WhisperとChatGPTを使って動画に対する感想、レポートおよび要約を生成する

実現したこと

コロナを機に様々な研修などがオンラインあるいは動画視聴などでおこなわれるようになりました(便利になったモノだ。遠い目)

問題は動画視聴などの後に感想、レポートや要約などが求められることがあることです。(誰も読まないのに{多大な労力が必要になります。

そこで、これらを自動化するPythonスクリプトを書きましたので紹介します。

このスクリプトは、音声や動画から自動で文字起こしを行い、さらにその内容を要約するPythonスクリプトを紹介します。このスクリプトは、WhisperやPyannote、OpenAI GPTなどの最新のAI技術を組み合わせることで、効率的な音声コンテンツの処理を実現します。

スクリプトの概要

様々な入力形式に対応

・ローカルの音声ファイル(WAV, MP3, M4A, FLAC)
・ローカルの動画ファイル(MP4, AVI, MOV, MKV)
・オンライン動画(YouTubeなど)からの音声抽出

高度な音声処理

・Whisperモデルによる高精度な文字起こし
・Pyannoteによる話者の識別(話者分離)
・長時間音声の自動分割処理

インテリジェントな要約生成

・OpenAI GPTモデルによる要約生成
・カスタマイズ可能な要約スタイル
・トークン制限の自動管理

それでは、各部分を詳しく見ていきましょう。

と、その前に環境構築から。私はRTX 3070Tiを積んでUbuntu 24.04 on WSL上で動かしています。

環境構築

OpanAI APIキー取得

OpenAIのウェブサイトでアカウントを作成し、APIキーを取得します。詳細な手順はこちらの記事を参照してください。

Hugging faceアクセストークン取得

こちらも良記事を参考にして下さい。

Docker engineインストール

インストールされている可能性のある古いDockerを削除

for pkg in docker.io docker-doc docker-compose docker-compose-v2 podman-docker containerd runc; do sudo apt remove $pkg; done

Docker engineのインストール

sudo apt update
sudo apt install ca-certificates curl
sudo install -m 0755 -d /etc/apt/keyrings
sudo curl -fsSL https://download.docker.com/linux/ubuntu/gpg -o /etc/apt/keyrings/docker.asc
sudo chmod a+r /etc/apt/keyrings/docker.asc
echo \
  "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] https://download.docker.com/linux/ubuntu \
  $(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \
  sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
sudo apt update
sudo apt install docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin
sudo docker run hello-world
docker -v
wsl -—shutdown

sudoなしでdockerコマンドを使えるようにする

sudo groupadd docker
sudo usermod -aG docker rui
wsl --shutdown

Dockerを常時起動にする

再起動するたびにDockerを立ち上げるのがめんどくさいので設定する。

sudo visudo

最終行に下記を加える

docker ALL=(ALL)  NOPASSWD: /usr/sbin/service docker start
sudo nano ~/.bashrc

最終行に下記を追記する。

if [[ $(service docker status | awk '{print $4}') = "not" ]]; then
sudo service docker start > /dev/null
fi
source ~/.bashrc

NVIDIA dockerインストール

curl -fsSL https://nvidia.github.io/libnvidia-container/gpgkey | sudo gpg --dearmor -o /usr/share/keyrings/nvidia-container-toolkit-keyring.gpg \
&& curl -s -L https://nvidia.github.io/libnvidia-container/stable/deb/nvidia-container-toolkit.list | \
sed 's#deb https://#deb [signed-by=/usr/share/keyrings/nvidia-container-toolkit-keyring.gpg] https://#g' | \
sudo tee /etc/apt/sources.list.d/nvidia-container-toolkit.list
sudo apt update
sudo apt install -y nvidia-container-toolkit
sudo nvidia-ctk runtime configure --runtime=docker
sudo systemctl restart docker

Docker container作製

Ubuntu on WSL
ホームフォルダで。

docker pull nvcr.io/nvidia/cuda:11.8.0-cudnn8-devel-ubuntu22.04
docker run -it --gpus all nvcr.io/nvidia/cuda:11.8.0-cudnn8-devel-ubuntu22.04
apt update && apt full-upgrade -y
apt install git wget nano ffmpeg -y

minicondaインストール

cd ~
mkdir tmp
cd tmp

linux 64-bitのリンクをコピー

wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
bash Miniconda3-latest-Linux-x86_64.sh

# yes, enter, yes
# tmpフォルダを削除
cd ..
rm -rf tmp

exit

docker container ls -a

docker start <container id>

docker exec -it <container id> /bin/bash

conda環境構築

mkdir movie-to-text
cd movie-to-text
nano movie-to-text.yml
name: movie-to-text
channels:
  - conda-forge
  - pytorch
  - nvidia
  - defaults
dependencies:
  - python=3.9
  - pip
  - cudatoolkit=11.8
  - tiktoken
  - yt-dlp
  - pip:
    - openai-whisper
    - pyannote.audio
    - pydub==0.25.1
    - torch
    - torchvision
    - torchaudio
    - openai
    - python-dotenv
conda env create -f movie-to-text.ymlconda activate speech-to-text
conda activate movie-to-text

スクリプト

nano movie-to-text-chatgpt.py
import os
import subprocess
import whisper
from pyannote.audio import Pipeline
from pydub import AudioSegment
import torch
from openai import OpenAI
from dotenv import load_dotenv
import argparse
import tiktoken
import yt_dlp

def get_env_variable(var_name):
    value = os.getenv(var_name)
    if value is None:
        raise ValueError(f"Environment variable {var_name} is not set")
    return value

# Load .env file
load_dotenv()

# Get API keys from environment variables
OPENAI_API_KEY = get_env_variable("OPENAI_API_KEY")
HF_TOKEN = get_env_variable("HF_TOKEN")

def get_device():
    return torch.device('cuda' if torch.cuda.is_available() else 'cpu')

def download_or_use_local_file(input_path, output_filename):
    if input_path.startswith(('http://', 'https://', 'www.')):
        return download_audio(input_path, output_filename)
    elif os.path.isfile(input_path):
        _, file_extension = os.path.splitext(input_path)
        if file_extension.lower() in ['.wav', '.mp3', '.m4a', '.flac']:
            return input_path
        elif file_extension.lower() in ['.mp4', '.avi', '.mov', '.mkv']:
            return extract_audio_from_video(input_path, output_filename)
        else:
            raise ValueError(f"Unsupported file format: {file_extension}")
    else:
        raise FileNotFoundError(f"File not found: {input_path}")

def download_audio(video_url, output_filename):
    ydl_opts = {
        'format': 'bestaudio/best',
        'postprocessors': [{
            'key': 'FFmpegExtractAudio',
            'preferredcodec': 'wav',
            'preferredquality': '192',
        }],
        'outtmpl': output_filename,
    }
    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        try:
            ydl.download([video_url])
            print(f"Successfully downloaded audio from {video_url}")
        except Exception as e:
            print(f"Error downloading from {video_url}: {str(e)}")
            return None
    
    actual_filename = output_filename + '.wav'
    if os.path.exists(actual_filename):
        return actual_filename
    elif os.path.exists(output_filename):
        return output_filename
    else:
        raise FileNotFoundError(f"Failed to download audio: neither {actual_filename} nor {output_filename} found")

def extract_audio_from_video(video_path, output_filename):
    output_audio = output_filename + '.wav'
    command = ['ffmpeg', '-i', video_path, '-vn', '-acodec', 'pcm_s16le', '-ar', '44100', '-ac', '2', output_audio]
    subprocess.run(command, check=True)
    print(f"Audio extracted from video: {output_audio}")
    return output_audio

def transcribe_audio(file_path, model_name, max_chunk_size=600):
    print(f"Transcribing audio from {file_path} using model {model_name}")
    
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"Audio file not found: {file_path}")
    
    device = get_device()
    print(f"Using device: {device}")
    
    model = whisper.load_model(model_name, device=device)
    audio = AudioSegment.from_wav(file_path)

    duration = len(audio) / 1000  # Duration in seconds
    chunk_size = min(max_chunk_size, duration)  # Adjust chunk size based on audio length

    transcriptions = []
    segments = []

    for i in range(0, int(duration), int(chunk_size)):
        chunk = audio[i*1000:(i+int(chunk_size))*1000]
        chunk_path = f"temp_chunk_{i}.wav"
        try:
            chunk.export(chunk_path, format="wav")
            result = model.transcribe(chunk_path)
            transcriptions.append(result["text"])
            for segment in result["segments"]:
                segment["start"] += i
                segment["end"] += i
                segments.append(segment)
        except Exception as e:
            print(f"Error processing chunk {i}: {e}")
        finally:
            if os.path.exists(chunk_path):
                os.remove(chunk_path)

    transcription = " ".join(transcriptions)
    print(f"Transcription completed. Total segments: {len(segments)}")
    return transcription, segments

def diarize_audio(file_path, num_speakers=None, min_speakers=None, max_speakers=None):
    print(f"Performing speaker diarization on {file_path}")
    device = get_device()
    print(f"Using device: {device}")
    
    pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token=HF_TOKEN)
    pipeline.to(device)
    
    diarization_kwargs = {}
    if num_speakers is not None:
        diarization_kwargs['num_speakers'] = num_speakers
    if min_speakers is not None:
        diarization_kwargs['min_speakers'] = min_speakers
    if max_speakers is not None:
        diarization_kwargs['max_speakers'] = max_speakers

    diarization = pipeline(file_path, **diarization_kwargs)
    print(f"Speaker diarization completed")
    return diarization

