Creating Meeting Minutes with Whisper-Pyannote-ChatGPT

Japanease version is below.

Accomplishments

It is common practice to generate minutes after conducting a meeting. This task can be quite burdensome and tedious when done manually, especially considering that these minutes often go unread. To address this issue, I will introduce a Python script that can create meeting minutes from audio or video files.

This script utilizes three powerful AI models: Whisper (for speech recognition), Pyannote (for speaker identification), and ChatGPT (for text generation).

Script Overview

The script operates in the following sequence:

  1. Loading and preprocessing of audio/video files

  2. Transcription of audio using Whisper

  3. Speaker identification using Pyannote

  4. Integration of transcription and speaker information

  5. Generation of minutes using ChatGPT

Let's examine each part in detail. But first, let's discuss the environment setup. I am running this on Ubuntu 24.04 on WSL with an RTX 3070Ti GPU.

Obtaining OpenAI API Key

Create an account on the OpenAI website and acquire an API key. For detailed instructions, please refer to this article:

https://qiita.com/kofumi/items/16a9a501ffc8dd49da50

Environment Setup

Obtaining Hugging Face Access Token

Obtaining Hugging Face Access Token

https://zenn.dev/protoout/articles/73-hugging-face-setup

Installing Docker Engine

Follow the official Docker documentation for Ubuntu installation:

https://docs.docker.com/engine/install/ubuntu/

Remove potentially old Docker installations

for pkg in docker.io docker-doc docker-compose docker-compose-v2 podman-docker containerd runc; do sudo apt remove $pkg; done

Install Docker Engine

sudo apt update
sudo apt install ca-certificates curl
sudo install -m 0755 -d /etc/apt/keyrings
sudo curl -fsSL https://download.docker.com/linux/ubuntu/gpg -o /etc/apt/keyrings/docker.asc
sudo chmod a+r /etc/apt/keyrings/docker.asc
echo \
  "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] https://download.docker.com/linux/ubuntu \
  $(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \
  sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
sudo apt update
sudo apt install docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin
sudo docker run hello-world
docker -v
wsl -—shutdown

Enable Docker command without sudo

sudo groupadd docker
sudo usermod -aG docker rui
wsl --shutdown

Set Docker to start automatically

To avoid manually starting Docker after each reboot:

sudo visudo

Add the following line at the end:

docker ALL=(ALL)  NOPASSWD: /usr/sbin/service docker start
sudo nano ~/.bashrc

Add the following at the end:

if [[ $(service docker status | awk '{print $4}') = "not" ]]; then
sudo service docker start > /dev/null
fi
source ~/.bashrc

Installing NVIDIA Docker

curl -fsSL https://nvidia.github.io/libnvidia-container/gpgkey | sudo gpg --dearmor -o /usr/share/keyrings/nvidia-container-toolkit-keyring.gpg \
&& curl -s -L https://nvidia.github.io/libnvidia-container/stable/deb/nvidia-container-toolkit.list | \
sed 's#deb https://#deb [signed-by=/usr/share/keyrings/nvidia-container-toolkit-keyring.gpg] https://#g' | \
sudo tee /etc/apt/sources.list.d/nvidia-container-toolkit.list
sudo apt update
sudo apt install -y nvidia-container-toolkit
sudo nvidia-ctk runtime configure --runtime=docker
sudo systemctl restart docker

Creating Docker Container

In your Ubuntu on WSL home folder:
CUDA | NVIDIA NGC

docker pull nvcr.io/nvidia/cuda:11.8.0-cudnn8-devel-ubuntu22.04
docker run -it --gpus all nvcr.io/nvidia/cuda:11.8.0-cudnn8-devel-ubuntu22.04
apt update && apt full-upgrade -y
apt install git wget nano ffmpeg -y

Installing Miniconda

cd ~
mkdir tmp
cd tmp

https://docs.anaconda.com/miniconda/#miniconda-latest-installer-links

wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
bash Miniconda3-latest-Linux-x86_64.sh

# yes, enter, yes
# remove a temp folder
cd ..
rm -rf tmp

exit

