
Pythonで学ぶ深度推定
深度推定とは
深度推定とは、入力した2D画像から物体までの距離を推定する技術です。
物体からの距離を推定したい際に深度推定が活用出来ます。
活用アイデアとしてまず思いつくのは自動運転技術において人間の目の代わりとして活用出来ます。
手元に単眼カメラがあれば、深度推定と物体認識アルゴリズムを組み合わせることにより物体の認識と距離の推定が行うことも可能です。
Pythonで深度推定を行うまでの流れ
最近Keras公式ドキュメントに「深度推定」のチュートリアルが出ましたので、これを活用することでPythonで深度推定を試せます。
補足:上記チュートリアル記事は英語で記載されているため、この記事では日本語にて記載します。コードはそのまま活用します。
一連の流れとしては以下の通りです。
⓪環境構築のセットアップ(Colabを使って進めることを想定)
①必要なモジュールの読み込み
②データセットをダウンロードする
③データセットの準備
④ハイパーパラメータを定数として定義
⑤データ・パイプラインの構築
⑥サンプルデータの可視化
⑦3D点群の可視化
➇モデルの構築
⑨損失の定義
➉モデルの学習
⑪モデルの出力の可視化
⓪環境構築のセットアップ
今回はGoogle Colabを利用します。下記のテキストにColabの使い方を記載していますので、初めて触るという方は一度下記テキストをお読みください。
①必要なモジュールの読み込み
必要なモジュールの読み込みと、シード値を固定します。
import os
import sys
import tensorflow as tf
from tensorflow.keras import layers
import pandas as pd
import numpy as np
import cv2
import matplotlib.pyplot as plt
tf.random.set_seed(123)
②データセットをダウンロードする
Keras公式チュートリアルではDIODE(A Dense Indoor and Outdoor Depth Dataset)というデータセットを使用しているのでこちらを利用します。
ただし、モデルの学習と評価のサブセットを生成するために、検証セットを使用します。元のデータセットの学習データではなく、検証セットを使用する理由は、学習データは81GBのデータで構成されており、ダウンロードが非常困難です。一方で検証セットはわずか2.6GBであるためです。
ということで以下のようにコードを記述します。
annotation_folder = "/dataset/"
if not os.path.exists(os.path.abspath(".") + annotation_folder):
annotation_zip = tf.keras.utils.get_file(
"val.tar.gz",
cache_subdir=os.path.abspath("."),
origin="http://diode-dataset.s3.amazonaws.com/val.tar.gz",
extract=True,
)
③データセットの準備
学習に利用するデータには屋内の画像のみを使用します。
path = "val/indoors"
filelist = []
for root, dirs, files in os.walk(path):
for file in files:
filelist.append(os.path.join(root, file))
filelist.sort()
data = {
"image": [x for x in filelist if x.endswith(".png")],
"depth": [x for x in filelist if x.endswith("_depth.npy")],
"mask": [x for x in filelist if x.endswith("_depth_mask.npy")],
}
df = pd.DataFrame(data)
df = df.sample(frac=1, random_state=42)
④ハイパーパラメータを定数として定義
エポックやバッチサイズなどを定数として定義します。
HEIGHT = 256
WIDTH = 256
LR = 0.0002
EPOCHS = 30
BATCH_SIZE = 32
⑤データ・パイプラインの構築
チュートリアルでは①RGB画像のパス、②深度ファイル、③深度マスクファイルを含むデータフレームを受け取り、RGB画像を読み込んでリサイズしています。深度ファイルとデプスマスクファイルを読み込んで、デプスマップ画像を生成するために処理し、サイズを変更し、バッチのRGB画像とデプスマップ画像を返します。
これを実現するパイプラインのコードは以下になります。
(※少々長いです)
class DataGenerator(tf.keras.utils.Sequence):
def __init__(self, data, batch_size=6,
dim=(768, 1024),
n_channels=3, shuffle=True):
self.data = data
self.indices = self.data.index.tolist()
self.dim = dim
self.n_channels = n_channels
self.batch_size = batch_size
self.shuffle = shuffle
self.min_depth = 0.1
self.on_epoch_end()
def __len__(self):
return int(np.ceil(len(self.data) / self.batch_size))
def __getitem__(self, index):
if (index + 1) * self.batch_size > len(self.indices):
self.batch_size = len(self.indices) - index * self.batch_size
# Generate one batch of data
# Generate indices of the batch
index = self.indices[index * self.batch_size : (index + 1) * self.batch_size]
# Find list of IDs
batch = [self.indices[k] for k in index]
x, y = self.data_generation(batch)
return x, y
def on_epoch_end(self):
"""
エポック毎にインデックスを更新
"""
self.index = np.arange(len(self.indices))
if self.shuffle == True:
np.random.shuffle(self.index)
def load(self, image_path, depth_map, mask):
"""
入力画像とターゲット画像を読み込む
"""
image_ = cv2.imread(image_path)
image_ = cv2.cvtColor(image_, cv2.COLOR_BGR2RGB)
image_ = cv2.resize(image_, self.dim)
image_ = tf.image.convert_image_dtype(image_, tf.float32)
depth_map = np.load(depth_map).squeeze()
mask = np.load(mask)
mask = mask > 0
max_depth = min(300, np.percentile(depth_map, 99))
depth_map = np.clip(depth_map, self.min_depth, max_depth)
depth_map = np.log(depth_map, where=mask)
depth_map = np.ma.masked_where(~mask, depth_map)
depth_map = np.clip(depth_map, 0.1, np.log(max_depth))
depth_map = cv2.resize(depth_map, self.dim)
depth_map = np.expand_dims(depth_map, axis=2)
depth_map = tf.image.convert_image_dtype(depth_map, tf.float32)
return image_, depth_map
def data_generation(self, batch):
x = np.empty((self.batch_size, *self.dim, self.n_channels))
y = np.empty((self.batch_size, *self.dim, 1))
for i, batch_id in enumerate(batch):
x[i,], y[i,] = self.load(
self.data["image"][batch_id],
self.data["depth"][batch_id],
self.data["mask"][batch_id],
)
return x, y
⑥サンプルデータの可視化
サンプル画像を可視化します。
def visualize_depth_map(samples, test=False, model=None):
input, target = samples
cmap = plt.cm.jet
cmap.set_bad(color="black")
if test:
pred = model.predict(input)
fig, ax = plt.subplots(6, 3, figsize=(50, 50))
for i in range(6):
ax[i, 0].imshow((input[i].squeeze()))
ax[i, 1].imshow((target[i].squeeze()), cmap=cmap)
ax[i, 2].imshow((pred[i].squeeze()), cmap=cmap)
else:
fig, ax = plt.subplots(6, 2, figsize=(50, 50))
for i in range(6):
ax[i, 0].imshow((input[i].squeeze()))
ax[i, 1].imshow((target[i].squeeze()), cmap=cmap)
visualize_samples = next(
iter(DataGenerator(data=df, batch_size=6, dim=(HEIGHT, WIDTH)))
)
visualize_depth_map(visualize_samples)
実行すると以下の画像が表示されます。
⑦3D点群の可視化
3D点群の可視化します。
depth_vis = np.flipud(visualize_samples[1][1].squeeze()) # target
img_vis = np.flipud(visualize_samples[0][1].squeeze()) # input
fig = plt.figure(figsize=(15, 10))
ax = plt.axes(projection="3d")
STEP = 3
for x in range(0, img_vis.shape[0], STEP):
for y in range(0, img_vis.shape[1], STEP):
ax.scatter(
[depth_vis[x, y]] * 3,
[y] * 3,
[x] * 3,
c=tuple(img_vis[x, y, :3] / 255),
s=3,
)
ax.view_init(45, 135)
➇モデルの構築
ベースのモデルはU-Netを利用します。ダウンスケーリングブロックには付加的なスキップ接続が実装されています。
(コードが少々長いです)
class DownscaleBlock(layers.Layer):
def __init__(
self, filters, kernel_size=(3, 3), padding="same", strides=1, **kwargs
):
super().__init__(**kwargs)
self.convA = layers.Conv2D(filters, kernel_size, strides, padding)
self.convB = layers.Conv2D(filters, kernel_size, strides, padding)
self.reluA = layers.LeakyReLU(alpha=0.2)
self.reluB = layers.LeakyReLU(alpha=0.2)
self.bn2a = tf.keras.layers.BatchNormalization()
self.bn2b = tf.keras.layers.BatchNormalization()
self.pool = layers.MaxPool2D((2, 2), (2, 2))
def call(self, input_tensor):
d = self.convA(input_tensor)
x = self.bn2a(d)
x = self.reluA(x)
x = self.convB(x)
x = self.bn2b(x)
x = self.reluB(x)
x += d
p = self.pool(x)
return x, p
class UpscaleBlock(layers.Layer):
def __init__(
self, filters, kernel_size=(3, 3), padding="same", strides=1, **kwargs
):
super().__init__(**kwargs)
self.us = layers.UpSampling2D((2, 2))
self.convA = layers.Conv2D(filters, kernel_size, strides, padding)
self.convB = layers.Conv2D(filters, kernel_size, strides, padding)
self.reluA = layers.LeakyReLU(alpha=0.2)
self.reluB = layers.LeakyReLU(alpha=0.2)
self.bn2a = tf.keras.layers.BatchNormalization()
self.bn2b = tf.keras.layers.BatchNormalization()
self.conc = layers.Concatenate()
def call(self, x, skip):
x = self.us(x)
concat = self.conc([x, skip])
x = self.convA(concat)
x = self.bn2a(x)
x = self.reluA(x)
x = self.convB(x)
x = self.bn2b(x)
x = self.reluB(x)
return x
class BottleNeckBlock(layers.Layer):
def __init__(
self, filters, kernel_size=(3, 3), padding="same", strides=1, **kwargs
):
super().__init__(**kwargs)
self.convA = layers.Conv2D(filters, kernel_size, strides, padding)
self.convB = layers.Conv2D(filters, kernel_size, strides, padding)
self.reluA = layers.LeakyReLU(alpha=0.2)
self.reluB = layers.LeakyReLU(alpha=0.2)
def call(self, x):
x = self.convA(x)
x = self.reluA(x)
x = self.convB(x)
x = self.reluB(x)
return x
⑨損失の定義
3つのロスを最適化していきます。
①構造的類似性インデックス(SSIM)
(SSIMはモデルの性能向上に貢献します)
②L1ロス(ここではPoint-wise depth)
③深度平滑性損失
class DepthEstimationModel(tf.keras.Model):
def __init__(self):
super().__init__()
self.ssim_loss_weight = 0.85
self.l1_loss_weight = 0.1
self.edge_loss_weight = 0.9
self.loss_metric = tf.keras.metrics.Mean(name="loss")
f = [16, 32, 64, 128, 256]
self.downscale_blocks = [
DownscaleBlock(f[0]),
DownscaleBlock(f[1]),
DownscaleBlock(f[2]),
DownscaleBlock(f[3]),
]
self.bottle_neck_block = BottleNeckBlock(f[4])
self.upscale_blocks = [
UpscaleBlock(f[3]),
UpscaleBlock(f[2]),
UpscaleBlock(f[1]),
UpscaleBlock(f[0]),
]
self.conv_layer = layers.Conv2D(1, (1, 1), padding="same", activation="tanh")
def calculate_loss(self, target, pred):
# Edges
dy_true, dx_true = tf.image.image_gradients(target)
dy_pred, dx_pred = tf.image.image_gradients(pred)
weights_x = tf.exp(tf.reduce_mean(tf.abs(dx_true)))
weights_y = tf.exp(tf.reduce_mean(tf.abs(dy_true)))
# 奥行きのある滑らかさ
smoothness_x = dx_pred * weights_x
smoothness_y = dy_pred * weights_y
depth_smoothness_loss = tf.reduce_mean(abs(smoothness_x)) + tf.reduce_mean(
abs(smoothness_y)
)
# Structural similarity (SSIM) index
ssim_loss = tf.reduce_mean(
1
- tf.image.ssim(
target, pred, max_val=WIDTH, filter_size=7, k1=0.01 ** 2, k2=0.03 ** 2
)
)
l1_loss = tf.reduce_mean(tf.abs(target - pred))
loss = (
(self.ssim_loss_weight * ssim_loss)
+ (self.l1_loss_weight * l1_loss)
+ (self.edge_loss_weight * depth_smoothness_loss)
)
return loss
@property
def metrics(self):
return [self.loss_metric]
def train_step(self, batch_data):
input, target = batch_data
with tf.GradientTape() as tape:
pred = self(input, training=True)
loss = self.calculate_loss(target, pred)
gradients = tape.gradient(loss, self.trainable_variables)
self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))
self.loss_metric.update_state(loss)
return {
"loss": self.loss_metric.result(),
}
def test_step(self, batch_data):
input, target = batch_data
pred = self(input, training=False)
loss = self.calculate_loss(target, pred)
self.loss_metric.update_state(loss)
return {
"loss": self.loss_metric.result(),
}
def call(self, x):
c1, p1 = self.downscale_blocks[0](x)
c2, p2 = self.downscale_blocks[1](p1)
c3, p3 = self.downscale_blocks[2](p2)
c4, p4 = self.downscale_blocks[3](p3)
bn = self.bottle_neck_block(p4)
u1 = self.upscale_blocks[0](bn, c4)
u2 = self.upscale_blocks[1](u1, c3)
u3 = self.upscale_blocks[2](u2, c2)
u4 = self.upscale_blocks[3](u3, c1)
return self.conv_layer(u4)
➉モデルの学習
いよいよモデルの学習です。
optimizer = tf.keras.optimizers.Adam(
learning_rate=LR,
amsgrad=False,
)
model = DepthEstimationModel()
# Define the loss function
cross_entropy = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True, reduction="none"
)
# Compile the model
model.compile(optimizer, loss=cross_entropy)
train_loader = DataGenerator(
data=df[:260].reset_index(drop="true"), batch_size=BATCH_SIZE, dim=(HEIGHT, WIDTH)
)
validation_loader = DataGenerator(
data=df[260:].reset_index(drop="true"), batch_size=BATCH_SIZE, dim=(HEIGHT, WIDTH)
)
model.fit(
train_loader,
epochs=EPOCHS,
validation_data=validation_loader,
)
⑪モデルの出力の可視化
検証セットに対するモデルの出力を可視化します。
1枚目の画像はRGB画像で、2枚目の画像はground truth depth map画像、3枚目の画像は予測されたデプスマップ画像になります。
test_loader = next(
iter(
DataGenerator(
data=df[265:].reset_index(drop="true"), batch_size=6, dim=(HEIGHT, WIDTH)
)
)
)
visualize_depth_map(test_loader, test=True, model=model)
test_loader = next(
iter(
DataGenerator(
data=df[300:].reset_index(drop="true"), batch_size=6, dim=(HEIGHT, WIDTH)
)
)
)
visualize_depth_map(test_loader, test=True, model=model)
おわりに
今回はKeras公式チュートリアルに沿って、そのまま進めてみました。
チュートリアルにはColabのボタンがあり、そこから実行環境がすぐにできます。
深度推定を活用することで、自動運転技術や自動運搬におけるロボットシステム、家庭用ロボットなど製造業において応用範囲がありそうです。
参考記事
【新プラン開始!】
6ヶ月チャットによる質問し放題で動画コンテンツでAI企画立案、Pythonプログラミング、Webスクレイピング、機械学習、深層学習、Web開発、SQLなどなどAIエンジニア、データアナリストに必要なスキルをパッケージ化したプランもかなりお得にご利用頂けるので是非 新プラン(チャットサポート+動画プラン)をご活用してみてください!