Whisper-Pyannote-ChatGPTで議事録を作成する

English version is below.

実現したこと

会議をすると議事録作成が求められることが一般的だと思います。手動でやるにはナカナカ重いタスクかつめんどくさい(誰も読まないのに)ということかとおもいます。そこで、音声ファイルあるいや動画ファイルから議事録を作成するPythonスクリプトを紹介します。

このスクリプトは、Whisper(音声認識)、Pyannote(話者識別)、ChatGPT(テキスト生成)という3つの強力なAIモデルを使用しています。

スクリプトの概要

このスクリプトは以下の手順で動作します:

  1. 音声/動画ファイルの読み込みと前処理

  2. Whisperによる音声の文字起こし

  3. Pyannoteによる話者の識別

  4. 文字起こしと話者情報の統合

  5. ChatGPTによる議事録の生成

それでは、各部分を詳しく見ていきましょう。
と、その前に環境構築から。私はRTX 3070Tiを積んでUbuntu 24.04 on WSL上で動かしています。

OpanAI APIキー取得

OpenAIのウェブサイトでアカウントを作成し、APIキーを取得します。詳細な手順はこちらの記事を参照してください。

環境構築

Hugging faceアクセストークン取得

こちらも良記事を参考にして下さい。

Docker engineインストール

インストールされている可能性のある古いDockerを削除

for pkg in docker.io docker-doc docker-compose docker-compose-v2 podman-docker containerd runc; do sudo apt remove $pkg; done

Docker engineのインストール

sudo apt update
sudo apt install ca-certificates curl
sudo install -m 0755 -d /etc/apt/keyrings
sudo curl -fsSL https://download.docker.com/linux/ubuntu/gpg -o /etc/apt/keyrings/docker.asc
sudo chmod a+r /etc/apt/keyrings/docker.asc
echo \
  "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] https://download.docker.com/linux/ubuntu \
  $(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \
  sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
sudo apt update
sudo apt install docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin
sudo docker run hello-world
docker -v
wsl -—shutdown

sudoなしでdockerコマンドを使えるようにする

sudo groupadd docker
sudo usermod -aG docker rui
wsl --shutdown

Dockerを常時起動にする

再起動するたびにDockerを立ち上げるのがめんどくさいので設定する。

sudo visudo

最終行に下記を加える

docker ALL=(ALL)  NOPASSWD: /usr/sbin/service docker start
sudo nano ~/.bashrc

最終行に下記を追記する。

if [[ $(service docker status | awk '{print $4}') = "not" ]]; then
sudo service docker start > /dev/null
fi
source ~/.bashrc

NVIDIA dockerインストール

curl -fsSL https://nvidia.github.io/libnvidia-container/gpgkey | sudo gpg --dearmor -o /usr/share/keyrings/nvidia-container-toolkit-keyring.gpg \
&& curl -s -L https://nvidia.github.io/libnvidia-container/stable/deb/nvidia-container-toolkit.list | \
sed 's#deb https://#deb [signed-by=/usr/share/keyrings/nvidia-container-toolkit-keyring.gpg] https://#g' | \
sudo tee /etc/apt/sources.list.d/nvidia-container-toolkit.list
sudo apt update
sudo apt install -y nvidia-container-toolkit
sudo nvidia-ctk runtime configure --runtime=docker
sudo systemctl restart docker

Docker container作製

Ubuntu on WSL
ホームフォルダで。
CUDA | NVIDIA NGC

docker pull nvcr.io/nvidia/cuda:11.8.0-cudnn8-devel-ubuntu22.04
docker run -it --gpus all nvcr.io/nvidia/cuda:11.8.0-cudnn8-devel-ubuntu22.04
apt update && apt full-upgrade -y
apt install git wget nano ffmpeg -y

minicondaインストール

cd ~
mkdir tmp
cd tmp

https://docs.anaconda.com/miniconda/#miniconda-latest-installer-links
linux 64-bitのリンクをコピー

wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
bash Miniconda3-latest-Linux-x86_64.sh

# yes, enter, yes
# tmpフォルダを削除
cd ..
rm -rf tmp

