ファイナンス機械学習:特徴量の重要度 人工データによる実験

特徴量重要度の推定手段を、人工データで検証する。
人工データの特徴量は、

  • 有益な特徴量(Imformative):ラベルの決定に用いられる特徴量

  • 冗長な特徴量(Redundant):有益な特徴量をランダムに線型結合し、代替効果を生じさせる特徴量。

  • ノイズ:観測値のラベル決定に無関係な特徴量

の3種類で構成される。このデータの作成の実装はスニペット8.7で与えられる。

def getTestData(n_features=40,n_informative=10,n_redundant=10,n_samples=10_000):
    # generate a random dataset for a classification problem
    from sklearn.datasets import make_classification
    from datetime import datetime, date
    
    trnsX,cont=make_classification(n_samples=n_samples,
                                   n_features=n_features,
                                   n_informative=n_informative,
                                   n_redundant=n_redundant,
                                   random_state=0,shuffle=False)
    df0=(pd.date_range(periods=n_samples, freq=pd.tseries.offsets.BusinessHour(),
                          end=pd.to_datetime(datetime.now())))
    print(n_samples,len(df0))
    trnsX=pd.DataFrame(trnsX,index=df0)
    cont=pd.Series(cont,index=df0).to_frame('bin')
    df0=['I_'+str(i) for i in range(n_informative)]+['R_'+str(i) for i in range(n_redundant)]
    df0+=['N_'+str(i) for i in range(n_features-len(df0))]
    trnsX.columns=df0
    cont['w']=1./cont.shape[0]
    cont['t1']=pd.Series(cont.index,index=cont.index)
    return trnsX,cont

 このテストデータに、MDI、MDA、SFIの計算方法をバギング決定木とランダムフォレストを用いて適用し、重要度が有益な特徴量に高く評価できているかどうかを調べる関数をスニペット8.8で実装している。

def getFeatImportances(trnsX,cont,n_estimators=1000,cv=10,
                    max_samples=1.,numThreads=11,pctEmbargo=0,
                    scoring='accuracy',method='SFI',minWLeaf=0.,
                    RandomForest=False,**kargs):
    # feature importance from a random forest
    from sklearn.tree import DecisionTreeClassifier
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.ensemble import BaggingClassifier
    import FinCV as fcv
    import Weights as wg
    #from mpEngine import mpPandasObj
    n_jobs=(-1 if numThreads>1 else 1) # run 1 thread w/ ht_helper in dirac1
    #1) prepare classifier,cv. max_features=1, to prevent masking
    if RandomForest is False:
        clf=DecisionTreeClassifier(criterion='entropy',max_features=1,
                               class_weight='balanced',
                               min_weight_fraction_leaf=minWLeaf)
        clf=BaggingClassifier(estimator=clf,n_estimators=n_estimators,
                          max_features=1.,max_samples=max_samples,
                          oob_score=True,n_jobs=n_jobs)
        fit=clf.fit(X=trnsX,y=cont['bin'],sample_weight=cont['w'].values)
    else:
     CoEvents=wg.getNumCoEvents(trnsX.index, t1=cont['t1'], period=trnsX.index)
        avgUniq=wg.getSampleTW(t1=cont['t1'],numCoEvents=CoEvents,period=trnsX.index)
        avgU=avgUniq.mean()
        clf = RandomForestClassifier(n_estimators=n_estimators,
                                     criterion='entropy',
                                     max_features=1,
                                     class_weight="balanced_subsample",
                                     bootstrap=True,
                                     oob_score=True,
                                     n_jobs=-1,
                                     max_samples=None)
        fit=clf.fit(X=trnsX,y=cont['bin'],sample_weight=cont['w'].values)
    oob=fit.oob_score_
    y=cont['bin']
    if method=='MDI':
        imp=getFeatImpMDI(fit,featNames=trnsX.columns)
        oos=cvScore(clf,X=trnsX,y=y,cv=cv,sample_weight=cont['w'],
                    t1=cont['t1'],pctEmbargo=pctEmbargo,scoring=scoring).mean()
    elif method=='MDA':
        imp,oos=getFeatImpMDA(clf,X=trnsX,y=y,cv=cv,
                           sample_weight=cont['w'],t1=cont['t1'],
                           pctEmbargo=pctEmbargo,scoring=scoring)
    elif method=='SFI':
        cvGen=fcv.PurgedKFold(n_splits=cv,t1=cont['t1'],pctEmbargo=pctEmbargo)
        oos=cvScore(clf,X=trnsX,y=y,sample_weight=cont['w'],
                    scoring=scoring,cvGen=cvGen).mean()
        imp = getAuxFeatImpSFI(featNames=trnsX.columns, clf=clf, trnsX=trnsX, cont=cont,
                               scoring=scoring, cvGen=cvGen)
    return imp,oob,oos