def assign_speakers_to_segments(segments, diarization):
    print("Assigning speakers to segments")
    speaker_segments = []
    for segment in segments:
        start_time = segment['start']
        end_time = segment['end']
        text = segment['text']
        speaker = "UNKNOWN"

        if diarization is not None:
            for turn, _, spk in diarization.itertracks(yield_label=True):
                if turn.start <= start_time < turn.end or turn.start < end_time <= turn.end:
                    speaker = spk
                    break

        speaker_segments.append({
            "start": start_time,
            "end": end_time,
            "speaker": speaker,
            "text": text
        })

    print("Speakers assigned to segments")
    return speaker_segments

def count_tokens(text):
    encoding = tiktoken.encoding_for_model("gpt-4")
    return len(encoding.encode(text))

def truncate_transcript(transcript, max_input_tokens, system_message):
    system_tokens = count_tokens(system_message)
    available_tokens = max_input_tokens - system_tokens

    words = transcript.split()
    truncated = []
    current_tokens = 0

    for word in words:
        word_tokens = count_tokens(word)
        if current_tokens + word_tokens > available_tokens:
            break
        truncated.append(word)
        current_tokens += word_tokens

    truncated_transcript = " ".join(truncated)
    print(f"Truncated transcript from {count_tokens(transcript)} to {count_tokens(truncated_transcript)} tokens")
    return truncated_transcript

