![見出し画像](https://assets.st-note.com/production/uploads/images/166759532/rectangle_large_type_2_65157b7bdb720379dce51b9f45fbb8b0.png?width=1200)
openai realtime api (4o-mini-realtime用) pythonコード
import pyaudio
import base64
import json
import wave
import os
import websockets
import asyncio
API_KEY = os.environ["OPENAI_API_KEY"] = "********************************************"
WS_URL = "wss://api.openai.com/v1/realtime?model=gpt-4o-mini-realtime-preview-2024-12-17"
HEADERS = {
"Authorization": "Bearer " + API_KEY,
"OpenAI-Beta": "realtime=v1"
}
def base64_to_pcm16(base64_audio):
audio_data = base64.b64decode(base64_audio)
return audio_data
def record_audio(filename, chunk=2048, format=pyaudio.paInt16, channels=1, rate=24000, record_seconds=5):
p = pyaudio.PyAudio()
stream = p.open(format=format, channels=channels, rate=rate, input=True, frames_per_buffer=chunk)
frames = []
print("Recording...")
for _ in range(0, int(rate / chunk * record_seconds)):
data = stream.read(chunk)
frames.append(data)
print("Recording complete.")
stream.stop_stream()
stream.close()
p.terminate()
with wave.open(filename, 'wb') as wf:
wf.setnchannels(channels)
wf.setsampwidth(p.get_sample_size(format))
wf.setframerate(rate)
wf.writeframes(b''.join(frames))
def play_audio(filename):
p = pyaudio.PyAudio()
wf = wave.open(filename, 'rb')
stream = p.open(format=p.get_format_from_width(wf.getsampwidth()), channels=wf.getnchannels(), rate=wf.getframerate(), output=True)
data = wf.readframes(2048)
while data:
stream.write(data)
data = wf.readframes(2048)
stream.stop_stream()
stream.close()
p.terminate()
async def send_audio(websocket, filename):
with wave.open(filename, 'rb') as wf:
chunk = 2048
data = wf.readframes(chunk)
while data:
base64_audio = base64.b64encode(data).decode("utf-8")
audio_event = {
"type": "input_audio_buffer.append",
"audio": base64_audio
}
await websocket.send(json.dumps(audio_event))
data = wf.readframes(chunk)
async def receive_audio(websocket, output_filename):
with wave.open(output_filename, 'wb') as wf:
wf.setnchannels(1)
wf.setsampwidth(2)
wf.setframerate(24000)
while True:
response = await websocket.recv()
response_data = json.loads(response)
if "delta" in response_data and response_data["type"] == "response.audio.delta":
base64_audio_response = response_data["delta"]
if base64_audio_response:
pcm16_audio = base64_to_pcm16(base64_audio_response)
wf.writeframes(pcm16_audio)
elif "type" in response_data and response_data["type"] == "response.audio_transcript.done":
break
async def main():
async with websockets.connect(WS_URL, additional_headers=HEADERS) as websocket:
print("Connected to WebSocket.")
init_request = {
"type": "response.create",
"response": {
"modalities": ["audio", "text"],
"instructions": "楽しく会話しましょう!",
"voice": "shimmer"
}
}
await websocket.send(json.dumps(init_request))
print("Initial request sent.")
input_filename = "input.wav"
output_filename = "output.wav"
while True:
record_audio(input_filename)
await send_audio(websocket, input_filename)
await receive_audio(websocket, output_filename)
play_audio(output_filename)
if __name__ == "__main__":
asyncio.run(main())
asapさんのrealtime api のコードを少し修正して4o-mini-realtime用に書き換えました。並列処理ではないのでレスポンスは少し悪くなっています。