見出し画像

openai realtime api (4o-mini-realtime用) pythonコード

import pyaudio
import base64
import json
import wave
import os
import websockets
import asyncio

API_KEY = os.environ["OPENAI_API_KEY"] = "********************************************"
WS_URL = "wss://api.openai.com/v1/realtime?model=gpt-4o-mini-realtime-preview-2024-12-17"
HEADERS = {
    "Authorization": "Bearer " + API_KEY,
    "OpenAI-Beta": "realtime=v1"
}

def base64_to_pcm16(base64_audio):
    audio_data = base64.b64decode(base64_audio)
    return audio_data

def record_audio(filename, chunk=2048, format=pyaudio.paInt16, channels=1, rate=24000, record_seconds=5):
    p = pyaudio.PyAudio()
    stream = p.open(format=format, channels=channels, rate=rate, input=True, frames_per_buffer=chunk)
    frames = []

    print("Recording...")
    for _ in range(0, int(rate / chunk * record_seconds)):
        data = stream.read(chunk)
        frames.append(data)
    print("Recording complete.")

    stream.stop_stream()
    stream.close()
    p.terminate()

    with wave.open(filename, 'wb') as wf:
        wf.setnchannels(channels)
        wf.setsampwidth(p.get_sample_size(format))
        wf.setframerate(rate)
        wf.writeframes(b''.join(frames))

def play_audio(filename):
    p = pyaudio.PyAudio()
    wf = wave.open(filename, 'rb')
    stream = p.open(format=p.get_format_from_width(wf.getsampwidth()), channels=wf.getnchannels(), rate=wf.getframerate(), output=True)

    data = wf.readframes(2048)
    while data:
        stream.write(data)
        data = wf.readframes(2048)

    stream.stop_stream()
    stream.close()
    p.terminate()

async def send_audio(websocket, filename):
    with wave.open(filename, 'rb') as wf:
        chunk = 2048
        data = wf.readframes(chunk)
        while data:
            base64_audio = base64.b64encode(data).decode("utf-8")
            audio_event = {
                "type": "input_audio_buffer.append",
                "audio": base64_audio
            }
            await websocket.send(json.dumps(audio_event))
            data = wf.readframes(chunk)

async def receive_audio(websocket, output_filename):
    with wave.open(output_filename, 'wb') as wf:
        wf.setnchannels(1)
        wf.setsampwidth(2)
        wf.setframerate(24000)
        while True:
            response = await websocket.recv()
            response_data = json.loads(response)
            if "delta" in response_data and response_data["type"] == "response.audio.delta":
                base64_audio_response = response_data["delta"]
                if base64_audio_response:
                    pcm16_audio = base64_to_pcm16(base64_audio_response)
                    wf.writeframes(pcm16_audio)
            elif "type" in response_data and response_data["type"] == "response.audio_transcript.done":
                break

async def main():
    async with websockets.connect(WS_URL, additional_headers=HEADERS) as websocket:
        print("Connected to WebSocket.")

        init_request = {
            "type": "response.create",
            "response": {
                "modalities": ["audio", "text"],
                "instructions": "楽しく会話しましょう!",
                "voice": "shimmer"
            }
        }
        await websocket.send(json.dumps(init_request))
        print("Initial request sent.")

        input_filename = "input.wav"
        output_filename = "output.wav"
        while True:
            record_audio(input_filename)
            await send_audio(websocket, input_filename)
            await receive_audio(websocket, output_filename)
            play_audio(output_filename)

if __name__ == "__main__":
    asyncio.run(main())

asapさんのrealtime api のコードを少し修正して4o-mini-realtime用に書き換えました。並列処理ではないのでレスポンスは少し悪くなっています。

asapさんの元記事
https://zenn.dev/asap/articles/4368fd306b592a

いいなと思ったら応援しよう!