見出し画像

PlanAndExecute, Self-Consistencyを組み合わせる


はじめに

以前の記事でPlanAndExecuteを実装しましたが、あまり期待通りの結果にならないという結果でした。今回はこの結果にモヤモヤしていたので、改善してみました。
どうやって改善したかというと、Self-Consistencyという手法を適用しました。

Self-Consistency

以下のページに詳しく記載されていますが、複数回の実行を行い、その数だけ出力を得る。出力結果のうち、多数派の回答を最終的な回答とするような方法です。

Self-Consistencyはfew-shot CoTによって、推論と結論の両方を使う方法が本来ですが、今回は結論だけを使う簡易版を適用してみます。

実装

事前準備

LLMとしてllama3.2をollamaを使って呼び出します。また、LLMの入出力の確認・分析のためにLanfuseを使ってログ取得できるようにします。
tempratureを0.5と高くして、推論に多様性を持たせることを期待します。推論は5回とします。

import inspect
import logging
logging.basicConfig(level=logging.WARNING)

from ollama import Client
from langfuse.decorators import observe
from langfuse.decorators import langfuse_context

client = Client(host='http://localhost:11434')
model = 'llama3.2:latest'
options={
    "temperature": 0.5,
}
n_repeat = 5

secret_key = "your secret key"
public_key = "your public key"
host = "http://localhost:3000"
langfuse_context.configure(
  secret_key=secret_key,
  public_key=public_key,
  host=host
)

ツール呼び出しとLLM実行

@observe()
def calculator(expression: str) -> str:
    """
    Calculator

    Args:
        expression: 計算すべき式

    Returns:
        string: 計算結果
    """
    try:
        return f'{expression}の計算結果は、{str(eval(expression))}です。'
    except Exception as e:
        return f"計算エラー: {str(e)}"

@observe(as_type="generation")
def execute(client, model, system_prompt, prompt, tools=None):
    langfuse_context.update_current_observation(
        input={
            "system": system_prompt,
            "prompt": prompt
        },
    )
    res = client.chat(
        model=model,
        messages=[
            {'role': 'system', 'content': system_prompt},
            {'role': 'user', 'content': prompt}
        ],
        format="json",
        options=options,
        tools=tools
    )
    langfuse_context.update_current_observation(
        output=res['message']
    )
    return res 

Planner

plan_system_prompt = '''
あなたはユーザーの目的から実行すべき計画を生成する実行計画AIです。
あなたは知識豊富で多くのことに精通しています。あなたの知識を最大限に発揮して計画を立ててください。
この計画には、正しく実行すれば正しい答えが得られるような、個々の作業を含むようにします。
計画は論理的で整理されたステップで構成してください。
余計なステップを追加しないでください。
最終ステップの結果が最終的な答えであるべきです。
各ステップに必要な情報がすべて含まれていることを確認してください。
以下のような有効なJSON形式で応答してください。
{"plan": [
    {"step1": "作業1"},
    {"step2": "作業2"}
]}
'''

plan_prompt= '''
与えられた以下の入力に対して、
正確な情報をユーザーに提供するために必要な行動の
簡単な step-by-step の作業計画を立ててください。

入力:
{objective}
'''

class Planner:
    def __init__(self, model, client):
        self.client = client
        self.model = model
        self.name = 'Planner'

    @observe()
    def run(self, objective):
        langfuse_context.update_current_observation(
            name=f'{self.name}: {inspect.currentframe().f_code.co_name}')
        res = execute(
            self.client, self.model, 
            plan_system_prompt,
            plan_prompt.format(objective=objective),
            tools=None
        )
        return res['message']['content'].strip()

Observer

PlanAndExecuteでは計画の評価・修正と回答生成を1つのプロンプトで実行しようとしていました。ここでは、計画の評価・修正と回答生成を分けることにします。
また、計画の評価・修正を行うモジュールをObserverと呼ぶことにします。

