見出し画像

画面共有機能付きボイスAIチャットボット

AIと対話しながら、PCの画面も共有することができるチャットボットをご紹介します。

このAIを使用するときは、イヤホンを使う必要があります。そうしないとAIが自分の声を拾ってしまってエラーが起きます。

主な機能

  • 音声でのやり取り: 音声でAIとコミュニケーション。

  • 画面共有: AIがリアルタイムでPC画面内容を分析し、回答。

  • 環境設定: `.env`ファイルを使用してAPIキーを安全に管理。

メリット

  • ハンズフリー操作: タイピング不要でAIとコミュニケーション可能。

  • 無料で使える:API無料のLLMを使用。

技術詳細

  • 音声処理: PyAudioを使用して音声ストリームの取得と再生。

  • 画面キャプチャ: MSSとPillowを使用して画面画像の取得と処理。

  • 使用API: GoogleのLLM(gemini-2.0-flash-exp)でAIの応答を生成。

セットアップの手順

1. Gemini APIの取得

1. Google AI Studioにアクセスします。
2. Googleアカウントでログインします。
3. APIキーを作成します。

2. 各ファイルのセットアップ

  1. 以下のコードで、ディレクトリに 'main.py' ファイルを作成して下さい。

# This project implements a voice chat bot that allows you to converse with AI while sharing your PC screen.
# You can interact with the AI using your voice while the AI can see your screen in real-time.

import asyncio
import base64
import io
import os
import sys
import traceback
from dotenv import load_dotenv

import pyaudio
import PIL.Image
import mss

from google import genai

if sys.version_info < (3, 11, 0):
    import taskgroup, exceptiongroup
    asyncio.TaskGroup = taskgroup.TaskGroup
    asyncio.ExceptionGroup = exceptiongroup.ExceptionGroup

FORMAT = pyaudio.paInt16
CHANNELS = 1
SEND_SAMPLE_RATE = 16000
RECEIVE_SAMPLE_RATE = 24000
CHUNK_SIZE = 1024

MODEL = "models/gemini-2.0-flash-exp" # gemini-2.0-flash-exp / gemini-2.0-flash-thinking-exp-1219

load_dotenv()  # Load variables from .env

api_key = os.getenv('GEMINI_API_KEY')

client = genai.Client(
    api_key=api_key,
    http_options={"api_version": "v1alpha", "timeout": 3000}
)

CONFIG = {"generation_config": {"response_modalities": ["AUDIO"]}}

pya = pyaudio.PyAudio()