exit

docker container ls -a

docker start <container id>

docker exec -it <container id> /bin/bash

conda環境構築

mkdir speech_to_text
cd speech_to_text
nano environment_speech_to_text.yml
name: speech-to-text
channels:
  - conda-forge
  - pytorch
  - nvidia
  - defaults
dependencies:
  - python=3.9
  - pip
  - cudatoolkit=11.8
  - tiktoken
  - pip:
    - openai-whisper
    - pyannote.audio
    - pydub==0.25.1
    - torch
    - torchvision
    - torchaudio
    - moviepy
    - openai
    - python-dotenv
conda env create -f environment_speech_to_text.yml
conda activate speech-to-text

スクリプト

nano speech_to_text.py
import os
import subprocess
import whisper
from pyannote.audio import Pipeline
from pydub import AudioSegment
from moviepy.editor import VideoFileClip
import torch
from openai import OpenAI
from dotenv import load_dotenv
import argparse
import tiktoken

def get_env_variable(var_name):
    value = os.getenv(var_name)
    if value is None:
        raise ValueError(f"Environment variable {var_name} is not set")
    return value

load_dotenv()

OPENAI_API_KEY = get_env_variable("OPENAI_API_KEY")
HF_TOKEN = get_env_variable("HF_TOKEN")

def get_device():
    return torch.device('cuda' if torch.cuda.is_available() else 'cpu')

def convert_to_wav(input_file):
    file_ext = os.path.splitext(input_file)[1].lower()
    if file_ext in [".mp3", ".mp4", ".wma"]:
        print(f"Converting {input_file} to WAV")
        wav_file_path = input_file.replace(file_ext, ".wav")
        
        if file_ext == ".wma":
            try:
                subprocess.run(['ffmpeg', '-i', input_file, wav_file_path], check=True)
                print(f"Converted file saved as {wav_file_path}")
            except subprocess.CalledProcessError:
                raise ValueError(f"Error converting WMA file: {input_file}")
        else:
            audio = AudioSegment.from_file(input_file, format=file_ext[1:])
            audio.export(wav_file_path, format="wav")
            print(f"Converted file saved as {wav_file_path}")
        
        return wav_file_path
    elif file_ext == ".wav":
        return input_file
    else:
        raise ValueError(f"Unsupported file format: {file_ext}")

def extract_audio_from_video(video_file):
    audio_file = video_file.replace(".mp4", ".wav")
    print(f"Extracting audio from {video_file}")
    try:
        video = VideoFileClip(video_file)
        audio = video.audio
        audio.write_audiofile(audio_file)
        video.close()
        audio.close()
        print(f"Audio extracted and saved as {audio_file}")
    except Exception as e:
        print(f"An error occurred while extracting audio: {str(e)}")
        raise
    return audio_file

def transcribe_audio(file_path, model_name, max_chunk_size=600):
    print(f"Transcribing audio from {file_path} using model {model_name}")
    device = get_device()
    model = whisper.load_model(model_name, device=device)
    audio = AudioSegment.from_wav(file_path)
    duration = len(audio) / 1000  # Duration in seconds
    chunk_size = min(max_chunk_size, duration) 

    transcriptions = []
    segments = []

    for i in range(0, int(duration), int(chunk_size)):
        chunk = audio[i*1000:(i+int(chunk_size))*1000]
        chunk_path = f"temp_chunk_{i}.wav"
        try:
            chunk.export(chunk_path, format="wav")
            result = model.transcribe(chunk_path)
            transcriptions.append(result["text"])
            for segment in result["segments"]:
                segment["start"] += i
                segment["end"] += i
                segments.append(segment)
        finally:
            if os.path.exists(chunk_path):
                os.remove(chunk_path)

    transcription = " ".join(transcriptions)
    print(f"Transcription completed")
    return transcription, segments

