見出し画像

REALITYにアーカイブ機能を! - REALITY Advent Calendar #11

REALITY Advent Calendar 11日目担当の増住です。先日の開発合宿でいわゆる「配信アーカイブ機能」をREALITYに実装した話を書きます。

ところで、「え?11日目?記事の順番おかしくない?」と思った方もいらっしゃると思いますが、この記事を書くのが遅れまくった結果なのです。ごめんなさい。

はじめに

ところで、配信アーカイブ機能の実装の話をするには、その前にまずREALITYの配信機能がどのように動いているかをお伝えする必要があります。

「ライブ配信サービス」というと、動画を配信・視聴するやり方を想像される方が多いかと思いますが、REALITYにおけるメインの配信方式はそれとは異なり、視聴者のアプリ上のアバターを動かすためのモーションデータなどの各種データをリアルタイムに送受信する方式をとっています。

画像1

↑のイメージイラスト(?)のように、配信された各種データをもとに、視聴者のアプリ側でアバターを動かす等の処理が行われています。


これによりREALITYは「低遅延・低通信量・高画質」の配信・視聴体験を実現することができているのですが、その方式の違いにより、いわゆる「配信アーカイブ機能」を実装する場合も、一般的な動画配信方式と異なるやり方が必要になってきます。
なぜならば、配信されているのは「動画」ではなくREALITYアプリが処理するためのモーション等のデータであるわけですから、アーカイブのためのデータについても同じことが言えるわけですね。

全体構成

配信アーカイブ機能のためのコンポーネントは、REALITYの他のバックエンドと同じくGCP上に構築しました。

画像2

配信データをもとにアーカイブデータを生成・保存し、それをアーカイブ配信サーバが読み取って配信サーバに流し込み、REALITYの通常の配信と同様にアプリから視聴できるようにする、という仕組みになっています。

本当はアプリから自由にアーカイブ配信サーバに繋いで視聴ができるようにしたかったんですが、そこまでやると合宿中に終わらないのでこのような形になりました。ですので、「配信アーカイブ機能」というよりは「再放送機能」というのが近いかもしれません。

データフォーマット


配信アーカイブ機能を実装する上で、そのためのデータが満たすべき要件が二つあります。

1. 全データのダウンロード完了を待たずにすぐに再生を開始できる
2. 再生中にシークできる

1. 全データのダウンロード完了を待たずにすぐに再生を開始できる
この要件を満たすために、今回はデータフォーマットとしてMessagePackを選択しました。
MessagePackのArrayの中に時系列でソートした配信データのbinaryを詰めることで、Streamimg Download & Decodeが可能になり、ダウンロード完了を待たずして再生を開始できることになります。
また、今回はトライアンドエラーを繰り返すため、スキーマレスで小回りが効く点も考慮しました。

2. 再生中にシークできる
要件1を満たすことでデータのダウンロード完了を待たずに再生を開始することはできますが、それだけでは再生中のシークに対応することはできません。なぜならシークに対応するためには、データの途中からダウンロード・再生ができなければいけないからです。
そのため、アーカイブデータを一定時間ずつで分割して保存する必要があります。結果、「シーク再生可能な単位 = アーカイブデータの分割単位」になります(今回は1分ごとに分割しました)。

配信データがデカすぎる問題

以上を踏まえて配信アーカイブデータを生成していくわけですが、ここで一つ問題があります。実際にREALITYで配信アーカイブ機能を提供しようとした場合、保存済みの配信データからアーカイブ生成対象の配信のデータを抜き出し、配信ごとにグルーピングした上でアーカイブデータを生成していくことになります。

REALITYでは、より良いサービスを提供するための分析やプラットフォームの健全な運営のためのパトロール、違反行為の通報対応などに使用するため、公開されている配信データを保存しています。

が、膨大な量の配信データに対してその処理を行おうとすると、通常のサーバ構成では処理しきれないのです。

配信データは1時間単位で切ったGCS Bucketのディレクトリに保存されていますが、なにぶん配信の数が非常に多いので(ありがとうございます)1時間分のデータだけでもすさまじい容量になります。

この問題に対処するため、アーカイブデータ生成処理を大規模並列分散処理として実装します。具体的には、Apache Beam PipelineをCloud Dataflow上で動かす構成をとります。こうすることで、テラバイト単位のデータであっても問題なく処理できます。

アーカイブデータの生成

