見出し画像

AT Protocol を利用して Bluesky のスレッドを取得し、扱いやすい JSON ファイルとして保存する in Python


1. 概要

本スクリプトは、Bluesky のスレッドを取得し、構造化された JSON ファイルとして保存する Python モジュールです。投稿内容、画像、動画、外部リンクなどの情報を体系的に収集し、再利用可能な形式で保存します。

2. 主要機能

  • Bluesky への認証

  • スレッドの取得と解析

  • 投稿内容の構造化

  • JSON ファイルへの保存

  • 包括的なエラーハンドリング

  • ログ記録

3. ソースコード

"""
Bluesky のスレッドを取得して JSON ファイルとして保存するモジュール。

このモジュールは、Bluesky のスレッドデータを取得し、投稿内容、画像、動画、外部リンクなどの
情報を構造化された JSON データとして保存します。

使用例:
    from bluesky_thread_fetcher import BlueskyThreadFetcher
    
    # 基本的な使用方法
    fetcher = BlueskyThreadFetcher(
        username='your_username',
        password='your_password',
        output_path=Path('output/thread.json')
    )
    fetcher.fetch_and_save_thread('at://example.com/post/123')

    # 環境変数を使用する場合
    load_dotenv()
    username = os.getenv("BLUESKY_USERNAME")
    password = os.getenv("BLUESKY_PASSWORD")
    
    fetcher = BlueskyThreadFetcher(
        username=username,
        password=password,
        output_path=Path('output/thread.json'),
        log_path=Path('logs/fetch.log')
    )

Dependencies:
    - atproto
    - python-dotenv
    - pathlib
    - logging
    - json
    - typing
"""

from atproto import Client
from atproto.exceptions import AtProtocolError
import json
import logging
from pathlib import Path
from typing import Dict, List, Optional
from datetime import datetime
import os
import sys
from dotenv import load_dotenv


# カスタム例外クラス
class BlueskyError(Exception):
    """Bluesky の操作に関連する基本例外クラス"""
    pass


class AuthenticationError(BlueskyError):
    """認証に関連するエラー"""
    pass


class ThreadFetchError(BlueskyError):
    """スレッド取得に関連するエラー"""
    pass


class FileOperationError(BlueskyError):
    """ファイル操作に関連するエラー"""
    pass


class ThreadContent:
    """
    スレッドの内容を構造化して保持するクラス
    
    Attributes:
        text (str): 投稿テキスト
        created_at (str): 投稿日時(ISO 8601形式)
        images (List[Dict[str, str]]): 添付画像のリスト
        video (List[Dict[str, str]]): 添付動画のリスト
        external_link (List[Dict[str, str]]): 外部リンクのリスト
    
    Example:
        content = ThreadContent("投稿内容", "2024-01-01T12:00:00Z")
        content.images.append({"alt": "画像の説明", "fullsize": "画像URL"})
    """
    def __init__(self, text: str, created_at: str):
        self.text = text
        self.created_at = created_at
        self.images: List[Dict[str, str]] = []
        self.video: List[Dict[str, str]] = []
        self.external_link: List[Dict[str, str]] = []

    def to_dict(self) -> Dict:
        """
        ThreadContent オブジェクトを辞書形式に変換
        
        Returns:
            Dict: 構造化されたスレッド内容
        """
        content_dict = {
            "text": self.text,
            "datetime": self.created_at
        }
        if self.images:
            content_dict["images"] = self.images
        if self.video:
            content_dict["video"] = self.video
        if self.external_link:
            content_dict["external"] = self.external_link
        return content_dict