def diarize_audio(file_path, num_speakers=None, min_speakers=None, max_speakers=None):
    print(f"Performing speaker diarization on {file_path}")
    device = get_device()
    print(f"Using device: {device}")
    
    try:
        pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token=HF_TOKEN)
        pipeline.to(device)
    except Exception as e:
        print(f"Failed to load the pipeline: {e}")
        return None
    
    if num_speakers is not None:
        diarization = pipeline(file_path, num_speakers=num_speakers)
    elif min_speakers is not None and max_speakers is not None:
        diarization = pipeline(file_path, min_speakers=min_speakers, max_speakers=max_speakers)
    else:
        diarization = pipeline(file_path)

    print(f"Speaker diarization completed: {diarization}")
    return diarization

def assign_speakers_to_segments(segments, diarization):
    if diarization is None:
        print("Diarization is None. Skipping speaker assignment.")
        return []

    print("Assigning speakers to segments")
    speaker_segments = []
    current_speaker = None
    current_segment = None
    unmatched_segments = []

    for segment in segments:
        start_time = segment['start']
        end_time = segment['end']
        text = segment['text']
        matched = False

        for turn, _, speaker in diarization.itertracks(yield_label=True):
            overlap = min(end_time, turn.end) - max(start_time, turn.start)
            if overlap > 0:
                if current_speaker == speaker:
                    # Append to the current segment
                    current_segment['end'] = end_time
                    current_segment['text'] += " " + text
                else:
                    # Save the current segment and start a new one
                    if current_segment:
                        speaker_segments.append(current_segment)
                    current_speaker = speaker
                    current_segment = {
                        "start": start_time,
                        "end": end_time,
                        "speaker": speaker,
                        "text": text
                    }
                matched = True
                break
        
        if not matched:
            print(f"No matching speaker found for segment: {text} [{start_time} - {end_time}]")
            unmatched_segments.append({
                "start": start_time,
                "end": end_time,
                "speaker": "UNKNOWN",
                "text": text
            })

    if current_segment:
        speaker_segments.append(current_segment)

    # Merge unmatched segments into speaker_segments
    all_segments = speaker_segments + unmatched_segments
    all_segments.sort(key=lambda x: x['start'])  # Sort by start time

    print("Speakers assigned to segments")
    return all_segments

def count_tokens(text):
    encoding = tiktoken.encoding_for_model("gpt-4-turbo")
    return len(encoding.encode(text))

def split_transcript(transcript, max_tokens_per_chunk):
    words = transcript.split()
    chunks = []
    current_chunk = []
    current_chunk_tokens = 0

    for word in words:
        word_tokens = count_tokens(word)
        if current_chunk_tokens + word_tokens > max_tokens_per_chunk:
            chunks.append(" ".join(current_chunk))
            current_chunk = [word]
            current_chunk_tokens = word_tokens
        else:
            current_chunk.append(word)
            current_chunk_tokens += word_tokens

    if current_chunk:
        chunks.append(" ".join(current_chunk))

    return chunks

def generate_minutes(speaker_segments):
    print("Generating meeting minutes using OpenAI API")
    client = OpenAI(api_key=OPENAI_API_KEY)

    transcript = "\n".join([f"Speaker {seg['speaker']} [{seg['start']} - {seg['end']}]: {seg['text']}" for seg in speaker_segments])
    max_tokens_per_chunk = 4096  # Adjust this based on your token limits and model used

    chunks = split_transcript(transcript, max_tokens_per_chunk)
    all_minutes = []

    for chunk in chunks:
        messages = [
            {"role": "system", "content": "以下の会話の議事録を作成してください。"},
            {"role": "user", "content": chunk}
        ]

        try:
            response = client.chat.completions.create(
                messages=messages,
                model="gpt-4o-mini",
                max_tokens=4096,  # Adjust the max tokens as necessary
                n=1,
                stop=None,
                temperature=0.5,
            )

            if response.choices and len(response.choices) > 0:
                minutes = response.choices[0].message.content.strip()
                all_minutes.append(minutes)
            else:
                print("No choices in the response")
                return None

        except Exception as e:
            print(f"An error occurred: {e}")
            return None

    return "\n".join(all_minutes)

