RabbitMQ:01 環境設定と動作確認
【1】はじめに:MQ(メッセージキューイング)概要
MQ(メッセージ・キューイング)の仕組みは、
異なるITシステムやソフトウェア間でデータをやり取りする時に、相手側から直接データを送受信するのではなく、間に一時的に格納しておく場所(メッセージキュー)を用意しておき、それを使ってデータ送受信をする。
というもの。
MQの仕組みにより、送信側は送りたいタイミングでデータを送信し、受信側は受信したいときに受信する、という相手側の処理完了を待たなくてよい非同期通信が実現できる。(※)
MQの仕組みは「レガシーシステムにおけるシステム間のデータ連携」や「クラウド上のシステム間連携・アプリ」等のバックエンド側でよく見かける。
※「電子メールの動き」をイメージするとわかりやすいかもしれない。電子メールのやり取りでは、
『送信側は任意のタイミングでメールサーバ宛にメールを送信する。一方、受信側も任意のタイミングでメールサーバに溜まっている電子メールを受信する。』という仕組みになっている。
【2】代表的なミドルウェア(ソフトウェア)・サービス
「MQ実現には結局のところ、データを送受信するサーバを立てる」ことになる。それを可能にする代表的なミドルウェア(ソフトウェア)として
・Microsoft Message Queue Server(MSMQ)
・IBM WebSphere MQ
・Apache ActiveMQ
・RabbitMQ ※今回使うもの
などがある。
クラウドサービスなら、例えば以下のような製品・サービスがある。
■ AWS
Amazon SQS(Simple Queue Service):
1対1のキュータイプ型。「PTPモデル」で「PULL」 のみ。
Amazon SNS(Simple Notification Service):
「Pub/Sub モデル」で「PUSH」のみ。SMS・プッシュ通知・http/https・メール・Lambda 等への汎用通知サービス用。「SNS → SQS」とつないで使うこともできる。
Amazon MQ:
Apache ActiveMQのAWSフルマネージド版みたいなもの。
■ Azure
Azure Queue Storage:
「PTP モデル」で「PULL 」のみ。
Azure Service Bus:
「Pub/Sub モデル」で「PULL・PUSH」可能。
Cloud Pub/Sub:
「Pub/Sub モデル」で「PULL・PUSH」可能。
Cloud Tasks:
「PTPモデル」で「PULL」のみ。
▲「PTPモデル、Pub/Subモデル」、「PULL、PUSH」といった用語が出てきているが詳細は略。
ようは「同一データは1対1の送受信か、1対多の送受信か」とか、「受信データは送りつけられるのか、受信側から取りに行くのか」みたいな違いってこと。
長々と説明したが、ここでは「Python」+「RabbitMQ」を使って、MQを使ってみることにする。
【3】RabbitMQ(CloudAMQP)のセットアップ
RabbitMQのサイトは以下。
…なのだが、ローカル上に環境を構築するのが面倒なので、
今回は「CloudAMQP(無料枠)」を使ってクラウド上にRabbitMQサーバを準備しておき、pythonプログラム(pikaライブラリ)で接続する。
※CloudAMQPの公式サイトは以下
【3-1】アカウントとインスタンス(無料枠)の作成
無料枠の利用制約は「pricing」から一番下までスクロールして確認する。
画面右上の「Sign Up」からCloudAMQP上にアカウントを作成する。
↓ アカウント作成したら「Team(プロジェクト相当)」を作る。
▲注意点は「GDPR」の部分。
【GDPR(General Data Protection Regulation):EU一般データ保護規】
EU域内における個人データ保護に関する規則。
「企業の所在地に依ることなく、EU域内の居住者に関する個人情報」の収集および処理に際して順守すべきルールが規定されている。
インターネット上で「EU域内の顧客」に商品やサービスを提供する場合にはGDPRが適用される。違反すると罰則金を科される可能性がある。
今回は適当に作るテストデータで実験する。個人情報は取り扱わないので「No」でも大丈夫。しかし、実利用を考えた場合「日本とEUでは個人情報として取り扱う範囲も違う」ので十分に確認して注意しよう。
↓ 引き続き「インスタンス」を作成する
「インスタンス名」と「プラン」を設定する。この「Plan欄」で「無料枠(free)のプラン」を選ぶ。
↓ 選択できるリージョン・データセンターの中から適当なものを選ぶ。
↓ 設定内容をプレビューして、問題なければインスタンスを作成する。
ここまででアカウントとインスタンス(無料枠)の作成が完了した。
【3-2】RabbitMQでのキューイングと管理画面の見方
ざっくりいうとRabbitMQには
Producer:送信者相当
Exchange:送信キュー相当
Queue:受信キュー相当
Consumer:受信者相当
の4つが出てくる。
①「Producer」が「Exchange」宛にデータを送信する
②「Exchange」から事前に決めたルールに従って、特定の「Queue」宛にデータを送信する
③「Consumer」は指定した「Queue」の状態を監視しており、状態に変化があればデータ取得などが起動する
■管理画面
CloudAMQPの管理画面は
・インスタンスに対する管理画面
・インスタンス上で立ち上がっているRabbitMQの管理画面
の2つからなる。
【インスタンスに対する管理画面】
▲「CloudAMQPへの接続情報:AMQP URL」が生成されているので控えておく。後でプログラムの際に使用する。
【インスタンス上で立ち上がっているRabbitMQの管理画面】
▲RabbitMQにはあらかじめいくつかの「Exchange」が用意されている。「Queue」側も「admin」という名前のものが用意されている。
【4】動作確認(Python)
pythonでRabbitMQの動作確認をする。この時に使用するのが「pika」。
■ライブラリのインストール
pip install pika
あるいは「requirements.txt」に「pika」を書いておき「pip install -r requirements.txt」でもOK。
■producer.py:Producer(送信側)の作成
※ 「AMQP URLは個々に違う」のでこのままコピペしても動かない(悪用されないようにnote掲載後「instance」と「team」は削除済み)。管理画面で確認した控えておいた「AMQP URL」を指定する。
import pika
from pika import connection
# 控えておいた AMQP URLを設定する
params = pika.URLParameters('amqps://jhuxgygz:cQHA-vEUngBY6r3dloKv4fkFmZ-5sxkc@cougar.rmq.cloudamqp.com/jhuxgygz')
connection = pika.BlockingConnection(params)
channel = connection.channel()
def publish():
# デフォルトで用意してあるexchangeに送信⇒デフォルトのadminキュー宛
channel.basic_publish(exchange='', routing_key='admin',body='hello')
if __name__ == '__main__':
publish()
▲今回は「adminキュー」宛に「hello」というメッセージを送信している。
【動作確認】
▲adminキューに1つメッセージがたまったのがわかる。
■consumer.py:Consumer(受信側)の作成
※ 「AMQP URLは個々に違う」のでこのままコピペしても動かない(悪用されないようにnote掲載後「instance」と「team」は削除済み)。管理画面で確認した控えておいた「AMQP URL」を指定する。
import pika
params = pika.URLParameters('amqps://jhuxgygz:cQHA-vEUngBY6r3dloKv4fkFmZ-5sxkc@cougar.rmq.cloudamqp.com/jhuxgygz')
connection = pika.BlockingConnection(params)
channel = connection.channel()
channel.queue_declare(queue='admin') # 対象とするキューを指定
# 監視するキューに対するコールバック関数
def callback(ch, method, properties,body):
print('recieved')
print(body)
#ch.basic_ack(delivery_tag = method.delivery_tag) # 受信してACKを返してキュー削除でもOK
# コールバック登録
channel.basic_consume(queue='admin', on_message_callback=callback, auto_ack=True) # auto_ack:ON
try:
print('started Comsuming')
channel.start_consuming() # キュー監視の監視開始(ループ開始⇒終わるときはとりあえず強制終了でOK)
except KeyboardInterrupt:
channel.stop_consuming()
channel.close()
【動作確認】
以上で「RabbitMQ(CloudAMQP)」の設定と「python(pikaライブラリ)」を使った動作確認ができた。