ファイナンス機械学習:ファイナンスにおける交差検証法 Purged K-foldクラスとCV Score

スニペット7.3において、sklearnのKFoldクラスを拡張し、重複する訓練データにパージングとエンバーゴを行い、リーケージが減少している交差検証を可能にする。

from sklearn.model_selection._split import _BaseKFold
class PurgedKFold(_BaseKFold):
    """
    Extend KFold class to work with labels that span intervals
    The train is purged of observations overlapping test-label intervals
    Test set is assumed contiguous (shuffle=False), w/o training samples in between
    """
    def __init__(self,n_splits=3,t1=None,pctEmbargo=0.):
        if not isinstance(t1,pd.Series):
            raise ValueError('Label Through Dates must be a pd.Series')
        super(PurgedKFold,self).__init__(n_splits,shuffle=False,random_state=None)
        self.t1=t1
        self.pctEmbargo=pctEmbargo
        
    def split(self,X,y=None,groups=None):
        if (X.index==self.t1.index).sum()!=len(self.t1):
            print(len(X.index),len(self.tx))
            raise ValueError('X and ThruDateValues must have the same index')
        indices=np.arange(X.shape[0])
        mbrg=int(X.shape[0]*self.pctEmbargo)
        test_starts=[
            (i[0],i[-1]+1) for i in np.array_split(np.arange(X.shape[0]),
                                                   self.n_splits)
        ]
        for i,j in test_starts:
            t0=self.t1.index[i] # start of test set
            test_indices=indices[i:j]
            maxT1Idx=self.t1.index.searchsorted(self.t1.iloc[test_indices].max())
            train_indices=self.t1.index.searchsorted(self.t1[self.t1<=t0].index)
            if maxT1Idx<X.shape[0]: # right train ( with embargo)
                train_indices=np.concatenate((train_indices, indices[maxT1Idx+mbrg:]))
            yield train_indices,test_indices

また、交差検証の結果のスコアを返すsklearnのcross_val_scoreにおけるバグ、
classes_sを識別しない

は、まだ解決されていないが、引数にweightが入っていないバグは修正されている。

スニペット7.4はこの二つのバグに対応した修正された交差検証のスコアを返すコードとなっている。

def cvScore(clf,X,y,sample_weight,scoring='neg_log_loss',
            t1=None,cv=None,cvGen=None,pctEmbargo=None):
    if scoring not in ['neg_log_loss','accuracy']:
        raise Exception('wrong scoring method.')
    from sklearn.metrics import log_loss,accuracy_score
    idx = pd.IndexSlice
    if cvGen is None:
        cvGen=snp.PurgedKFold(n_splits=cv,t1=t1,pctEmbargo=pctEmbargo) # purged
    score=[]
    mean_fpr = np.linspace(0, 1, 100)
    for train,test in cvGen.split(X=X):
        if sample_weight is not None:
            fit=clf.fit(X=X.iloc[idx[train],:],y=y.iloc[idx[train].values.ravel()],
                    sample_weight=sample_weight.iloc[idx[train]].values)
            if scoring=='neg_log_loss':
                prob=fit.predict_proba(X.iloc[idx[test],:])
                score_=-log_loss(y.iloc[idx[test]], prob,
                                    sample_weight=sample_weight.iloc[idx[test]].values,
                                    labels=clf.classes_)
            else:
                pred=fit.predict(X.iloc[idx[test],:])
                score_=accuracy_score(y.iloc[idx[test]],pred,
                                  sample_weight=sample_weight.iloc[idx[test]].values)
        else:
            fit=clf.fit(X=X.iloc[idx[train],:],y=y.iloc[idx[train]].values.ravel())
            if scoring=='neg_log_loss':
                prob=fit.predict_proba(X.iloc[idx[test],:])
                score_=-log_loss(y.iloc[idx[test]], prob,
                                    labels=clf.classes_)
            else:
                pred=fit.predict(X.iloc[idx[test],:])
                score_=accuracy_score(y.iloc[idx[test]],pred)   
            
        score.append(score_)
        
    return np.array(score)

いいなと思ったら応援しよう!