WhisperとChatGPTを使って動画に対する感想、レポートおよび要約を生成する
実現したこと
コロナを機に様々な研修などがオンラインあるいは動画視聴などでおこなわれるようになりました(便利になったモノだ。遠い目)
問題は動画視聴などの後に感想、レポートや要約などが求められることがあることです。(誰も読まないのに{多大な労力が必要になります。
そこで、これらを自動化するPythonスクリプトを書きましたので紹介します。
このスクリプトは、音声や動画から自動で文字起こしを行い、さらにその内容を要約するPythonスクリプトを紹介します。このスクリプトは、WhisperやPyannote、OpenAI GPTなどの最新のAI技術を組み合わせることで、効率的な音声コンテンツの処理を実現します。
スクリプトの概要
様々な入力形式に対応
・ローカルの音声ファイル(WAV, MP3, M4A, FLAC)
・ローカルの動画ファイル(MP4, AVI, MOV, MKV)
・オンライン動画(YouTubeなど)からの音声抽出
高度な音声処理
・Whisperモデルによる高精度な文字起こし
・Pyannoteによる話者の識別(話者分離)
・長時間音声の自動分割処理
インテリジェントな要約生成
・OpenAI GPTモデルによる要約生成
・カスタマイズ可能な要約スタイル
・トークン制限の自動管理
それでは、各部分を詳しく見ていきましょう。
と、その前に環境構築から。私はRTX 3070Tiを積んでUbuntu 24.04 on WSL上で動かしています。
環境構築
OpanAI APIキー取得
OpenAIのウェブサイトでアカウントを作成し、APIキーを取得します。詳細な手順はこちらの記事を参照してください。
Hugging faceアクセストークン取得
こちらも良記事を参考にして下さい。
Docker engineインストール
インストールされている可能性のある古いDockerを削除
for pkg in docker.io docker-doc docker-compose docker-compose-v2 podman-docker containerd runc; do sudo apt remove $pkg; done
Docker engineのインストール
sudo apt update
sudo apt install ca-certificates curl
sudo install -m 0755 -d /etc/apt/keyrings
sudo curl -fsSL https://download.docker.com/linux/ubuntu/gpg -o /etc/apt/keyrings/docker.asc
sudo chmod a+r /etc/apt/keyrings/docker.asc
echo \
"deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] https://download.docker.com/linux/ubuntu \
$(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \
sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
sudo apt update
sudo apt install docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin
sudo docker run hello-world
docker -v
wsl -—shutdown
sudoなしでdockerコマンドを使えるようにする
sudo groupadd docker
sudo usermod -aG docker rui
wsl --shutdown
Dockerを常時起動にする
再起動するたびにDockerを立ち上げるのがめんどくさいので設定する。
sudo visudo
最終行に下記を加える
docker ALL=(ALL) NOPASSWD: /usr/sbin/service docker start
sudo nano ~/.bashrc
最終行に下記を追記する。
if [[ $(service docker status | awk '{print $4}') = "not" ]]; then
sudo service docker start > /dev/null
fi
source ~/.bashrc
NVIDIA dockerインストール
curl -fsSL https://nvidia.github.io/libnvidia-container/gpgkey | sudo gpg --dearmor -o /usr/share/keyrings/nvidia-container-toolkit-keyring.gpg \
&& curl -s -L https://nvidia.github.io/libnvidia-container/stable/deb/nvidia-container-toolkit.list | \
sed 's#deb https://#deb [signed-by=/usr/share/keyrings/nvidia-container-toolkit-keyring.gpg] https://#g' | \
sudo tee /etc/apt/sources.list.d/nvidia-container-toolkit.list
sudo apt update
sudo apt install -y nvidia-container-toolkit
sudo nvidia-ctk runtime configure --runtime=docker
sudo systemctl restart docker
Docker container作製
Ubuntu on WSL
ホームフォルダで。
docker pull nvcr.io/nvidia/cuda:11.8.0-cudnn8-devel-ubuntu22.04
docker run -it --gpus all nvcr.io/nvidia/cuda:11.8.0-cudnn8-devel-ubuntu22.04
apt update && apt full-upgrade -y
apt install git wget nano ffmpeg -y
minicondaインストール
cd ~
mkdir tmp
cd tmp
linux 64-bitのリンクをコピー
wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
bash Miniconda3-latest-Linux-x86_64.sh
# yes, enter, yes
# tmpフォルダを削除
cd ..
rm -rf tmp
exit
docker container ls -a
docker start <container id>
docker exec -it <container id> /bin/bash
conda環境構築
mkdir movie-to-text
cd movie-to-text
nano movie-to-text.yml
name: movie-to-text
channels:
- conda-forge
- pytorch
- nvidia
- defaults
dependencies:
- python=3.9
- pip
- cudatoolkit=11.8
- tiktoken
- yt-dlp
- pip:
- openai-whisper
- pyannote.audio
- pydub==0.25.1
- torch
- torchvision
- torchaudio
- openai
- python-dotenv
conda env create -f movie-to-text.ymlconda activate speech-to-text
conda activate movie-to-text
スクリプト
nano movie-to-text-chatgpt.py
import os
import subprocess
import whisper
from pyannote.audio import Pipeline
from pydub import AudioSegment
import torch
from openai import OpenAI
from dotenv import load_dotenv
import argparse
import tiktoken
import yt_dlp
def get_env_variable(var_name):
value = os.getenv(var_name)
if value is None:
raise ValueError(f"Environment variable {var_name} is not set")
return value
# Load .env file
load_dotenv()
# Get API keys from environment variables
OPENAI_API_KEY = get_env_variable("OPENAI_API_KEY")
HF_TOKEN = get_env_variable("HF_TOKEN")
def get_device():
return torch.device('cuda' if torch.cuda.is_available() else 'cpu')
def download_or_use_local_file(input_path, output_filename):
if input_path.startswith(('http://', 'https://', 'www.')):
return download_audio(input_path, output_filename)
elif os.path.isfile(input_path):
_, file_extension = os.path.splitext(input_path)
if file_extension.lower() in ['.wav', '.mp3', '.m4a', '.flac']:
return input_path
elif file_extension.lower() in ['.mp4', '.avi', '.mov', '.mkv']:
return extract_audio_from_video(input_path, output_filename)
else:
raise ValueError(f"Unsupported file format: {file_extension}")
else:
raise FileNotFoundError(f"File not found: {input_path}")
def download_audio(video_url, output_filename):
ydl_opts = {
'format': 'bestaudio/best',
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'wav',
'preferredquality': '192',
}],
'outtmpl': output_filename,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
try:
ydl.download([video_url])
print(f"Successfully downloaded audio from {video_url}")
except Exception as e:
print(f"Error downloading from {video_url}: {str(e)}")
return None
actual_filename = output_filename + '.wav'
if os.path.exists(actual_filename):
return actual_filename
elif os.path.exists(output_filename):
return output_filename
else:
raise FileNotFoundError(f"Failed to download audio: neither {actual_filename} nor {output_filename} found")
def extract_audio_from_video(video_path, output_filename):
output_audio = output_filename + '.wav'
command = ['ffmpeg', '-i', video_path, '-vn', '-acodec', 'pcm_s16le', '-ar', '44100', '-ac', '2', output_audio]
subprocess.run(command, check=True)
print(f"Audio extracted from video: {output_audio}")
return output_audio
def transcribe_audio(file_path, model_name, max_chunk_size=600):
print(f"Transcribing audio from {file_path} using model {model_name}")
if not os.path.exists(file_path):
raise FileNotFoundError(f"Audio file not found: {file_path}")
device = get_device()
print(f"Using device: {device}")
model = whisper.load_model(model_name, device=device)
audio = AudioSegment.from_wav(file_path)
duration = len(audio) / 1000 # Duration in seconds
chunk_size = min(max_chunk_size, duration) # Adjust chunk size based on audio length
transcriptions = []
segments = []
for i in range(0, int(duration), int(chunk_size)):
chunk = audio[i*1000:(i+int(chunk_size))*1000]
chunk_path = f"temp_chunk_{i}.wav"
try:
chunk.export(chunk_path, format="wav")
result = model.transcribe(chunk_path)
transcriptions.append(result["text"])
for segment in result["segments"]:
segment["start"] += i
segment["end"] += i
segments.append(segment)
except Exception as e:
print(f"Error processing chunk {i}: {e}")
finally:
if os.path.exists(chunk_path):
os.remove(chunk_path)
transcription = " ".join(transcriptions)
print(f"Transcription completed. Total segments: {len(segments)}")
return transcription, segments
def diarize_audio(file_path, num_speakers=None, min_speakers=None, max_speakers=None):
print(f"Performing speaker diarization on {file_path}")
device = get_device()
print(f"Using device: {device}")
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token=HF_TOKEN)
pipeline.to(device)
diarization_kwargs = {}
if num_speakers is not None:
diarization_kwargs['num_speakers'] = num_speakers
if min_speakers is not None:
diarization_kwargs['min_speakers'] = min_speakers
if max_speakers is not None:
diarization_kwargs['max_speakers'] = max_speakers
diarization = pipeline(file_path, **diarization_kwargs)
print(f"Speaker diarization completed")
return diarization
def assign_speakers_to_segments(segments, diarization):
print("Assigning speakers to segments")
speaker_segments = []
for segment in segments:
start_time = segment['start']
end_time = segment['end']
text = segment['text']
speaker = "UNKNOWN"
if diarization is not None:
for turn, _, spk in diarization.itertracks(yield_label=True):
if turn.start <= start_time < turn.end or turn.start < end_time <= turn.end:
speaker = spk
break
speaker_segments.append({
"start": start_time,
"end": end_time,
"speaker": speaker,
"text": text
})
print("Speakers assigned to segments")
return speaker_segments
def count_tokens(text):
encoding = tiktoken.encoding_for_model("gpt-4")
return len(encoding.encode(text))
def truncate_transcript(transcript, max_input_tokens, system_message):
system_tokens = count_tokens(system_message)
available_tokens = max_input_tokens - system_tokens
words = transcript.split()
truncated = []
current_tokens = 0
for word in words:
word_tokens = count_tokens(word)
if current_tokens + word_tokens > available_tokens:
break
truncated.append(word)
current_tokens += word_tokens
truncated_transcript = " ".join(truncated)
print(f"Truncated transcript from {count_tokens(transcript)} to {count_tokens(truncated_transcript)} tokens")
return truncated_transcript
def generate_summary(speaker_segments, gpt_model="gpt-4o-mini", temperature=0.5):
print(f"Generating summary using OpenAI API with model {gpt_model} and temperature {temperature}")
client = OpenAI(api_key=OPENAI_API_KEY)
full_transcript = "\n".join([f"Speaker {seg['speaker']} [{seg['start']:.2f} - {seg['end']:.2f}]: {seg['text']}" for seg in speaker_segments])
total_tokens = count_tokens(full_transcript)
print(f"Total tokens in transcript: {total_tokens}")
max_input_tokens = 128000
max_output_tokens = 4096
system_message = "あなたは大学薬学部の教員です。以下の会話の要約、レポートと感想の3つを別々にアカデミック調で書いてください。生成する要約、レポートと乾燥はそれぞれA4で1〜2ページ程度の分量としてください。なお、ハルシネーションは絶対にやめてください。"
if total_tokens > max_input_tokens:
print(f"Transcript exceeds {max_input_tokens} tokens. Truncating...")
full_transcript = truncate_transcript(full_transcript, max_input_tokens, system_message)
messages = [
{"role": "system", "content": system_message},
{"role": "user", "content": full_transcript}
]
try:
response = client.chat.completions.create(
messages=messages,
model=gpt_model,
max_tokens=max_output_tokens,
n=1,
stop=None,
temperature=temperature,
)
if response.choices and len(response.choices) > 0:
summary = response.choices[0].message.content.strip()
return summary
else:
print("No choices in the response")
return None
except Exception as e:
print(f"An error occurred: {e}")
return None
def main(input_path, whisper_model="base", num_speakers=None, min_speakers=None, max_speakers=None, gpt_model="gpt-4o-mini", temperature=0.5):
print(f"Processing input: {input_path}")
base_filename = "temp_audio"
try:
audio_filename = download_or_use_local_file(input_path, base_filename)
print(f"Audio file to process: {audio_filename}")
except Exception as e:
print(f"Error processing input file: {e}")
return
try:
transcription, segments = transcribe_audio(audio_filename, whisper_model)
print(f"Transcription completed. Length: {len(transcription)} characters")
except Exception as e:
print(f"Error during transcription: {e}")
return
try:
diarization = diarize_audio(audio_filename, num_speakers=num_speakers,
min_speakers=min_speakers, max_speakers=max_speakers)
print("Speaker diarization completed")
except Exception as e:
print(f"Error during speaker diarization: {e}")
diarization = None
speaker_segments = assign_speakers_to_segments(segments, diarization)
output_file = "transcription_output.txt"
try:
with open(output_file, 'w', encoding='utf-8') as f:
for segment in speaker_segments:
segment_text = f"Speaker {segment['speaker']} [{segment['start']:.2f} - {segment['end']:.2f}]: {segment['text']}"
f.write(segment_text + "\n")
print(f"Transcription results written to {output_file}")
except Exception as e:
print(f"Error writing transcription to file: {e}")
try:
summary = generate_summary(speaker_segments, gpt_model, temperature)
if summary is not None:
summary_file = "summary.txt"
with open(summary_file, 'w', encoding='utf-8') as f:
f.write(summary)
print(f"Summary written to {summary_file}")
else:
print("Failed to generate summary.")
except Exception as e:
print(f"Error generating or saving summary: {e}")
if audio_filename != input_path:
try:
os.remove(audio_filename)
print(f"Temporary audio file {audio_filename} removed")
except Exception as e:
print(f"Error removing temporary audio file: {e}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Video/Audio Transcription and Summarization")
parser.add_argument("input_path", help="Path to local video/audio file or URL of online video")
parser.add_argument("--model", choices=["tiny", "base", "small", "medium", "large", "large-v2", "large-v3"],
default="base", help="Whisper model to use")
parser.add_argument("--num_speakers", type=int, help="Number of speakers (if known)")
parser.add_argument("--min_speakers", type=int, help="Minimum number of speakers")
parser.add_argument("--max_speakers", type=int, help="Maximum number of speakers")
parser.add_argument("--gpt_model", default="gpt-4o-mini",
help="GPT model to use for summarization. Examples: gpt-4o, gpt-4o-mini, gpt-4-turbo, gpt-4, gpt-3.5-turbo")
parser.add_argument("--temperature", type=float, default=0.5,
help="Temperature for GPT model (0.0 to 1.0). Controls randomness in generation. "
"Lower values (e.g., 0.2) make the output more focused and deterministic. "
"Higher values (e.g., 0.8) make the output more creative and diverse. "
"0.5 is a balanced default.")
args = parser.parse_args()
main(args.input_path, args.model, args.num_speakers, args.min_speakers, args.max_speakers, args.gpt_model, args.temperature)
.env作成
nano .env
OPENAI_API_KEY=sk-XXXXXXXXXXXXXXXXXXXXXX
HF_TOKEN=hf_ZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ
使用方法
ヘルプ
python Movie-to-text-to-chatgpt.py --help
usage: Movie-to-text-to-chatgpt.py [-h] [--model {tiny,base,small,medium,large,large-v2,large-v3}]
[--num_speakers NUM_SPEAKERS] [--min_speakers MIN_SPEAKERS]
[--max_speakers MAX_SPEAKERS]
input_path
Video/Audio Transcription and Summarization
positional arguments:
input_path Path to local video/audio file or URL of online video
optional arguments:
-h, --help show this help message and exit
--model {tiny,base,small,medium,large,large-v2,large-v3}
Whisper model to use
--num_speakers NUM_SPEAKERS
Number of speakers (if known)
--min_speakers MIN_SPEAKERS
Minimum number of speakers
--max_speakers MAX_SPEAKERS
Maximum number of speakers
ファイルをWindowsからコンテナにコピー
# on ubuntu command line
# docker container ls -aでコンテナ名を調べる
docker cp "/mnt/c/Windows_path/VVVV.wav" docker_container_name:root/movie-to-text
使用例
使用例:
1. オンラインビデオ
python Movie-file-or-url-to-text-to-chatgpt.py "https://www.youtube.com/watch?v=example" --model large-v3 --num_speakers 2 --gpt_model gpt-4o-mini
2. 動画ファイル
python Movie-file-or-url-to-text-to-chatgpt.py local_video.mp4 --model large-v3 --num_speakers 4 --gpt_model gpt-4o-mini --temperature 0.5
3. 音声ファイル
python Movie-file-or-url-to-text-to-chatgpt.py research_interview.mp3 --model large-v2 --num_speakers 2 --gpt_model gpt-4o --temperature 0.2
説明:
--model: Whisperモデルのサイズを指定(tiny/base/small/medium/large/large-v2/large-v3)
--num_speakers: 話者の数を指定
--min_speakers/--max_speakers: 話者数の範囲を指定
--gpt_model: 要約生成に使用するGPTモデルを指定
--temperature: 要約の創造性を制御(0.0〜1.0)
出力ファイル:
1. transcription_output.txt: 文字起こしの結果が保存される
2. summary.txt: 要約、レポート、感想が保存される
特記事項:
- オンライン動画は自動的にダウンロードされ処理される
- 動画ファイルは自動的に音声が抽出される
- 長時間の音声は自動的にチャンクに分割して処理される
- 文字起こしと要約は別々のファイルに保存される
- 全ての処理は自動的に実行される
ファイルをコンテナからWindowsにコピー
docker cp docker_container_name:root/movie-to-text/xxx.txt "/mnt/c/Windows path/"
まとめ
このPythonスクリプトは、最新のAI技術を組み合わせることで、音声や動画から自動的に高品質な感想・レポート・要約の3つを作成することができます。これにより、作業を大幅に効率化できます。
これで教育・研究活動に時間が使えるようになりハッピーになると良いですね。
AIの進歩は目覚ましいものがあります。皆さんも、このようなAI技術を活用して、より効率的な作業環境を作り上げていってください!
質問や改善点があれば、ぜひコメントで教えてください。