docker container ls -a

docker start <container id>

docker exec -it <container id> /bin/bash

Setting up Conda Environment

mkdir speech_to_text
cd speech_to_text
nano environment_speech_to_text.yml
name: speech-to-text
channels:
  - conda-forge
  - pytorch
  - nvidia
  - defaults
dependencies:
  - python=3.9
  - pip
  - cudatoolkit=11.8
  - tiktoken
  - pip:
    - openai-whisper
    - pyannote.audio
    - pydub==0.25.1
    - torch
    - torchvision
    - torchaudio
    - moviepy
    - openai
    - python-dotenv
conda env create -f environment_speech_to_text.yml
conda activate speech-to-text

Script

nano speech_to_text.py
import os
import subprocess
import whisper
from pyannote.audio import Pipeline
from pydub import AudioSegment
from moviepy.editor import VideoFileClip
import torch
from openai import OpenAI
from dotenv import load_dotenv
import argparse
import tiktoken

def get_env_variable(var_name):
    value = os.getenv(var_name)
    if value is None:
        raise ValueError(f"Environment variable {var_name} is not set")
    return value

load_dotenv()

OPENAI_API_KEY = get_env_variable("OPENAI_API_KEY")
HF_TOKEN = get_env_variable("HF_TOKEN")

def get_device():
    return torch.device('cuda' if torch.cuda.is_available() else 'cpu')

def convert_to_wav(input_file):
    file_ext = os.path.splitext(input_file)[1].lower()
    if file_ext in [".mp3", ".mp4", ".wma"]:
        print(f"Converting {input_file} to WAV")
        wav_file_path = input_file.replace(file_ext, ".wav")
        
        if file_ext == ".wma":
            try:
                subprocess.run(['ffmpeg', '-i', input_file, wav_file_path], check=True)
                print(f"Converted file saved as {wav_file_path}")
            except subprocess.CalledProcessError:
                raise ValueError(f"Error converting WMA file: {input_file}")
        else:
            audio = AudioSegment.from_file(input_file, format=file_ext[1:])
            audio.export(wav_file_path, format="wav")
            print(f"Converted file saved as {wav_file_path}")
        
        return wav_file_path
    elif file_ext == ".wav":
        return input_file
    else:
        raise ValueError(f"Unsupported file format: {file_ext}")

def extract_audio_from_video(video_file):
    audio_file = video_file.replace(".mp4", ".wav")
    print(f"Extracting audio from {video_file}")
    try:
        video = VideoFileClip(video_file)
        audio = video.audio
        audio.write_audiofile(audio_file)
        video.close()
        audio.close()
        print(f"Audio extracted and saved as {audio_file}")
    except Exception as e:
        print(f"An error occurred while extracting audio: {str(e)}")
        raise
    return audio_file

def transcribe_audio(file_path, model_name, max_chunk_size=600):
    print(f"Transcribing audio from {file_path} using model {model_name}")
    device = get_device()
    model = whisper.load_model(model_name, device=device)
    audio = AudioSegment.from_wav(file_path)
    duration = len(audio) / 1000  # Duration in seconds
    chunk_size = min(max_chunk_size, duration) 

    transcriptions = []
    segments = []

    for i in range(0, int(duration), int(chunk_size)):
        chunk = audio[i*1000:(i+int(chunk_size))*1000]
        chunk_path = f"temp_chunk_{i}.wav"
        try:
            chunk.export(chunk_path, format="wav")
            result = model.transcribe(chunk_path)
            transcriptions.append(result["text"])
            for segment in result["segments"]:
                segment["start"] += i
                segment["end"] += i
                segments.append(segment)
        finally:
            if os.path.exists(chunk_path):
                os.remove(chunk_path)

    transcription = " ".join(transcriptions)
    print(f"Transcription completed")
    return transcription, segments

