AT Protocol を利用して Bluesky のスレッドを取得し、扱いやすい JSON ファイルとして保存する in Python
1. 概要
本スクリプトは、Bluesky のスレッドを取得し、構造化された JSON ファイルとして保存する Python モジュールです。投稿内容、画像、動画、外部リンクなどの情報を体系的に収集し、再利用可能な形式で保存します。
2. 主要機能
Bluesky への認証
スレッドの取得と解析
投稿内容の構造化
JSON ファイルへの保存
包括的なエラーハンドリング
ログ記録
3. ソースコード
"""
Bluesky のスレッドを取得して JSON ファイルとして保存するモジュール。
このモジュールは、Bluesky のスレッドデータを取得し、投稿内容、画像、動画、外部リンクなどの
情報を構造化された JSON データとして保存します。
使用例:
from bluesky_thread_fetcher import BlueskyThreadFetcher
# 基本的な使用方法
fetcher = BlueskyThreadFetcher(
username='your_username',
password='your_password',
output_path=Path('output/thread.json')
)
fetcher.fetch_and_save_thread('at://example.com/post/123')
# 環境変数を使用する場合
load_dotenv()
username = os.getenv("BLUESKY_USERNAME")
password = os.getenv("BLUESKY_PASSWORD")
fetcher = BlueskyThreadFetcher(
username=username,
password=password,
output_path=Path('output/thread.json'),
log_path=Path('logs/fetch.log')
)
Dependencies:
- atproto
- python-dotenv
- pathlib
- logging
- json
- typing
"""
from atproto import Client
from atproto.exceptions import AtProtocolError
import json
import logging
from pathlib import Path
from typing import Dict, List, Optional
from datetime import datetime
import os
import sys
from dotenv import load_dotenv
# カスタム例外クラス
class BlueskyError(Exception):
"""Bluesky の操作に関連する基本例外クラス"""
pass
class AuthenticationError(BlueskyError):
"""認証に関連するエラー"""
pass
class ThreadFetchError(BlueskyError):
"""スレッド取得に関連するエラー"""
pass
class FileOperationError(BlueskyError):
"""ファイル操作に関連するエラー"""
pass
class ThreadContent:
"""
スレッドの内容を構造化して保持するクラス
Attributes:
text (str): 投稿テキスト
created_at (str): 投稿日時(ISO 8601形式)
images (List[Dict[str, str]]): 添付画像のリスト
video (List[Dict[str, str]]): 添付動画のリスト
external_link (List[Dict[str, str]]): 外部リンクのリスト
Example:
content = ThreadContent("投稿内容", "2024-01-01T12:00:00Z")
content.images.append({"alt": "画像の説明", "fullsize": "画像URL"})
"""
def __init__(self, text: str, created_at: str):
self.text = text
self.created_at = created_at
self.images: List[Dict[str, str]] = []
self.video: List[Dict[str, str]] = []
self.external_link: List[Dict[str, str]] = []
def to_dict(self) -> Dict:
"""
ThreadContent オブジェクトを辞書形式に変換
Returns:
Dict: 構造化されたスレッド内容
"""
content_dict = {
"text": self.text,
"datetime": self.created_at
}
if self.images:
content_dict["images"] = self.images
if self.video:
content_dict["video"] = self.video
if self.external_link:
content_dict["external"] = self.external_link
return content_dict
class BlueskyThreadFetcher:
"""
Bluesky のスレッドを取得し、JSON 形式で保存するクラス
Attributes:
username (str): Bluesky のユーザー名
password (str): Bluesky のパスワード
output_path (Path): 出力 JSON ファイルのパス
logger (logging.Logger): ロギングインスタンス
client (Optional[Client]): atproto クライアントインスタンス
thread_contents (Dict[str, ThreadContent]): 取得したスレッドの内容
"""
def __init__(
self,
username: str,
password: str,
output_path: Path,
log_path: Optional[Path] = None
):
"""
BlueskyThreadFetcher の初期化
Args:
username: Bluesky のユーザー名
password: Bluesky のパスワード
output_path: JSON ファイルの出力パス
log_path: ログファイルのパス(省略可能)
Raises:
ValueError: 必須パラメータが不足している場合
"""
if not all([username, password, output_path]):
raise ValueError("username, password, output_path は必須パラメータです")
self.username = username
self.password = password
self.output_path = Path(output_path)
self._setup_logging(log_path)
self.client: Optional[Client] = None
self.thread_contents: Dict[str, ThreadContent] = {}
def _setup_logging(self, log_path: Optional[Path]) -> None:
"""
ロギングの設定
Args:
log_path: ログファイルのパス(省略可能)
"""
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.INFO)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# コンソールへの出力設定
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
self.logger.addHandler(console_handler)
# ファイルへの出力設定(指定がある場合)
if log_path:
try:
log_path.parent.mkdir(parents=True, exist_ok=True)
file_handler = logging.FileHandler(
log_path,
encoding='utf-8'
)
file_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
except Exception as e:
self.logger.warning(f"ログファイルの設定に失敗しました: {e}")
def _authenticate(self) -> None:
"""
Bluesky への認証を行う
Raises:
AuthenticationError: 認証に失敗した場合
"""
try:
self.client = Client()
self.client.login(self.username, self.password)
self.logger.info("Bluesky への認証に成功しました")
except AtProtocolError as e:
raise AuthenticationError(f"認証に失敗しました: {e}")
def _extract_thread_content(
self,
thread: any,
author_handle: str,
count: int = 0
) -> int:
"""
スレッドから内容を抽出し、ThreadContent オブジェクトとして保存
Args:
thread: スレッドオブジェクト
author_handle: 投稿者のハンドル
count: 投稿のカウンター
Returns:
int: 更新されたカウンター
"""
if not hasattr(thread, 'post') or not thread.post:
return count
post = thread.post
if not hasattr(post, 'author') or post.author.handle != author_handle:
return count
if not hasattr(post, 'record') or not post.record.text:
return count
# ThreadContent オブジェクトの作成
content = ThreadContent(
post.record.text,
post.record.created_at
)
if hasattr(post, 'embed') and post.embed:
# 画像の処理
if hasattr(post.embed, 'images'):
for img in post.embed.images:
content.images.append({
"alt": img.alt,
"fullsize": img.fullsize
})
# 動画の処理
if hasattr(post.embed, 'playlist'):
content.video.append({
"alt": post.embed.alt,
"playlist": post.embed.playlist
})
# 外部リンクの処理
if hasattr(post.embed, 'external'):
content.external_link.append({
"title": post.embed.external.title,
"uri": post.embed.external.uri
})
self.thread_contents[str(count)] = content
count += 1
# 返信の再帰的処理
if hasattr(thread, 'replies'):
for reply in thread.replies:
count = self._extract_thread_content(
reply,
author_handle,
count
)
return count
def _save_to_json(self) -> None:
"""
取得したスレッド内容を JSON ファイルとして保存
Raises:
FileOperationError: ファイル操作に失敗した場合
"""
try:
self.output_path.parent.mkdir(parents=True, exist_ok=True)
# ThreadContent オブジェクトを辞書に変換
output_data = {
key: content.to_dict()
for key, content in self.thread_contents.items()
}
with open(self.output_path, 'w', encoding='utf-8') as f:
json.dump(
output_data,
f,
ensure_ascii=False,
indent=4
)
self.logger.info(f"JSON ファイルを保存しました: {self.output_path}")
except Exception as e:
raise FileOperationError(f"JSON ファイルの保存に失敗しました: {e}")
def fetch_and_save_thread(self, thread_uri: str) -> None:
"""
スレッドを取得して JSON ファイルとして保存する
Args:
thread_uri: 取得するスレッドの URI
Raises:
ThreadFetchError: スレッドの取得に失敗した場合
FileOperationError: ファイル操作に失敗した場合
"""
try:
self.logger.info(f"スレッド {thread_uri} の取得を開始します")
# 認証
if not self.client:
self._authenticate()
# スレッドの取得
thread_data = self.client.get_post_thread(uri=thread_uri)
# 投稿者のハンドルを取得
author_handle = thread_uri.split("//")[1].split("/")[0]
# スレッドの内容を抽出
self._extract_thread_content(thread_data.thread, author_handle)
# JSONファイルとして保存
self._save_to_json()
self.logger.info("スレッドの取得と保存が完了しました")
except AuthenticationError as e:
raise ThreadFetchError(f"認証エラー: {e}")
except AtProtocolError as e:
raise ThreadFetchError(f"スレッド取得エラー: {e}")
except FileOperationError as e:
raise e
except Exception as e:
raise ThreadFetchError(f"予期しないエラーが発生しました: {e}")
def main():
"""
メイン実行関数
環境変数:
BLUESKY_USERNAME: Bluesky のユーザー名
BLUESKY_PASSWORD: Bluesky のパスワード
Raises:
ValueError: 必要な環境変数が設定されていない場合
"""
# 環境変数の読み込み
load_dotenv()
# 環境変数からの設定読み込み
username = os.getenv("BLUESKY_USERNAME")
password = os.getenv("BLUESKY_PASSWORD")
if not all([username, password]):
raise ValueError("環境変数 BLUESKY_USERNAME と BLUESKY_PASSWORD が必要です")
# パスの設定
output_path = Path('output/thread.json')
log_path = Path('logs/fetch.log')
try:
# BlueskyThreadFetcher のインスタンス化
fetcher = BlueskyThreadFetcher(
username=username,
password=password,
output_path=output_path,
log_path=log_path
)
# スレッドの取得と保存
fetcher.fetch_and_save_thread(
'at://example.com/app.bsky.feed.post/123abcdefg456' # 実際のスレッド URI に置き換えてください
)
except (ValueError, BlueskyError) as e:
logging.error(f"エラーが発生しました: {e}")
sys.exit(1)
except KeyboardInterrupt:
logging.info("スクリプトを終了します")
sys.exit(0)
except Exception as e:
logging.error(f"予期しないエラーが発生しました: {e}")
sys.exit(1)
if __name__ == '__main__':
main()
4. クラス構成
1. カスタム例外クラス
BlueskyError # 基本例外クラス
├── AuthenticationError # 認証エラー
├── ThreadFetchError # スレッド取得エラー
└── FileOperationError # ファイル操作エラー
2. ThreadContent
スレッドの個々の投稿内容を構造化して保持するクラスです。
属性:
text: 投稿テキスト
created_at: 投稿日時
images: 添付画像のリスト
video: 添付動画のリスト
external_link: 外部リンク(リンクカード)のリスト
3. BlueskyThreadFetcher
メインクラスとして、スレッドの取得と保存を管理します。
主要メソッド:
init: 初期化
必須パラメータ: username, password, output_path
オプション: log_path
_authenticate: Bluesky 認証
クライアントの初期化と認証を実行
失敗時は AuthenticationError を発生
_extract_thread_content: スレッド内容の抽出
投稿内容の解析
画像、動画、外部リンクの収集
再帰的な返信の処理
_save_to_json: JSON 保存
構造化データの JSON 形式での保存
ディレクトリ作成とファイル書き込み
fetch_and_save_thread: メイン実行メソッド
スレッド取得から保存までの一連の処理を実行
包括的なエラーハンドリング
5. 使用方法
基本的な使用例
from bluesky_thread_fetcher import BlueskyThreadFetcher
from pathlib import Path
fetcher = BlueskyThreadFetcher(
username='your_username',
password='your_password',
output_path=Path('output.json'),
log_path=Path('fetcher.log')
)
fetcher.fetch_and_save_thread('at://example.com/app.bsky.feed.post/123abcdefg456')
環境変数を使用した実行例
from dotenv import load_dotenv
import os
load_dotenv()
username = os.getenv("BLUESKY_USERNAME")
password = os.getenv("BLUESKY_PASSWORD")
fetcher = BlueskyThreadFetcher(
username=username,
password=password,
output_path=Path('output.json')
)
6. 出力JSON形式
{
"0": {
"text": "投稿内容 1",
"datetime": "2024-01-01T12:00:00Z"
},
"1": {
"text": "投稿内容 2",
"datetime": "2024-01-01T12:05:00Z",
"images": [
{
"alt": "画像の説明 1",
"fullsize": "画像 URL"
},
{
"alt": "画像の説明 2",
"fullsize": "画像 URL"
}
],
},
"2": {
"text": "投稿内容 3",
"datetime": "2024-01-01T12:10:00Z",
"video": [
{
"alt": "動画の説明",
"playlist": "動画 URL"
}
]
}
"3": {
"text": "投稿内容 4",
"datetime": "2024-01-01T12:15:00Z",
"external": [
{
"title": "リンクタイトル",
"uri": "https://example.com"
}
]
}
}
7. エラーハンドリング
スクリプトは以下の状況で適切なエラーを発生させます:
認証失敗
スレッド取得エラー
ファイル操作エラー
必須パラメータの不足
各エラーは専用の例外クラスでキャッチされ、ログに記録されます。
8. ロギング
ログは以下の2つの出力先に記録されます:
コンソール出力(常時)
ログファイル(log_path 指定時)
ログフォーマット:
%(asctime)s - %(name)s - %(levelname)s - %(message)s
9. 注意事項
パスワードは環境変数での管理を推奨
大量のリクエストを行う場合は API の制限に注意
出力パスの親ディレクトリは自動作成される
ログファイルのパスは省略可能
10. 依存関係
atproto
python-dotenv
pathlib
logging
json
typing
11. エラー対応
よくあるエラーと解決方法
AuthenticationError
認証情報の確認
ネットワーク接続の確認
ThreadFetchError
URI の形式確認
スレッドの存在確認
API 制限の確認
FileOperationError
書き込み権限の確認
ディスク容量の確認
パスの妥当性確認