Python 並列化 Thred vs Process
pythonの並列化は、マルチスレッドとマルチプロセスの二つで行う。
マルチスレッドは同一CPUでメモリを共有し、マルチプロセスはそれぞれのCPUでメモリは共有されない。スレッドでは、メモリの書き込みや読み込みでタイミングの処理が重要になってくる。
このことから、pythonでは、グローバルインタプリタロック(GIL)によって、1プロセッサ当り1スレッドの実行に制限される。計算処理の多いプログラムにマルチスレッドを設定しても、予想通りにパフォーマンスが上がらないことがあるのはこのためである。コードを走らせる前に、スレッド数とコア数を確認した方が良い。
GILを回避するためには、mpi4pyを使うか、Cで書かれている高速ライブラリのNumpy, Scipyを使用する関数内でGILの効かない並列化を行うのが効率が良い。
ファイルの読み書きのI/Oに時間がかかっているプログラムに対しては、GILの影響はそれほど出ないので、マルチスレッドに適している。
スレッド数とコア数を確かめるコードは以下の通り。
import psutil
import os
import multiprocessing as mp
def ThreadNum():
nthreads = psutil.cpu_count(logical=True)
ncores = psutil.cpu_count(logical=False)
nthreads_per_core = nthreads // ncores
try:
nthread_available= len(os.sched_getaffinity(0))
except AttributeError:
nthread_available=mp.cpu_count()
ncores_available = nthreads_available // nthreads_per_core
print(f'{nthreads=}')
print(f'{ncores=}')
print(f'{nthreads_per_core=}')
print(f'{nthreads_available=}')
print(f'{ncores_available=}')
大型計算機システムによっても違うが、これでスレッドを指定して走らせれば、スレッドを使った並列化ができる。
以下に、ランダム列を並列で作成し、メインで繋げるコードを例として置いておく。
#!/usr/bin/env python3
import time
import numpy as np
import sys
import pandas as pd
import concurrent.futures
import psutil
import os
import multiprocessing as mp
def ThreadNum():
nthreads = psutil.cpu_count(logical=True)
ncores = psutil.cpu_count(logical=False)
nthreads_per_core = nthreads // ncores
try:
nthreads_available= len(os.sched_getaffinity(0))
except AttributeError:
nthreads_available=mp.cpu_count()
ncores_available = nthreads_available // nthreads_per_core
print(f'{nthreads=}')
print(f'{ncores=}')
print(f'{nthreads_per_core=}')
print(f'{nthreads_available=}')
print(f'{ncores_available=}')
return nthreads_available
def MakeArg(numT,Num):
subNum=Num/numT
matIndS=[]
matIndE=[]
for i in range(numT):
matIndS.append(int(0+i*subNum))
matIndE.append(min(int((i+1)*subNum),Num))
return [matIndS,matIndE]
def CreatT1(indS,indE):
t=pd.Series(dtype='float64')
for i in range (indS, indE):
val=np.random.randint(1,20)
t.loc[i]=val
return t
def main():
Num=100000
numT = ThreadNum()
args=MakeArg(numT,Num)
t1=pd.Series(dtype='float64')
startTime = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=numT) as executor:
result=executor.map(CreatT1,*args)
t1=pd.concat(result, axis=0)
endTime = time.time()
runTime = endTime - startTime
if __name__ == '__main__':
main()
同じコードをMulti Processで実行するためには、以下のように変えれば良い。
def CoreNum():
nthreads = psutil.cpu_count(logical=True)
ncores = psutil.cpu_count(logical=False)
nthreads_per_core = nthreads // ncores
try:
nthreads_available= len(os.sched_getaffinity(0))
except AttributeError:
nthreads_available=mp.cpu_count()
ncores_available = nthreads_available // nthreads_per_core
print(f'{nthreads=}')
print(f'{ncores=}')
print(f'{nthreads_per_core=}')
print(f'{nthreads_available=}')
print(f'{ncores_available=}')
return ncores_available
def main():
Num=100000
numC = CoreNum()
args=MakeArg(numC,Num)
t1=pd.Series(dtype='float64')
startTime = time.time()
with concurrent.futures.ProcessPoolExecutor(max_workers=numC) as executor:
result=executor.map(CreatT1,*args)
t1=pd.concat(result, axis=0)
endTime = time.time()
runTime = endTime - startTime
print (f'Time:{runTime}[sec]')
print(Num, numC, len(t1))
スレッド数を変えて実行時間を計測したものと、同じシステムで、コア数を変えて実行時間計測の比較をしてみたのが以下のグラフである。

スレッドでの実行では、CPU使用率は50%であったのが、Coreを使用した結果、97%までに上昇した。
大規模計算を行う前に、使用するシステム上で並列化効率を測っておくのは重要である。