Faster-Whisper-Pyannote-ChatGPTで議事録を作成する

実現したこと

会議をすると議事録作成が求められることが一般的だと思います。手動でやるにはナカナカ重いタスクかつめんどくさい(誰も読まないのに)ということかとおもいます。そこで、音声ファイルあるいや動画ファイルから議事録を作成するPythonスクリプトを紹介します。

このスクリプトは、Whisper(音声認識)、Pyannote(話者識別)、ChatGPT(テキスト生成)という3つの強力なAIモデルを使用しています。

以前、Whisperを使ったものを記事にしました。

これが結構処理に時間がかかるので、今回はFaster-Whisperを使ったものを紹介します。

これが速いです!!!

スクリプトの概要

このスクリプトは以下の手順で動作します:

  1. 音声/動画ファイルの読み込みと前処理

  2. Faster-Whisperによる音声の文字起こし

  3. Pyannoteによる話者の識別

  4. 文字起こしと話者情報の統合

  5. ChatGPTによる議事録の生成

それでは、各部分を詳しく見ていきましょう。
と、その前に環境構築から。私はRTX 3070Tiを積んでUbuntu 24.04 on WSL上で動かしています。

OpanAI APIキー取得

OpenAIのウェブサイトでアカウントを作成し、APIキーを取得します。詳細な手順はこちらの記事を参照してください。

https://qiita.com/kofumi/items/16a9a501ffc8dd49da50

環境構築

Hugging faceアクセストークン取得

こちらも良記事を参考にして下さい。

https://zenn.dev/protoout/articles/73-hugging-face-setup

Docker engineインストール

https://docs.docker.com/engine/install/ubuntu/

インストールされている可能性のある古いDockerを削除

for pkg in docker.io docker-doc docker-compose docker-compose-v2 podman-docker containerd runc; do sudo apt remove $pkg; done

Docker engineのインストール

sudo apt update
sudo apt install ca-certificates curl
sudo install -m 0755 -d /etc/apt/keyrings
sudo curl -fsSL https://download.docker.com/linux/ubuntu/gpg -o /etc/apt/keyrings/docker.asc
sudo chmod a+r /etc/apt/keyrings/docker.asc
echo \
  "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] https://download.docker.com/linux/ubuntu \
  $(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \
  sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
sudo apt update
sudo apt install docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin
sudo docker run hello-world
docker -v
wsl -—shutdown

sudoなしでdockerコマンドを使えるようにする

sudo groupadd docker
sudo usermod -aG docker rui
wsl --shutdown

Dockerを常時起動にする

再起動するたびにDockerを立ち上げるのがめんどくさいので設定する。

sudo visudo

最終行に下記を加える

docker ALL=(ALL)  NOPASSWD: /usr/sbin/service docker start
sudo nano ~/.bashrc

最終行に下記を追記する。

if [[ $(service docker status | awk '{print $4}') = "not" ]]; then
sudo service docker start > /dev/null
fi
source ~/.bashrc

NVIDIA dockerインストール

curl -fsSL https://nvidia.github.io/libnvidia-container/gpgkey | sudo gpg --dearmor -o /usr/share/keyrings/nvidia-container-toolkit-keyring.gpg \
&& curl -s -L https://nvidia.github.io/libnvidia-container/stable/deb/nvidia-container-toolkit.list | \
sed 's#deb https://#deb [signed-by=/usr/share/keyrings/nvidia-container-toolkit-keyring.gpg] https://#g' | \
sudo tee /etc/apt/sources.list.d/nvidia-container-toolkit.list
sudo apt update
sudo apt install -y nvidia-container-toolkit
sudo nvidia-ctk runtime configure --runtime=docker
sudo systemctl restart docker

Docker container作製

Ubuntu on WSL
ホームフォルダで。
CUDA | NVIDIA NGC

docker pull nvcr.io/nvidia/cuda:12.2.2-cudnn8-devel-ubuntu22.04
docker run -it --gpus all nvcr.io/nvidia/cuda:12.2.2-cudnn8-devel-ubuntu22.04
apt update && apt full-upgrade -y
apt install git wget nano ffmpeg -y

minicondaインストール

cd ~
mkdir tmp
cd tmp

https://docs.anaconda.com/miniconda/#miniconda-latest-installer-links
linux 64-bitのリンクをコピー

wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
bash Miniconda3-latest-Linux-x86_64.sh