def generate_summary(speaker_segments, gpt_model="gpt-4o-mini", temperature=0.5):
    print(f"Generating summary using OpenAI API with model {gpt_model} and temperature {temperature}")
    client = OpenAI(api_key=OPENAI_API_KEY)

    full_transcript = "\n".join([f"Speaker {seg['speaker']} [{seg['start']:.2f} - {seg['end']:.2f}]: {seg['text']}" for seg in speaker_segments])

    total_tokens = count_tokens(full_transcript)
    print(f"Total tokens in transcript: {total_tokens}")

    max_input_tokens = 128000
    max_output_tokens = 4096
    system_message = "あなたは大学薬学部の教員です。以下の会話の要約、レポートと感想の3つを別々にアカデミック調で書いてください。生成する要約、レポートと乾燥はそれぞれA4で1〜2ページ程度の分量としてください。なお、ハルシネーションは絶対にやめてください。"

    if total_tokens > max_input_tokens:
        print(f"Transcript exceeds {max_input_tokens} tokens. Truncating...")
        full_transcript = truncate_transcript(full_transcript, max_input_tokens, system_message)

    messages = [
        {"role": "system", "content": system_message},
        {"role": "user", "content": full_transcript}
    ]

    try:
        response = client.chat.completions.create(
            messages=messages,
            model=gpt_model,
            max_tokens=max_output_tokens,
            n=1,
            stop=None,
            temperature=temperature,
        )

        if response.choices and len(response.choices) > 0:
            summary = response.choices[0].message.content.strip()
            return summary
        else:
            print("No choices in the response")
            return None

    except Exception as e:
        print(f"An error occurred: {e}")
        return None

