エンジニア実践的基礎: グラフ探索
背景
このシリーズでは、エンジニアとして7年目を迎える節目に、これまで蓄積した知識を整理し、共有します。すべての記事を読めば、ミドルエンジニアとしてのスキルを身につけることができます。コンピュータサイエンスの理論よりも、実践的なソフトウェアエンジニアリングの知識に焦点を当てています。コーディングインタビューにも役立つ内容です。データ構造とアルゴリズムから設計、要件定義、バージョン管理、アジャイルなチーム開発、データベース、ネットワークなど、幅広いテーマを扱います。要するに、仕事に困らないレベルの知識を網羅します。
グラフ探索とは
グラフの探索は木の探索同様に深さ優先探索(DFS)と幅優先探索(BFS)を使います。グラフには格子グラフ・隣接行列・隣接リストの3つの表現方法がありましたが、グラフ探索の実装は表現方法別に異なります。今回は代表的な格子グラフと隣接リストを使い、グラフ探索について説明します。
格子グラフ DFS
格子グラフは 0 と 1 で埋められた二次元配列です。格子グラフの DFS は木の DFS と似ていますがエッジケースに注意する必要があります。
二次元配列の境界を越えるケース
訪問済みのマスを再訪問して無限ループになるケース
格子グラフの探索では二次元のマスを移動するイメージです。上下左右に移動できるため、移動方向によっては境界を超えてしまいます。例えば、右に移動し続けると境界を超えます。
木は非巡回グラフなので探索を続ければ必ずリーフに辿り着きます。一方でグラフは巡回する場合があり、訪問済みのマスを再訪問すると無限ループになります。
境界のエッジケースは配列のインデックスをチェックすれば対応できます。無限ループは訪問したマスを記録することで防げます。マスの記録にはセットを使います。セットは O(1) で検索ができるため配列に比べて高速で訪問済かをチェックできます。
次の格子グラフを例に説明します。左上から右下のゴールに移動したいです。1 は岩で進めず、0 は進むことができます。ゴールに辿り着けるパスがあか確かめましょう。
grid = [[0, 0, 0, 0],
[1, 1, 0, 0],
[0, 0, 0, 1],
[0, 1, 0, 0]]
def dfs(grid, r, c, visit):
ROWS, COLS = len(grid), len(grid[0])
# 境界チェック
if (min(r, c) < 0 or
r == ROWS or c == COLS):
return False
# 岩チェック
if (grid[r][c] == 1):
return False
# 訪問済みチェック
if ((r, c) in visit):
return False
# ゴールチェック
if r == ROWS - 1 and c == COLS - 1:
return True
# 訪問済みに追加
visit.add((r, c))
# 4方向に進む
if (dfs(grid, r + 1, c, visit)):
return True
if (dfs(grid, r - 1, c, visit)):
return True
if (dfs(grid, r, c + 1, visit)):
return True
if (dfs(grid, r, c - 1, visit)):
return True
# 訪問済みから削除
visit.remove((r, c))
return False
print(dfs(grid, 0, 0, set())) # True
全てのマス(m行×n列)で上下左右の四方向に移動できるため、O(4^(m×n)) の時間計算量です。空間計算量は全てのますの分だけ必要なので O(m×n)です。
格子グラフ BFS
格子グラフの BFS も木と非常に似ていますが、DFS 同様に「境界超え」と「無限ループ」のエッジケースに対処する必要があります。
DFS と同じマスの例を使い、ゴールまでの最短ステップ数を数えましょう。格子グラフの BFS も木の BFS 同様にキューを使い階層ごとに移動します。
from collections import deque
grid = [[0, 0, 0, 0],
[1, 1, 0, 0],
[0, 0, 0, 1],
[0, 1, 0, 0]]
def bfs(grid):
ROWS, COLS = len(grid), len(grid[0])
visit = set()
queue = deque()
# スタート地点をキューに追加
queue.append((0, 0))
# スタート地点を訪問済みにする
visit.add((0, 0))
# 最短距離
length = 0
while queue:
# 階層ごとに処理
for i in range(len(queue)):
r, c = queue.popleft()
# ゴール地点に到達したら終了
if r == ROWS - 1 and c == COLS - 1:
return length
# 4方向に移動
neighbors = [[0, 1], [0, -1], [1, 0], [-1, 0]]
for dr, dc in neighbors:
# 境界チェック
if (min(r + dr, c + dc) < 0 or
r + dr == ROWS or c + dc == COLS):
continue
# 訪問済みチェック
if ((r + dr, c + dc) in visit):
continue
# 岩チェック
if (grid[r + dr][c + dc] == 1):
continue
# 次の地点をキューに追加
queue.append((r + dr, c + dc))
# 次の地点を訪問済みにする
visit.add((r + dr, c + dc))
# 階層を1つ進める
length += 1
print(bfs(grid)) # 6
全てのマスを1回訪問するため時間計算量は O(m×n)です。空間計算量も全てのマス分だけ必要なので O(m×n) です。
隣接リスト DFS
隣接リストの DFS も格子グラフと考え方は同じです。隣接リスト特有の、ノードを使ったグラフ表現を DFS に適用する方法に慣れるだけです。実際はこちらの方が直感的だと思います。
上の画像のようにノード1からスタートしてノード5へ DFS で到達するパスが存在するかをチェックするコードです。ループで取得した隣接ノードを再帰処理しています。
def dfs(node, target, adjList, visit):
# 訪問済チェック
if node in visit:
return False
# ゴール
if node == target:
return True
# 訪問済に追加
visit.add(node)
for neighbor in adjList[node]:
if (dfs(neighbor, target, adjList, visit)):
return True
# これ以上進めないのでバックトラック
visit.remove(node)
return False
class GraphNode:
def __init__(self, val):
self.val = val
self.neighbors = []
# ノード間のエッジ
edges = [["1", "2"], ["2", "3"], ["2", "4"], ["3", "4"], ["4", "5"]]
# 隣接リストを構築
adjList = {}
for src, dst in edges:
if src not in adjList:
adjList[src] = []
if dst not in adjList:
adjList[dst] = []
adjList[src].append(dst)
visit = set()
print(dfs("1", "5", adjList, visit)) # True
格子グラフの DFS の時間計算量は O(4^(m×n)) でした。これは全てのノードが4方向に移動できるからです。隣接リストは最悪のケースで全てのノードがお互いに接続されています。つまり移動方向はノードの数だけあります。ノードの数を V とすると、O(V^V) が隣接ノードの時間計算量です。
from collections import deque
def bfs(node, target, adjList):
length = 0
queue = deque()
visit = set()
# スタート地点をキューに追加
queue.append(node)
# スタート地点を訪問済みにする
visit.add(node)
while queue:
# 階層ごとに処理
for i in range(len(queue)):
curr = queue.popleft()
# ゴール地点に到達したら終了
if curr == target:
return length
# 隣接ノードをキューに追加
for neighbor in adjList[curr]:
# 訪問済チェック
if neighbor not in visit:
visit.add(neighbor)
queue.append(neighbor)
# 階層を進める
length += 1
# ゴール地点に到達できなかった場合
return None
# GraphNode used for adjacency list
class GraphNode:
def __init__(self, val):
self.val = val
self.neighbors = []
# ノード間のエッジ
edges = [["1", "2"], ["2", "3"], ["2", "4"], ["3", "4"], ["4", "5"]]
# 隣接リストを構築
adjList = {}
for src, dst in edges:
if src not in adjList:
adjList[src] = []
if dst not in adjList:
adjList[dst] = []
adjList[src].append(dst)
print(bfs("1", "5", adjList)) # 3
隣接ノードの BFS はノードごとの処理とエッジの処理に分かれます。それぞれ1度ずつ処理されるので、時間計算量は O(V + E) になります。