Raspberry Pi 400でスイッチのプレイ動画を保存するための準備(音声編)
最終更新日:2023年2月13日
最近ハマっている(はず)のスプラ3のウデマエ上達のため、地道な反復練習も必要だが、振り返りのためのプレイ動画も必要なのでは。
また、手元で暇そうにしているラズパイを活用できないか、あと動画だけだと味気ないので音声も付けたいという企画。
最終目標はラズパイで動作するHDMI入力動画の録画ソフト/スクリプトとなります。
目次
全体構想(機材)
実装方式
サンプルソース
実験結果
1.全体構想
Raspberry Pi 400に対するHDMI入力の情報を保存するようにします。
本体にはじめからついているHDMI端子は出力用のため、別にインターフェースを用意する必要があります。
今回は予算等々の兼ね合いでUSB入力のHDMIキャプチャボードを使うこととしました。
2.実装方式
最近はC++のアプリを開発する体力がないため?、実験しやすいPythonを利用することとします。
構成・機材はHDMIスプリッターとUSBキャプチャボードで、動画編と同じです。
3.サンプルソース
現時点のサンプルソースを記載します。サンプルと言いつつ、他サイト様の内容をかなーり参考にさせていただいております
(引用元はソース内に記載)
ソースは無保証です。少し見るとわかりますが、加筆部分はかなり雑です
あと、まともに動作しないこともあります
#SoundRecorder.py
"""Pass input directly to output.
#
# 処理内容
# 1.入力音源をそのまま出力音源へリダイレクトする
# 2.入力音源をWAVファイルへ保存する
#
# 使い方
# args = [-ws] [-wv] [-wo Wavファイル名]
#
# oSR = SoundRecorder(args)
# oSR.start()
#
# 何らかの待ち処理
#
# oSR.end()
#
# 引用元
# https://python-sounddevice.readthedocs.io/en/0.4.5/examples.html#input-to-output-pass-through
#
"""
import argparse
import time
import sounddevice as sd
import threading
import numpy as np # Make sure NumPy is loaded before it is used in the callback
#assert numpy # avoid "imported but unused" message (W0611)
import soundfile as sf
import resampy
import sys
import wave
def int_or_str(text):
"""Helper function for argument parsing."""
try:
return int(text)
except ValueError:
return text
#sd.default.device = [1, 10] # Input, Outputデバイス指定
class SoundRecorder(threading.Thread):
mArgs = "" #引数指定用
mLoopFlag = True #スレッド継続フラグ
mVolumeratio = 1.0
mRecordFlag = False #録音制御フラグ
#コンストラクタ
def __init__(self, args):
super(SoundRecorder, self).__init__()
self.mArgs = args
if self.mArgs.wave_volume != None:
self.mVolumeratio = self.mArgs.wave_volume
#デストラクタ
def __del__(self):
self.recordstop()
self.mLoopFlag = False
#外部からの処理終了指示
def end(self):
self.mLoopFlag = False
def callback(self, indata, outdata, frames, time, status):
if status:
print(status)
if self.mArgs.wave_silentmode != 1:
outdata[:] = self.filter(indata)
t1 = threading.Thread(target=self.waveout, args=(indata,))
t1.start()
t1.join()
#音声加工処理
def filter(self, indata):
outdata = indata
return outdata
def recordstart(self):
self.mRecordFlag = True
def recordstop(self):
self.mRecordFlag = False
def waveopen(self):
if self.mArgs.wave_outputfile != None:
self.wb = wave.open(self.mArgs.wave_outputfile, mode='wb')
self.wb.setnchannels(2) #初期値は2にしておく
if self.mArgs.channels != None:
self.wb.setnchannels(self.mArgs.channels)
self.wb.setsampwidth(2) # 16bit=2byte
self.wb.setframerate(44100) #初期値は44100にしておく
if self.mArgs.wave_samplerate != None:
self.wb.setframerate(self.mArgs.wave_samplerate)
def waveout(self, buf):
if self.mArgs.wave_outputfile == None:
return
if self.mRecordFlag == True:
# リサンプリング
#soundbuf = resampy.resample(buf, self.mArgs.wave_samplerate, self.mArgs.wave_samplerate)
soundbuf = buf
# 音量正規化
# これにより微小なノイズが猛烈に拡大されるときがある
#if soundbuf.max() != 0:
# soundbuf = soundbuf / soundbuf.max() * np.iinfo(np.int16).max
# 音量補正
soundbuf = soundbuf * (np.iinfo(np.int16).max - 1) * self.mVolumeratio
#print("max=", soundbuf.max())
# float -> int
soundbuf = soundbuf.astype(np.int16)
self.wb.writeframes(soundbuf.tobytes())
def waveclose(self):
if self.mArgs.wave_outputfile != None:
self.wb.close()
#メインスレッド
def run(self):
self.event = threading.Event()
print(sd.query_devices())
print("default.device=", sd.default.device,
", default.dtype=", sd.default.dtype,
", default.channels=", sd.default.channels,
", default.samplerate=", sd.default.samplerate)
print("inputdev=", self.mArgs.input_device,
", outputdev=", self.mArgs.output_device,
", channels=", self.mArgs.channels,
", wavesamplerate=", self.mArgs.wave_samplerate,
", wavevolumeratio=", self.mVolumeratio,
", waveoutputfile=", self.mArgs.wave_outputfile)
self.waveopen()
try:
with sd.Stream(device=(self.mArgs.input_device, self.mArgs.output_device),
samplerate=self.mArgs.wave_samplerate, blocksize=self.mArgs.wave_blocksize,
dtype=self.mArgs.wave_dtype, latency=self.mArgs.wave_latency,
channels=self.mArgs.channels, callback=self.callback,
finished_callback=self.event.set):
print('#' * 80)
while self.mLoopFlag:
time.sleep(0.01)
except KeyboardInterrupt:
parser.exit('')
except Exception as e:
parser.exit(type(e).__name__ + ': ' + str(e))
self.waveclose()
print("Thread End")
if __name__ == "__main__":
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument(
'-l', '--list-devices', action='store_true',
help='show list of audio devices and exit')
args, remaining = parser.parse_known_args()
if args.list_devices:
print(sd.query_devices())
parser.exit(0)
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
parents=[parser])
parser.add_argument('-i', '--input-device', type=int_or_str, help='input device (numeric ID or substring)')
parser.add_argument('-o', '--output-device', type=int_or_str, help='output device (numeric ID or substring)')
parser.add_argument('-c', '--channels', type=int, default=2, help='number of channels')
parser.add_argument('-wd', '--wave-dtype', help='audio data type')
parser.add_argument('-wr', '--wave-samplerate', type=float, help='sampling rate')
parser.add_argument('-wb', '--wave-blocksize', type=int, help='block size')
parser.add_argument('-wl', '--wave-latency', type=float, help='latency in seconds')
parser.add_argument('-wv', '--wave-volume', type=float, default=1.0, help='wave volume ratio')
parser.add_argument('-ws', '--wave-silentmode', action='store_true', help='silent mode')
parser.add_argument('-wo', '--wave-outputfile', help='output WAV file')
args = parser.parse_args(remaining)
oSR = SoundRecorder(args)
oSR.start()
input()
oSR.recordstart()
input()
oSR.end()
4.実験結果
キャプチャしたサンプルです(動画編と同じものです
この記事が気に入ったらサポートをしてみませんか?