OpenAI RealtimeAPI のPythonサンプルコードを公開
はじめに
2024年10月に、OpenAIは待望の「Advanced Voice」をChatGPTに実装し、リアルタイムでの音声対話が可能になりました。さらに同月、開発者向けにリアルタイムAPIが公開されました。これにより、開発者は音声入力を活用したリアルタイム対話型のアプリケーションを構築できるようになり、音声アシスタントやカスタマーサポートのような用途に活用できます。
リアルタイムAPIの可能性
リアルタイムAPIは、音声入力を低遅延で処理する技術を提供し、会話の途中でユーザーが話しかけることも可能です。このAPIは、WebSocketを使用して継続的な音声データの送受信を行い、音声入力を即座に解析・応答する仕組みになっています。この技術により、スマートスピーカーや音声アシスタントのようなアプリケーションに、より自然な人間らしい対話体験をもたらすことが可能です。
サンプルコード
Youtubeで実演したコードはこちら。
import asyncio
import websockets
import pyaudio
import numpy as np
import base64
import json
import os
from dotenv import load_dotenv
import argparse
import threading
import time
# 環境変数の読み込み
load_dotenv()
# 定数
CHUNK = 2048
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 24000
WS_URL = "wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview-2024-10-01"
class AudioChat:
def __init__(self, api_key, voice="echo"):
self.api_key = api_key
self.voice = voice
self.headers = {
"Authorization": f"Bearer {self.api_key}",
"OpenAI-Beta": "realtime=v1"
}
self.p = pyaudio.PyAudio()
self.input_stream = None
self.output_stream = None
self.is_listening = False
self.is_paused = False
def print_welcome_message(self):
print("===== リアルタイム音声チャットへようこそ! =====")
print("使い方:")
print(" - マイクに向かって話しかけてください")
print(" - '!quit'と言うか、Ctrl+Cを押すと終了します")
print(" - '!pause'と言うと一時停止、'!resume'で再開します")
print(" - '!topic <トピック>'と言うと新しい会話トピックを設定できます")
print("準備ができたら何か話しかけてください!")
print("==============================================")
def show_listening_indicator(self):
while self.is_listening and not self.is_paused:
print("🎤", end="", flush=True)
time.sleep(0.5)
print("\b ", end="", flush=True)
time.sleep(0.5)
async def send_audio(self, websocket):
self.print_welcome_message()
print("マイクの初期化中...")
await asyncio.sleep(2) # マイクの初期化を模倣
print("準備完了!話しかけてください。")
self.is_listening = True
threading.Thread(target=self.show_listening_indicator, daemon=True).start()
while True:
if self.is_paused:
await asyncio.sleep(0.1)
continue
try:
audio_data = self.input_stream.read(CHUNK, exception_on_overflow=False)
base64_audio = base64.b64encode(audio_data).decode("utf-8")
audio_event = {
"type": "input_audio_buffer.append",
"audio": base64_audio
}
await websocket.send(json.dumps(audio_event))
await asyncio.sleep(0)
except Exception as e:
print(f"音声送信エラー: {e}")
await asyncio.sleep(1)
async def receive_audio(self, websocket):
print("アシスタント: 準備ができました。どのようなお手伝いができますか?")
while True:
try:
response = await websocket.recv()
response_data = json.loads(response)
if response_data["type"] == "response.audio_transcript.delta":
transcript = response_data["delta"]
print(transcript, end="", flush=True)
# コマンドの処理
if transcript.strip().lower() == "!quit":
print("\n終了します...")
return
elif transcript.strip().lower() == "!pause":
self.is_paused = True
print("\n一時停止しました。'!resume'で再開します。")
elif transcript.strip().lower() == "!resume":
self.is_paused = False
print("\n再開しました。")
elif transcript.strip().lower().startswith("!topic"):
new_topic = transcript.strip()[7:]
print(f"\n新しいトピックを設定しました: {new_topic}")
await self.set_new_topic(websocket, new_topic)
elif response_data["type"] == "response.audio_transcript.done":
print("\nアシスタント: ", end="", flush=True)
elif response_data["type"] == "response.audio.delta":
base64_audio_response = response_data["delta"]
if base64_audio_response:
pcm16_audio = base64.b64decode(base64_audio_response)
self.output_stream.write(pcm16_audio)
except Exception as e:
print(f"音声受信エラー: {e}")
await asyncio.sleep(1)
async def set_new_topic(self, websocket, topic):
new_topic_request = {
"type": "response.create",
"response": {
"modalities": ["audio", "text"],
"instructions": f"新しいトピックについて話し合います: {topic}",
"voice": self.voice
}
}
await websocket.send(json.dumps(new_topic_request))
async def chat_session(self):
async with websockets.connect(WS_URL, extra_headers=self.headers) as websocket:
print("WebSocketに接続しました。")
init_request = {
"type": "response.create",
"response": {
"modalities": ["audio", "text"],
"instructions": "ユーザーをサポートしてください。",
"voice": self.voice
}
}
await websocket.send(json.dumps(init_request))
print("初期リクエストを送信しました。")
self.input_stream = self.p.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK)
self.output_stream = self.p.open(format=FORMAT, channels=CHANNELS, rate=RATE, output=True, frames_per_buffer=CHUNK)
print("マイク入力とサーバーからの音声再生を開始します...")
try:
send_task = asyncio.create_task(self.send_audio(websocket))
receive_task = asyncio.create_task(self.receive_audio(websocket))
await asyncio.gather(send_task, receive_task)
except KeyboardInterrupt:
print("終了中...")
finally:
self.cleanup()
def cleanup(self):
if self.input_stream:
self.input_stream.stop_stream()
self.input_stream.close()
if self.output_stream:
self.output_stream.stop_stream()
self.output_stream.close()
self.p.terminate()
def main():
parser = argparse.ArgumentParser(description="OpenAI APIを使用したリアルタイム音声チャット")
parser.add_argument("--voice", choices=["alloy", "echo", "shimmer"], default="echo", help="アシスタントの音声を選択")
args = parser.parse_args()
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise ValueError("環境変数にOPENAI_API_KEYが設定されていません")
chat = AudioChat(api_key, args.voice)
asyncio.get_event_loop().run_until_complete(chat.chat_session())
if __name__ == "__main__":
main()