def diarize_audio(file_path, num_speakers=None, min_speakers=None, max_speakers=None):
    print(f"Performing speaker diarization on {file_path}")
    device = get_device()
    print(f"Using device: {device}")
    
    try:
        pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token=HF_TOKEN)
        pipeline.to(device)
    except Exception as e:
        print(f"Failed to load the pipeline: {e}")
        return None
    
    if num_speakers is not None:
        diarization = pipeline(file_path, num_speakers=num_speakers)
    elif min_speakers is not None and max_speakers is not None:
        diarization = pipeline(file_path, min_speakers=min_speakers, max_speakers=max_speakers)
    else:
        diarization = pipeline(file_path)

    print(f"Speaker diarization completed: {diarization}")
    return diarization

def assign_speakers_to_segments(segments, diarization):
    if diarization is None:
        print("Diarization is None. Skipping speaker assignment.")
        return []

    print("Assigning speakers to segments")
    speaker_segments = []
    current_speaker = None
    current_segment = None
    unmatched_segments = []

    for segment in segments:
        start_time = segment['start']
        end_time = segment['end']
        text = segment['text']
        matched = False

        for turn, _, speaker in diarization.itertracks(yield_label=True):
            overlap = min(end_time, turn.end) - max(start_time, turn.start)
            if overlap > 0:
                if current_speaker == speaker:
                    # Append to the current segment
                    current_segment['end'] = end_time
                    current_segment['text'] += " " + text
                else:
                    # Save the current segment and start a new one
                    if current_segment:
                        speaker_segments.append(current_segment)
                    current_speaker = speaker
                    current_segment = {
                        "start": start_time,
                        "end": end_time,
                        "speaker": speaker,
                        "text": text
                    }
                matched = True
                break
        
        if not matched:
            print(f"No matching speaker found for segment: {text} [{start_time} - {end_time}]")
            unmatched_segments.append({
                "start": start_time,
                "end": end_time,
                "speaker": "UNKNOWN",
                "text": text
            })

    if current_segment:
        speaker_segments.append(current_segment)

    # Merge unmatched segments into speaker_segments
    all_segments = speaker_segments + unmatched_segments
    all_segments.sort(key=lambda x: x['start'])  # Sort by start time

    print("Speakers assigned to segments")
    return all_segments

def count_tokens(text):
    encoding = tiktoken.encoding_for_model("gpt-4-turbo")
    return len(encoding.encode(text))

def split_transcript(transcript, max_tokens_per_chunk):
    words = transcript.split()
    chunks = []
    current_chunk = []
    current_chunk_tokens = 0

    for word in words:
        word_tokens = count_tokens(word)
        if current_chunk_tokens + word_tokens > max_tokens_per_chunk:
            chunks.append(" ".join(current_chunk))
            current_chunk = [word]
            current_chunk_tokens = word_tokens
        else:
            current_chunk.append(word)
            current_chunk_tokens += word_tokens

    if current_chunk:
        chunks.append(" ".join(current_chunk))

    return chunks

def generate_minutes(speaker_segments):
    print("Generating meeting minutes using OpenAI API")
    client = OpenAI(api_key=OPENAI_API_KEY)

    transcript = "\n".join([f"Speaker {seg['speaker']} [{seg['start']} - {seg['end']}]: {seg['text']}" for seg in speaker_segments])
    max_tokens_per_chunk = 4096  # Adjust this based on your token limits and model used

    chunks = split_transcript(transcript, max_tokens_per_chunk)
    all_minutes = []

    for chunk in chunks:
        messages = [
            {"role": "system", "content": "以下の会話の議事録を作成してください。"},
            {"role": "user", "content": chunk}
        ]

        try:
            response = client.chat.completions.create(
                messages=messages,
                model="gpt-4o-mini",
                max_tokens=4096,  # Adjust the max tokens as necessary
                n=1,
                stop=None,
                temperature=0.5,
            )

            if response.choices and len(response.choices) > 0:
                minutes = response.choices[0].message.content.strip()
                all_minutes.append(minutes)
            else:
                print("No choices in the response")
                return None

        except Exception as e:
            print(f"An error occurred: {e}")
            return None

    return "\n".join(all_minutes)