observer_system_prompt = '''
あなたはアシスタントAIが生成した計画が正しいかどうか判断して、修正を行う観測AIです。
計画には、正しく実行すれば正しい答えが得られるような、個々の作業を含むようにします。
余計なステップを追加しないでください。
最終ステップの結果が最終的な答えであるべきです。
各ステップに必要な情報がすべて含まれていることを確認してください。
以下のような有効なJSON形式で応答してください。
{"plan": [
    {"step1": "作業1"},
    {"step2": "作業2"}
]}
'''

observer_prompt= '''
与えられた以下のタスクと実行計画とステップの実行結果に対して、以下の観点でレビューを行い、修正が必要な場合は計画を修正してください。
- 実行計画に沿って実行するとタスクを満たす応答ができるか?

タスク:
{objective}
実行計画:
{plan}
'''

class Observer:
    def __init__(self, model, client):
        self.client = client
        self.model = model
        self.name = 'Observer'

    @observe()
    def run(self, objective, plan):
        langfuse_context.update_current_observation(
            name=f'{self.name}: {inspect.currentframe().f_code.co_name}')
        res = execute(
            self.client, self.model, 
            observer_system_prompt,
            observer_prompt.format(objective=objective, plan=plan),
            tools=None
        )
        return res['message']['content'].strip()

Responder

回答生成を行うモジュールをResponderと呼ぶことにします。

responder_system_prompt = '''
あなたはAIが計画・実行した結果からユーザーへの応答を生成する応答AIです。
以下のような有効なJSON形式で応答してください。
{
    "response": "応答文",
}
'''

responder_prompt = '''
与えられた以下のタスクと実行計画と実行結果を使って、ユーザーへの応答文を生成してください。
- もし、タスクを満たす応答ができないならば、「did not finish」と応答してください。
- 実行計画の途中であっても、タスクを満たす応答ができるならば、応答文を生成してください。

タスク:
{objective}
実行計画と実行結果:
{result}
'''

class Responder:
    def __init__(self, model, client):
        self.client = client
        self.model = model
        self.name = 'Responder'

    @observe()
    def run(self, objective, result):
        langfuse_context.update_current_observation(
            name=f'{self.name}: {inspect.currentframe().f_code.co_name}')
        res = execute(
            self.client, self.model, 
            responder_system_prompt,
            responder_prompt.format(objective=objective, result=result),
            tools=None
        )
        return res['message']['content'].strip()

Executer

FunctionCallingを行うモジュールの作成。

executer_system_prompt = '''
あなたはアシスタントAIが生成した計画を1ステップづつ実行する実行するエージェントです。
以下のような有効なJSON形式で応答してください。
{
    "function_output": "実行結果",
    "response": "実行結果の解説"
}
'''

executer_prompt = '''
以下にタスクと実行計画と実行すべきステップが与えられます。
実行すべきステップを実行してください。

タスク:
{objective}
実行計画:
{plan}
実行すべきステップ:
{step}
'''

class Executer:
    def __init__(self, model, client):
        self.client = client
        self.model = model
        self.name = 'Executer'

    @observe()
    def run(self, objective, plan, step):
        langfuse_context.update_current_observation(
            name=f'{self.name}: {inspect.currentframe().f_code.co_name}')
        res = execute(
            self.client, self.model, 
            executer_system_prompt,
            executer_prompt.format(objective=objective, plan=plan, step=step),
            tools=[calculator]
        )
        return res

ここで関数呼び出し用の応答文を生成していますが、実際に実行しているのは別の場所になっています。このモジュール内で関数呼び出しを行うべきだった。。。

Merger

Self-Consistencyの多数決を行うモジュールです。

merger_system_prompt = '''
あなたはアシスタントAIが生成した複数の応答文を使って一貫性のある応答をユーザーに返却するAIです。
以下のような有効なJSON形式で応答してください。
{
    "response": "応答文"
}
'''

