2013-08-11

RabbitMQ 3.1の導入とCluster構成を検証する

RabbitMQ 3.1の導入と冗長化の検証をしたのでメモ。
検証のための構成はフロントのAPサーバー、RabbitMQが動作するキューサーバー、ワーカーそれぞれ二台づつ。キューサーバーが片方死んでも全体が動作し続けられる事、両方がダウンしたとしてもデータは損失しない事が確認できれば良い。要するに単一障害点にならないようにRabbitMQを使いたい。


サーバーの準備

仮想マシン6台はVagrantを使えば一発で用意できる、メモリ16GB積んでてよかった。ホスト名を後でいじるとrabbitmqctlで停止・再起動がうまくいかなくなった。ホスト名周りはEC2で使う時に面倒な事になりそうだ。

各サーバーの /etc/hosts にrabbit1とrabbit2は追加しておく。

RabbitMQ 3.1 のインストール

APTリポジトリの追加が必要、公式ページに手順があるのでその通りに。

起動確認

vagrant@rabbit1:~$ sudo rabbitmq-server
vagrant@rabbit2:~$ sudo rabbitmq-server

管理画面の有効化

vagrant@rabbit1:~$ sudo rabbitmq-plugins enable rabbitmq_management 
vagrant@rabbit1:~$ sudo rabbitmqctl stop
vagrant@rabbit1:~$ sudo rabbitmq-server

vagrant@rabbit2:~$ sudo rabbitmq-plugins enable rabbitmq_management 
vagrant@rabbit2:~$ sudo rabbitmqctl stop
vagrant@rabbit2:~$ sudo rabbitmq-server

それぞれのサーバーのポート15672で管理画面が起動する(2.x系の場合は55672)。Basic認証がかかっているが有効化直後は guest/guest で参照できる。

疎通テスト

クラスタ構成にする前にそれぞれのrabbitmqノードと疎通できるかチェックする。Hello Worldが届けばOK。
# sender.py
import pika

params = pika.ConnectionParameters('rabbit1')
connection = pika.BlockingConnection(params)
channel = connection.channel()
# Make Queue
channel.queue_declare(queue='mq_test')
# Publish
channel.basic_publish(
                exchange='',
                routing_key='mq_test',
                body='Hello World',
                )
print " [x] sent Hello World"
# receiver.py
import pika

params = pika.ConnectionParameters('rabbit1')
connection = pika.BlockingConnection(params)

channel = connection.channel()
channel.queue_declare(queue='mq_test')

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)

channel.basic_consume(callback, queue='mq_test', no_ack=True)
channel.start_consuming()

メッセージの永続化

先のPublisherのコードだと、receiverがメッセージを受け取る前にrabbitmqを停止させるとキューの内容が消失した。それではまずいのでキューとメッセージにオプションを追加する。
# durable=True オプション付きでキューを宣言する
channel.queue_declare(queue='cluster_test', durable=True)
# 永続化オプション付きでメッセージをpublishする
channel.basic_publish(
                exchange='',
                routing_key='cluster_test',
                body=msg,
                properties=pika.BasicProperties(
                        # To Persistent
                        delivery_mode=2,
                )
        )

管理画面上でキューのパラメータに[D]と表示され、rabbitmqの起動再起動を繰りかえしても内容が保持されるようになった。

ACKの送出

先のComsumerの実装だと、ワーカーが正常に処理を完了したかどうかに関係無くキューのメッセージが消える。正常応答がComsumerから返ってきた場合のみキューのメッセージが消えるようにするには次の通り。
def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    # ackを返す (返さないとキューから消えない)
    ch.basic_ack(delivery_tag = method.delivery_tag)

# no_ack=Trueを削除
channel.basic_consume(callback, queue='mq_test')

Cluster化

次はrabbit1とrabbit2をクラスタ構成にする。まずは各ノードのerlang cookie (/var/lib/rabbitmq/.erlang.cookie) を同じにしてそれぞれ再起動。
rabbit2をrabbi1に参加させるには次のコマンド。
vagrant@rabbit2:~$ sudo rabbitmqctl stop_app
vagrant@rabbit2:~$ sudo rabbitmqctl join_cluster rabbit@rabbit1
vagrant@rabbit2:~$ sudo rabbitmqctl start_app
クラスタの状態を確認、二つともdiskノードになっている。
vagrant@rabbit2:~$ sudo rabbitmqctl cluster_status
ls: cannot access /etc/rabbitmq/rabbitmq.conf.d: No such file or directory
Cluster status of node rabbit@rabbit2 ...
[{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2]}]},
 {running_nodes,[rabbit@rabbit1,rabbit@rabbit2]},
 {partitions,[]}]
