検証のための構成はフロントのAPサーバー、RabbitMQが動作するキューサーバー、ワーカーそれぞれ二台づつ。キューサーバーが片方死んでも全体が動作し続けられる事、両方がダウンしたとしてもデータは損失しない事が確認できれば良い。要するに単一障害点にならないようにRabbitMQを使いたい。
サーバーの準備
仮想マシン6台はVagrantを使えば一発で用意できる、メモリ16GB積んでてよかった。ホスト名を後でいじるとrabbitmqctlで停止・再起動がうまくいかなくなった。ホスト名周りはEC2で使う時に面倒な事になりそうだ。
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- mode: ruby -*- | |
# vi: set ft=ruby : | |
VAGRANTFILE_API_VERSION = "2" | |
Vagrant.configure(VAGRANTFILE_API_VERSION) do |config| | |
config.vm.box = "ubuntu-12.04-x64" | |
config.vm.define :ap_server1 do |ap| | |
ap.vm.hostname = "ap1" | |
ap.vm.network :private_network, ip: "192.168.50.13" | |
end | |
config.vm.define :ap_server2 do |ap| | |
ap.vm.hostname = "ap2" | |
ap.vm.network :private_network, ip: "192.168.50.14" | |
end | |
config.vm.define :cluster1 do |cluster| | |
cluster.vm.hostname = "rabbit1" | |
cluster.vm.network :private_network, ip: "192.168.50.15" | |
end | |
config.vm.define :cluster2 do |cluster| | |
cluster.vm.hostname = "rabbit2" | |
cluster.vm.network :private_network, ip: "192.168.50.16" | |
end | |
config.vm.define :worker1 do |worker| | |
worker.vm.hostname = "worker1" | |
worker.vm.network :private_network, ip: "192.168.50.17" | |
end | |
config.vm.define :worker2 do |worker| | |
worker.vm.hostname = "worker2" | |
worker.vm.network :private_network, ip: "192.168.50.18" | |
end | |
end |
RabbitMQ 3.1 のインストール
APTリポジトリの追加が必要、公式ページに手順があるのでその通りに。
起動確認
vagrant@rabbit1:~$ sudo rabbitmq-server vagrant@rabbit2:~$ sudo rabbitmq-server
管理画面の有効化
vagrant@rabbit1:~$ sudo rabbitmq-plugins enable rabbitmq_management vagrant@rabbit1:~$ sudo rabbitmqctl stop vagrant@rabbit1:~$ sudo rabbitmq-server vagrant@rabbit2:~$ sudo rabbitmq-plugins enable rabbitmq_management vagrant@rabbit2:~$ sudo rabbitmqctl stop vagrant@rabbit2:~$ sudo rabbitmq-server
それぞれのサーバーのポート15672で管理画面が起動する(2.x系の場合は55672)。Basic認証がかかっているが有効化直後は guest/guest で参照できる。
疎通テスト
クラスタ構成にする前にそれぞれのrabbitmqノードと疎通できるかチェックする。Hello Worldが届けばOK。1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | # sender.py import pika params = pika.ConnectionParameters( 'rabbit1' ) connection = pika.BlockingConnection(params) channel = connection.channel() # Make Queue channel.queue_declare(queue = 'mq_test' ) # Publish channel.basic_publish( exchange = '', routing_key = 'mq_test' , body = 'Hello World' , ) print " [x] sent Hello World" |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | # receiver.py import pika params = pika.ConnectionParameters( 'rabbit1' ) connection = pika.BlockingConnection(params) channel = connection.channel() channel.queue_declare(queue = 'mq_test' ) def callback(ch, method, properties, body): print " [x] Received %r" % (body,) channel.basic_consume(callback, queue = 'mq_test' , no_ack = True ) channel.start_consuming() |
メッセージの永続化
先のPublisherのコードだと、receiverがメッセージを受け取る前にrabbitmqを停止させるとキューの内容が消失した。それではまずいのでキューとメッセージにオプションを追加する。1 2 3 4 5 6 7 8 9 10 11 12 | # durable=True オプション付きでキューを宣言する channel.queue_declare(queue = 'cluster_test' , durable = True ) # 永続化オプション付きでメッセージをpublishする channel.basic_publish( exchange = '', routing_key = 'cluster_test' , body = msg, properties = pika.BasicProperties( # To Persistent delivery_mode = 2 , ) ) |
管理画面上でキューのパラメータに[D]と表示され、rabbitmqの起動再起動を繰りかえしても内容が保持されるようになった。
ACKの送出
先のComsumerの実装だと、ワーカーが正常に処理を完了したかどうかに関係無くキューのメッセージが消える。正常応答がComsumerから返ってきた場合のみキューのメッセージが消えるようにするには次の通り。
1 2 3 4 5 6 7 | def callback(ch, method, properties, body): print " [x] Received %r" % (body,) # ackを返す (返さないとキューから消えない) ch.basic_ack(delivery_tag = method.delivery_tag) # no_ack=Trueを削除 channel.basic_consume(callback, queue = 'mq_test' ) |
Cluster化
次はrabbit1とrabbit2をクラスタ構成にする。まずは各ノードのerlang cookie (/var/lib/rabbitmq/.erlang.cookie) を同じにしてそれぞれ再起動。rabbit2をrabbi1に参加させるには次のコマンド。
vagrant@rabbit2:~$ sudo rabbitmqctl stop_app vagrant@rabbit2:~$ sudo rabbitmqctl join_cluster rabbit@rabbit1 vagrant@rabbit2:~$ sudo rabbitmqctl start_appクラスタの状態を確認、二つともdiskノードになっている。
vagrant@rabbit2:~$ sudo rabbitmqctl cluster_status ls: cannot access /etc/rabbitmq/rabbitmq.conf.d: No such file or directory Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2]}]}, {running_nodes,[rabbit@rabbit1,rabbit@rabbit2]}, {partitions,[]}] …done.
いずれかのrabbitmqノードに送信する様にPublisherを改造する
HAProxyを各APサーバーに載せるのが硬そうだが、この場では生きているrabbitmqノードに接続できるまでランダムに選ぶというナイーブな get_connection を実装した。1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | # sender.py (ap1) import random import pika def get_connection(): mq_clusters = [ 'rabbit1' , 'rabbit2' ] random.shuffle(mq_clusters) for mq in mq_clusters: try : params = pika.ConnectionParameters(mq) connection = pika.BlockingConnection(params) return connection except Exception, e: print ( "Try next node" ) raise Exception( "Cannot establish connection" ) def send(con, msg): channel = con.channel() # 同じなので中略 for i in xrange ( 1 , 1000 ): con = get_connection() msg = 'From AP1 to %s: %i' % (con.params.host, i) send(con, msg) con.close() |
実行結果
# AP1 [x] sent From AP1 to rabbit1: 900 [x] sent From AP1 to rabbit1: 901 [x] sent From AP1 to rabbit1: 902 [x] sent From AP1 to rabbit1: 903 [x] sent From AP1 to rabbit1: 904 [x] sent From AP1 to rabbit1: 905 [x] sent From AP1 to rabbit2: 906 [x] sent From AP1 to rabbit1: 907 [x] sent From AP1 to rabbit1: 908 [x] sent From AP1 to rabbit2: 909 # AP2 [x] sent From AP2 to rabbit2: 815 [x] sent From AP2 to rabbit1: 816 [x] sent From AP2 to rabbit2: 817 [x] sent From AP2 to rabbit2: 818 [x] sent From AP2 to rabbit2: 819 [x] sent From AP2 to rabbit2: 820 [x] sent From AP2 to rabbit1: 821 [x] sent From AP2 to rabbit1: 822 [x] sent From AP2 to rabbit2: 823 [x] sent From AP2 to rabbit1: 824 # Worker1 [x] Received 'From AP1 to rabbit1: 903' [x] Received 'From AP1 to rabbit1: 904' [x] Received 'From AP1 to rabbit1: 905' [x] Received 'From AP2 to rabbit2: 820' [x] Received 'From AP1 to rabbit1: 907' [x] Received 'From AP1 to rabbit1: 908' [x] Received 'From AP2 to rabbit2: 823' # Worker2 [x] Received 'From AP1 to rabbit1: 900' [x] Received 'From AP1 to rabbit1: 901' [x] Received 'From AP1 to rabbit1: 902' [x] Received 'From AP2 to rabbit2: 817' [x] Received 'From AP2 to rabbit2: 818' [x] Received 'From AP2 to rabbit2: 819' [x] Received 'From AP1 to rabbit2: 906' [x] Received 'From AP2 to rabbit1: 821' [x] Received 'From AP2 to rabbit1: 822' [x] Received 'From AP1 to rabbit2: 909' [x] Received 'From AP2 to rabbit1: 824'
これで上手くいったかと思いきや、rabbit1を停止させるとrabbit2にメッセージを投げた時にエラーが返ってくるようになる。
pika.exceptions.ChannelClosed: (404, "NOT_FOUND - home node 'rabbit@rabbit1' of durable queue 'cluster_test' in vhost '/' is down or inaccessible")キューの内容がrabbit1にしか保持されていないからだ。rabbit2は生きているがrabbit1のキューにアクセスできなければ何もできない。
キューをミラーリングする設定
特定のキューの内容をクラスタの全てのノードにも持たせるには rabbitmqctl で ha-mode を指定する。vagrant@rabbit1:~$ sudo rabbitmqctl set_policy all 'cluster_test' '{"ha-mode": "all"}' vagrant@rabbit2:~$ sudo rabbitmqctl set_policy all 'cluster_test' '{"ha-mode": "all"}'キューの指定には正規表現が使えるので、全てのキューをミラーリングするには
sudo rabbitmqctl set_policy all '^.*' '{"ha-mode": "all"}'としても良い。これで、rabbit1を落してもrabbit2だけで動作するようになった。管理画面でもミラーリングができているか確認ができる。