def main(input_path, whisper_model="base", num_speakers=None, min_speakers=None, max_speakers=None, gpt_model="gpt-4o-mini", temperature=0.5):
    print(f"Processing input: {input_path}")

    base_filename = "temp_audio"
    try:
        audio_filename = download_or_use_local_file(input_path, base_filename)
        print(f"Audio file to process: {audio_filename}")
    except Exception as e:
        print(f"Error processing input file: {e}")
        return

    try:
        transcription, segments = transcribe_audio(audio_filename, whisper_model)
        print(f"Transcription completed. Length: {len(transcription)} characters")
    except Exception as e:
        print(f"Error during transcription: {e}")
        return

    try:
        diarization = diarize_audio(audio_filename, num_speakers=num_speakers, 
                                    min_speakers=min_speakers, max_speakers=max_speakers)
        print("Speaker diarization completed")
    except Exception as e:
        print(f"Error during speaker diarization: {e}")
        diarization = None

    speaker_segments = assign_speakers_to_segments(segments, diarization)

    output_file = "transcription_output.txt"
    try:
        with open(output_file, 'w', encoding='utf-8') as f:
            for segment in speaker_segments:
                segment_text = f"Speaker {segment['speaker']} [{segment['start']:.2f} - {segment['end']:.2f}]: {segment['text']}"
                f.write(segment_text + "\n")
        print(f"Transcription results written to {output_file}")
    except Exception as e:
        print(f"Error writing transcription to file: {e}")

    try:
        summary = generate_summary(speaker_segments, gpt_model, temperature)
        if summary is not None:
            summary_file = "summary.txt"
            with open(summary_file, 'w', encoding='utf-8') as f:
                f.write(summary)
            print(f"Summary written to {summary_file}")
        else:
            print("Failed to generate summary.")
    except Exception as e:
        print(f"Error generating or saving summary: {e}")

    if audio_filename != input_path:
        try:
            os.remove(audio_filename)
            print(f"Temporary audio file {audio_filename} removed")
        except Exception as e:
            print(f"Error removing temporary audio file: {e}")

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Video/Audio Transcription and Summarization")
    parser.add_argument("input_path", help="Path to local video/audio file or URL of online video")
    parser.add_argument("--model", choices=["tiny", "base", "small", "medium", "large", "large-v2", "large-v3"], 
                        default="base", help="Whisper model to use")
    parser.add_argument("--num_speakers", type=int, help="Number of speakers (if known)")
    parser.add_argument("--min_speakers", type=int, help="Minimum number of speakers")
    parser.add_argument("--max_speakers", type=int, help="Maximum number of speakers")
    parser.add_argument("--gpt_model", default="gpt-4o-mini", 
                        help="GPT model to use for summarization. Examples: gpt-4o, gpt-4o-mini, gpt-4-turbo, gpt-4, gpt-3.5-turbo")
    parser.add_argument("--temperature", type=float, default=0.5, 
                        help="Temperature for GPT model (0.0 to 1.0). Controls randomness in generation. "
                             "Lower values (e.g., 0.2) make the output more focused and deterministic. "
                             "Higher values (e.g., 0.8) make the output more creative and diverse. "
                             "0.5 is a balanced default.")
    args = parser.parse_args()

    main(args.input_path, args.model, args.num_speakers, args.min_speakers, args.max_speakers, args.gpt_model, args.temperature)