今回アーカイブデータ生成処理を実装するにあたって、折角の機会なのでちょっとだけチャレンジしようと、Apache Beam Pipelineの実装を通常使われるJavaではなくKotlinで行ってみました。ほとんど詰まることもなくサクサクかけたので、かなりお勧めです。

@JvmStatic 
fun main(args: Array<String>) {
	val options = PipelineOptionsFactory
			.fromArgs(*args)
			.withValidation()
			.`as`(Options::class.java)
	val targetLiveId = options.targetLiveId
	val outputBucket = options.outputPath

	val pipeline = Pipeline.create(options)
	pipeline.apply("ReadRawDataMessage", AvroIO.read(RawDataMessage::class.java).from(options.rawDataInputPattern))
			
            // MapElements Transformのような処理を書くと、KotlinのLambdaの書きやすさを痛感します
            .apply("MapToLiveMessage", MapElements
					.into(TypeDescriptor.of(LiveMessage::class.java))
					.via(ProcessFunction { it.toLiveMessage() }))
			
            .apply("FileterLiveMessages", Filter.by(ProcessFunction { it.mediaId == targetLiveId }))
			
            // アーカイブ再生に対応するために、一部の配信データに手を加えます
            .apply("ModifyMotion", MapElements
					.into(TypeDescriptor.of(LiveMessage::class.java))
					.via(ModifyMotion(options.overrideVliveId)))
			
            // シークできるようにするため1分単位で配信ごとにデータをGroupByします
            .apply("WithKeyForFilePath", WithKeys.of {
				val min = ZonedDateTime
						.ofInstant(Instant.ofEpochMilli(it.timestamp), ZoneId.of("Asia/Tokyo"))
						.truncatedTo(ChronoUnit.MINUTES)
				val minFormatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmss")
				"liveId=${targetLiveId}/channel=${it.channel}/${min.format(minFormatter)}.mpac"
			}).setCoder(KvCoder.of(StringUtf8Coder.of(), AvroCoder.of(LiveMessage::class.java)))
            .apply("GroupByKey", GroupByKey.create())
            
            // アーカイブデータのMessagePack Binaryを生成します
			.apply("ToMsgPackBinary", ParDo.of(ToMsgPackBinary()))

            // アーカイブデータをそれぞれ期待されるGCS BucketのPathに書き込みます
			.apply("WriteFile", FileIO.writeDynamic<String, KV<String, ByteArray>>()
					.by { it.key }
					.withDestinationCoder(StringUtf8Coder.of())
					.via(FileSink())
					.to(outputBucket)
					.withTempDirectory("${outputBucket}/tmp/")
					.withNaming { fileName -> FileNaming(fileName ?: "unknown") })

	pipeline.run().waitUntilFinish()
}

アーカイブ配信してみる

そんなこんなでアーカイブデータが生成されたので、早速これを視聴してみます。アーカイブ配信サーバはNode.js + TypeScriptで実装します。

import { decodeArrayStream } from '@msgpack/msgpack';
import * as GCS from '@google-cloud/storage'

// ...

// GCS SDKのStreaming Donwload機能を使う
const stream = this.storage.bucket(this.bucket)
    .file(file)
    .createReadStream();
    
// AsyncIterableでMessagePackのStreaming Decodeを処理する
for await (const item of decodeArrayStream(stream)) {
    const msg = item as Message
    
    // 配信サーバへのデータ送信Queueに入れる
    queue.push(msg)
}
GCS SDKと@msgpack/msgpackを組み合わせることで、GCS BucketからMessagePack BinaryをStreaming Download & Decodeする処理をシンプルに書けます

最後にこのアーカイブ配信サーバを起動すれば、そのままアプリから通常の配信と同じように視聴できます。まあ、現段階では(先述の通り)配信アーカイブというよりは再放送といった趣ですが‥

まとめ

というわけで、REALITYでも配信アーカイブ機能が実現できるということは実証できました。

ただ、これをそのまま公開する機能として出せるか?というと、色々考えなければいけない点(例えば過去の配信の互換性とか)が多いというのが正直なところです。とはいえ、今回の試みを通して、REALITYの配信データには、「ライブ配信」という形式を超えたポテンシャルがあることを改めて痛感したので、そのうち何らかの形で皆様の手元で動く機能として提供する日が来るかもしれません、多分。

明日のアドベントカレンダーはサーバサイドエンジニアのみるぽちさんによる「はじめてのM5Stack」です。お楽しみに!


この記事が参加している募集