# yes, enter, yes
# tmpフォルダを削除
cd ..
rm -rf tmp

exit

docker container ls -a

docker start <container id>

docker exec -it <container id> /bin/bash

conda環境構築

mkdir faster_whisper
cd faster_whisper
nano environment_faster_whisper.yml
name: faster-whisper
channels:
  - conda-forge
  - pytorch
  - nvidia
  - defaults
dependencies:
  - python=3.11
  - pip
  - nvidia/label/cuda-12.2.2::cuda-toolkit
  - tiktoken
  - pip:
    - faster-whisper
    - pyannote.audio
    - pydub==0.25.1
    - torch
    - torchvision
    - torchaudio
    - moviepy
    - openai
    - python-dotenv
conda env create -f environment_faster_whisper.yml
conda activate faster-whisper

スクリプト

nano faster_whisper_script.py
import os
import subprocess
from faster_whisper import WhisperModel
from pyannote.audio import Pipeline
from pydub import AudioSegment
from moviepy.editor import VideoFileClip
import torch
from openai import OpenAI
from dotenv import load_dotenv
import argparse
import tiktoken

def get_env_variable(var_name):
    value = os.getenv(var_name)
    if value is None:
        raise ValueError(f"Environment variable {var_name} is not set")
    return value

# .envファイルを読み込む
load_dotenv()

# OpenAI APIキーと HuggingFace トークンを環境変数から取得
OPENAI_API_KEY = get_env_variable("OPENAI_API_KEY")
HF_TOKEN = get_env_variable("HF_TOKEN")

def get_device():
    return 'cuda' if torch.cuda.is_available() else 'cpu'

def convert_to_wav(input_file):
    file_ext = os.path.splitext(input_file)[1].lower()
    if file_ext in [".mp3", ".mp4", ".wma"]:
        print(f"Converting {input_file} to WAV")
        wav_file_path = input_file.replace(file_ext, ".wav")
        
        if file_ext == ".wma":
            try:
                subprocess.run(['ffmpeg', '-i', input_file, wav_file_path], check=True)
                print(f"Converted file saved as {wav_file_path}")
            except subprocess.CalledProcessError:
                raise ValueError(f"Error converting WMA file: {input_file}")
        else:
            audio = AudioSegment.from_file(input_file, format=file_ext[1:])
            audio.export(wav_file_path, format="wav")
            print(f"Converted file saved as {wav_file_path}")
        
        return wav_file_path
    elif file_ext == ".wav":
        return input_file
    else:
        raise ValueError(f"Unsupported file format: {file_ext}")

def extract_audio_from_video(video_file):
    audio_file = video_file.replace(".mp4", ".wav")
    print(f"Extracting audio from {video_file}")
    try:
        video = VideoFileClip(video_file)
        audio = video.audio
        audio.write_audiofile(audio_file)
        video.close()
        audio.close()
        print(f"Audio extracted and saved as {audio_file}")
    except Exception as e:
        print(f"An error occurred while extracting audio: {str(e)}")
        raise
    return audio_file

def transcribe_audio(file_path, model_name, beam_size=5, compute_type="float16"):
    print(f"Transcribing audio from {file_path} using model {model_name}")
    print(f"Beam size: {beam_size}, Compute type: {compute_type}")
    device = get_device()
    model = WhisperModel(model_name, device=device, compute_type=compute_type)
    
    try:
        # まず、multi-segment言語検出を試みる
        language_info = model.detect_language_multi_segment(file_path)
        detected_language = language_info[0]
    except AttributeError:
        # multi-segmentメソッドが利用できない場合、通常の言語検出を使用
        print("Multi-segment language detection not available. Using standard detection.")
        segments, info = model.transcribe(file_path, beam_size=beam_size)
        detected_language = info.language
    
    print(f"Detected language: {detected_language}")
    
    segments, info = model.transcribe(file_path, beam_size=beam_size, language=detected_language)
    
    transcription = ""
    formatted_segments = []
    
    for segment in segments:
        transcription += segment.text + " "
        formatted_segments.append({
            "start": segment.start,
            "end": segment.end,
            "text": segment.text
        })
    
    print(f"Transcription completed")
    return transcription.strip(), formatted_segments

