Python 並列化Pool map or submit

ThreadPoolExecutorで並列化を行う場合、mapとsubmitの二つの方法がある。
mapでは、for loopなしに、多数のthreadにtaskを投げられる。故に、タスク関数の引数はlist形式で、thread毎に違う引数で同じ関数を行わせたい場合に使える。
一方、submitは非同期で実行され、taskの引数の与え方は普通の関数の引数渡しと変わりはなく、taskの進行状況のコントロールが可能である。
 ここでは簡単化のため、threadが行うtaskは、task外のデータアクセスを必要とせず、引数は不変とする。

executor.map

前記事と同じく、ランダム数列を返す並列タスクを行う。
mapは、関数の引数はリストで与えるため、引数は全スレッドで共通ではあるが、スレッド数のリストを作成する。

arg=(subNum,maxH)
args=[]
for a in arg:
     a_list=list(repeat(a,numT))
     args.append(a_list)

この引数を受ける乱数列を作るCreatT1は以下のようにする。

def CreatT1(num,maxH):
    t=pd.Series(dtype='float64')
    for i in range (num):
       val=np.random.randint(1,maxH)
       t.loc[i]=val
    return t

concurrent.futuresで並列化をする。

with concurrent.futures.ThreadPoolExecutor(max_workers=numT) as executor:
        result=executor.map(CreatT1,*args)

全体コードは、以下の通り。

import time
import numpy as np
import sys
import pandas as pd
import concurrent.futures
import psutil
import os
import multiprocessing as mp
from itertools import repeat

def ThreadNum():
    nthreads = psutil.cpu_count(logical=True)
    ncores = psutil.cpu_count(logical=False)
    nthreads_per_core = nthreads // ncores
    #nthreads_available = len(os.sched_getaffinity(0))
    nthreads_available = mp.cpu_count()

    ncores_available = nthreads_available // nthreads_per_core
    print(f'{nthreads=}')
    print(f'{ncores=}')
    print(f'{nthreads_per_core=}')
    print(f'{nthreads_available=}')
    print(f'{ncores_available=}')
    return nthreads_available

def CreatT1(num,maxH):
    t=pd.Series(dtype='float64')
    for i in range (num):
       val=np.random.randint(1,maxH)
       t.loc[i]=val
    return t

def main():
    numT = ThreadNum()
    num=10000
    subNum=int(num/numT)
    maxH=20
    
    arg=(subNum,maxH)
    args=[]
    for a in arg:
        a_list=list(repeat(a,numT))
        args.append(a_list)
    
    t1=pd.Series(dtype='float64')
    startTime = time.time()
    with concurrent.futures.ThreadPoolExecutor(max_workers=numT) as executor:
        result=executor.map(CreatT1,*args)
    t1=pd.concat(result,axis=0)
    endTime = time.time()
    runTime = endTime - startTime
    print (f'Time:{runTime}[sec]')
    print(t1.head)

if __name__ == '__main__':
    main()

これをM1コア上で実行すると以下の結果を得る。

executor map

executor.submit

非同期で実行されるsubmitは、taskへの引数の渡し方は、普通の関数と同じでリストにする必要はない。返り値はfutureで、taskの実行状況を表すrunnning(),done(),concelled()がある。これを使えば、

if future.runnning():
    #do something

のようにも使え、効率を上げられる。
taskの返り値は、future.result()で与えられる。
一方mapと違い、taskを投げるthread数のfor loopを明示しなければならない。

with concurrent.futures.ThreadPoolExecutor(max_workers=numT) as executor:
        futures=[executor.submit(CreatT1, subNum,maxH) for _ in range(numT)]

結果の返り値であるpd.Seriesを扱うのはtaskが全て終わった後で、for loopを使って一つにまとめる。

wait(futures)
for future in futures:
    result=future.result()
    t1=pd.concat([t1,result], ignore_index = True)

全体コードは以下の通りとなる。

import time
import numpy as np
import sys
import pandas as pd
import concurrent.futures
import psutil
import os
import multiprocessing as mp
from itertools import repeat
from concurrent.futures import wait

def ThreadNum():
    nthreads = psutil.cpu_count(logical=True)
    ncores = psutil.cpu_count(logical=False)
    nthreads_per_core = nthreads // ncores
    #nthreads_available = len(os.sched_getaffinity(0))
    nthreads_available = mp.cpu_count()

    ncores_available = nthreads_available // nthreads_per_core
    print(f'{nthreads=}')
    print(f'{ncores=}')
    print(f'{nthreads_per_core=}')
    print(f'{nthreads_available=}')
    print(f'{ncores_available=}')
    return nthreads_available

def CreatT1(num,maxH):
    t=pd.Series(dtype='float64')
    for i in range (num):
       val=np.random.randint(1,maxH)
       t.loc[i]=val
    return t

def main():
    numT = ThreadNum()
    num=10000
    subNum=int(num/numT)
    maxH=20
    
    t1=pd.Series(dtype='float64')
    startTime = time.time()
    with concurrent.futures.ThreadPoolExecutor(max_workers=numT) as executor:
        futures=[executor.submit(CreatT1, subNum,maxH) for _ in range(numT)]
    wait(futures)
    for future in futures:
        result=future.result()
        t1=pd.concat([t1,result], ignore_index = True)
    
    endTime = time.time()
    runTime = endTime - startTime
    print (f'Time:{runTime}[sec]')
    print(t1.head)

if __name__ == '__main__':
    main()

M1チップ上で実行した結果は以下の通りとなる。

executor submit

この記事が気に入ったらサポートをしてみませんか?