検証のための構成はフロントのAPサーバー、RabbitMQが動作するキューサーバー、ワーカーそれぞれ二台づつ。キューサーバーが片方死んでも全体が動作し続けられる事、両方がダウンしたとしてもデータは損失しない事が確認できれば良い。要するに単一障害点にならないようにRabbitMQを使いたい。
サーバーの準備
仮想マシン6台はVagrantを使えば一発で用意できる、メモリ16GB積んでてよかった。ホスト名を後でいじると
rabbitmqctlで停止・再起動がうまくいかなくなった。ホスト名周りはEC2で使う時に面倒な事になりそうだ。
各サーバーの /etc/hosts にrabbit1とrabbit2は追加しておく。
RabbitMQ 3.1 のインストール
APTリポジトリの追加が必要、公式ページに手順があるのでその通りに。
起動確認
vagrant@rabbit1:~$ sudo rabbitmq-server
vagrant@rabbit2:~$ sudo rabbitmq-server
管理画面の有効化
vagrant@rabbit1:~$ sudo rabbitmq-plugins enable rabbitmq_management
vagrant@rabbit1:~$ sudo rabbitmqctl stop
vagrant@rabbit1:~$ sudo rabbitmq-server
vagrant@rabbit2:~$ sudo rabbitmq-plugins enable rabbitmq_management
vagrant@rabbit2:~$ sudo rabbitmqctl stop
vagrant@rabbit2:~$ sudo rabbitmq-server
それぞれのサーバーのポート15672で管理画面が起動する(2.x系の場合は55672)。Basic認証がかかっているが有効化直後は guest/guest で参照できる。
疎通テスト
クラスタ構成にする前にそれぞれのrabbitmqノードと疎通できるかチェックする。Hello Worldが届けばOK。
# sender.py
import pika
params = pika.ConnectionParameters('rabbit1')
connection = pika.BlockingConnection(params)
channel = connection.channel()
# Make Queue
channel.queue_declare(queue='mq_test')
# Publish
channel.basic_publish(
exchange='',
routing_key='mq_test',
body='Hello World',
)
print " [x] sent Hello World"
# receiver.py
import pika
params = pika.ConnectionParameters('rabbit1')
connection = pika.BlockingConnection(params)
channel = connection.channel()
channel.queue_declare(queue='mq_test')
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
channel.basic_consume(callback, queue='mq_test', no_ack=True)
channel.start_consuming()
メッセージの永続化
先のPublisherのコードだと、receiverがメッセージを受け取る前にrabbitmqを停止させるとキューの内容が消失した。それではまずいのでキューとメッセージにオプションを追加する。
# durable=True オプション付きでキューを宣言する
channel.queue_declare(queue='cluster_test', durable=True)
# 永続化オプション付きでメッセージをpublishする
channel.basic_publish(
exchange='',
routing_key='cluster_test',
body=msg,
properties=pika.BasicProperties(
# To Persistent
delivery_mode=2,
)
)
管理画面上でキューのパラメータに[D]と表示され、rabbitmqの起動再起動を繰りかえしても内容が保持されるようになった。
ACKの送出
先のComsumerの実装だと、ワーカーが正常に処理を完了したかどうかに関係無くキューのメッセージが消える。正常応答がComsumerから返ってきた場合のみキューのメッセージが消えるようにするには次の通り。
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
# ackを返す (返さないとキューから消えない)
ch.basic_ack(delivery_tag = method.delivery_tag)
# no_ack=Trueを削除
channel.basic_consume(callback, queue='mq_test')
Cluster化
次はrabbit1とrabbit2をクラスタ構成にする。まずは各ノードのerlang cookie (/var/lib/rabbitmq/.erlang.cookie)
を同じにしてそれぞれ再起動。
rabbit2をrabbi1に参加させるには次のコマンド。
vagrant@rabbit2:~$ sudo rabbitmqctl stop_app
vagrant@rabbit2:~$ sudo rabbitmqctl join_cluster rabbit@rabbit1
vagrant@rabbit2:~$ sudo rabbitmqctl start_app
クラスタの状態を確認、二つともdiskノードになっている。
vagrant@rabbit2:~$ sudo rabbitmqctl cluster_status
ls: cannot access /etc/rabbitmq/rabbitmq.conf.d: No such file or directory
Cluster status of node rabbit@rabbit2 ...
[{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2]}]},
{running_nodes,[rabbit@rabbit1,rabbit@rabbit2]},
{partitions,[]}]
…done.
いずれかのrabbitmqノードに送信する様にPublisherを改造する
HAProxyを各APサーバーに載せるのが硬そうだが、この場では生きているrabbitmqノードに接続できるまでランダムに選ぶというナイーブな get_connection を実装した。
# sender.py (ap1)
import random
import pika
def get_connection():
mq_clusters = ['rabbit1', 'rabbit2']
random.shuffle(mq_clusters)
for mq in mq_clusters:
try:
params = pika.ConnectionParameters(mq)
connection = pika.BlockingConnection(params)
return connection
except Exception, e:
print("Try next node")
raise Exception("Cannot establish connection")
def send(con, msg):
channel = con.channel()
# 同じなので中略
for i in xrange(1, 1000):
con = get_connection()
msg = 'From AP1 to %s: %i' % (con.params.host, i)
send(con, msg)
con.close()
実行結果
# AP1
[x] sent From AP1 to rabbit1: 900
[x] sent From AP1 to rabbit1: 901
[x] sent From AP1 to rabbit1: 902
[x] sent From AP1 to rabbit1: 903
[x] sent From AP1 to rabbit1: 904
[x] sent From AP1 to rabbit1: 905
[x] sent From AP1 to rabbit2: 906
[x] sent From AP1 to rabbit1: 907
[x] sent From AP1 to rabbit1: 908
[x] sent From AP1 to rabbit2: 909
# AP2
[x] sent From AP2 to rabbit2: 815
[x] sent From AP2 to rabbit1: 816
[x] sent From AP2 to rabbit2: 817
[x] sent From AP2 to rabbit2: 818
[x] sent From AP2 to rabbit2: 819
[x] sent From AP2 to rabbit2: 820
[x] sent From AP2 to rabbit1: 821
[x] sent From AP2 to rabbit1: 822
[x] sent From AP2 to rabbit2: 823
[x] sent From AP2 to rabbit1: 824
# Worker1
[x] Received 'From AP1 to rabbit1: 903'
[x] Received 'From AP1 to rabbit1: 904'
[x] Received 'From AP1 to rabbit1: 905'
[x] Received 'From AP2 to rabbit2: 820'
[x] Received 'From AP1 to rabbit1: 907'
[x] Received 'From AP1 to rabbit1: 908'
[x] Received 'From AP2 to rabbit2: 823'
# Worker2
[x] Received 'From AP1 to rabbit1: 900'
[x] Received 'From AP1 to rabbit1: 901'
[x] Received 'From AP1 to rabbit1: 902'
[x] Received 'From AP2 to rabbit2: 817'
[x] Received 'From AP2 to rabbit2: 818'
[x] Received 'From AP2 to rabbit2: 819'
[x] Received 'From AP1 to rabbit2: 906'
[x] Received 'From AP2 to rabbit1: 821'
[x] Received 'From AP2 to rabbit1: 822'
[x] Received 'From AP1 to rabbit2: 909'
[x] Received 'From AP2 to rabbit1: 824'
これで上手くいったかと思いきや、rabbit1を停止させるとrabbit2にメッセージを投げた時にエラーが返ってくるようになる。
pika.exceptions.ChannelClosed: (404, "NOT_FOUND - home node 'rabbit@rabbit1' of durable queue 'cluster_test' in vhost '/' is down or inaccessible")
キューの内容がrabbit1にしか保持されていないからだ。rabbit2は生きているがrabbit1のキューにアクセスできなければ何もできない。
キューをミラーリングする設定
特定のキューの内容をクラスタの全てのノードにも持たせるには
rabbitmqctl で ha-mode を指定する。
vagrant@rabbit1:~$ sudo rabbitmqctl set_policy all 'cluster_test' '{"ha-mode": "all"}'
vagrant@rabbit2:~$ sudo rabbitmqctl set_policy all 'cluster_test' '{"ha-mode": "all"}'
キューの指定には正規表現が使えるので、全てのキューをミラーリングするには
sudo rabbitmqctl set_policy all '^.*' '{"ha-mode": "all"}'
としても良い。これで、rabbit1を落してもrabbit2だけで動作するようになった。管理画面でもミラーリングができているか確認ができる。
停止したノードはそのまま再起動でOK、元のクラスタ構成に戻る。
参考