.env作成

nano .env
OPENAI_API_KEY=sk-XXXXXXXXXXXXXXXXXXXXXX
HF_TOKEN=hf_ZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ

使用方法

ヘルプ

python Movie-to-text-to-chatgpt.py --help
usage: Movie-to-text-to-chatgpt.py [-h] [--model {tiny,base,small,medium,large,large-v2,large-v3}]
                                               [--num_speakers NUM_SPEAKERS] [--min_speakers MIN_SPEAKERS]
                                               [--max_speakers MAX_SPEAKERS]
                                               input_path

Video/Audio Transcription and Summarization

positional arguments:
  input_path            Path to local video/audio file or URL of online video

optional arguments:
  -h, --help            show this help message and exit
  --model {tiny,base,small,medium,large,large-v2,large-v3}
                        Whisper model to use
  --num_speakers NUM_SPEAKERS
                        Number of speakers (if known)
  --min_speakers MIN_SPEAKERS
                        Minimum number of speakers
  --max_speakers MAX_SPEAKERS
                        Maximum number of speakers

ファイルをWindowsからコンテナにコピー

# on ubuntu command line
# docker container ls -aでコンテナ名を調べる
docker cp "/mnt/c/Windows_path/VVVV.wav" docker_container_name:root/movie-to-text

使用例

使用例:

1. オンラインビデオ
python Movie-file-or-url-to-text-to-chatgpt.py "https://www.youtube.com/watch?v=example" --model large-v3 --num_speakers 2 --gpt_model gpt-4o-mini

2. 動画ファイル
python Movie-file-or-url-to-text-to-chatgpt.py local_video.mp4 --model large-v3 --num_speakers 4 --gpt_model gpt-4o-mini --temperature 0.5

3. 音声ファイル
python Movie-file-or-url-to-text-to-chatgpt.py research_interview.mp3 --model large-v2 --num_speakers 2 --gpt_model gpt-4o --temperature 0.2


説明:
--model: Whisperモデルのサイズを指定(tiny/base/small/medium/large/large-v2/large-v3)
--num_speakers: 話者の数を指定
--min_speakers/--max_speakers: 話者数の範囲を指定
--gpt_model: 要約生成に使用するGPTモデルを指定
--temperature: 要約の創造性を制御(0.01.0)

出力ファイル:
1. transcription_output.txt: 文字起こしの結果が保存される
2. summary.txt: 要約、レポート、感想が保存される

特記事項:
- オンライン動画は自動的にダウンロードされ処理される
- 動画ファイルは自動的に音声が抽出される
- 長時間の音声は自動的にチャンクに分割して処理される
- 文字起こしと要約は別々のファイルに保存される
- 全ての処理は自動的に実行される

ファイルをコンテナからWindowsにコピー

docker cp docker_container_name:root/movie-to-text/xxx.txt "/mnt/c/Windows path/"

まとめ

このPythonスクリプトは、最新のAI技術を組み合わせることで、音声や動画から自動的に高品質な感想・レポート・要約の3つを作成することができます。これにより、作業を大幅に効率化できます。

これで教育・研究活動に時間が使えるようになりハッピーになると良いですね。

AIの進歩は目覚ましいものがあります。皆さんも、このようなAI技術を活用して、より効率的な作業環境を作り上げていってください!

質問や改善点があれば、ぜひコメントで教えてください。

いいなと思ったら応援しよう!