Creating Meeting Minutes with Whisper-Pyannote-ChatGPT
Japanease version is below.
Accomplishments
It is common practice to generate minutes after conducting a meeting. This task can be quite burdensome and tedious when done manually, especially considering that these minutes often go unread. To address this issue, I will introduce a Python script that can create meeting minutes from audio or video files.
This script utilizes three powerful AI models: Whisper (for speech recognition), Pyannote (for speaker identification), and ChatGPT (for text generation).
Script Overview
The script operates in the following sequence:
Loading and preprocessing of audio/video files
Transcription of audio using Whisper
Speaker identification using Pyannote
Integration of transcription and speaker information
Generation of minutes using ChatGPT
Let's examine each part in detail. But first, let's discuss the environment setup. I am running this on Ubuntu 24.04 on WSL with an RTX 3070Ti GPU.
Obtaining OpenAI API Key
Create an account on the OpenAI website and acquire an API key. For detailed instructions, please refer to this article:
https://qiita.com/kofumi/items/16a9a501ffc8dd49da50
Environment Setup
Obtaining Hugging Face Access Token
Obtaining Hugging Face Access Token
https://zenn.dev/protoout/articles/73-hugging-face-setup
Installing Docker Engine
Follow the official Docker documentation for Ubuntu installation:
https://docs.docker.com/engine/install/ubuntu/
Remove potentially old Docker installations
for pkg in docker.io docker-doc docker-compose docker-compose-v2 podman-docker containerd runc; do sudo apt remove $pkg; done
Install Docker Engine
sudo apt update
sudo apt install ca-certificates curl
sudo install -m 0755 -d /etc/apt/keyrings
sudo curl -fsSL https://download.docker.com/linux/ubuntu/gpg -o /etc/apt/keyrings/docker.asc
sudo chmod a+r /etc/apt/keyrings/docker.asc
echo \
"deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] https://download.docker.com/linux/ubuntu \
$(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \
sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
sudo apt update
sudo apt install docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin
sudo docker run hello-world
docker -v
wsl -—shutdown
Enable Docker command without sudo
sudo groupadd docker
sudo usermod -aG docker usr
wsl --shutdown
Set Docker to start automatically
To avoid manually starting Docker after each reboot:
sudo visudo
Add the following line at the end:
docker ALL=(ALL) NOPASSWD: /usr/sbin/service docker start
sudo nano ~/.bashrc
Add the following at the end:
if [[ $(service docker status | awk '{print $4}') = "not" ]]; then
sudo service docker start > /dev/null
fi
source ~/.bashrc
Installing NVIDIA Docker
curl -fsSL https://nvidia.github.io/libnvidia-container/gpgkey | sudo gpg --dearmor -o /usr/share/keyrings/nvidia-container-toolkit-keyring.gpg \
&& curl -s -L https://nvidia.github.io/libnvidia-container/stable/deb/nvidia-container-toolkit.list | \
sed 's#deb https://#deb [signed-by=/usr/share/keyrings/nvidia-container-toolkit-keyring.gpg] https://#g' | \
sudo tee /etc/apt/sources.list.d/nvidia-container-toolkit.list
sudo apt update
sudo apt install -y nvidia-container-toolkit
sudo nvidia-ctk runtime configure --runtime=docker
sudo systemctl restart docker
Creating Docker Container
In your Ubuntu on WSL home folder:
CUDA | NVIDIA NGC
docker pull nvcr.io/nvidia/cuda:11.8.0-cudnn8-devel-ubuntu22.04
docker run -it --gpus all nvcr.io/nvidia/cuda:11.8.0-cudnn8-devel-ubuntu22.04
apt update && apt full-upgrade -y
apt install git wget nano ffmpeg -y
Installing Miniconda
cd ~
mkdir tmp
cd tmp
https://docs.anaconda.com/miniconda/#miniconda-latest-installer-links
wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
bash Miniconda3-latest-Linux-x86_64.sh
# yes, enter, yes
# remove a temp folder
cd ..
rm -rf tmp
exit
docker container ls -a
docker start <container id>
docker exec -it <container id> /bin/bash
Setting up Conda Environment
mkdir speech_to_text
cd speech_to_text
nano environment_speech_to_text.yml
name: speech-to-text
channels:
- conda-forge
- pytorch
- nvidia
- defaults
dependencies:
- python=3.9
- pip
- cudatoolkit=11.8
- tiktoken
- pip:
- openai-whisper
- pyannote.audio
- pydub==0.25.1
- torch
- torchvision
- torchaudio
- moviepy
- openai
- python-dotenv
conda env create -f environment_speech_to_text.yml
conda activate speech-to-text
Script
nano speech_to_text.py
import os
import subprocess
import whisper
from pyannote.audio import Pipeline
from pydub import AudioSegment
from moviepy.editor import VideoFileClip
import torch
from openai import OpenAI
from dotenv import load_dotenv
import argparse
import tiktoken
def get_env_variable(var_name):
value = os.getenv(var_name)
if value is None:
raise ValueError(f"Environment variable {var_name} is not set")
return value
load_dotenv()
OPENAI_API_KEY = get_env_variable("OPENAI_API_KEY")
HF_TOKEN = get_env_variable("HF_TOKEN")
def get_device():
return torch.device('cuda' if torch.cuda.is_available() else 'cpu')
def convert_to_wav(input_file):
file_ext = os.path.splitext(input_file)[1].lower()
if file_ext in [".mp3", ".mp4", ".wma"]:
print(f"Converting {input_file} to WAV")
wav_file_path = input_file.replace(file_ext, ".wav")
if file_ext == ".wma":
try:
subprocess.run(['ffmpeg', '-i', input_file, wav_file_path], check=True)
print(f"Converted file saved as {wav_file_path}")
except subprocess.CalledProcessError:
raise ValueError(f"Error converting WMA file: {input_file}")
else:
audio = AudioSegment.from_file(input_file, format=file_ext[1:])
audio.export(wav_file_path, format="wav")
print(f"Converted file saved as {wav_file_path}")
return wav_file_path
elif file_ext == ".wav":
return input_file
else:
raise ValueError(f"Unsupported file format: {file_ext}")
def extract_audio_from_video(video_file):
audio_file = video_file.replace(".mp4", ".wav")
print(f"Extracting audio from {video_file}")
try:
video = VideoFileClip(video_file)
audio = video.audio
audio.write_audiofile(audio_file)
video.close()
audio.close()
print(f"Audio extracted and saved as {audio_file}")
except Exception as e:
print(f"An error occurred while extracting audio: {str(e)}")
raise
return audio_file
def transcribe_audio(file_path, model_name, max_chunk_size=600):
print(f"Transcribing audio from {file_path} using model {model_name}")
device = get_device()
model = whisper.load_model(model_name, device=device)
audio = AudioSegment.from_wav(file_path)
duration = len(audio) / 1000 # Duration in seconds
chunk_size = min(max_chunk_size, duration)
transcriptions = []
segments = []
for i in range(0, int(duration), int(chunk_size)):
chunk = audio[i*1000:(i+int(chunk_size))*1000]
chunk_path = f"temp_chunk_{i}.wav"
try:
chunk.export(chunk_path, format="wav")
result = model.transcribe(chunk_path)
transcriptions.append(result["text"])
for segment in result["segments"]:
segment["start"] += i
segment["end"] += i
segments.append(segment)
finally:
if os.path.exists(chunk_path):
os.remove(chunk_path)
transcription = " ".join(transcriptions)
print(f"Transcription completed")
return transcription, segments
def diarize_audio(file_path, num_speakers=None, min_speakers=None, max_speakers=None):
print(f"Performing speaker diarization on {file_path}")
device = get_device()
print(f"Using device: {device}")
try:
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token=HF_TOKEN)
pipeline.to(device)
except Exception as e:
print(f"Failed to load the pipeline: {e}")
return None
if num_speakers is not None:
diarization = pipeline(file_path, num_speakers=num_speakers)
elif min_speakers is not None and max_speakers is not None:
diarization = pipeline(file_path, min_speakers=min_speakers, max_speakers=max_speakers)
else:
diarization = pipeline(file_path)
print(f"Speaker diarization completed: {diarization}")
return diarization
def assign_speakers_to_segments(segments, diarization):
if diarization is None:
print("Diarization is None. Skipping speaker assignment.")
return []
print("Assigning speakers to segments")
speaker_segments = []
current_speaker = None
current_segment = None
unmatched_segments = []
for segment in segments:
start_time = segment['start']
end_time = segment['end']
text = segment['text']
matched = False
for turn, _, speaker in diarization.itertracks(yield_label=True):
overlap = min(end_time, turn.end) - max(start_time, turn.start)
if overlap > 0:
if current_speaker == speaker:
# Append to the current segment
current_segment['end'] = end_time
current_segment['text'] += " " + text
else:
# Save the current segment and start a new one
if current_segment:
speaker_segments.append(current_segment)
current_speaker = speaker
current_segment = {
"start": start_time,
"end": end_time,
"speaker": speaker,
"text": text
}
matched = True
break
if not matched:
print(f"No matching speaker found for segment: {text} [{start_time} - {end_time}]")
unmatched_segments.append({
"start": start_time,
"end": end_time,
"speaker": "UNKNOWN",
"text": text
})
if current_segment:
speaker_segments.append(current_segment)
# Merge unmatched segments into speaker_segments
all_segments = speaker_segments + unmatched_segments
all_segments.sort(key=lambda x: x['start']) # Sort by start time
print("Speakers assigned to segments")
return all_segments
def count_tokens(text):
encoding = tiktoken.encoding_for_model("gpt-4-turbo")
return len(encoding.encode(text))
def split_transcript(transcript, max_tokens_per_chunk):
words = transcript.split()
chunks = []
current_chunk = []
current_chunk_tokens = 0
for word in words:
word_tokens = count_tokens(word)
if current_chunk_tokens + word_tokens > max_tokens_per_chunk:
chunks.append(" ".join(current_chunk))
current_chunk = [word]
current_chunk_tokens = word_tokens
else:
current_chunk.append(word)
current_chunk_tokens += word_tokens
if current_chunk:
chunks.append(" ".join(current_chunk))
return chunks
def generate_minutes(speaker_segments):
print("Generating meeting minutes using OpenAI API")
client = OpenAI(api_key=OPENAI_API_KEY)
transcript = "\n".join([f"Speaker {seg['speaker']} [{seg['start']} - {seg['end']}]: {seg['text']}" for seg in speaker_segments])
max_tokens_per_chunk = 4096 # Adjust this based on your token limits and model used
chunks = split_transcript(transcript, max_tokens_per_chunk)
all_minutes = []
for chunk in chunks:
messages = [
{"role": "system", "content": "以下の会話の議事録を作成してください。"},
{"role": "user", "content": chunk}
]
try:
response = client.chat.completions.create(
messages=messages,
model="gpt-4o-mini",
max_tokens=4096, # Adjust the max tokens as necessary
n=1,
stop=None,
temperature=0.5,
)
if response.choices and len(response.choices) > 0:
minutes = response.choices[0].message.content.strip()
all_minutes.append(minutes)
else:
print("No choices in the response")
return None
except Exception as e:
print(f"An error occurred: {e}")
return None
return "\n".join(all_minutes)
def parse_arguments():
parser = argparse.ArgumentParser(description="Speech to Text with Speaker Diarization")
parser.add_argument("audio_file", help="Path to the audio or video file")
parser.add_argument("--model", choices=["tiny", "base", "small", "medium", "large", "large-v2", "large-v3"],
default="base", help="Whisper model to use")
parser.add_argument("--num_speakers", type=int, help="Number of speakers (if known)")
parser.add_argument("--min_speakers", type=int, help="Minimum number of speakers")
parser.add_argument("--max_speakers", type=int, help="Maximum number of speakers")
return parser.parse_args()
def main(args):
print(f"Processing file: {args.audio_file}")
file_ext = os.path.splitext(args.audio_file)[1].lower()
if file_ext == ".mp4":
audio_file = extract_audio_from_video(args.audio_file)
else:
audio_file = args.audio_file
if file_ext in [".mp3", ".mp4", ".wav"]:
audio_file = convert_to_wav(audio_file)
transcription, segments = transcribe_audio(audio_file, args.model)
print(f"Transcription: {transcription}")
diarization = diarize_audio(audio_file, num_speakers=args.num_speakers,
min_speakers=args.min_speakers, max_speakers=args.max_speakers)
speaker_segments = assign_speakers_to_segments(segments, diarization)
output_file = os.path.splitext(audio_file)[0] + "_output.txt"
with open(output_file, 'w', encoding='utf-8') as f:
f.write("Speakers assigned to segments\n")
for segment in speaker_segments:
segment_text = f"Speaker {segment['speaker']} [{segment['start']} - {segment['end']}]: {segment['text']}"
print(segment_text)
f.write(segment_text + "\n")
print(f"Results written to {output_file}")
minutes = generate_minutes(speaker_segments)
if minutes is not None:
minutes_file = os.path.splitext(audio_file)[0] + "_minutes.txt"
with open(minutes_file, 'w', encoding='utf-8') as f:
f.write(minutes)
print(f"Meeting minutes written to {minutes_file}")
else:
print("Failed to generate meeting minutes.")
if __name__ == "__main__":
args = parse_arguments()
main(args)
Create a .env file:
nano .env
OPENAI_API_KEY=sk-XXXXXXXXXXXXXXXXXXXXXX
HF_TOKEN=hf_ZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ
Usage
The help message:
python speech_to_text.py --help
usage: speech_to_text.py [-h] [--model {tiny,base,small,medium,large,large-v2,large-v3}] [--num_speakers NUM_SPEAKERS] [--min_speakers MIN_SPEAKERS] [--max_speakers MAX_SPEAKERS]
audio_file
Speech to Text with Speaker Diarization
positional arguments:
audio_file Path to the audio or video file
optional arguments:
-h, --help show this help message and exit
--model {tiny,base,small,medium,large,large-v2,large-v3}
Whisper model to use
--num_speakers NUM_SPEAKERS
Number of speakers (if known)
--min_speakers MIN_SPEAKERS
Minimum number of speakers
--max_speakers MAX_SPEAKERS
Maximum number of speakers
To copy files from Windows to the container:
# On Ubuntu command line
# Use docker container ls -a to find the container name
docker cp "/mnt/c/Windows_path/VVVV.wav" docker_container_name:root/speech_to_text
Example usage:
python speech_to_text.py VVVSSS0.mp4 --model large-v3 --min_speakers 1 --max_speakers 2
To copy files from the container to Windows:
docker cp docker_container_name:root/speech_to_text/xxx.txt "/mnt/c/Windows path/"
Conclusion
This Python script combines cutting-edge AI technologies to automatically generate high-quality minutes from audio or video files. This can significantly improve the efficiency of tasks such as recording meetings or transcribing lectures.
The hope is that this will allow more time for educational and research activities, leading to increased productivity and satisfaction.
The progress of AI is remarkable. I encourage you to utilize such AI technologies to create more efficient work environments!
If you have any questions or suggestions for improvement, please feel free to leave a comment.