class BlueskyThreadFetcher:
    """
    Bluesky のスレッドを取得し、JSON 形式で保存するクラス
    
    Attributes:
        username (str): Bluesky のユーザー名
        password (str): Bluesky のパスワード
        output_path (Path): 出力 JSON ファイルのパス
        logger (logging.Logger): ロギングインスタンス
        client (Optional[Client]): atproto クライアントインスタンス
        thread_contents (Dict[str, ThreadContent]): 取得したスレッドの内容
    """
    
    def __init__(
        self,
        username: str,
        password: str,
        output_path: Path,
        log_path: Optional[Path] = None
    ):
        """
        BlueskyThreadFetcher の初期化
        
        Args:
            username: Bluesky のユーザー名
            password: Bluesky のパスワード
            output_path: JSON ファイルの出力パス
            log_path: ログファイルのパス(省略可能)
            
        Raises:
            ValueError: 必須パラメータが不足している場合
        """
        if not all([username, password, output_path]):
            raise ValueError("username, password, output_path は必須パラメータです")
            
        self.username = username
        self.password = password
        self.output_path = Path(output_path)
        self._setup_logging(log_path)
        self.client: Optional[Client] = None
        self.thread_contents: Dict[str, ThreadContent] = {}
        
    def _setup_logging(self, log_path: Optional[Path]) -> None:
        """
        ロギングの設定
        
        Args:
            log_path: ログファイルのパス(省略可能)
        """
        self.logger = logging.getLogger(__name__)
        self.logger.setLevel(logging.INFO)
        
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        
        # コンソールへの出力設定
        console_handler = logging.StreamHandler()
        console_handler.setFormatter(formatter)
        self.logger.addHandler(console_handler)
        
        # ファイルへの出力設定(指定がある場合)
        if log_path:
            try:
                log_path.parent.mkdir(parents=True, exist_ok=True)
                file_handler = logging.FileHandler(
                    log_path,
                    encoding='utf-8'
                )
                file_handler.setFormatter(formatter)
                self.logger.addHandler(file_handler)
            except Exception as e:
                self.logger.warning(f"ログファイルの設定に失敗しました: {e}")
                
    def _authenticate(self) -> None:
        """
        Bluesky への認証を行う
        
        Raises:
            AuthenticationError: 認証に失敗した場合
        """
        try:
            self.client = Client()
            self.client.login(self.username, self.password)
            self.logger.info("Bluesky への認証に成功しました")
        except AtProtocolError as e:
            raise AuthenticationError(f"認証に失敗しました: {e}")
            
    def _extract_thread_content(
        self,
        thread: any,
        author_handle: str,
        count: int = 0
    ) -> int:
        """
        スレッドから内容を抽出し、ThreadContent オブジェクトとして保存
        
        Args:
            thread: スレッドオブジェクト
            author_handle: 投稿者のハンドル
            count: 投稿のカウンター
            
        Returns:
            int: 更新されたカウンター
        """
        if not hasattr(thread, 'post') or not thread.post:
            return count
            
        post = thread.post
        if not hasattr(post, 'author') or post.author.handle != author_handle:
            return count
            
        if not hasattr(post, 'record') or not post.record.text:
            return count
            
        # ThreadContent オブジェクトの作成
        content = ThreadContent(
            post.record.text,
            post.record.created_at
        )
        
        if hasattr(post, 'embed') and post.embed:
            # 画像の処理
            if hasattr(post.embed, 'images'):
                for img in post.embed.images:
                    content.images.append({
                        "alt": img.alt,
                        "fullsize": img.fullsize
                    })
            
            # 動画の処理
            if hasattr(post.embed, 'playlist'):
                content.video.append({
                    "alt": post.embed.alt,
                    "playlist": post.embed.playlist
                })

            # 外部リンクの処理
            if hasattr(post.embed, 'external'):
                content.external_link.append({
                    "title": post.embed.external.title,
                    "uri": post.embed.external.uri
                })
                
        self.thread_contents[str(count)] = content
        count += 1
        
        # 返信の再帰的処理
        if hasattr(thread, 'replies'):
            for reply in thread.replies:
                count = self._extract_thread_content(
                    reply,
                    author_handle,
                    count
                )
                
        return count
        
    def _save_to_json(self) -> None:
        """
        取得したスレッド内容を JSON ファイルとして保存
        
        Raises:
            FileOperationError: ファイル操作に失敗した場合
        """
        try:
            self.output_path.parent.mkdir(parents=True, exist_ok=True)
            
            # ThreadContent オブジェクトを辞書に変換
            output_data = {
                key: content.to_dict()
                for key, content in self.thread_contents.items()
            }
            
            with open(self.output_path, 'w', encoding='utf-8') as f:
                json.dump(
                    output_data,
                    f,
                    ensure_ascii=False,
                    indent=4
                )
            self.logger.info(f"JSON ファイルを保存しました: {self.output_path}")
        except Exception as e:
            raise FileOperationError(f"JSON ファイルの保存に失敗しました: {e}")
            
    def fetch_and_save_thread(self, thread_uri: str) -> None:
        """
        スレッドを取得して JSON ファイルとして保存する
        
        Args:
            thread_uri: 取得するスレッドの URI
            
        Raises:
            ThreadFetchError: スレッドの取得に失敗した場合
            FileOperationError: ファイル操作に失敗した場合
        """
        try:
            self.logger.info(f"スレッド {thread_uri} の取得を開始します")
            
            # 認証
            if not self.client:
                self._authenticate()
                
            # スレッドの取得
            thread_data = self.client.get_post_thread(uri=thread_uri)
            
            # 投稿者のハンドルを取得
            author_handle = thread_uri.split("//")[1].split("/")[0]
            
            # スレッドの内容を抽出
            self._extract_thread_content(thread_data.thread, author_handle)
            
            # JSONファイルとして保存
            self._save_to_json()
            
            self.logger.info("スレッドの取得と保存が完了しました")
            
        except AuthenticationError as e:
            raise ThreadFetchError(f"認証エラー: {e}")
        except AtProtocolError as e:
            raise ThreadFetchError(f"スレッド取得エラー: {e}")
        except FileOperationError as e:
            raise e
        except Exception as e:
            raise ThreadFetchError(f"予期しないエラーが発生しました: {e}")