def diarize_audio(file_path, num_speakers=None, min_speakers=None, max_speakers=None):
    print(f"Performing speaker diarization on {file_path}")
    device = get_device()
    print(f"Using device: {device}")
    
    try:
        pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token=HF_TOKEN)
        pipeline.to(device)
    except Exception as e:
        print(f"Failed to load the pipeline: {e}")
        return None
    
    if num_speakers is not None:
        diarization = pipeline(file_path, num_speakers=num_speakers)
    elif min_speakers is not None and max_speakers is not None:
        diarization = pipeline(file_path, min_speakers=min_speakers, max_speakers=max_speakers)
    else:
        diarization = pipeline(file_path)

    print(f"Speaker diarization completed: {diarization}")
    return diarization

def assign_speakers_to_segments(segments, diarization):
    if diarization is None:
        print("Diarization is None. Skipping speaker assignment.")
        return []

    print("Assigning speakers to segments")
    speaker_segments = []
    current_speaker = None
    current_segment = None
    unmatched_segments = []

    for segment in segments:
        start_time = segment['start']
        end_time = segment['end']
        text = segment['text']
        matched = False

        for turn, _, speaker in diarization.itertracks(yield_label=True):
            overlap = min(end_time, turn.end) - max(start_time, turn.start)
            if overlap > 0:  # 重なりがある場合にマッチと判定
                if current_speaker == speaker:
                    # Append to the current segment
                    current_segment['end'] = end_time
                    current_segment['text'] += " " + text
                else:
                    # Save the current segment and start a new one
                    if current_segment:
                        speaker_segments.append(current_segment)
                    current_speaker = speaker
                    current_segment = {
                        "start": start_time,
                        "end": end_time,
                        "speaker": speaker,
                        "text": text
                    }
                matched = True
                break
        
        if not matched:
            print(f"No matching speaker found for segment: {text} [{start_time} - {end_time}]")
            unmatched_segments.append({
                "start": start_time,
                "end": end_time,
                "speaker": "UNKNOWN",
                "text": text
            })

    if current_segment:
        speaker_segments.append(current_segment)

    # Merge unmatched segments into speaker_segments
    all_segments = speaker_segments + unmatched_segments
    all_segments.sort(key=lambda x: x['start'])  # Sort by start time

    print("Speakers assigned to segments")
    return all_segments

def count_tokens(text):
    encoding = tiktoken.encoding_for_model("gpt-4-turbo")
    return len(encoding.encode(text))

def split_transcript(transcript, max_tokens_per_chunk):
    words = transcript.split()
    chunks = []
    current_chunk = []
    current_chunk_tokens = 0

    for word in words:
        word_tokens = count_tokens(word)
        if current_chunk_tokens + word_tokens > max_tokens_per_chunk:
            chunks.append(" ".join(current_chunk))
            current_chunk = [word]
            current_chunk_tokens = word_tokens
        else:
            current_chunk.append(word)
            current_chunk_tokens += word_tokens

    if current_chunk:
        chunks.append(" ".join(current_chunk))

    return chunks

def generate_minutes(speaker_segments):
    print("Generating meeting minutes using OpenAI API")
    client = OpenAI(api_key=OPENAI_API_KEY)

    transcript = "\n".join([f"Speaker {seg['speaker']} [{seg['start']} - {seg['end']}]: {seg['text']}" for seg in speaker_segments])
    max_tokens_per_chunk = 4096  # Adjust this based on your token limits and model used

    chunks = split_transcript(transcript, max_tokens_per_chunk)
    all_minutes = []

    for chunk in chunks:
        messages = [
            {"role": "system", "content": "以下の会話の議事録を作成してください。"},
            {"role": "user", "content": chunk}
        ]

        try:
            response = client.chat.completions.create(
                messages=messages,
                model="gpt-4",
                max_tokens=4096,  # Adjust the max tokens as necessary
                n=1,
                stop=None,
                temperature=0.5,
            )

            if response.choices and len(response.choices) > 0:
                minutes = response.choices[0].message.content.strip()
                all_minutes.append(minutes)
            else:
                print("No choices in the response")
                return None

        except Exception as e:
            print(f"An error occurred: {e}")
            return None

    return "\n".join(all_minutes)

def parse_arguments():
    parser = argparse.ArgumentParser(description="Speech to Text with Speaker Diarization")
    parser.add_argument("audio_file", help="Path to the audio or video file")
    parser.add_argument("--model", choices=["tiny", "base", "small", "medium", "large", "large-v2", "large-v3"], 
                        default="base", help="Faster-whisper model to use")
    parser.add_argument("--num_speakers", type=int, help="Number of speakers (if known)")
    parser.add_argument("--min_speakers", type=int, help="Minimum number of speakers")
    parser.add_argument("--max_speakers", type=int, help="Maximum number of speakers")
    parser.add_argument("--beam_size", type=int, default=5, help="Beam size for transcription")
    parser.add_argument("--compute_type", choices=["float16", "float32", "int8"], 
                        default="float16", help="Compute type for the model")
    return parser.parse_args()