def parse_arguments():
    parser = argparse.ArgumentParser(description="Speech to Text with Speaker Diarization")
    parser.add_argument("audio_file", help="Path to the audio or video file")
    parser.add_argument("--model", choices=["tiny", "base", "small", "medium", "large", "large-v2", "large-v3"], 
                        default="base", help="Whisper model to use")
    parser.add_argument("--num_speakers", type=int, help="Number of speakers (if known)")
    parser.add_argument("--min_speakers", type=int, help="Minimum number of speakers")
    parser.add_argument("--max_speakers", type=int, help="Maximum number of speakers")
    return parser.parse_args()

def main(args):
    print(f"Processing file: {args.audio_file}")
    file_ext = os.path.splitext(args.audio_file)[1].lower()
    if file_ext == ".mp4":
        audio_file = extract_audio_from_video(args.audio_file)
    else:
        audio_file = args.audio_file
    
    if file_ext in [".mp3", ".mp4", ".wav"]:
        audio_file = convert_to_wav(audio_file)
    
    transcription, segments = transcribe_audio(audio_file, args.model)
    print(f"Transcription: {transcription}")
    diarization = diarize_audio(audio_file, num_speakers=args.num_speakers, 
                                min_speakers=args.min_speakers, max_speakers=args.max_speakers)
    speaker_segments = assign_speakers_to_segments(segments, diarization)

    output_file = os.path.splitext(audio_file)[0] + "_output.txt"
    with open(output_file, 'w', encoding='utf-8') as f:
        f.write("Speakers assigned to segments\n")
        for segment in speaker_segments:
            segment_text = f"Speaker {segment['speaker']} [{segment['start']} - {segment['end']}]: {segment['text']}"
            print(segment_text)
            f.write(segment_text + "\n")
    
    print(f"Results written to {output_file}")

    minutes = generate_minutes(speaker_segments)
    if minutes is not None:
        minutes_file = os.path.splitext(audio_file)[0] + "_minutes.txt"
        with open(minutes_file, 'w', encoding='utf-8') as f:
            f.write(minutes)
        print(f"Meeting minutes written to {minutes_file}")
    else:
        print("Failed to generate meeting minutes.")

if __name__ == "__main__":
    args = parse_arguments()
    main(args)

.env作成

nano .env
OPENAI_API_KEY=sk-XXXXXXXXXXXXXXXXXXXXXX
HF_TOKEN=hf_ZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ

使用方法

ヘルプ

python speech_to_text.py --help
usage: speech_to_text.py [-h] [--model {tiny,base,small,medium,large,large-v2,large-v3}] [--num_speakers NUM_SPEAKERS] [--min_speakers MIN_SPEAKERS] [--max_speakers MAX_SPEAKERS]
                         audio_file

Speech to Text with Speaker Diarization

positional arguments:
  audio_file            Path to the audio or video file

optional arguments:
  -h, --help            show this help message and exit
  --model {tiny,base,small,medium,large,large-v2,large-v3}
                        Whisper model to use
  --num_speakers NUM_SPEAKERS
                        Number of speakers (if known)
  --min_speakers MIN_SPEAKERS
                        Minimum number of speakers
  --max_speakers MAX_SPEAKERS
                        Maximum number of speakers

ファイルをWindowsからコンテナにコピー

# on ubuntu command line
# docker container ls -aでコンテナ名を調べる
docker cp "/mnt/c/Windows_path/VVVV.wav" docker_container_name:root/speech_to_text

使用例

python speech_to_text.py VVVSSS0.mp4 --model large-v3 --min_speakers 1 --max_speakers 2

ファイルをコンテナからWindowsにコピー

docker cp docker_container_name:root/speech_to_text/xxx.txt "/mnt/c/Windows path/"

まとめ

このPythonスクリプトは、最新のAI技術を組み合わせることで、音声や動画から自動的に高品質な議事録を作成することができます。これにより、会議の記録や講義の文字起こしなどの作業を大幅に効率化できます。

これで教育・研究活動に時間が使えるようになりハッピーになると良いですね。

AIの進歩は目覚ましいものがあります。皆さんも、このようなAI技術を活用して、より効率的な作業環境を作り上げていってください!

質問や改善点があれば、ぜひコメントで教えてください。

この記事が気に入ったらサポートをしてみませんか?