マルチAI Agent : 複数のLLMを用いた並列処理と自動要約
はじめに
業務を行う上で、複数の大規模言語モデル(LLM)に同じ依頼をして結果を比較することがよくあります。しかし、個別に実行するのは手間がかかります。そこで、LangGraphを使って、一つの問い合わせで複数のモデルからの回答を得て、さらにLLMの力を使って自動的にまとめる方法を探りました。
実装
並列処理の実装方法は、以下のLaggraphの公式ドキュメンを参考にしました。環境はcolabを使用しています。
また今回構築するグラフの構成は以下です。もっともシンプルな構成から試します。
supervisor が全体を監督し、タスクを各 agent に割り振ります。
各 agent は割り振られたタスクを実行し、その結果を summarizer に送ります。
summarizer は各エージェントからの結果をまとめ、最終的なアウトプットを生成します。
必要なパッケージのインストール
%pip install -qU langchain-google-genai
%pip install -qU langchain-anthropic
%pip install -qU langchain-core
%pip install -qU langchain-openai
%pip install -qU tavily-python
%pip install -qU langchain_community
%pip install -qU langgraph
%pip install -qU duckduckgo-search
APIキーの設定
各LLMプロバイダー(OpenAI、Anthropic、Google, Langsmith)のAPIキーを環境変数に設定します。あらかじめcolabのSecretsに設定しています。
# API キーの設定
import os
from google.colab import userdata
os.environ["ANTHROPIC_API_KEY"] = userdata.get('ANTHROPIC_API_KEY')
os.environ["GOOGLE_API_KEY"] = userdata.get("GOOGLE_API_KEY")
os.environ['OPENAI_API_KEY'] = userdata.get('OPENAI_API_KEY')
os.environ['LANGCHAIN_TRACING_V2'] = userdata.get('LANGCHAIN_TRACING_V2')
os.environ['LANGCHAIN_API_KEY'] = userdata.get('LANGCHAIN_API_KEY')
必要なクラスと型の定義
# 必要なライブラリとモジュールのインポート
import operator
from typing import Annotated, Any, List
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langchain.schema import HumanMessage, AIMessage
from langchain_anthropic import ChatAnthropic
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_openai import ChatOpenAI
# 状態を表す型定義
class State(TypedDict):
messages: Annotated[List[str], operator.add]
iteration: int
# 基本ノードクラスの定義
class Node:
def __init__(self, name: str):
self.name = name
def process(self, state: State) -> dict:
raise NotImplementedError
def __call__(self, state: State) -> dict:
result = self.process(state)
print(f"{self.name}: Processing complete")
return result
各ノードの実装
スーパーバイザーノード
タスクの管理と初期化を行います。
# スーパーバイザーノードの定義
class SupervisorNode(Node):
def __init__(self):
super().__init__("Supervisor")
def process(self, state: State) -> dict:
state['iteration'] = state.get('iteration', 0) + 1
task = state['messages'][0] if state['messages'] else "タスクが設定されていません。"
return {"messages": [f"Iteration {state['iteration']}: {task}"]}
エージェントノード
各LLMモデルにタスクを実行させます。
# エージェントノードの定義
class AgentNode(Node):
def __init__(self, agent_name: str, llm):
super().__init__(agent_name)
self.llm = llm
def process(self, state: State) -> dict:
task = state['messages'][-1]
response = self.llm.invoke([HumanMessage(content=f"あなたは{self.name}です。次のタスクを実行してください: {task}")])
return {"messages": [f"{self.name}の回答: {response.content}"]}
サマライザーノード
各エージェントの回答を要約します。
# サマライザーノードの定義
class SummarizerNode(Node):
def __init__(self, llm):
super().__init__("Summarizer")
self.llm = llm
def process(self, state: State) -> dict:
responses = state['messages'][1:] # 初期タスクをスキップ
summary_request = "これまでの各エージェントの回答を要約してください。\n" + "\n".join(responses)
response = self.llm.invoke([HumanMessage(content=summary_request)])
return {"messages": [f"要約: {response.content}"]}
LLMモデルの初期化
def create_llm(model_class, model_name, temperature=0.7):
return model_class(model_name=model_name, temperature=temperature)
# 高性能モデル
claude = create_llm(ChatAnthropic, "claude-3-5-sonnet-20240620")
gemini = create_llm(ChatGoogleGenerativeAI, "gemini-1.5-pro-002")
openai = create_llm(ChatOpenAI, "gpt-4o")
# 軽量モデル
# claude = create_llm(ChatAnthropic, "claude-3-haiku-20240307")
# gemini = create_llm(ChatGoogleGenerativeAI, "gemini-1.5-flash-latest")
# openai = create_llm(ChatOpenAI, "gpt-4o-mini")
グラフの構築
# グラフの構築
builder = StateGraph(State)
nodes = {
"supervisor": SupervisorNode(),
"agent1": AgentNode("Agent1 (claude)", claude),
"agent2": AgentNode("Agent2 (gemini)", gemini),
"agent3": AgentNode("Agent3 (gpt)", openai),
"summarizer": SummarizerNode(openai)
}
for name, node in nodes.items():
builder.add_node(name, node)
builder.add_edge(START, "supervisor")
for agent in ["agent1", "agent2", "agent3"]:
builder.add_edge("supervisor", agent)
builder.add_edge(agent, "summarizer")
builder.add_edge("summarizer", END)
# グラフのコンパイル
graph = builder.compile()
タスクの定義と実行
# タスクの定義
task = "テーブルの上のコップの中にビー玉を入れ、机の上で逆さにしました。それからコップを冷蔵庫の中に移動しました。ビー玉はどこにありますか?"
# グラフの実行
final_state = graph.invoke(
{
"messages": [task],
"iteration": 0
}
)
# 結果の表示(最後の要約のみ)
print(final_state["messages"][-1])
結果の解説
実行すると、以下のような結果が得られます。
Supervisor: Processing complete
Agent2 (gemini): Processing complete
Agent3 (gpt): Processing complete
Agent1 (claude): Processing complete
Summarizer: Processing complete
要約: 以下に各エージェントの回答を要約します:
- **Agent1 (claude)**: ビー玉は冷蔵庫の中にあります。理由として、コップを逆さにしてもビー玉はコップの中に留まり、コップを冷蔵庫に移動させたため、ビー玉も冷蔵庫の中にあると説明しています。
- **Agent2 (gemini)**: ビー玉はテーブルの上にあります。コップを逆さにした時点でビー玉はテーブルの上に落ち、その後コップを冷蔵庫に移動してもビー玉はテーブルの上に残っていると説明しています。
- **Agent3 (gpt)**: ビー玉は冷蔵庫の中にあります。理由として、逆さにしてもビー玉はコップの中に留まるとし、コップを冷蔵庫に移動させたため、ビー玉も冷蔵庫の中にあると説明しています。
要約すると、Agent1とAgent3は「ビー玉は冷蔵庫の中にある」とし、Agent2は「ビー玉はテーブルの上にある」と回答しています。
グラフ定義の確認方法
from IPython.display import Image, display
display(Image(graph.get_graph().draw_mermaid_png()))