見出し画像

AssitantServiceのEventHandler実装

挨拶

今更なのですが、私の記事を読んだことのある方は誰しも、「こいつ理屈わかってないな」と思われると思います。その通りです。私は工学部出身のしかも機械科卒業なので、「動けばOK」プログラマーなのです。現に同僚からは「競技プログラマー」といわれているぐらいです。なので、「記法」にこだわりはありませんし、「アルゴリズム」にもこだわりがありません。ですので、これからご紹介するのは「動くだけ」のものになります。また、抜粋ですので、コピペで動くようにはなっておりません。申し訳ございません。

前半戦のurl


要は呼び出し元でEventHandlerを作って渡せばいい

event_handler = EventHandler(client=client, thread_id=thread_id, chat=chat, db=db)
stream = get_stream(event_handler, client=client, thread_id=thread_id, chat=chat, db=db)
return StreamingResponse(stream, media_type="text/event-stream")

まず、第1点として呼び出し元がEventHandlerを作ることが肝要です。ここでget_stream関数が返してくるのは次のようなClassです。

EventHandlerとやり取りをするSSEClientクラス

# GPTにSSEってどう作ると質問して作らせました。
# SSEClientなんて言ってますが、要は非同期でイテレータブルに動くものですね。(多分)
class SSEClient:
    def __init__(self, response):
        self.response = response
        self.event_queue = asyncio.Queue()

    # このputがEventHandlerから呼ばれる通信機構です。
    # EventHandlerは非同期ではなく、同期で呼んでくるのでこういう実装になりました。
    def put(self, event):
        try:
            # nowaitでいれないと動きません
            self.event_queue.put_nowait(event)
        except Exception as e:
            print(e)
    
    # これを各決まりのようです。
    def __aiter__(self): 
        return self

    # これも決まりのようです。
    async def __anext__(self):
        try:
            # 万が一の時、延々と待ってしまうので、Timeoutを設定しました。
            event = await asyncio.wait_for(self.event_queue.get(), 30)
            if event is not None:
                # yield の代わりにこうするそうです。
                return event
            else:
                raise StopAsyncIteration  # イテレーションが終了した場合
        except asyncio.TimeoutError:
            print(e)
            raise StopAsyncIteration  # タイムアウトした場合も終了

    # 呼ばれているのかどうか確認してしてません。
    async def close(self):
        self.response.close()

ですので、これをそのまま上記のコードでStreamingResponseに渡しているのです。

改良版EventHandler

上記のSSEClientと通信できるようにしたのが、次のEventHandlerです。

class EventHandler(AssistantEventHandler):
    def __init__(self, client, thread_id, chat,db):
        super().__init__()
        self.sse_clients = []
        self.client = client
        self.thread_id = thread_id
        self.chat = chat
    
    def add_sse_client(self, sse_client):
        self.sse_clients.append(sse_client)

    def remove_sse_client(self, sse_client):
        self.sse_clients.remove(sse_client)

    # この関数がSSEClientとの通信を一手に引き受けますが、
    # 要はクラスの変数に値を入れているだけですね。
    def notify_sse_clients(self, event):
        for sse_client in self.sse_clients:
            try:
                # このputは先ほど出てきたSSECLientクラスのものです
                sse_client.put(event)
            except Exception as e:
                self.remove_sse_client(sse_client)
                print(e)


    @override
    def on_message_created(self, message: Message) -> None:
        pass

    @override
    def on_message_delta(self, delta: MessageDelta, snapshot: Message) -> None:
        event = delta.content[0].text.value
        # 旧版の Stream:Trueでもつぶしていたもの
        if event is not None:
            self.notify_sse_clients(event)
    
    @override
    def on_message_done(self, message) -> None:
        try:
            run_id = message.run_id
            # これはEventHandlerでなくても実行できるんですが。
            for attachment in message.attachments:
                file_id = attachment.file_id
                # 拡張子情報が欲しいので、ファイル名を取り出します。
                file = self.client.files.retrieve(file_id)
                # コンテンツをダウンロードします。(中身はバイナリです)
                content = self.client.files.content(file_id)
                file_name = os.path.basename(file.filename)
                # コンテンツを適当なファイル名でバイナリで書きだします。
                with open(f'outputs/{file_id}{file_name}', "wb") as f:
                    f.write(content.read())
                # それをurlに変換してSSEClientに送ることで、ユーザにurlを送ります。
                event = f'<a href="/filedownload/{file_id}{file_name}">{file_name}</a><br>'
                self.notify_sse_clients(event)
                # OpenAIからはファイルを消してしまいます。
                self.client.files.delete(file_id)
        except Exception as e:
            print(e)
        # 処理が終了したことをSSEClientに伝えます。(これは本当はダメな処理な気がします)
        self.notify_sse_clients(None)
    
    @override
    def on_text_created(self, text) -> None:
        pass

    @override
    def on_text_delta(self, delta: Text, snapshot: Text) -> None:
        pass
    
    @override
    def on_text_done(self, text: Text) -> None:
        pass

    @override
    def on_tool_call_created(self, tool_call):
        pass

    @override
    def on_tool_call_delta(self, delta: ToolCallDelta, snapshot: ToolCall) -> None:
        pass

    @override
    def on_tool_call_done(self, tool_call):
        pass

呼び出し元でのHandlerとSSEClient

下記のコードはGPTと相談しながら書き上げたものですが、おそらく、別のコードを書いても動くと思います。GPT曰く、asyncif.create_taskを使おうとすると、こういう風になるそうなのですが。
あとSSEClient(None)をしていますが、これはreturn sse_clientを実施するための方便として書いているそうです。これもGPTのアイディアでした。

async def get_stream(event_handler, client, thread_id, chat, db):
    try:
        # SSEClientはこのファイルで定義しているクラスです。
        sse_client = SSEClient(None)
        event_handler.add_sse_client(sse_client)

        # 笑ってしまうほどよくわからない手を使っています。
        async def stream_task():
            try:
                with client.beta.threads.runs.stream(
                    thread_id=thread_id,
                    assistant_id=assistant_id,
                    event_handler = event_handler
                ) as stream:
                    sse_client.response = stream
                    await asyncio.to_thread(stream.until_done)
            except Exception as e:
                print(e)
        asyncio.create_task(stream_task())
        return sse_client
        except Exception as e:
            print(e)

これで無事にすべてのイベントを処理することができるようになるはずです。いるのかというと、これから先なにかいるかもしれません(笑)

結論

細かいことを考えなければ、
client.beta.threads.runs.streamで行けばよい。というかそっちの方がいいと思います。この実装はできたけど、時間帯効果を考えると、たとえば、他のサービスをtoolをつかって具体的に呼び出すようなOpenAIのgithubにも書いてあるように画像生成が伴うようなことをするとか、自前で作ったツールを使うとか、そういう、ChatGPTのフルサービスに近いようなことをするのでなければ、それは不要ですし、もしそこまで使うんだったら、保守とかメンテナンスやエンジニアの持ってるリソースとか考えて、いっそ、GPTの有料プランを契約する、ClaudeのArtifactのほうが便利なので、Claudeを契約することの方がいいような気がします。


この記事が気に入ったらサポートをしてみませんか?