chatbotが自発的な質問をするための標準inputではないinput #1
import json
import threading
import openai
import requests
def input_with_timeout(s, timeout):
n_return = None
def my_input(text):
nonlocal n_return
n_return = input(text)
t = threading.Thread(target=my_input, args=(s,))
t.daemon = True # スレッドがメインスレッドと一緒に終了するように設定
t.start()
t.join(timeout=timeout) # スレッドが終了するまで待機
if t.is_alive(): # タイムアウト後もスレッドが生きている場合
print() # 改行
return None # Noneを返す
return n_return # ユーザーの入力を返す
def get_weather(location):
# 気象庁データの取得。エリアのコード(pathCode) 東京は130000
jma_url = f"https://www.jma.go.jp/bosai/forecast/data/forecast/{location}.json"
jma_json = requests.get(jma_url).json()
jma_weather = jma_json[0]["timeSeries"][0]["areas"][0]["weathers"][0]
jma_weather = jma_weather.replace(' ', '')
return json.dumps({"weather":jma_weather})
def chat(u, messages, tools, client):
messages.append({"role": "user", "content": u})
resp = client.chat.completions.create(
model="gpt-3.5-turbo-0125",
messages=messages,
tools=tools,
tool_choice="auto",
)
resp_mess = resp.choices[0].message
tool_calls = resp_mess.tool_calls
if tool_calls:
available_functions = {
"get_weather": get_weather
} # only one function in this example, but you can have multiple
messages.append(resp_mess) # extend conversation with assistant's reply
for tool_call in tool_calls:
function_name = tool_call.function.name
function_to_call = available_functions[function_name]
function_args = json.loads(tool_call.function.arguments)
function_response = function_to_call(
location=function_args.get("location"),
)
messages.append(
{
"tool_call_id": tool_call.id,
"role": "tool",
"name": function_name,
"content": function_response,
}
) # extend conversation with function response
second_resp = client.chat.completions.create(
model="gpt-3.5-turbo-0125",
messages=messages,
) # get a new response from the model where it can see the function response
a = second_resp.choices[0].message.content
messages.append({"role": "assistant", "content": a})
return a
a = resp.choices[0].message.content
messages.append({"role": "assistant", "content": a})
return a
tools = [
{"type": "function",
"function":{
"name": "get_weather",
"description": "ask for the weather",
"parameters": {
"type": "object",
"properties": {
# 引数の情報
"location": {
"type": "string",
"description": "location",
},
},
"required": ["weather"]
}
}
}
]
client = openai.OpenAI()
messages = []
while True:
u = input_with_timeout('You: ', 50)
if u is None: # 50秒間何も入力がなかった場合
u = "東京の天気の情報を取得します。会話の中に取り入れてuserに伝えてください。tokyoのlocationは130000です。"
messages.append({"role": "system", "content": u})
a = chat(u, messages, tools, client)
print(" chatbot: " + a)
continue
print()
if u == "":
break
a = chat(u, messages, tools, client)
print(" chatbot: " + a)
まず自前のinputを作ります。webを参考に少し修正しました。そして50秒入力がなかった場合強制function callingで天気の情報を気象庁から取得します。天気を参考にchatbotが返答します。ここの所はループを何回もするとワンパターンになってしまうので何か工夫が必要です。関数をRAGに変更して多岐にわたる質問のバリエーションを作った方が良いかもしれません。
またはニュースを取得するなど。
追記:
自前のinputを少し改善しました。2024/05/12