ファイナンス機械学習: 標本の重み付け 逐次ブーストラップ モンテカルロ法
独自性の低い標本サンプルから、標準な方法でブーストラップ抽出を行うサンプリングと、冗長医をコントロールするように確立を変えていく逐次ブーストラップサンプリングの比較をモンテカルロ法で行う。
ランダムなt1行列を作成するコードの実装はスニペット4.7で与えられている。
def getRndT1(numObs, numBars, maxH):
t1 = pd.Series(dtype='float64')
for i in range(numObs):
idx = np.random.randint(0, numBars)
val = idx + np.random.randint(1, maxH)
t1.loc[idx] = val
return t1.sort_index()
これで得られたt1行列に対し、インプライドインディケータ行列indMを導出する。
サンプル数だけ、重複を許すブーストラップ抽出を、ランダムに行う標準方法と、indMを用いて確率を計算しながら取り出す逐次型を行い、双方の抽出サンプルに対し、平均独自性$${\displaystyle{\frac{\sum^{I}_{i=1}\bar{u}_i}{I}}}$$を計算し、1000,000回分の分布状態を観察する。
演算回数が多いので、並列化が必須であり、並列化はMultiProcessを使い、以下のように行なった。
#!/usr/bin/env python3
import time
import numpy as np
import sys
import pandas as pd
import concurrent.futures
import psutil
import os
import multiprocessing as mp
def CoreNum():
nthreads = psutil.cpu_count(logical=True)
ncores = psutil.cpu_count(logical=False)
nthreads_per_core = nthreads // ncores
nthreads_available = len(os.sched_getaffinity(0))
ncores_available = nthreads_available // nthreads_per_core
print(f'{nthreads=}')
print(f'{ncores=}')
print(f'{nthreads_per_core=}')
print(f'{nthreads_available=}')
print(f'{ncores_available=}')
return ncores_available
def getRndT1(numObs, numBars, maxH):
t1 = pd.Series(dtype='float64')
for i in range(numObs):
idx = np.random.randint(0, numBars)
val = idx + np.random.randint(1, maxH)
t1.loc[idx] = val
return t1.sort_index()
def getAvgUniqueness(indM):
# Average uniqueness from indicator matrix
c=indM.sum(axis=1) # concurrency
u=indM.div(c,axis=0) # uniqueness
avgU=u[u>0].mean() # avg. uniqueness
return avgU
def seqBootstrap(indM,sLength=None):
# Generate a sample via sequential bootstrap
if sLength is None:sLength=indM.shape[1]
phi=[]
while len(phi)<sLength:
avgU=pd.Series(dtype='float64')
for i in indM:
indM_=indM[phi+[i]] # reduce indM
avgU.loc[i]=getAvgUniqueness(indM_).iloc[-1]
prob=avgU/avgU.sum() # draw prob
phi+=[np.random.choice(indM.columns,p=prob)]
return phi
def getIndMatrix(barIx,t1):
# Get Indicator matrix
indM=(pd.DataFrame(0,index=barIx,columns=range(t1.shape[0])))
for i,(t0,t1) in enumerate(t1.items()):indM.loc[t0:t1,i]=1.
return indM
def auxMC(numObs, numBars, maxH):
t1 = getRndT1(numObs, numBars, maxH)
barIdx = range(t1.max() + 1)
indM = getIndMatrix(barIdx, t1)
phi = np.random.choice(indM.columns, size=indM.shape[1])
stdU = getAvgUniqueness(indM[phi]).mean()
phi = seqBootstrap(indM)
seqU = getAvgUniqueness(indM[phi]).mean()
return {'stdU': stdU, 'seqU': seqU}
def MC(numObs, numBars, maxH, numIters):
out = pd.DataFrame()
for i in range(numIters):
out = pd.concat((out, pd.DataFrame([auxMC(numObs, numBars, maxH)])))
return out
def main():
numC = CoreNum()
numObs=10
numBars=100
maxH=5
numIters=1000000
Iters=int(numIters/numC)
items = ((numObs,numBars,maxH,Iters) for i in range(numC))
results = pd.DataFrame()
startTime = time.time()
with concurrent.futures.ProcessPoolExecutor(max_workers=numC) as executor:
futures=[executor.submit(MC, numObs,numBars,maxH,Iters) for _ in range(numC)]
concurrent.futures.wait(futures)
for future in futures:
result=future.result()
results=pd.concat([results,result], ignore_index = True)
np.savetxt(r'BstMC.txt', results, fmt='%f')
endTime = time.time()
runTime = endTime - startTime
print (f'Time:{runTime}[sec]')
if __name__ == '__main__':
main()
ここで得られたデータから、ヒストグラムを出し、分布を比較する。
MCData = np.loadtxt("BstMC.txt", dtype='float64')
print('Mean of Std Boost:',MCData[:,0].mean())
print('Mean of Seq Boost:',MCData[:,1].mean())
plt.figure(figsize=(10,6))
plt.hist(MCData[:,0], bins=20, density=True, alpha=0.4,label='Std Boost')
plt.hist(MCData[:,1], bins=20, density=True, alpha=0.4,label='Seq Boost')
plt.xlabel('AvgUniq')
plt.legend()
plt.show
結果は以下のようになった。
逐次ブースティング方の平均独自性の分布も右により、期待値も大きくなっていることから、IIDに近い分布となっていることがわかる。