ファイナンス機械学習:ファイナンスにおける交差検証法 Purged K-foldクラスとCV Score
スニペット7.3において、sklearnのKFoldクラスを拡張し、重複する訓練データにパージングとエンバーゴを行い、リーケージが減少している交差検証を可能にする。
from sklearn.model_selection._split import _BaseKFold
class PurgedKFold(_BaseKFold):
"""
Extend KFold class to work with labels that span intervals
The train is purged of observations overlapping test-label intervals
Test set is assumed contiguous (shuffle=False), w/o training samples in between
"""
def __init__(self,n_splits=3,t1=None,pctEmbargo=0.):
if not isinstance(t1,pd.Series):
raise ValueError('Label Through Dates must be a pd.Series')
super(PurgedKFold,self).__init__(n_splits,shuffle=False,random_state=None)
self.t1=t1
self.pctEmbargo=pctEmbargo
def split(self,X,y=None,groups=None):
if (X.index==self.t1.index).sum()!=len(self.t1):
print(len(X.index),len(self.tx))
raise ValueError('X and ThruDateValues must have the same index')
indices=np.arange(X.shape[0])
mbrg=int(X.shape[0]*self.pctEmbargo)
test_starts=[
(i[0],i[-1]+1) for i in np.array_split(np.arange(X.shape[0]),
self.n_splits)
]
for i,j in test_starts:
t0=self.t1.index[i] # start of test set
test_indices=indices[i:j]
maxT1Idx=self.t1.index.searchsorted(self.t1.iloc[test_indices].max())
train_indices=self.t1.index.searchsorted(self.t1[self.t1<=t0].index)
if maxT1Idx<X.shape[0]: # right train ( with embargo)
train_indices=np.concatenate((train_indices, indices[maxT1Idx+mbrg:]))
yield train_indices,test_indices
また、交差検証の結果のスコアを返すsklearnのcross_val_scoreにおけるバグ、
classes_sを識別しない
は、まだ解決されていないが、引数にweightが入っていないバグは修正されている。
スニペット7.4はこの二つのバグに対応した修正された交差検証のスコアを返すコードとなっている。
def cvScore(clf,X,y,sample_weight,scoring='neg_log_loss',
t1=None,cv=None,cvGen=None,pctEmbargo=None):
if scoring not in ['neg_log_loss','accuracy']:
raise Exception('wrong scoring method.')
from sklearn.metrics import log_loss,accuracy_score
idx = pd.IndexSlice
if cvGen is None:
cvGen=snp.PurgedKFold(n_splits=cv,t1=t1,pctEmbargo=pctEmbargo) # purged
score=[]
mean_fpr = np.linspace(0, 1, 100)
for train,test in cvGen.split(X=X):
if sample_weight is not None:
fit=clf.fit(X=X.iloc[idx[train],:],y=y.iloc[idx[train].values.ravel()],
sample_weight=sample_weight.iloc[idx[train]].values)
if scoring=='neg_log_loss':
prob=fit.predict_proba(X.iloc[idx[test],:])
score_=-log_loss(y.iloc[idx[test]], prob,
sample_weight=sample_weight.iloc[idx[test]].values,
labels=clf.classes_)
else:
pred=fit.predict(X.iloc[idx[test],:])
score_=accuracy_score(y.iloc[idx[test]],pred,
sample_weight=sample_weight.iloc[idx[test]].values)
else:
fit=clf.fit(X=X.iloc[idx[train],:],y=y.iloc[idx[train]].values.ravel())
if scoring=='neg_log_loss':
prob=fit.predict_proba(X.iloc[idx[test],:])
score_=-log_loss(y.iloc[idx[test]], prob,
labels=clf.classes_)
else:
pred=fit.predict(X.iloc[idx[test],:])
score_=accuracy_score(y.iloc[idx[test]],pred)
score.append(score_)
return np.array(score)