def main():
    """
    メイン実行関数
    
    環境変数:
        BLUESKY_USERNAME: Bluesky のユーザー名
        BLUESKY_PASSWORD: Bluesky のパスワード
    
    Raises:
        ValueError: 必要な環境変数が設定されていない場合
    """
    # 環境変数の読み込み
    load_dotenv()
    
    # 環境変数からの設定読み込み
    username = os.getenv("BLUESKY_USERNAME")
    password = os.getenv("BLUESKY_PASSWORD")
    
    if not all([username, password]):
        raise ValueError("環境変数 BLUESKY_USERNAME と BLUESKY_PASSWORD が必要です")
        
    # パスの設定
    output_path = Path('output/thread.json')
    log_path = Path('logs/fetch.log')
    
    try:
        # BlueskyThreadFetcher のインスタンス化
        fetcher = BlueskyThreadFetcher(
            username=username,
            password=password,
            output_path=output_path,
            log_path=log_path
        )
        
        # スレッドの取得と保存
        fetcher.fetch_and_save_thread(
            'at://example.com/app.bsky.feed.post/123abcdefg456'  # 実際のスレッド URI に置き換えてください
        )
        
    except (ValueError, BlueskyError) as e:
        logging.error(f"エラーが発生しました: {e}")
        sys.exit(1)
    except KeyboardInterrupt:
        logging.info("スクリプトを終了します")
        sys.exit(0)
    except Exception as e:
        logging.error(f"予期しないエラーが発生しました: {e}")
        sys.exit(1)


if __name__ == '__main__':
    main()

4. クラス構成

1. カスタム例外クラス

BlueskyError          # 基本例外クラス
├── AuthenticationError  # 認証エラー
├── ThreadFetchError    # スレッド取得エラー
└── FileOperationError  # ファイル操作エラー

2. ThreadContent

スレッドの個々の投稿内容を構造化して保持するクラスです。