def parse_arguments():
    parser = argparse.ArgumentParser(description="Speech to Text with Speaker Diarization")
    parser.add_argument("audio_file", help="Path to the audio or video file")
    parser.add_argument("--model", choices=["tiny", "base", "small", "medium", "large", "large-v2", "large-v3"], 
                        default="base", help="Whisper model to use")
    parser.add_argument("--num_speakers", type=int, help="Number of speakers (if known)")
    parser.add_argument("--min_speakers", type=int, help="Minimum number of speakers")
    parser.add_argument("--max_speakers", type=int, help="Maximum number of speakers")
    return parser.parse_args()

def main(args):
    print(f"Processing file: {args.audio_file}")
    file_ext = os.path.splitext(args.audio_file)[1].lower()
    if file_ext == ".mp4":
        audio_file = extract_audio_from_video(args.audio_file)
    else:
        audio_file = args.audio_file
    
    if file_ext in [".mp3", ".mp4", ".wav"]:
        audio_file = convert_to_wav(audio_file)
    
    transcription, segments = transcribe_audio(audio_file, args.model)
    print(f"Transcription: {transcription}")
    diarization = diarize_audio(audio_file, num_speakers=args.num_speakers, 
                                min_speakers=args.min_speakers, max_speakers=args.max_speakers)
    speaker_segments = assign_speakers_to_segments(segments, diarization)

    output_file = os.path.splitext(audio_file)[0] + "_output.txt"
    with open(output_file, 'w', encoding='utf-8') as f:
        f.write("Speakers assigned to segments\n")
        for segment in speaker_segments:
            segment_text = f"Speaker {segment['speaker']} [{segment['start']} - {segment['end']}]: {segment['text']}"
            print(segment_text)
            f.write(segment_text + "\n")
    
    print(f"Results written to {output_file}")

    minutes = generate_minutes(speaker_segments)
    if minutes is not None:
        minutes_file = os.path.splitext(audio_file)[0] + "_minutes.txt"
        with open(minutes_file, 'w', encoding='utf-8') as f:
            f.write(minutes)
        print(f"Meeting minutes written to {minutes_file}")
    else:
        print("Failed to generate meeting minutes.")

if __name__ == "__main__":
    args = parse_arguments()
    main(args)

Create a .env file:

nano .env
OPENAI_API_KEY=sk-XXXXXXXXXXXXXXXXXXXXXX
HF_TOKEN=hf_ZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ

Usage

The help message:

python speech_to_text.py --help
usage: speech_to_text.py [-h] [--model {tiny,base,small,medium,large,large-v2,large-v3}] [--num_speakers NUM_SPEAKERS] [--min_speakers MIN_SPEAKERS] [--max_speakers MAX_SPEAKERS]
                         audio_file

Speech to Text with Speaker Diarization

positional arguments:
  audio_file            Path to the audio or video file

optional arguments:
  -h, --help            show this help message and exit
  --model {tiny,base,small,medium,large,large-v2,large-v3}
                        Whisper model to use
  --num_speakers NUM_SPEAKERS
                        Number of speakers (if known)
  --min_speakers MIN_SPEAKERS
                        Minimum number of speakers
  --max_speakers MAX_SPEAKERS
                        Maximum number of speakers

To copy files from Windows to the container:

# On Ubuntu command line
# Use docker container ls -a to find the container name
docker cp "/mnt/c/Windows_path/VVVV.wav" docker_container_name:root/speech_to_text

Example usage:

python speech_to_text.py VVVSSS0.mp4 --model large-v3 --min_speakers 1 --max_speakers 2

To copy files from the container to Windows:

docker cp docker_container_name:root/speech_to_text/xxx.txt "/mnt/c/Windows path/"

Conclusion

This Python script combines cutting-edge AI technologies to automatically generate high-quality minutes from audio or video files. This can significantly improve the efficiency of tasks such as recording meetings or transcribing lectures.

The hope is that this will allow more time for educational and research activities, leading to increased productivity and satisfaction.

The progress of AI is remarkable. I encourage you to utilize such AI technologies to create more efficient work environments!

If you have any questions or suggestions for improvement, please feel free to leave a comment.

この記事が気に入ったらサポートをしてみませんか?