テストデータを作成し、必要に応じてPCA変換してから、重要度を推定し、結果を可視化するテスト関数は、重要度をプロットする関数とともに、スニペット8.9と8.10で実装されている。

def testFunc(n_features=40,n_informative=10,n_redundant=10,n_estimators=1000,
             n_samples=10000,cv=10,testlabel='testFunc',PCA=False,PCAthreads=0.95):
    # test the performance of the feat importance functions on artificial data
    # Nr noise features = n_featurs-n_informative-n_redundant
    from itertools import product
    import pathlib
    X,cont=getTestData(n_features,n_informative,n_redundant,n_samples)
    testlabel=testlabel
    if PCA is False:
        trnsX=X
    else:
        #trnsX=getOrthoFeats(X,varThres=PCAthreads).add_prefix("PCA_")
        trnsX=getOrthoFeats(X,varThres=PCAthreads)
        trnsX.index=X.index
        testlabel+='PCA'

    dict0={'minWLeaf':[0.],'scoring':['accuracy'],'method':['MDI','MDA','SFI'],
           'max_samples':[1.]}
    jobs,out=(dict(zip(dict0,i))for i in product(*dict0.values())),[]
    pdir=pathlib.Path()
    kargs={'pathOut':pathlib.PurePosixPath(pdir/testlabel),
           'n_estimators':n_estimators,'tag':testlabel,'cv':cv}

    for job in jobs:
        job['simNum']=job['method']+'_'+job['scoring']+'_'+'%.2f'%job['minWLeaf']+\
        '_'+str(job['max_samples'])
        kargs.update(job)
        imp,oob,oos=getFeatImportances(trnsX=trnsX,cont=cont,**kargs)
        plotFeatImportance(imp=imp,oob=oob,oos=oos,**kargs)
        df0=imp[['mean']]/imp['mean'].abs().sum()
        df0['type']=[i[0] for i in df0.index]
        df0=df0.groupby('type')['mean'].sum().to_dict()
        df0.update({'oob':oob,'oos':oos});df0.update(job)
    out.append(df0)
    out=(pd.DataFrame(out).sort_values(['method','scoring','minWLeaf','max_samples']))
    if PCA is False:
         out=out[['method','scoring','minWLeaf','max_samples','I','R','N','oob','oos']]
    else:
         out=out[['method','scoring','minWLeaf','max_samples','O','oob','oos']]
    outfile=str(kargs['pathOut'])+'stats.csv'
    out.to_csv(outfile)
    return

def plotFeatImportance(pathOut,imp,oob,oos,method,tag=0,simNum=0,**kargs):
    import matplotlib.pyplot as plt
    # plot mean imp bars with std
    plt.ioff()
    plt.figure(figsize=(10,imp.shape[0]/5.))
    imp=imp.sort_values('mean',ascending=True)
    ax=imp['mean'].plot(kind='barh',color='b',alpha=0.25,xerr=imp['std'],
                        error_kw={'ecolor':'r'})
    if method=='MDI':
        plt.xlim([0,imp.sum(axis=1).max()])
        plt.axvline(1./imp.shape[0],lw=1.,color='r',ls='dotted')
    ax.get_yaxis().set_visible(False)
    for i,j in zip(ax.patches,imp.index):
        ax.text(i.get_width()/2, i.get_y()+i.get_height()/2,
                j,ha='center',va='center',color='k')
    simNumStr=str(simNum)
    plt.title('tag='+tag+' | simNum='+simNumStr+' | oob='+str(round(oob,4))+' | oos='+str(round(oos,4)))
    savefile=str(pathOut)+'featImportance_'+simNumStr+'.png'
    plt.savefig(savefile, dpi=100)
    plt.clf()
    plt.close()

matplotlib.pyplotのioffで、計算中のグラフは出力せず、pngファイルで保存される。
 これらによるデフォルト(有益特徴量=10,冗長特徴量=10,全特徴量=40,n_estimators=1000, n_samples=10000,cv=10)の結果は以下の通り。

MDI
MDA
SFI

いいなと思ったら応援しよう!