merger_prompt= '''
与えられた以下の応答候補文の回答を見て、多数決を取り、もっとも多数派の内容を応答してください。

タスク:
{objective}
応答候補文:
{candidates}
'''

class Merger:
    def __init__(self, model, client):
        self.client = client
        self.model = model
        self.name = 'Merger'

    @observe()
    def run(self, objective, candidates):
        langfuse_context.update_current_observation(
            name=f'{self.name}: {inspect.currentframe().f_code.co_name}')
        res = execute(
            self.client, self.model, 
            merger_system_prompt,
            merger_prompt.format(objective=objective, candidates=candidates),
            tools=None
        )
        return res['message']['content'].strip()

処理

各モジュールを呼び出して、処理を行うコード。LangGraphを使うともっとスッキリして、拡張性の高いものになりそうです。

import json

planner = Planner(model, client)
observer = Observer(model, client)
executer = Executer(model, client)
responder = Responder(model, client)
merger = Merger(model, client)

@observe()
def process(objective):
    current_step = 0
    plan = planner.run(objective=objective)
    plan = json.loads(plan)

    for _ in range(10):
        plan = json.dumps(plan, ensure_ascii=False)
        plan = observer.run(objective=objective, plan=plan)
        plan = json.loads(plan)

        if len(plan['plan']) <= current_step:
            current_step = len(plan['plan']) - 1

        result = executer.run(objective=objective, plan=plan, step=plan['plan'][current_step])

        available_functions = {
            'calculator': calculator,
        }

        if result.message.tool_calls != None:
            for tool in result.message.tool_calls or []:
                function_to_call = available_functions.get(tool.function.name)
                if function_to_call:
                    output = function_to_call(**tool.function.arguments)
                    plan['plan'][current_step].update({'result': output})
                else:
                    print('Function not found:', tool.function.name)
                    plan['plan'][current_step].update({'result': ''})
        else:
            plan['plan'][current_step].update({'result': ''})

        result = json.dumps(plan, ensure_ascii=False)
        response = responder.run(objective=objective, result=result)
        response = json.loads(response)
        if 'did not finish' not in response['response']:
            break
        else:
            current_step += 1
            if len(plan['plan']) <= current_step:
                current_step = len(plan['plan']) - 1
    return response

@observe()
def main(objective):
    responses = []
    for _ in range(n_repeat):
        responses.append(process(objective)['response'])
    split = '\n---------------\n'
    candidates = split.join([f'候補{n}:\n {r}' for n, r in enumerate(responses)])
    res = merger.run(objective=objective, candidates=candidates)
    return json.loads(res)['response']

実行結果

objectives = [
    "1234 * 5678 を計算してください",
    "2024年は令和何年ですか?",
    "時速4kmで45分間走り続けると、何キロ進めますか?距離は速さx時間で与えられます。",
    "原価3000円の商品に20%の利益をつけて売りました。定価はいくら?定価は原価+利益で与えられます。",
    "128-256を計算して",
]

result = []
for o in objectives:
    res = main(o)
    result.append({'タスク': o, '応答': res})

for r in result:
    print(r)

出力

{'タスク': '1234 * 5678 を計算してください', '応答': '1234 * 5678 の計算結果は、7006652 です。'}
{'タスク': '2024年は令和何年ですか?', '応答': '2024年は令和6年です。'}
{'タスク': '時速4kmで45分間走り続けると、何キロ進めますか?距離は速さx時間で与えられます。', '応答': '時速4kmで45分間走ると、3キロメートル進めます。'}
{'タスク': '原価3000円の商品に20%の利益をつけて売りました。定価はいくら?定価は原価+利益で与えられます。', '応答': '定価は原価に利益を加えた値で、3000 + (3000 * 0.2) = 3600.0円となります。'}
{'タスク': '128-256を計算して', '応答': '128 は 256 = -128'}

ここまでの結果を3Bパラメータのモデルで達成できるllama3.2は、すごいモデルですね。

いいなと思ったら応援しよう!