
LangchainとLanggraphのストリーミングについてまとめてみた
はじめに
LangchainとLanggraphにはStramingという機能があって、LLMアプリを作る上でユーザービリティを高めるために必要不可欠なものになってるんだけど、いろんな関数やルールがあってちょっとややこしいなと思ってまとめてみました。
次の3ステップに分けて記事書いていきます。
pythonのジェネレーターと非同期処理について
langchainのstreamについて
Langgraphのstreamについて
1,ジェネレーターと非同期処理
langchainやlanggraphの話じゃなくて、まさかのpythonの基本構文の話から入るんだけど、Langgraphのstreamを理解する時にジェネレーターと非同期処理について理解しておくとより深く理解できるようになるのでここから解説。知ってる人は飛ばしてヨシ!!
ジェネレーター
ジェネレーター(generator)とはなんでしょうか?
わかりやすく一文でまとめていたものがあったので引用します。
ジェネレータとは、Pythonに特徴的な要素の一つで、反復可能なオブジェクト(イテレータ)を簡単に作ることができる機能です。ジェネレータは通常の関数と同じように見えますが、その動作は異なります。通常の関数が結果を一度に返すのに対し、ジェネレータは一度に一つの結果を生成(yield)し、必要に応じて後から続きの結果を生成することができます。ジェネレータは、実はイテレータの特別な形の一種で、イテレータの作成をより容易にしています。
ジェネレーターの特徴はいろいろありますが、とりあえず次の3点だけ知っていれば大体OKです。
ジェネレーター関数はreturnの代わりにyieldで値を返す
関数一つだけを呼び出すと値を返さなくなる
for文で呼び出すと順番に値を返すようになる
文章だけわからないと思うので、実際にコードを見てみましょう
# Generator Sample
def count_up():
x = 0
while True:
yield x
x += 1
print(count_up())
for i in count_up():
print(i)
if i == 5:
break
<generator object count_up at 0x120c1f7c0>
0
1
2
3
4
5
generatorを知らずに直感的に見ると明らかに無限ループしそうな関数count_upを作成しました。
しかし、このcount upは無限ループしません。
上記で紹介した通り、count_upを単体で呼び出したときは自分のobject名しか返してないけど、for文で呼び出すと、順番に返しているのがわかります。
ここで勘がいい人は"あれ?関数なのに値保持してる?なんで覚えてるの?"となると思います。
そうです。generator関数と普通の関数の1番の違いは、値を保持しているかしてないかです。
generatorはどちらかというとクラスに近く、generator関数は行われた処理を記憶して値を保持する(stateする)ことができるのです。
「State」
Langgraphをやってる人なら耳が痛くなるほど聞く単語ですね。
この特徴のメリットはメモリ効率がいいことなのですが、Streamを学ぶ上で今はあまり意識しなくてOKです。
とりあえずここまで理解できたらひとまずgeneratorについてはOKです。
次の非同期処理(async)に進みます。
非同期処理
python初心者の1番の壁。非同期処理の登場です。
非同期処理とは待機中のタスクが完了するのを待たずに他のタスクを実行ことです。
pythonでは、asyncioというライブラリを使って、非同期処理を実装します。
読むだけだとさっぱりだと思うので、非同期処理を見ていきましょう。
import nest_asyncio
import asyncio
import time
nest_asyncio.apply()
async def say_after(delay, what):
await asyncio.sleep(delay)
print(f"{time.strftime('%X')} - {what}")
async def main():
print(f"{time.strftime('%X')} - start")
c1 = say_after(1, 'hello')
c2 = say_after(2, 'world')
await asyncio.gather(c1, c2)
print(f"{time.strftime('%X')} - finish")
# 非同期IOのエントリポイント
asyncio.run(main())
11:38:25 - start
11:38:26 - hello
11:38:27 - world
11:38:27 - finish
delayの時間が1秒と2秒になっていて、helloの1秒後にworldが表示されてるので、うまく非同期処理(並列処理)が実装できてるのがわかります。
登場人物(コード)を簡単に紹介します。
async
簡単にいうとこのキーワードで定義したものは非同期処理になります。
async defなら非同期で実行される関数,async forなら非同期で実行されるfor文です。
await
await + (corutine object)みたいな感じで使います。
await をつけることで、そのコルーチンはI/Oなどの待機時間の間に別の処理を割り込ませて良い、ということを明示するイメージです。
async run
asyncio.run(coro, *, debug=None)
coroutine coro を実行し、結果を返します。
この関数は、非同期イベントループの管理と 非同期ジェネレータの終了処理 およびスレッドプールのクローズ処理を行いながら、渡されたコルーチンを実行します。
です。非同期処理のエントリーポイントみたいなイメージだそうです。
asynco.gather()
aws シーケンスにある awaitable オブジェクト を 並行 実行します。
aws にある awaitable がコルーチンである場合、自動的に Task としてスケジュールされます。
awaitで定義した処理を並行で実行するための関数ですね。
asyncoの関数は色々ありますが、Streamingを覚える上では、asyncoの細かい仕様については覚えなくてOKで、大事なのは次のうまくいかない例です。
import asyncio
import time
async def say_after(delay, what):
time.sleep(delay) # asyncio.sleep ではなく、 time.sleep
print(f"{time.strftime('%X')} - {what}")
async def main():
print(f"{time.strftime('%X')} - start")
c1 = say_after(1, 'hello')
c2 = say_after(2, 'world')
await asyncio.gather(c1, c2) #2つの処理の終了を待つ
print(f"{time.strftime('%X')} - finish")
asyncio.run(main())
11:44:11 - start
11:44:12 - hello
11:44:14 - world
11:44:14 - finish
この例では、helloの後に2秒間待ってworldが出力されているため、並列処理がうまくいっていません。
なぜならtime.sleep(2)が同期処理だからです。
"非同期で実装した場合でも、同期処理が処理の中にあれば非同期にならない"
この特徴は、Stream処理を実装する上で、常に頭に入れておく必要があります。
同期処理とはどのようなものがあるでしょうか?
ファイル入出力
with open('file.txt', 'r') as f:
data = f.read()
ネットワーク操作
import requests
response = requests.get('https://example.com')
data = response.text
スリープ
import time
time.sleep(5)
データベースアクセス
import sqlite3
conn = sqlite3.connect('example.db')
cursor = conn.cursor()
cursor.execute('SELECT * FROM table')
rows = cursor.fetchall()
などが挙げられます。
これらの同期処理を内部に組み込むと、同期処理になってしまう。
非同期でこれらの処理を組み込みたい場合は、下記のように独自の非同期処理が必要になるということを覚えておいてください。
#入出力の非同期の例
async def read_file():
async with aiofiles.open('file.txt', mode='r') as f:
data = await f.read()
print(data)
長くなりましたが、Streamingを学ぶ上でAsyncについて知っておくべきことは下記の2つです。
Async で実装された処理は通常とは異なり、非同期に実行される
非同期で実装した場合でも、同期処理が処理の中にあれば非同期にならない
二点を覚えたら次に進みましょう
Step2 LangchainのStreaming
やっとLangchainの話です。
Langchainのstreamに関しては、こちらの公式を参考にしてるのでぜひ読んでください!
LLMの出力をstreamingするってどういうこと?
Langchainの話をする前にそもそもLLMにおけるStreamingとは何かという話です。
LLMでは処理を完全に終了するのに数秒かかることがあります。
数秒間待つとなると、処理を完全に終わるのを待ってから返信をすると、ユーザーは返信が遅いと感じてしまいます。(ユーザーは200~300ms以内に返信がないと遅いと感じるらしいです)
そんな時に使われるのが、Streamingという技術です。
LLMをstreamingすることで、全ての処理を終わるのを待たずに、生成した文字を順番に出力することが可能になります。
Streaming処理はusabilityの向上が最大のメリットとなります。
LLM modelをstreamしてみる
まずはllm model単体をstreamで出力してみましょう。
from langchain_openai import ChatOpenAI
model = ChatOpenAI(model="gpt-3.5-turbo-0125")
chunks = []
for chunk in model.stream("hello. tell me something about yourself"):
chunks.append(chunk)
print(chunk.content, end="|", flush=True)
|Hello|!| I| am| a| language| model| AI| designed| to| assist| with| various| tasks| such| as| answering| questions|,| providing| information|,| and| engaging| in| conversation|.| I| am| constantly| learning| and| improving| my| abilities| to| better| assist| users| like| yourself|.| How| can| I| help| you| today|?||
こちらのコードを実行すると、順番に生成された文字が出力されたのがわかります。
これが基本的なstream処理となります。
また、この例ではstreamからとってきたchunkのcontentのみを出力していますが、chunksにstream結果をappendして出力するとmodel.streamのから返ってくる値がわかります。
model.streamの結果は次のようになっています。
chunks
AIMessageChunk(content='', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content='Hello', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content='!', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content=' I', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content=' am', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content=' a', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content=' language', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content=' model', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content=' AI', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content=' designed', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content=' to', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content=' assist', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content=' with', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content=' various', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content=' tasks', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content=' such', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content=' as', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content=' answering', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content=' questions', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content=',', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content=' providing', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content=' information', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content=',', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content=' and', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content=' engaging', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content=' in', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content=' conversation', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content='.', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content=' I', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content=' am', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content=' constantly', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content=' learning', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content=' and', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content=' improving', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content=' my', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content=' abilities', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content=' to', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content=' better', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content=' assist', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content=' users', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content=' like', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content=' yourself', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content='.', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content=' How', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content=' can', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content=' I', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content=' help', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content=' you', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content=' today', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content='?', id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74'),
AIMessageChunk(content='', response_metadata={'finish_reason': 'stop', 'model_name': 'gpt-3.5-turbo-0125'}, id='run-62b0e695-49f3-407f-9396-6ecf5b39ec74')]
AIMessageChunkというクラスで値が返ってきているのがわかります。
AIMessageChunkは次のような属性を持っています。
content・・生成された文字
id・・streamされたAIMessageChunkクラスのid。同じstreamから出力されたメッセージは同じidとなる
response_metadata・・metaデータ。上記の例では最後のchunkには終了理由やモデルの情報が格納されています。
一応、逆にstreamではない処理を見てみましょう。
streamではない処理を実装すると次のようになります。
from langchain_openai import ChatOpenAI
model = ChatOpenAI(model="gpt-3.5-turbo-0125")
result = model.invoke("hello. tell me something about yourself")
print(result)
content='Hello! I am an AI digital assistant designed to provide helpful information and assist with a wide range of tasks. I am constantly learning and improving to better serve you. How can I assist you today?' response_metadata={'token_usage': {'completion_tokens': 40, 'prompt_tokens': 14, 'total_tokens': 54}, 'model_name': 'gpt-3.5-turbo-0125', 'system_fingerprint': None, 'finish_reason': 'stop', 'logprobs': None} id='run-8f0c3c5b-84f5-44f9-a036-c433ad5dd57c-0' usage_metadata={'input_tokens': 14, 'output_tokens': 40, 'total_tokens': 54}
結果を見るとわかるようにforでは回せないので、stream処理として出力できないことがわかります。
ChainをStreamしてみる
モデルにだけでなくchainに対してもstreamすることができます。
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")
parser = StrOutputParser()
chain = prompt | model | parser
chunks = []
for chunk in chain.stream({"topic": "parrot"}):
print(chunk, end="|", flush=True)
chunks.append(chunk)
chunks
|Why| did| the| par|rot| wear| a| rain|coat|?| Because| it| wanted| to| be| pol|ly| uns|aturated|!||
['',
'Why',
' did',
' the',
' par',
'rot',
' wear',
' a',
' rain',
'coat',
'?',
' Because',
' it',
' wanted',
' to',
' be',
' pol',
'ly',
' uns',
'aturated',
'!',
'']
実行すると上記のように順番にStreaming出力されているのがわかります。
また、chunksに関しては、StrOutputParserクラスの処理によって,文字列のみを出力しているので、次のように文字列だけが追加れているのがわかります。
独自の関数をchainに組み込んでStreamする(うまくいかない例)
次のコードはうまくいかない例です。
LLMから出力された文章を大文字にするだけのchainを組んでみました。
def upper(text):
return text.upper()
parser = StrOutputParser()
chain = model | parser | upper
chunks = []
for chunk in chain.stream("what your name?"):
print(chunk, end="|", flush=True)
chunks.append(chunk)
chunks
I AM A LANGUAGE MODEL AI ASSISTANT AND DO NOT HAVE A PERSONAL NAME. YOU CAN CALL ME ASSISTANT OR JUST AI. HOW CAN I ASSIST YOU TODAY?|
['I AM A LANGUAGE MODEL AI ASSISTANT AND DO NOT HAVE A PERSONAL NAME. YOU CAN CALL ME ASSISTANT OR JUST AI. HOW CAN I ASSIST YOU TODAY?']
今までの出力と比べるとおかしいのがわかります。
"I | AM | A |"のように文字ごとに区切られていないので,streaming出力がされていないです。
これは感覚的にはおかしいと思います。
先ほどの"chain = prompt | model | parser"のようなchainでは、正常にstreaming処理できていたのに、独自に生成した関数を加えただけでstreamが動かなくなってしましました。
これは、独自で実装した関数がstream処理をするための形式になっていないことが理由となります。
次のような関数にすると正しくstreamできるようになります。
独自の関数をchainに組み込んでStreamする(うまくいく例)
先ほどのコードを変更して下記のようなコードにしてみました。
def upper(text_stream):
for text in text_stream:
yield text.upper()
parser = StrOutputParser()
chain = model | parser | upper
chunks = []
for chunk in chain.stream("What your name?"):
print(chunk, end="|", flush=True)
chunks.append(chunk)
chunks
|I| AM| AN| AI| DIGITAL| ASSISTANT|,| YOU| CAN| CALL| ME| ASSISTANT|.||
['',
'I',
' AM',
' AN',
' AI',
' DIGITAL',
' ASSISTANT',
',',
' YOU',
' CAN',
' CALL',
' ME',
' ASSISTANT',
'.',
'']
すると、うまくstream処理ができているのがわかります。
コードを見てみましょう。
出てきました。generatorです。
このようにstream処理に独自の関数を組み込みたいときはgenerator関数として実装する必要があります。
なんでStrOutputParser()の処理は問題なくStream処理ができた?
ここで一つ疑問が出てくると思います。
parser = StrOutputParser()
上記の処理はstringだけを出力する処理でupper関数とやってることはほとんど変わらないにも関わらず、なぜstreamで処理できるのでしょうか?
それは、Langchainのコンポーネントはstreamに対応して作られているからです。
公式に次のようにあります。
LCEL is a declarative way to specify a "program" by chainining together different LangChain primitives. Chains created using LCEL benefit from an automatic implementation of stream and astream allowing streaming of the final output. In fact, chains created with LCEL implement the entire standard Runnable interface.
StreamできないLangchainコンポーネント
一方で、全てのLangchainコンポーネントがStreamingに対応しているわけではありません。
例えば、Retrieversなどの一部のコンポーネントはstreamできません。
from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import OpenAIEmbeddings
template = """Answer the question based only on the following context:
{context}
Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)
vectorstore = FAISS.from_texts(
["harrison worked at kensho", "harrison likes spicy food"],
embedding=OpenAIEmbeddings(),
)
retriever = vectorstore.as_retriever()
chunks = [chunk for chunk in retriever.stream("where did harrison work?")]
chunks
[[Document(page_content='harrison worked at kensho'),
Document(page_content='harrison likes spicy food')]]
見ての通り、Retrieverに対してはStreamingできていないのがわかります。
ただし、非ストリーミングコンポーネントが存在するからと言って、全ての処理でStreamingができないわけではありません。
retrieval_chain = (
{
"context": retriever.with_config(run_name="Docs"),
"question": RunnablePassthrough(),
}
| prompt
| model
| StrOutputParser()
)
for chunk in retrieval_chain.stream(
"Where did harrison work? " "Write 3 made up sentences about this place."
):
print(chunk, end="|", flush=True)
Based| on| the| given| context|,| the| only| information| provided| about| where| Harrison| worked| is| that| he| worked| at| Ken|sh|o|.| Since| there| are| no| other| details| provided| about| Ken|sh|o|,| I| do| not| have| enough| information| to| write| 3| additional| made| up| sentences| about| this| place|.| I| can| only| state| that| Harrison| worked| at| Ken|sh|o|.||
上記のように、Retrieverの後にLLMのコンポーネントをchainさせると、streamingができるLLMの処理だけちゃんとStreamingできているのがわかります。
An LCEL chain constructed using non-streaming components, will still be able to stream in a lot of cases, with streaming of partial output starting after the last non-streaming step in the chain.
非同期Streaming
次に非同期でのstreamingを見てみます。
langchainでは次のようにastream関数を使うことで非同期streamingを実装できます。
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")
parser = StrOutputParser()
chain = prompt | model | parser
async for chunk in chain.astream({"topic": "parrot"}):
print(chunk, end="|", flush=True)
|Why| was| the| par|rot| kicked| out| of| the| comedy| club|?| Because| he| kept| squ|aw|king| all| the| punch|lines|!||
このようにstream関数と同じようにstreamで処理できているのがわかります。
ただ、非同期処理に関しては、このようにnotebookで処理させてるだけでは恩恵はほとんどありません。
ただし、非同期でこのstream処理を実装しておくことで、他の処理を並行して実行することが可能になります。
例えば、複数のLLMの文字生成を同時に実行するみたいな処理をしたい時に便利です。
Event Stream:チェーンのイベントもstreamする
ここまでは結果出力に対してstreamingしてきたけど、そのチェーンが起こしたイベントをstreamで出力したいということがあります。
ここでいうイベントとは、"parserの処理が終わったよ"とか"モデルの処理が始まったよ"などの処理のことを表しています。
このようなイベントのstreamを実現するためにlang chainでは"astream_events"APIというのが用意されています。
(Beta版なので注意が必要です)
まずはコードを見てみましょう
events = []
async for event in model.astream_events("hello", version="v2"):
events.append(event)
events
[{'event': 'on_chat_model_start',
'data': {'input': 'hello'},
'name': 'ChatOpenAI',
'tags': [],
'run_id': '2a4ee2b1-c09c-4782-8853-dd8844c70ae4',
'metadata': {'ls_provider': 'openai',
'ls_model_name': 'gpt-3.5-turbo-0125',
'ls_model_type': 'chat',
'ls_temperature': 0.7},
'parent_ids': []},
{'event': 'on_chat_model_stream',
'run_id': '2a4ee2b1-c09c-4782-8853-dd8844c70ae4',
'name': 'ChatOpenAI',
'tags': [],
'metadata': {'ls_provider': 'openai',
'ls_model_name': 'gpt-3.5-turbo-0125',
'ls_model_type': 'chat',
'ls_temperature': 0.7},
'data': {'chunk': AIMessageChunk(content='', id='run-2a4ee2b1-c09c-4782-8853-dd8844c70ae4')},
'parent_ids': []},
{'event': 'on_chat_model_stream',
'run_id': '2a4ee2b1-c09c-4782-8853-dd8844c70ae4',
'name': 'ChatOpenAI',
'tags': [],
'metadata': {'ls_provider': 'openai',
'ls_model_name': 'gpt-3.5-turbo-0125',
'ls_model_type': 'chat',
'ls_temperature': 0.7},
'data': {'chunk': AIMessageChunk(content='Hello', id='run-2a4ee2b1-c09c-4782-8853-dd8844c70ae4')},
'parent_ids': []},
{'event': 'on_chat_model_stream',
'run_id': '2a4ee2b1-c09c-4782-8853-dd8844c70ae4',
'name': 'ChatOpenAI',
'tags': [],
'metadata': {'ls_provider': 'openai',
'ls_model_name': 'gpt-3.5-turbo-0125',
'ls_model_type': 'chat',
'ls_temperature': 0.7},
'data': {'chunk': AIMessageChunk(content='!', id='run-2a4ee2b1-c09c-4782-8853-dd8844c70ae4')},
'parent_ids': []},
{'event': 'on_chat_model_stream',
'run_id': '2a4ee2b1-c09c-4782-8853-dd8844c70ae4',
'name': 'ChatOpenAI',
'tags': [],
'metadata': {'ls_provider': 'openai',
'ls_model_name': 'gpt-3.5-turbo-0125',
'ls_model_type': 'chat',
'ls_temperature': 0.7},
'data': {'chunk': AIMessageChunk(content=' How', id='run-2a4ee2b1-c09c-4782-8853-dd8844c70ae4')},
'parent_ids': []},
{'event': 'on_chat_model_stream',
'run_id': '2a4ee2b1-c09c-4782-8853-dd8844c70ae4',
'name': 'ChatOpenAI',
'tags': [],
'metadata': {'ls_provider': 'openai',
'ls_model_name': 'gpt-3.5-turbo-0125',
'ls_model_type': 'chat',
'ls_temperature': 0.7},
'data': {'chunk': AIMessageChunk(content=' can', id='run-2a4ee2b1-c09c-4782-8853-dd8844c70ae4')},
'parent_ids': []},
{'event': 'on_chat_model_stream',
'run_id': '2a4ee2b1-c09c-4782-8853-dd8844c70ae4',
'name': 'ChatOpenAI',
'tags': [],
'metadata': {'ls_provider': 'openai',
'ls_model_name': 'gpt-3.5-turbo-0125',
'ls_model_type': 'chat',
'ls_temperature': 0.7},
'data': {'chunk': AIMessageChunk(content=' I', id='run-2a4ee2b1-c09c-4782-8853-dd8844c70ae4')},
'parent_ids': []},
{'event': 'on_chat_model_stream',
'run_id': '2a4ee2b1-c09c-4782-8853-dd8844c70ae4',
'name': 'ChatOpenAI',
'tags': [],
'metadata': {'ls_provider': 'openai',
'ls_model_name': 'gpt-3.5-turbo-0125',
'ls_model_type': 'chat',
'ls_temperature': 0.7},
'data': {'chunk': AIMessageChunk(content=' assist', id='run-2a4ee2b1-c09c-4782-8853-dd8844c70ae4')},
'parent_ids': []},
{'event': 'on_chat_model_stream',
'run_id': '2a4ee2b1-c09c-4782-8853-dd8844c70ae4',
'name': 'ChatOpenAI',
'tags': [],
'metadata': {'ls_provider': 'openai',
'ls_model_name': 'gpt-3.5-turbo-0125',
'ls_model_type': 'chat',
'ls_temperature': 0.7},
'data': {'chunk': AIMessageChunk(content=' you', id='run-2a4ee2b1-c09c-4782-8853-dd8844c70ae4')},
'parent_ids': []},
{'event': 'on_chat_model_stream',
'run_id': '2a4ee2b1-c09c-4782-8853-dd8844c70ae4',
'name': 'ChatOpenAI',
'tags': [],
'metadata': {'ls_provider': 'openai',
'ls_model_name': 'gpt-3.5-turbo-0125',
'ls_model_type': 'chat',
'ls_temperature': 0.7},
'data': {'chunk': AIMessageChunk(content=' today', id='run-2a4ee2b1-c09c-4782-8853-dd8844c70ae4')},
'parent_ids': []},
{'event': 'on_chat_model_stream',
'run_id': '2a4ee2b1-c09c-4782-8853-dd8844c70ae4',
'name': 'ChatOpenAI',
'tags': [],
'metadata': {'ls_provider': 'openai',
'ls_model_name': 'gpt-3.5-turbo-0125',
'ls_model_type': 'chat',
'ls_temperature': 0.7},
'data': {'chunk': AIMessageChunk(content='?', id='run-2a4ee2b1-c09c-4782-8853-dd8844c70ae4')},
'parent_ids': []},
{'event': 'on_chat_model_stream',
'run_id': '2a4ee2b1-c09c-4782-8853-dd8844c70ae4',
'name': 'ChatOpenAI',
'tags': [],
'metadata': {'ls_provider': 'openai',
'ls_model_name': 'gpt-3.5-turbo-0125',
'ls_model_type': 'chat',
'ls_temperature': 0.7},
'data': {'chunk': AIMessageChunk(content='', response_metadata={'finish_reason': 'stop', 'model_name': 'gpt-3.5-turbo-0125'}, id='run-2a4ee2b1-c09c-4782-8853-dd8844c70ae4')},
'parent_ids': []},
{'event': 'on_chat_model_end',
'data': {'output': AIMessageChunk(content='Hello! How can I assist you today?', response_metadata={'finish_reason': 'stop', 'model_name': 'gpt-3.5-turbo-0125'}, id='run-2a4ee2b1-c09c-4782-8853-dd8844c70ae4')},
'run_id': '2a4ee2b1-c09c-4782-8853-dd8844c70ae4',
'name': 'ChatOpenAI',
'tags': [],
'metadata': {'ls_provider': 'openai',
'ls_model_name': 'gpt-3.5-turbo-0125',
'ls_model_type': 'chat',
'ls_temperature': 0.7},
'parent_ids': []}]
このように先ほどは出力に対するstreamだったのに対して、'on_chat_model_start'や'on_chat_model_stream'などeventごとにstreamされているのがわかります。
これらのeventの種類に関しては公式のEvent Referenceから参考してください
ではeventが出力されるようことは何に使えるのでしょうか?
例えば、特定のeventが呼ばれた時に処理を中断したり、処理を変えたりといったこともできるようになります。
下記のようにif文でeventを分類して、処理を変更できます。
async for event in model.astream_events("hello", version="v2"):
if event["event"] == 'on_chat_model_start':
print(f"------{event["metadata"]["ls_model_name"]}での生成を開始します-------")
elif event["event"] == 'on_chat_model_stream':
print(event["data"]["chunk"].content)
elif event["event"] == 'on_chat_model_end':
print("------生成が完了しました--------")
------gpt-3.5-turbo-0125での生成を開始します-------
Hello
!
How
can
I
assist
you
today
?
------生成が完了しました--------
他にも、Agentを実装するときに、どのツールが呼ばれたかなどを表示するのに使えたりするので、処理の幅が広がるので便利です。
さて、ここまでがLangChainのStreaminについてでした。
次からはLanggraphのStreamingについてまとめていきます。
Step3 LanggraphのStreaming
Langchainのマルチエージェント機能を提供するLanggraphにも当然Streaming機能があります。
LanggraphのStreamingについては、こちらの公式のHow to guideがわかりやすいので参考にしてください。
OPENAI KEYのsetupなども端折っているので、こちらを参考にお願いいたします。
LanggraphのStreamingについては、考え方や実装の仕方は概ねLangchainと同じですが、一点違うところがあるとすれば、複数のStreamingモードがあることです。
langgraphのstreamingモードには"values","updates","debug"があります。
Streamingモード"value"と"update"の違い
最もよく使う"value"と"update"の違いです。
公式による文章の違いを日本語訳してものがこちらです。
values: このストリーミングモードでは、グラフの値をストリーミングで返します。これは各ノードが呼び出された後のグラフの全てのStateです。
updates: このストリーミングモードでは、グラフへの更新をストリーミングで返します。これは各ノードが呼び出された後のグラフの更新されたStateです。
ふむふむ。updateは名前の通り更新されたストリームのみ返してvalueはそのStream内での全てのStateを返す感じですかね、
コードの出力を見て違いを確認してみます。
from typing import Literal
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.runnables import ConfigurableField
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
@tool
def get_weather(city: Literal["nyc", "sf"]):
"""Use this to get weather information."""
if city == "nyc":
return "It might be cloudy in nyc"
elif city == "sf":
return "It's always sunny in sf"
else:
raise AssertionError("Unknown city")
tools = [get_weather]
model = ChatOpenAI(model_name="gpt-4o", temperature=0)
graph = create_react_agent(model, tools)
inputs = {"messages": [("human", "what's the weather in sf")]}
print("[Stream_mode = values]")
async for chunk in graph.astream(inputs, stream_mode="values"):
print(chunk)
print("")
print("[Stream_mode = updates]")
async for chunk in graph.astream(inputs, stream_mode="updates"):
print(chunk)
print("")
[Stream_mode = values]
{'messages': [HumanMessage(content="what's the weather in sf", id='39b54097-c88f-4de7-a622-ca0de32682fb')]}
{'messages': [HumanMessage(content="what's the weather in sf", id='39b54097-c88f-4de7-a622-ca0de32682fb'), AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_WDY1ky3qQ0BshR8TlgMvWyBP', 'function': {'arguments': '{"city":"sf"}', 'name': 'get_weather'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 14, 'prompt_tokens': 57, 'total_tokens': 71}, 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_d33f7b429e', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-325d498d-a1df-4ce1-80b9-b1be8b442301-0', tool_calls=[{'name': 'get_weather', 'args': {'city': 'sf'}, 'id': 'call_WDY1ky3qQ0BshR8TlgMvWyBP', 'type': 'tool_call'}], usage_metadata={'input_tokens': 57, 'output_tokens': 14, 'total_tokens': 71})]}
{'messages': [HumanMessage(content="what's the weather in sf", id='39b54097-c88f-4de7-a622-ca0de32682fb'), AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_WDY1ky3qQ0BshR8TlgMvWyBP', 'function': {'arguments': '{"city":"sf"}', 'name': 'get_weather'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 14, 'prompt_tokens': 57, 'total_tokens': 71}, 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_d33f7b429e', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-325d498d-a1df-4ce1-80b9-b1be8b442301-0', tool_calls=[{'name': 'get_weather', 'args': {'city': 'sf'}, 'id': 'call_WDY1ky3qQ0BshR8TlgMvWyBP', 'type': 'tool_call'}], usage_metadata={'input_tokens': 57, 'output_tokens': 14, 'total_tokens': 71}), ToolMessage(content="It's always sunny in sf", name='get_weather', id='84cc331c-e04d-4c1e-843a-3cc7ef8f4d06', tool_call_id='call_WDY1ky3qQ0BshR8TlgMvWyBP')]}
{'messages': [HumanMessage(content="what's the weather in sf", id='39b54097-c88f-4de7-a622-ca0de32682fb'), AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_WDY1ky3qQ0BshR8TlgMvWyBP', 'function': {'arguments': '{"city":"sf"}', 'name': 'get_weather'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 14, 'prompt_tokens': 57, 'total_tokens': 71}, 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_d33f7b429e', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-325d498d-a1df-4ce1-80b9-b1be8b442301-0', tool_calls=[{'name': 'get_weather', 'args': {'city': 'sf'}, 'id': 'call_WDY1ky3qQ0BshR8TlgMvWyBP', 'type': 'tool_call'}], usage_metadata={'input_tokens': 57, 'output_tokens': 14, 'total_tokens': 71}), ToolMessage(content="It's always sunny in sf", name='get_weather', id='84cc331c-e04d-4c1e-843a-3cc7ef8f4d06', tool_call_id='call_WDY1ky3qQ0BshR8TlgMvWyBP'), AIMessage(content='The weather in San Francisco is currently sunny.', response_metadata={'token_usage': {'completion_tokens': 10, 'prompt_tokens': 84, 'total_tokens': 94}, 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_097dcbfe54', 'finish_reason': 'stop', 'logprobs': None}, id='run-630cfc49-ccd0-43a3-a4f3-4c3e7e239650-0', usage_metadata={'input_tokens': 84, 'output_tokens': 10, 'total_tokens': 94})]}
[Stream_mode = updates]
{'agent': {'messages': [AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_5FXZs0fbqjLIlkTg2ZNd8Maq', 'function': {'arguments': '{"city":"sf"}', 'name': 'get_weather'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 14, 'prompt_tokens': 57, 'total_tokens': 71}, 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_d33f7b429e', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-82965ee3-cd60-437f-b992-1ccedcfd7a59-0', tool_calls=[{'name': 'get_weather', 'args': {'city': 'sf'}, 'id': 'call_5FXZs0fbqjLIlkTg2ZNd8Maq', 'type': 'tool_call'}], usage_metadata={'input_tokens': 57, 'output_tokens': 14, 'total_tokens': 71})]}}
{'tools': {'messages': [ToolMessage(content="It's always sunny in sf", name='get_weather', tool_call_id='call_5FXZs0fbqjLIlkTg2ZNd8Maq')]}}
{'agent': {'messages': [AIMessage(content='The weather in San Francisco is currently sunny.', response_metadata={'token_usage': {'completion_tokens': 10, 'prompt_tokens': 84, 'total_tokens': 94}, 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_d33f7b429e', 'finish_reason': 'stop', 'logprobs': None}, id='run-8097afb8-b0f6-4d70-b453-9e4d2d667197-0', usage_metadata={'input_tokens': 84, 'output_tokens': 10, 'total_tokens': 94})]}}
graph.astreamの一つ目のstream_modeが"value"で二つ目が"update"です。
stream_modeがvalueの場合はstreamが出力されるたびに、message 属性のリストにappendされていってるのがわかると思います。
そのため、最後のstreamには、今までのstreamの遷移が全て残っているということになります。
一方でupdateはぱっと見でもすっきりした形になっています。
こちらはkeyにnode名をとって、更新されたmessageのステートのみを返すようになっています。
LanggraphでEventストリームをする
langgraphのコンパイルgraphに対しても"astream_events"を実行できます。
かなりコードを省略しているので、こちらを参考にしてください
from langchain_core.messages import HumanMessage
inputs = [HumanMessage(content="what is the weather in sf")]
async for event in app.astream_events({"messages": inputs}, version="v1"):
kind = event["event"]
if kind == "on_chat_model_stream":
content = event["data"]["chunk"].content
if content:
# Empty content in the context of OpenAI or Anthropic usually means
# that the model is asking for a tool to be invoked.
# So we only print non-empty content
print(content, end="|")
elif kind == "on_tool_start":
print("--")
print(
f"Starting tool: {event['name']} with inputs: {event['data'].get('input')}"
)
elif kind == "on_tool_end":
print(f"Done tool: {event['name']}")
print(f"Tool output was: {event['data'].get('output')}")
print("--")
--
Starting tool: search with inputs: {'query': 'weather in San Francisco'}
Done tool: search
Tool output was: ['Cloudy with a chance of hail.']
--
The| weather| in| San| Francisco| is| currently| cloudy| with| a| chance| of| hail|.|
astream_eventで注意しなければいけないことは,streamに対応しているモデルを選択する必要があるので、その点は注意しましょう!
LLMトークン以外をStreamしたい
Langgraphを使ってる時、LLMトークン(LLMからの出力)以外もStream処理したいなぁと思うことがあるかもしれません。
そんな時はstreamしたい関数をgeneratorで実装して、RunnableGeneratorでラップすることで、Streamさせることが可能です。
こちらがコードになります。
from langchain_core.messages import AIMessage
from langchain_core.runnables import RunnableGenerator
from langchain_core.runnables import RunnableConfig
from langchain_core.messages import HumanMessage
from langgraph.graph import START, StateGraph, MessagesState, END
# Define a new graph
workflow = StateGraph(MessagesState)
async def my_generator(state: MessagesState):
messages = [
"Four",
"score",
"and",
"seven",
"years",
"ago",
"our",
"fathers",
"...",
]
for message in messages:
yield message
async def my_node(state: MessagesState, config: RunnableConfig):
messages = []
# Tagging a node makes it easy to filter out which events to include in your stream
# It's completely optional, but useful if you have many functions with similar names
gen = RunnableGenerator(my_generator).with_config(tags=["should_stream"])
async for message in gen.astream(state):
messages.append(message)
return {"messages": [AIMessage(content=" ".join(messages))]}
workflow.add_node("model", my_node)
workflow.add_edge(START, "model")
workflow.add_edge("model", END)
app = workflow.compile()
inputs = [HumanMessage(content="What are you thinking about?")]
async for event in app.astream_events({"messages": inputs}, version="v1"):
kind = event["event"]
tags = event.get("tags", [])
if kind == "on_chain_stream" and "should_stream" in tags:
data = event["data"]
if data:
# Empty content in the context of OpenAI or Anthropic usually means
# that the model is asking for a tool to be invoked.
# So we only print non-empty content
print(data, end="|")
{'chunk': 'Four'}|{'chunk': 'score'}|{'chunk': 'and'}|{'chunk': 'seven'}|{'chunk': 'years'}|{'chunk': 'ago'}|{'chunk': 'our'}|{'chunk': 'fathers'}|{'chunk': '...'}|
この例だとstreamする意味ないですが、my_generatorのリストの値の数が10000とかになると意味があります。
見ての通り,my_generator関数はgenerator関数として実装されています。
これをRunnableGeneratorで呼び出してastreamするとstream処理が可能になります
まとめ
最後の方が駆け足になってしまいましたが、LangchainとLanggraphのstreamingに関する重要な点や実装の仕方をまとめてみました。
今日はここまで。読んでくださった方ありがとうございます。