class AudioLoop:
    def __init__(self):
        self.audio_in_queue = None
        self.audio_out_queue = None
        self.data_out_queue = None

        self.session = None

        self.send_text_task = None
        self.receive_audio_task = None
        self.play_audio_task = None

        self.audio_stream = None
        self.play_stream = None
        
        self.is_running = True

    async def send_text(self):
        while self.is_running:
            try:
                text = await asyncio.to_thread(
                    input,
                    "message > ",
                )
                if text.lower() == "q":
                    self.is_running = False
                    break
                await self.session.send(text or ".", end_of_turn=True)
            except EOFError:
                print("\nInput stream ended. Retrying in 1 second...")
                await asyncio.sleep(1)
                continue
            except Exception as e:
                print(f"\nError in send_text: {e}")
                await asyncio.sleep(1)
                continue

    def _get_frame(self, sct):
        try:
            monitor = sct.monitors[1]  # Use the primary monitor
            sct_img = sct.grab(monitor)
            img = PIL.Image.frombytes("RGB", sct_img.size, sct_img.bgra, "raw", "BGRX")
            img.thumbnail([1024, 1024])

            image_io = io.BytesIO()
            img.save(image_io, format="jpeg")
            image_io.seek(0)

            mime_type = "image/jpeg"
            image_bytes = image_io.read()
            return {"mime_type": mime_type, "data": base64.b64encode(image_bytes).decode()}
        except Exception as e:
            print(f"Error in _get_frame: {e}")
            return None

    async def get_frames(self):
        with mss.mss() as sct:
            while self.is_running:
                try:
                    frame_data = await asyncio.to_thread(self._get_frame, sct)
                    if frame_data is None:
                        await asyncio.sleep(1)
                        continue

                    await asyncio.sleep(1.0)
                    await self.data_out_queue.put(frame_data)
                except Exception as e:
                    print(f"Error in get_frames: {e}")
                    await asyncio.sleep(1)

    async def send_realtime(self):
        async def process_audio_queue():
            while self.is_running:
                try:
                    audio_msg = await self.audio_out_queue.get()
                    await self.session.send(input=audio_msg)
                except Exception as e:
                    print(f"Error in process_audio_queue: {e}")
                    await asyncio.sleep(1)

        async def process_data_queue():
            while self.is_running:
                try:
                    data_msg = await self.data_out_queue.get()
                    await self.session.send(input=data_msg)
                except Exception as e:
                    print(f"Error in process_data_queue: {e}")
                    await asyncio.sleep(1)

        await asyncio.gather(process_audio_queue(), process_data_queue())

    async def listen_audio(self):
        try:
            mic_info = pya.get_default_input_device_info()
            self.audio_stream = await asyncio.to_thread(
                pya.open,
                format=FORMAT,
                channels=CHANNELS,
                rate=SEND_SAMPLE_RATE,
                input=True,
                input_device_index=mic_info["index"],
                frames_per_buffer=CHUNK_SIZE,
            )
            if __debug__:
                kwargs = {"exception_on_overflow": False}
            else:
                kwargs = {}
            while self.is_running:
                try:
                    data = await asyncio.to_thread(self.audio_stream.read, CHUNK_SIZE, **kwargs)
                    await self.audio_out_queue.put({"data": data, "mime_type": "audio/pcm"})
                except Exception as e:
                    print(f"Error in listen_audio: {e}")
                    await asyncio.sleep(1)
        except Exception as e:
            print(f"Error initializing audio stream: {e}")

    async def receive_audio(self):
        while self.is_running:
            try:
                turn = self.session.receive()
                async for response in turn:
                    if data := response.data:
                        self.audio_in_queue.put_nowait(data)
                        continue
                    if text := response.text:
                        print(text, end="")

                while not self.audio_in_queue.empty():
                    self.audio_in_queue.get_nowait()
            except Exception as e:
                print(f"Error in receive_audio: {e}")
                await asyncio.sleep(1)

    async def play_audio(self):
        try:
            self.play_stream = await asyncio.to_thread(
                pya.open,
                format=FORMAT,
                channels=CHANNELS,
                rate=RECEIVE_SAMPLE_RATE,
                output=True,
            )
            while self.is_running:
                try:
                    bytestream = await self.audio_in_queue.get()
                    await asyncio.to_thread(self.play_stream.write, bytestream)
                except Exception as e:
                    print(f"Error in play_audio: {e}")
                    sys.exit(0)
        except Exception as e:
            print(f"Error initializing play stream: {e}")

    async def run(self):
        try:
            async with (
                client.aio.live.connect(model=MODEL, config=CONFIG) as session,
                asyncio.TaskGroup() as tg,
            ):
                self.session = session

                self.audio_in_queue = asyncio.Queue()
                self.audio_out_queue = asyncio.Queue(maxsize=5)
                self.data_out_queue = asyncio.Queue(maxsize=5)

                send_text_task = tg.create_task(self.send_text())
                tg.create_task(self.send_realtime())
                tg.create_task(self.listen_audio())
                tg.create_task(self.get_frames())
                tg.create_task(self.receive_audio())
                tg.create_task(self.play_audio())

                await send_text_task
                self.is_running = False

        except asyncio.CancelledError:
            self.is_running = False
        except ExceptionGroup as EG:
            traceback.print_exception(EG)
        finally:
            self.is_running = False
            # Stop and close the audio stream if it exists
            if self.audio_stream:
                self.audio_stream.stop_stream()
                self.audio_stream.close()
            # Stop and close the play stream if it exists
            if self.play_stream:
                self.play_stream.stop_stream()
                self.play_stream.close()
            # Terminate PyAudio object
            pya.terminate()

if __name__ == "__main__":
    main = AudioLoop()
    asyncio.run(main.run())

2. '.env' ファイルを作成し、main.pyと同じディレクトリに配置します。

'.env' ファイルに、以下のXXXXXXXXXXXXXX部分を、取得したGemini APIキーに置き換えたコードを記載して下さい。

GEMINI_API_KEY=XXXXXXXXXXXXXX

3. 必要なライブラリをインストール

pip install -U google-generativeai python-dotenv PyAudio Pillow mss

3. 'main.py' ファイルの実行

スタート方法:

1. ターミナルを開きます。
2. 次のコマンドを実行します:

python main.py

3. プログラムを終了するには、ターミナルで q を押します。

ーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーー

TwitterでぜひこのAIについてご意見をお寄せ下さい。フォローもよろしくお願いします🙇

旅人Twitter


いいなと思ったら応援しよう!