Whisper-Pyannote-ChatGPTで議事録を作成する
English version is below.
実現したこと
会議をすると議事録作成が求められることが一般的だと思います。手動でやるにはナカナカ重いタスクかつめんどくさい(誰も読まないのに)ということかとおもいます。そこで、音声ファイルあるいや動画ファイルから議事録を作成するPythonスクリプトを紹介します。
このスクリプトは、Whisper(音声認識)、Pyannote(話者識別)、ChatGPT(テキスト生成)という3つの強力なAIモデルを使用しています。
スクリプトの概要
このスクリプトは以下の手順で動作します:
音声/動画ファイルの読み込みと前処理
Whisperによる音声の文字起こし
Pyannoteによる話者の識別
文字起こしと話者情報の統合
ChatGPTによる議事録の生成
それでは、各部分を詳しく見ていきましょう。
と、その前に環境構築から。私はRTX 3070Tiを積んでUbuntu 24.04 on WSL上で動かしています。
OpanAI APIキー取得
OpenAIのウェブサイトでアカウントを作成し、APIキーを取得します。詳細な手順はこちらの記事を参照してください。
環境構築
Hugging faceアクセストークン取得
こちらも良記事を参考にして下さい。
Docker engineインストール
インストールされている可能性のある古いDockerを削除
for pkg in docker.io docker-doc docker-compose docker-compose-v2 podman-docker containerd runc; do sudo apt remove $pkg; done
Docker engineのインストール
sudo apt update
sudo apt install ca-certificates curl
sudo install -m 0755 -d /etc/apt/keyrings
sudo curl -fsSL https://download.docker.com/linux/ubuntu/gpg -o /etc/apt/keyrings/docker.asc
sudo chmod a+r /etc/apt/keyrings/docker.asc
echo \
"deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] https://download.docker.com/linux/ubuntu \
$(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \
sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
sudo apt update
sudo apt install docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin
sudo docker run hello-world
docker -v
wsl -—shutdown
sudoなしでdockerコマンドを使えるようにする
sudo groupadd docker
sudo usermod -aG docker User
wsl --shutdown
Dockerを常時起動にする
再起動するたびにDockerを立ち上げるのがめんどくさいので設定する。
sudo visudo
最終行に下記を加える
docker ALL=(ALL) NOPASSWD: /usr/sbin/service docker start
sudo nano ~/.bashrc
最終行に下記を追記する。
if [[ $(service docker status | awk '{print $4}') = "not" ]]; then
sudo service docker start > /dev/null
fi
source ~/.bashrc
NVIDIA dockerインストール
curl -fsSL https://nvidia.github.io/libnvidia-container/gpgkey | sudo gpg --dearmor -o /usr/share/keyrings/nvidia-container-toolkit-keyring.gpg \
&& curl -s -L https://nvidia.github.io/libnvidia-container/stable/deb/nvidia-container-toolkit.list | \
sed 's#deb https://#deb [signed-by=/usr/share/keyrings/nvidia-container-toolkit-keyring.gpg] https://#g' | \
sudo tee /etc/apt/sources.list.d/nvidia-container-toolkit.list
sudo apt update
sudo apt install -y nvidia-container-toolkit
sudo nvidia-ctk runtime configure --runtime=docker
sudo systemctl restart docker
Docker container作製
Ubuntu on WSL
ホームフォルダで。
CUDA | NVIDIA NGC
docker pull nvcr.io/nvidia/cuda:11.8.0-cudnn8-devel-ubuntu22.04
docker run -it --gpus all nvcr.io/nvidia/cuda:11.8.0-cudnn8-devel-ubuntu22.04
apt update && apt full-upgrade -y
apt install git wget nano ffmpeg -y
minicondaインストール
cd ~
mkdir tmp
cd tmp
https://docs.anaconda.com/miniconda/#miniconda-latest-installer-links
linux 64-bitのリンクをコピー
wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
bash Miniconda3-latest-Linux-x86_64.sh
# yes, enter, yes
# tmpフォルダを削除
cd ..
rm -rf tmp
exit
docker container ls -a
docker start <container id>
docker exec -it <container id> /bin/bash
conda環境構築
mkdir speech_to_text
cd speech_to_text
nano environment_speech_to_text.yml
name: speech-to-text
channels:
- conda-forge
- pytorch
- nvidia
- defaults
dependencies:
- python=3.9
- pip
- cudatoolkit=11.8
- tiktoken
- pip:
- openai-whisper
- pyannote.audio
- pydub==0.25.1
- torch
- torchvision
- torchaudio
- moviepy
- openai
- python-dotenv
conda env create -f environment_speech_to_text.yml
conda activate speech-to-text
スクリプト
nano speech_to_text.py
import os
import subprocess
import whisper
from pyannote.audio import Pipeline
from pydub import AudioSegment
from moviepy.editor import VideoFileClip
import torch
from openai import OpenAI
from dotenv import load_dotenv
import argparse
import tiktoken
def get_env_variable(var_name):
value = os.getenv(var_name)
if value is None:
raise ValueError(f"Environment variable {var_name} is not set")
return value
load_dotenv()
OPENAI_API_KEY = get_env_variable("OPENAI_API_KEY")
HF_TOKEN = get_env_variable("HF_TOKEN")
def get_device():
return torch.device('cuda' if torch.cuda.is_available() else 'cpu')
def convert_to_wav(input_file):
file_ext = os.path.splitext(input_file)[1].lower()
if file_ext in [".mp3", ".mp4", ".wma"]:
print(f"Converting {input_file} to WAV")
wav_file_path = input_file.replace(file_ext, ".wav")
if file_ext == ".wma":
try:
subprocess.run(['ffmpeg', '-i', input_file, wav_file_path], check=True)
print(f"Converted file saved as {wav_file_path}")
except subprocess.CalledProcessError:
raise ValueError(f"Error converting WMA file: {input_file}")
else:
audio = AudioSegment.from_file(input_file, format=file_ext[1:])
audio.export(wav_file_path, format="wav")
print(f"Converted file saved as {wav_file_path}")
return wav_file_path
elif file_ext == ".wav":
return input_file
else:
raise ValueError(f"Unsupported file format: {file_ext}")
def extract_audio_from_video(video_file):
audio_file = video_file.replace(".mp4", ".wav")
print(f"Extracting audio from {video_file}")
try:
video = VideoFileClip(video_file)
audio = video.audio
audio.write_audiofile(audio_file)
video.close()
audio.close()
print(f"Audio extracted and saved as {audio_file}")
except Exception as e:
print(f"An error occurred while extracting audio: {str(e)}")
raise
return audio_file
def transcribe_audio(file_path, model_name, max_chunk_size=600):
print(f"Transcribing audio from {file_path} using model {model_name}")
device = get_device()
model = whisper.load_model(model_name, device=device)
audio = AudioSegment.from_wav(file_path)
duration = len(audio) / 1000 # Duration in seconds
chunk_size = min(max_chunk_size, duration)
transcriptions = []
segments = []
for i in range(0, int(duration), int(chunk_size)):
chunk = audio[i*1000:(i+int(chunk_size))*1000]
chunk_path = f"temp_chunk_{i}.wav"
try:
chunk.export(chunk_path, format="wav")
result = model.transcribe(chunk_path)
transcriptions.append(result["text"])
for segment in result["segments"]:
segment["start"] += i
segment["end"] += i
segments.append(segment)
finally:
if os.path.exists(chunk_path):
os.remove(chunk_path)
transcription = " ".join(transcriptions)
print(f"Transcription completed")
return transcription, segments
def diarize_audio(file_path, num_speakers=None, min_speakers=None, max_speakers=None):
print(f"Performing speaker diarization on {file_path}")
device = get_device()
print(f"Using device: {device}")
try:
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token=HF_TOKEN)
pipeline.to(device)
except Exception as e:
print(f"Failed to load the pipeline: {e}")
return None
if num_speakers is not None:
diarization = pipeline(file_path, num_speakers=num_speakers)
elif min_speakers is not None and max_speakers is not None:
diarization = pipeline(file_path, min_speakers=min_speakers, max_speakers=max_speakers)
else:
diarization = pipeline(file_path)
print(f"Speaker diarization completed: {diarization}")
return diarization
def assign_speakers_to_segments(segments, diarization):
if diarization is None:
print("Diarization is None. Skipping speaker assignment.")
return []
print("Assigning speakers to segments")
speaker_segments = []
current_speaker = None
current_segment = None
unmatched_segments = []
for segment in segments:
start_time = segment['start']
end_time = segment['end']
text = segment['text']
matched = False
for turn, _, speaker in diarization.itertracks(yield_label=True):
overlap = min(end_time, turn.end) - max(start_time, turn.start)
if overlap > 0:
if current_speaker == speaker:
# Append to the current segment
current_segment['end'] = end_time
current_segment['text'] += " " + text
else:
# Save the current segment and start a new one
if current_segment:
speaker_segments.append(current_segment)
current_speaker = speaker
current_segment = {
"start": start_time,
"end": end_time,
"speaker": speaker,
"text": text
}
matched = True
break
if not matched:
print(f"No matching speaker found for segment: {text} [{start_time} - {end_time}]")
unmatched_segments.append({
"start": start_time,
"end": end_time,
"speaker": "UNKNOWN",
"text": text
})
if current_segment:
speaker_segments.append(current_segment)
# Merge unmatched segments into speaker_segments
all_segments = speaker_segments + unmatched_segments
all_segments.sort(key=lambda x: x['start']) # Sort by start time
print("Speakers assigned to segments")
return all_segments
def count_tokens(text):
encoding = tiktoken.encoding_for_model("gpt-4-turbo")
return len(encoding.encode(text))
def split_transcript(transcript, max_tokens_per_chunk):
words = transcript.split()
chunks = []
current_chunk = []
current_chunk_tokens = 0
for word in words:
word_tokens = count_tokens(word)
if current_chunk_tokens + word_tokens > max_tokens_per_chunk:
chunks.append(" ".join(current_chunk))
current_chunk = [word]
current_chunk_tokens = word_tokens
else:
current_chunk.append(word)
current_chunk_tokens += word_tokens
if current_chunk:
chunks.append(" ".join(current_chunk))
return chunks
def generate_minutes(speaker_segments):
print("Generating meeting minutes using OpenAI API")
client = OpenAI(api_key=OPENAI_API_KEY)
transcript = "\n".join([f"Speaker {seg['speaker']} [{seg['start']} - {seg['end']}]: {seg['text']}" for seg in speaker_segments])
max_tokens_per_chunk = 4096 # Adjust this based on your token limits and model used
chunks = split_transcript(transcript, max_tokens_per_chunk)
all_minutes = []
for chunk in chunks:
messages = [
{"role": "system", "content": "以下の会話の議事録を作成してください。"},
{"role": "user", "content": chunk}
]
try:
response = client.chat.completions.create(
messages=messages,
model="gpt-4o-mini",
max_tokens=4096, # Adjust the max tokens as necessary
n=1,
stop=None,
temperature=0.5,
)
if response.choices and len(response.choices) > 0:
minutes = response.choices[0].message.content.strip()
all_minutes.append(minutes)
else:
print("No choices in the response")
return None
except Exception as e:
print(f"An error occurred: {e}")
return None
return "\n".join(all_minutes)
def parse_arguments():
parser = argparse.ArgumentParser(description="Speech to Text with Speaker Diarization")
parser.add_argument("audio_file", help="Path to the audio or video file")
parser.add_argument("--model", choices=["tiny", "base", "small", "medium", "large", "large-v2", "large-v3"],
default="base", help="Whisper model to use")
parser.add_argument("--num_speakers", type=int, help="Number of speakers (if known)")
parser.add_argument("--min_speakers", type=int, help="Minimum number of speakers")
parser.add_argument("--max_speakers", type=int, help="Maximum number of speakers")
return parser.parse_args()
def main(args):
print(f"Processing file: {args.audio_file}")
file_ext = os.path.splitext(args.audio_file)[1].lower()
if file_ext == ".mp4":
audio_file = extract_audio_from_video(args.audio_file)
else:
audio_file = args.audio_file
if file_ext in [".mp3", ".mp4", ".wav"]:
audio_file = convert_to_wav(audio_file)
transcription, segments = transcribe_audio(audio_file, args.model)
print(f"Transcription: {transcription}")
diarization = diarize_audio(audio_file, num_speakers=args.num_speakers,
min_speakers=args.min_speakers, max_speakers=args.max_speakers)
speaker_segments = assign_speakers_to_segments(segments, diarization)
output_file = os.path.splitext(audio_file)[0] + "_output.txt"
with open(output_file, 'w', encoding='utf-8') as f:
f.write("Speakers assigned to segments\n")
for segment in speaker_segments:
segment_text = f"Speaker {segment['speaker']} [{segment['start']} - {segment['end']}]: {segment['text']}"
print(segment_text)
f.write(segment_text + "\n")
print(f"Results written to {output_file}")
minutes = generate_minutes(speaker_segments)
if minutes is not None:
minutes_file = os.path.splitext(audio_file)[0] + "_minutes.txt"
with open(minutes_file, 'w', encoding='utf-8') as f:
f.write(minutes)
print(f"Meeting minutes written to {minutes_file}")
else:
print("Failed to generate meeting minutes.")
if __name__ == "__main__":
args = parse_arguments()
main(args)
.env作成
nano .env
OPENAI_API_KEY=sk-XXXXXXXXXXXXXXXXXXXXXX
HF_TOKEN=hf_ZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ
使用方法
ヘルプ
python speech_to_text.py --help
usage: speech_to_text.py [-h] [--model {tiny,base,small,medium,large,large-v2,large-v3}] [--num_speakers NUM_SPEAKERS] [--min_speakers MIN_SPEAKERS] [--max_speakers MAX_SPEAKERS]
audio_file
Speech to Text with Speaker Diarization
positional arguments:
audio_file Path to the audio or video file
optional arguments:
-h, --help show this help message and exit
--model {tiny,base,small,medium,large,large-v2,large-v3}
Whisper model to use
--num_speakers NUM_SPEAKERS
Number of speakers (if known)
--min_speakers MIN_SPEAKERS
Minimum number of speakers
--max_speakers MAX_SPEAKERS
Maximum number of speakers
ファイルをWindowsからコンテナにコピー
# on ubuntu command line
# docker container ls -aでコンテナ名を調べる
docker cp "/mnt/c/Windows_path/VVVV.wav" docker_container_name:root/speech_to_text
使用例
python speech_to_text.py VVVSSS0.mp4 --model large-v3 --min_speakers 1 --max_speakers 2
ファイルをコンテナからWindowsにコピー
docker cp docker_container_name:root/speech_to_text/xxx.txt "/mnt/c/Windows path/"
まとめ
このPythonスクリプトは、最新のAI技術を組み合わせることで、音声や動画から自動的に高品質な議事録を作成することができます。これにより、会議の記録や講義の文字起こしなどの作業を大幅に効率化できます。
これで教育・研究活動に時間が使えるようになりハッピーになると良いですね。
AIの進歩は目覚ましいものがあります。皆さんも、このようなAI技術を活用して、より効率的な作業環境を作り上げていってください!
質問や改善点があれば、ぜひコメントで教えてください。