…done.

いずれかのrabbitmqノードに送信する様にPublisherを改造する

HAProxyを各APサーバーに載せるのが硬そうだが、この場では生きているrabbitmqノードに接続できるまでランダムに選ぶというナイーブな get_connection を実装した。
# sender.py (ap1)
import random
import pika

def get_connection():
    mq_clusters = ['rabbit1', 'rabbit2']
    random.shuffle(mq_clusters)
    for mq in mq_clusters:
        try:
            params = pika.ConnectionParameters(mq)
            connection = pika.BlockingConnection(params)
            return connection
        except Exception, e:
            print("Try next node")
    raise Exception("Cannot establish connection")

def send(con, msg):
    channel = con.channel()
    # 同じなので中略

for i in xrange(1, 1000):
    con = get_connection()
    msg = 'From AP1 to %s: %i' % (con.params.host, i)
    send(con, msg)
    con.close()

実行結果

# AP1
[x] sent From AP1 to rabbit1: 900 
[x] sent From AP1 to rabbit1: 901 
[x] sent From AP1 to rabbit1: 902 
[x] sent From AP1 to rabbit1: 903 
[x] sent From AP1 to rabbit1: 904 
[x] sent From AP1 to rabbit1: 905 
[x] sent From AP1 to rabbit2: 906 
[x] sent From AP1 to rabbit1: 907 
[x] sent From AP1 to rabbit1: 908 
[x] sent From AP1 to rabbit2: 909 

# AP2
[x] sent From AP2 to rabbit2: 815
[x] sent From AP2 to rabbit1: 816
[x] sent From AP2 to rabbit2: 817
[x] sent From AP2 to rabbit2: 818
[x] sent From AP2 to rabbit2: 819
[x] sent From AP2 to rabbit2: 820
[x] sent From AP2 to rabbit1: 821
[x] sent From AP2 to rabbit1: 822
[x] sent From AP2 to rabbit2: 823
[x] sent From AP2 to rabbit1: 824
 
 # Worker1
[x] Received 'From AP1 to rabbit1: 903'
[x] Received 'From AP1 to rabbit1: 904'
[x] Received 'From AP1 to rabbit1: 905'
[x] Received 'From AP2 to rabbit2: 820'
[x] Received 'From AP1 to rabbit1: 907'
[x] Received 'From AP1 to rabbit1: 908'
[x] Received 'From AP2 to rabbit2: 823'

# Worker2
[x] Received 'From AP1 to rabbit1: 900'
[x] Received 'From AP1 to rabbit1: 901'
[x] Received 'From AP1 to rabbit1: 902'
[x] Received 'From AP2 to rabbit2: 817'
[x] Received 'From AP2 to rabbit2: 818'
[x] Received 'From AP2 to rabbit2: 819'
[x] Received 'From AP1 to rabbit2: 906'
[x] Received 'From AP2 to rabbit1: 821'
[x] Received 'From AP2 to rabbit1: 822'
[x] Received 'From AP1 to rabbit2: 909'
[x] Received 'From AP2 to rabbit1: 824'

これで上手くいったかと思いきや、rabbit1を停止させるとrabbit2にメッセージを投げた時にエラーが返ってくるようになる。
pika.exceptions.ChannelClosed: (404, "NOT_FOUND - home node 'rabbit@rabbit1' of durable queue 'cluster_test' in vhost '/' is down or inaccessible")
キューの内容がrabbit1にしか保持されていないからだ。rabbit2は生きているがrabbit1のキューにアクセスできなければ何もできない。

キューをミラーリングする設定

特定のキューの内容をクラスタの全てのノードにも持たせるには rabbitmqctl で ha-mode を指定する。
vagrant@rabbit1:~$ sudo rabbitmqctl set_policy all 'cluster_test' '{"ha-mode": "all"}'
vagrant@rabbit2:~$ sudo rabbitmqctl set_policy all 'cluster_test' '{"ha-mode": "all"}' 
キューの指定には正規表現が使えるので、全てのキューをミラーリングするには
sudo rabbitmqctl set_policy all '^.*' '{"ha-mode": "all"}'
としても良い。これで、rabbit1を落してもrabbit2だけで動作するようになった。管理画面でもミラーリングができているか確認ができる。


停止したノードはそのまま再起動でOK、元のクラスタ構成に戻る。

参考


このエントリーをはてなブックマークに追加