属性:

  • text: 投稿テキスト

  • created_at: 投稿日時

  • images: 添付画像のリスト

  • video: 添付動画のリスト

  • external_link: 外部リンク(リンクカード)のリスト

3. BlueskyThreadFetcher

メインクラスとして、スレッドの取得と保存を管理します。

主要メソッド:

  1. init: 初期化

    • 必須パラメータ: username, password, output_path

    • オプション: log_path

  2. _authenticate: Bluesky 認証

    • クライアントの初期化と認証を実行

    • 失敗時は AuthenticationError を発生

  3. _extract_thread_content: スレッド内容の抽出

    • 投稿内容の解析

    • 画像、動画、外部リンクの収集

    • 再帰的な返信の処理

  4. _save_to_json: JSON 保存

    • 構造化データの JSON 形式での保存

    • ディレクトリ作成とファイル書き込み

  5. fetch_and_save_thread: メイン実行メソッド

    • スレッド取得から保存までの一連の処理を実行

    • 包括的なエラーハンドリング

5. 使用方法

基本的な使用例

from bluesky_thread_fetcher import BlueskyThreadFetcher
from pathlib import Path

fetcher = BlueskyThreadFetcher(
    username='your_username',
    password='your_password',
    output_path=Path('output.json'),
    log_path=Path('fetcher.log')
)

fetcher.fetch_and_save_thread('at://example.com/app.bsky.feed.post/123abcdefg456')

環境変数を使用した実行例

from dotenv import load_dotenv
import os

load_dotenv()

username = os.getenv("BLUESKY_USERNAME")
password = os.getenv("BLUESKY_PASSWORD")

fetcher = BlueskyThreadFetcher(
    username=username,
    password=password,
    output_path=Path('output.json')
)

6. 出力JSON形式

{
    "0": {
        "text": "投稿内容 1",
        "datetime": "2024-01-01T12:00:00Z"
    },
    "1": {
        "text": "投稿内容 2",
        "datetime": "2024-01-01T12:05:00Z",
        "images": [
            {
                "alt": "画像の説明 1",
                "fullsize": "画像 URL"
            },
            {
                "alt": "画像の説明 2",
                "fullsize": "画像 URL"
            }
        ],
    },
    "2": {
        "text": "投稿内容 3",
        "datetime": "2024-01-01T12:10:00Z",
        "video": [
            {
                "alt": "動画の説明",
                "playlist": "動画 URL"
            }
        ]
    }    
    "3": {
        "text": "投稿内容 4",
        "datetime": "2024-01-01T12:15:00Z",
        "external": [
            {
                "title": "リンクタイトル",
                "uri": "https://example.com"
            }
        ]
    }
}

7. エラーハンドリング

スクリプトは以下の状況で適切なエラーを発生させます:

  1. 認証失敗

  2. スレッド取得エラー

  3. ファイル操作エラー

  4. 必須パラメータの不足

各エラーは専用の例外クラスでキャッチされ、ログに記録されます。

8. ロギング

ログは以下の2つの出力先に記録されます:

  1. コンソール出力(常時)

  2. ログファイル(log_path 指定時)

ログフォーマット:

%(asctime)s - %(name)s - %(levelname)s - %(message)s

9. 注意事項

  1. パスワードは環境変数での管理を推奨

  2. 大量のリクエストを行う場合は API の制限に注意

  3. 出力パスの親ディレクトリは自動作成される

  4. ログファイルのパスは省略可能

10. 依存関係

  • atproto

  • python-dotenv

  • pathlib

  • logging

  • json

  • typing

11. エラー対応

よくあるエラーと解決方法

  1. AuthenticationError

    • 認証情報の確認

    • ネットワーク接続の確認

  2. ThreadFetchError

    • URI の形式確認

    • スレッドの存在確認

    • API 制限の確認

  3. FileOperationError

    • 書き込み権限の確認

    • ディスク容量の確認

    • パスの妥当性確認

いいなと思ったら応援しよう!