Python 並列化 Thred vs Process

pythonの並列化は、マルチスレッドとマルチプロセスの二つで行う。
マルチスレッドは同一CPUでメモリを共有し、マルチプロセスはそれぞれのCPUでメモリは共有されない。スレッドでは、メモリの書き込みや読み込みでタイミングの処理が重要になってくる。

 このことから、pythonでは、グローバルインタプリタロック(GIL)によって、1プロセッサ当り1スレッドの実行に制限される。計算処理の多いプログラムにマルチスレッドを設定しても、予想通りにパフォーマンスが上がらないことがあるのはこのためである。コードを走らせる前に、スレッド数とコア数を確認した方が良い。

 GILを回避するためには、mpi4pyを使うか、Cで書かれている高速ライブラリのNumpy, Scipyを使用する関数内でGILの効かない並列化を行うのが効率が良い。
 ファイルの読み書きのI/Oに時間がかかっているプログラムに対しては、GILの影響はそれほど出ないので、マルチスレッドに適している。
 スレッド数とコア数を確かめるコードは以下の通り。

import psutil
import os
import multiprocessing as mp

def ThreadNum():
    nthreads = psutil.cpu_count(logical=True)
    ncores = psutil.cpu_count(logical=False)
    nthreads_per_core = nthreads // ncores
    try:
        nthread_available= len(os.sched_getaffinity(0))
    except AttributeError:
        nthread_available=mp.cpu_count()
    
    ncores_available = nthreads_available // nthreads_per_core
    print(f'{nthreads=}')
    print(f'{ncores=}')
    print(f'{nthreads_per_core=}')
    print(f'{nthreads_available=}')
    print(f'{ncores_available=}')

大型計算機システムによっても違うが、これでスレッドを指定して走らせれば、スレッドを使った並列化ができる。
 以下に、ランダム列を並列で作成し、メインで繋げるコードを例として置いておく。

#!/usr/bin/env python3
import time
import numpy as np
import sys
import pandas as pd
import concurrent.futures
import psutil
import os
import multiprocessing as mp

def ThreadNum():
    nthreads = psutil.cpu_count(logical=True)
    ncores = psutil.cpu_count(logical=False)
    nthreads_per_core = nthreads // ncores
    try:
        nthreads_available= len(os.sched_getaffinity(0))
    except AttributeError:
        nthreads_available=mp.cpu_count()

    ncores_available = nthreads_available // nthreads_per_core
    print(f'{nthreads=}')
    print(f'{ncores=}')
    print(f'{nthreads_per_core=}')
    print(f'{nthreads_available=}')
    print(f'{ncores_available=}')
    return nthreads_available

def MakeArg(numT,Num):
    subNum=Num/numT
    matIndS=[]
    matIndE=[]
    for i in range(numT):
        matIndS.append(int(0+i*subNum))
        matIndE.append(min(int((i+1)*subNum),Num))
    return [matIndS,matIndE]


def CreatT1(indS,indE):
    t=pd.Series(dtype='float64')
    for i in range (indS, indE):
       val=np.random.randint(1,20)
       t.loc[i]=val
    return t


def main():
    Num=100000
    numT = ThreadNum()

    args=MakeArg(numT,Num)

    t1=pd.Series(dtype='float64')
    startTime = time.time()
    with concurrent.futures.ThreadPoolExecutor(max_workers=numT) as executor:
        result=executor.map(CreatT1,*args)
    t1=pd.concat(result, axis=0)
    endTime = time.time()
    runTime = endTime - startTime

if __name__ == '__main__':
    main()

 同じコードをMulti Processで実行するためには、以下のように変えれば良い。

def CoreNum():
    nthreads = psutil.cpu_count(logical=True)
    ncores = psutil.cpu_count(logical=False)
    nthreads_per_core = nthreads // ncores
    try:
        nthreads_available= len(os.sched_getaffinity(0))
    except AttributeError:
        nthreads_available=mp.cpu_count()

    ncores_available = nthreads_available // nthreads_per_core
    print(f'{nthreads=}')
    print(f'{ncores=}')
    print(f'{nthreads_per_core=}')
    print(f'{nthreads_available=}')
    print(f'{ncores_available=}')
    return ncores_available

def main():
    Num=100000
    numC = CoreNum()

    args=MakeArg(numC,Num)

    t1=pd.Series(dtype='float64')
    startTime = time.time()
    with concurrent.futures.ProcessPoolExecutor(max_workers=numC) as executor:
        result=executor.map(CreatT1,*args)
    t1=pd.concat(result, axis=0)
    endTime = time.time()
    runTime = endTime - startTime
    print (f'Time:{runTime}[sec]')
    print(Num, numC, len(t1))

 スレッド数を変えて実行時間を計測したものと、同じシステムで、コア数を変えて実行時間計測の比較をしてみたのが以下のグラフである。

Create Random Matrix (size=1000_000)

スレッドでの実行では、CPU使用率は50%であったのが、Coreを使用した結果、97%までに上昇した。
 大規模計算を行う前に、使用するシステム上で並列化効率を測っておくのは重要である。

いいなと思ったら応援しよう!