RabbitMQ 3.1の導入と冗長化の検証をしたのでメモ。
検証のための構成はフロントのAPサーバー、RabbitMQが動作するキューサーバー、ワーカーそれぞれ二台づつ。キューサーバーが片方死んでも全体が動作し続けられる事、両方がダウンしたとしてもデータは損失しない事が確認できれば良い。要するに単一障害点にならないようにRabbitMQを使いたい。
各サーバーの /etc/hosts にrabbit1とrabbit2は追加しておく。
それぞれのサーバーのポート15672で管理画面が起動する(2.x系の場合は55672)。Basic認証がかかっているが有効化直後は guest/guest で参照できる。
管理画面上でキューのパラメータに[D]と表示され、rabbitmqの起動再起動を繰りかえしても内容が保持されるようになった。
rabbit2をrabbi1に参加させるには次のコマンド。
これで上手くいったかと思いきや、rabbit1を停止させるとrabbit2にメッセージを投げた時にエラーが返ってくるようになる。
停止したノードはそのまま再起動でOK、元のクラスタ構成に戻る。
検証のための構成はフロントのAPサーバー、RabbitMQが動作するキューサーバー、ワーカーそれぞれ二台づつ。キューサーバーが片方死んでも全体が動作し続けられる事、両方がダウンしたとしてもデータは損失しない事が確認できれば良い。要するに単一障害点にならないようにRabbitMQを使いたい。
サーバーの準備
仮想マシン6台はVagrantを使えば一発で用意できる、メモリ16GB積んでてよかった。ホスト名を後でいじるとrabbitmqctlで停止・再起動がうまくいかなくなった。ホスト名周りはEC2で使う時に面倒な事になりそうだ。各サーバーの /etc/hosts にrabbit1とrabbit2は追加しておく。
RabbitMQ 3.1 のインストール
APTリポジトリの追加が必要、公式ページに手順があるのでその通りに。
起動確認
vagrant@rabbit1:~$ sudo rabbitmq-server vagrant@rabbit2:~$ sudo rabbitmq-server
管理画面の有効化
vagrant@rabbit1:~$ sudo rabbitmq-plugins enable rabbitmq_management vagrant@rabbit1:~$ sudo rabbitmqctl stop vagrant@rabbit1:~$ sudo rabbitmq-server vagrant@rabbit2:~$ sudo rabbitmq-plugins enable rabbitmq_management vagrant@rabbit2:~$ sudo rabbitmqctl stop vagrant@rabbit2:~$ sudo rabbitmq-server
それぞれのサーバーのポート15672で管理画面が起動する(2.x系の場合は55672)。Basic認証がかかっているが有効化直後は guest/guest で参照できる。
疎通テスト
クラスタ構成にする前にそれぞれのrabbitmqノードと疎通できるかチェックする。Hello Worldが届けばOK。# sender.py import pika params = pika.ConnectionParameters('rabbit1') connection = pika.BlockingConnection(params) channel = connection.channel() # Make Queue channel.queue_declare(queue='mq_test') # Publish channel.basic_publish( exchange='', routing_key='mq_test', body='Hello World', ) print " [x] sent Hello World"
# receiver.py import pika params = pika.ConnectionParameters('rabbit1') connection = pika.BlockingConnection(params) channel = connection.channel() channel.queue_declare(queue='mq_test') def callback(ch, method, properties, body): print " [x] Received %r" % (body,) channel.basic_consume(callback, queue='mq_test', no_ack=True) channel.start_consuming()
メッセージの永続化
先のPublisherのコードだと、receiverがメッセージを受け取る前にrabbitmqを停止させるとキューの内容が消失した。それではまずいのでキューとメッセージにオプションを追加する。# durable=True オプション付きでキューを宣言する channel.queue_declare(queue='cluster_test', durable=True) # 永続化オプション付きでメッセージをpublishする channel.basic_publish( exchange='', routing_key='cluster_test', body=msg, properties=pika.BasicProperties( # To Persistent delivery_mode=2, ) )
管理画面上でキューのパラメータに[D]と表示され、rabbitmqの起動再起動を繰りかえしても内容が保持されるようになった。
ACKの送出
先のComsumerの実装だと、ワーカーが正常に処理を完了したかどうかに関係無くキューのメッセージが消える。正常応答がComsumerから返ってきた場合のみキューのメッセージが消えるようにするには次の通り。
def callback(ch, method, properties, body): print " [x] Received %r" % (body,) # ackを返す (返さないとキューから消えない) ch.basic_ack(delivery_tag = method.delivery_tag) # no_ack=Trueを削除 channel.basic_consume(callback, queue='mq_test')
Cluster化
次はrabbit1とrabbit2をクラスタ構成にする。まずは各ノードのerlang cookie (/var/lib/rabbitmq/.erlang.cookie) を同じにしてそれぞれ再起動。rabbit2をrabbi1に参加させるには次のコマンド。
vagrant@rabbit2:~$ sudo rabbitmqctl stop_app vagrant@rabbit2:~$ sudo rabbitmqctl join_cluster rabbit@rabbit1 vagrant@rabbit2:~$ sudo rabbitmqctl start_appクラスタの状態を確認、二つともdiskノードになっている。
vagrant@rabbit2:~$ sudo rabbitmqctl cluster_status ls: cannot access /etc/rabbitmq/rabbitmq.conf.d: No such file or directory Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2]}]}, {running_nodes,[rabbit@rabbit1,rabbit@rabbit2]}, {partitions,[]}] …done.
いずれかのrabbitmqノードに送信する様にPublisherを改造する
HAProxyを各APサーバーに載せるのが硬そうだが、この場では生きているrabbitmqノードに接続できるまでランダムに選ぶというナイーブな get_connection を実装した。# sender.py (ap1) import random import pika def get_connection(): mq_clusters = ['rabbit1', 'rabbit2'] random.shuffle(mq_clusters) for mq in mq_clusters: try: params = pika.ConnectionParameters(mq) connection = pika.BlockingConnection(params) return connection except Exception, e: print("Try next node") raise Exception("Cannot establish connection") def send(con, msg): channel = con.channel() # 同じなので中略 for i in xrange(1, 1000): con = get_connection() msg = 'From AP1 to %s: %i' % (con.params.host, i) send(con, msg) con.close()
実行結果
# AP1 [x] sent From AP1 to rabbit1: 900 [x] sent From AP1 to rabbit1: 901 [x] sent From AP1 to rabbit1: 902 [x] sent From AP1 to rabbit1: 903 [x] sent From AP1 to rabbit1: 904 [x] sent From AP1 to rabbit1: 905 [x] sent From AP1 to rabbit2: 906 [x] sent From AP1 to rabbit1: 907 [x] sent From AP1 to rabbit1: 908 [x] sent From AP1 to rabbit2: 909 # AP2 [x] sent From AP2 to rabbit2: 815 [x] sent From AP2 to rabbit1: 816 [x] sent From AP2 to rabbit2: 817 [x] sent From AP2 to rabbit2: 818 [x] sent From AP2 to rabbit2: 819 [x] sent From AP2 to rabbit2: 820 [x] sent From AP2 to rabbit1: 821 [x] sent From AP2 to rabbit1: 822 [x] sent From AP2 to rabbit2: 823 [x] sent From AP2 to rabbit1: 824 # Worker1 [x] Received 'From AP1 to rabbit1: 903' [x] Received 'From AP1 to rabbit1: 904' [x] Received 'From AP1 to rabbit1: 905' [x] Received 'From AP2 to rabbit2: 820' [x] Received 'From AP1 to rabbit1: 907' [x] Received 'From AP1 to rabbit1: 908' [x] Received 'From AP2 to rabbit2: 823' # Worker2 [x] Received 'From AP1 to rabbit1: 900' [x] Received 'From AP1 to rabbit1: 901' [x] Received 'From AP1 to rabbit1: 902' [x] Received 'From AP2 to rabbit2: 817' [x] Received 'From AP2 to rabbit2: 818' [x] Received 'From AP2 to rabbit2: 819' [x] Received 'From AP1 to rabbit2: 906' [x] Received 'From AP2 to rabbit1: 821' [x] Received 'From AP2 to rabbit1: 822' [x] Received 'From AP1 to rabbit2: 909' [x] Received 'From AP2 to rabbit1: 824'
これで上手くいったかと思いきや、rabbit1を停止させるとrabbit2にメッセージを投げた時にエラーが返ってくるようになる。
pika.exceptions.ChannelClosed: (404, "NOT_FOUND - home node 'rabbit@rabbit1' of durable queue 'cluster_test' in vhost '/' is down or inaccessible")キューの内容がrabbit1にしか保持されていないからだ。rabbit2は生きているがrabbit1のキューにアクセスできなければ何もできない。
キューをミラーリングする設定
特定のキューの内容をクラスタの全てのノードにも持たせるには rabbitmqctl で ha-mode を指定する。vagrant@rabbit1:~$ sudo rabbitmqctl set_policy all 'cluster_test' '{"ha-mode": "all"}' vagrant@rabbit2:~$ sudo rabbitmqctl set_policy all 'cluster_test' '{"ha-mode": "all"}'キューの指定には正規表現が使えるので、全てのキューをミラーリングするには
sudo rabbitmqctl set_policy all '^.*' '{"ha-mode": "all"}'としても良い。これで、rabbit1を落してもrabbit2だけで動作するようになった。管理画面でもミラーリングができているか確認ができる。