def main(args):
    print(f"Processing file: {args.audio_file}")
    file_ext = os.path.splitext(args.audio_file)[1].lower()
    if file_ext == ".mp4":
        audio_file = extract_audio_from_video(args.audio_file)
    else:
        audio_file = args.audio_file
    
    if file_ext in [".mp3", ".mp4", ".wav"]:
        audio_file = convert_to_wav(audio_file)
    
    transcription, segments = transcribe_audio(audio_file, args.model, args.beam_size, args.compute_type)
    print(f"Transcription: {transcription}")
    diarization = diarize_audio(audio_file, num_speakers=args.num_speakers, 
                                min_speakers=args.min_speakers, max_speakers=args.max_speakers)
    speaker_segments = assign_speakers_to_segments(segments, diarization)

    output_file = os.path.splitext(audio_file)[0] + "_output.txt"
    with open(output_file, 'w', encoding='utf-8') as f:
        f.write("Speakers assigned to segments\n")
        for segment in speaker_segments:
            segment_text = f"Speaker {segment['speaker']} [{segment['start']} - {segment['end']}]: {segment['text']}"
            print(segment_text)
            f.write(segment_text + "\n")
    
    print(f"Results written to {output_file}")

    minutes = generate_minutes(speaker_segments)
    if minutes is not None:
        minutes_file = os.path.splitext(audio_file)[0] + "_minutes.txt"
        with open(minutes_file, 'w', encoding='utf-8') as f:
            f.write(minutes)
        print(f"Meeting minutes written to {minutes_file}")
    else:
        print("Failed to generate meeting minutes.")

if __name__ == "__main__":
    args = parse_arguments()
    main(args)

.env作成

nano .env
OPENAI_API_KEY=sk-XXXXXXXXXXXXXXXXXXXXXX
HF_TOKEN=hf_ZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ

使用方法

ヘルプ

python faster_whisper_script.py --help
usage: faster_whisper_script.py [-h] [--model {tiny,base,small,medium,large,large-v2,large-v3}] [--num_speakers NUM_SPEAKERS] [--min_speakers MIN_SPEAKERS] [--max_speakers MAX_SPEAKERS]
                                [--beam_size BEAM_SIZE] [--compute_type {float16,float32,int8}]
                                audio_file

Speech to Text with Speaker Diarization

positional arguments:
  audio_file            Path to the audio or video file

options:
  -h, --help            show this help message and exit
  --model {tiny,base,small,medium,large,large-v2,large-v3}
                        Faster-whisper model to use
  --num_speakers NUM_SPEAKERS
                        Number of speakers (if known)
  --min_speakers MIN_SPEAKERS
                        Minimum number of speakers
  --max_speakers MAX_SPEAKERS
                        Maximum number of speakers
  --beam_size BEAM_SIZE
                        Beam size for transcription
  --compute_type {float16,float32,int8}
                        Compute type for the model

ファイルをWindowsからコンテナにコピー

# on ubuntu command line
# docker container ls -aでコンテナ名を調べる
docker cp "/mnt/c/Windows_path/VVVV.wav" docker_container_name:root/speech_to_text

使用例

python faster_whisper_script.py XXXX.wav --model large-v3 --min_speakers 1 --max_speakers 5

ファイルをコンテナからWindowsにコピー

docker cp docker_container_name:root/speech_to_text/xxx.txt "/mnt/c/Windows path/"

まとめ

このPythonスクリプトは、最新のAI技術を組み合わせることで、音声や動画から自動的に高品質な議事録を作成することができます。これにより、会議の記録や講義の文字起こしなどの作業を大幅に効率化できます。

今回はFaster-Whisperを使ったことで処理時間が大幅に短く出来ました。

これで教育・研究活動に時間が使えるようになりハッピーになると良いですね。

AIの進歩は目覚ましいものがあります。皆さんも、このようなAI技術を活用して、より効率的な作業環境を作り上げていってください!

質問や改善点があれば、ぜひコメントで教えてください。

この記事が気に入ったらサポートをしてみませんか?