GoogleのCloud Pub/SubでCloud Storageと連携してみた
分析屋の下滝です。
前回は、Pub/SubとBigQueryの連携をさらっと触ってみました。
今回は、Pub/Subに送信したメッセージを、Cloud Storage に格納する方法を試してみます。
Cloud Storage サブスクリプション
Pub/Sub には3つのタイプのサブスクリプションがあります。
・pull
・push
・エクスポートサブスクリプション
・BigQuery サブスクリプション
・Cloud Storage サブスクリプション
前回は、BigQuery サブスクリプションを試しました。今回は、もう一つエクスポートサブスクリプションであるCloud Storageサブスクリプションを試します。
詳しくは公式の説明を参照してください。
ざっくり、準備としては
・Pub/Sub サービス アカウントに Cloud Storage のロールを割り当てる(上記の公式を参照)
・Cloud Storage バケットを作成しておく
ことが必要になります。
詳しくは、公式の説明を参照して準備してください。
Cloud Storage サブスクリプションのワークフローは、次のような図で説明があります。
では、設定をしていきます。
Cloud Storage バケットとしては、pub_sub_topic_testというバケットを作りました。
使うトピックは、前回の記事と同じものを使います。test-topicという名前です。
Pub/Subの画面からCloud Storageサブスクリプションを作成してみます。
サブスクリプションIDには「test-topic-sub-gs」を設定しました。
トピックには「test-topic」を選択しました。
配信タイプには、「Cloud Storage への書き込み」を選択しました
バケットには「pub_sub_topic_test」を
ファイル形式は「Text」を
バケットに書き込まれるファイル名に関する設定として、ファイル名の接頭辞は、必須ではありませんが「prefix_」を、同じくファイル名の接尾辞も必須ではありませんが「_suffix」を
ストレージのバッチ最大時間はデフォルトの5を
などを選択・入力してください。
Cloud Storage サブスクリプションでのファイル作成の仕様は、次のように説明があります。
続いて、トピックへのメッセージの送信を行います。前回の記事と同じです。project_idは、適切なものを設定してください。
"""Publishes multiple messages to a Pub/Sub topic with an error handler."""
from concurrent import futures
from google.cloud import pubsub_v1
from typing import Callable
project_id = "gcp-test-1697637841257"
topic_id = "test-topic"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
publish_futures = []
def get_callback(
publish_future: pubsub_v1.publisher.futures.Future, data: str
) -> Callable[[pubsub_v1.publisher.futures.Future], None]:
def callback(publish_future: pubsub_v1.publisher.futures.Future) -> None:
try:
# Wait 60 seconds for the publish call to succeed.
print(publish_future.result(timeout=60))
except futures.TimeoutError:
print(f"Publishing {data} timed out.")
return callback
data_list = ["あああ", "いいい"]
for data in data_list:
# When you publish a message, the client returns a future.
publish_future = publisher.publish(topic_path, data.encode("utf-8"))
# Non-blocking. Publish failures are handled in the callback function.
publish_future.add_done_callback(get_callback(publish_future, data))
publish_futures.append(publish_future)
# Wait for all the publish futures to resolve before exiting.
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)
print(f"Published messages with error handler to {topic_path}.")
メッセージ内容が「あああ」と「いいい」となる2通のメッセージをトピックに送ります。
実行結果は以下となります。
>python pub_sub.py
8903378788073495
8903378788073496
Published messages with error handler to projects/gcp-test-1697637841257/topics/test-topic.
8903378788073495のような数字は、メッセージIDとなります。詳しくはresultメソッドを参照してください。2通分のメッセージIDが取得できていることが分かります。
送信したメッセージがCloud Storageに格納されているか確認します。
ファイルが作成されるまでに時間がかかりました。ストレージ バッチ期間を5分にしているのでその間隔で作成されるのか、あまり関係なくリアルタイムではない仕様なのかは分かりませんでした。
作成されたファイル名は次のようになっていました。
prefix_2023-10-23T14:12:58+00:00_98a369_suffix
ファイル名の仕様としては
<file-prefix><UTC-date-time>_<uuid><file-suffix>
となるようです。
ファイルの中身は
あああ
いいい
となっていましたので、送信したメッセージが改行されて格納されていることが分かります。
今回は以上です。
なお、前回のBigQueryや今回のCloud Storageのエクスポートサブスクリプションの用途と利点に関しては、公式ページに説明があります。
株式会社分析屋について
ホームページはこちら。
noteでの会社紹介記事はこちら。
専用の採用ページはこちら。
【データ分析で日本を豊かに】
分析屋はシステム分野・ライフサイエンス分野・マーケティング分野の知見を生かし、多種多様な分野の企業様のデータ分析のご支援をさせていただいております。 「あなたの問題解決をする」をモットーに、お客様の抱える課題にあわせた解析・分析手法を用いて、問題解決へのお手伝いをいたします!
【マーケティング】
マーケティング戦略上の目的に向けて、各種のデータ統合及び加工ならびにPDCAサイクル運用全般を支援や高度なデータ分析技術により複雑な課題解決に向けての分析サービスを提供いたします。
【システム】
アプリケーション開発やデータベース構築、WEBサイト構築、運用保守業務などお客様の問題やご要望に沿ってご支援いたします。
【ライフサイエンス】
機械学習や各種アルゴリズムなどの解析アルゴリズム開発サービスを提供いたします。過去には医療系のバイタルデータを扱った解析が主でしたが、今後はそれらで培った経験・技術を工業など他の分野の企業様の問題解決にも役立てていく方針です。
【SES】
SESサービスも行っております。