アセットマネージャーのためのファイナンス機械学習:ポートフォリオの構築 練習問題 CLA
minVarPort関数を、上限と下限の制約付きの最適化ができるCLAに置き換えて、NCO有りと無しでモンテカルトシミュレーションを行う。
上記のCLA.pyは細かなバグがあるので、修正した最終版が以下のコードになる。
#!/usr/bin/env python
#
# I (Martin Dengler) have been given the OK to upload this code to
# github, as long as I indicate:
#
# 1. That David H. Bailey and Marcos Lopez de Prado are the original
# authors.
#
# 2. That this code is provided under a GPL license for
# non-commercial purposes.
#
# 3. That the original authors retain the rights as it relates to
# commercial applications.
#
# The accompanying paper was published in an open-access application:
# http://ssrn.com/abstract=2197616
#
# The original code is here:
#
# http://www.quantresearch.info/CLA.py.txt
# http://www.quantresearch.info/CLA_Main.py.txt
#
# A sample dataset can be found here:
# http://www.quantresearch.info/CLA_Data.csv.txt
#
#
# On 20130210, v0.2
# Critical Line Algorithm
# by MLdP <lopezdeprado@lbl.gov>
#
# From: http://papers.ssrn.com/sol3/papers.cfm?abstract_id=2197616
#
# "All code in this paper is provided 'as is', and contributed to the
# academic community for non-business purposes only, under a GNU-GPL
# license"
#
import numpy as np
#---------------------------------------------------------------
#---------------------------------------------------------------
class CLA:
def __init__(self,mean,covar,lB=None,uB=None):
# Initialize the class
self.mean=mean
self.covar=covar
if(lB is None):
self.lB=np.full_like(mean,-1.)
else: self.lB=lB
if(uB is None):
self.uB=np.ones_like(mean)
else: self.uB=uB
self.w=[] # solution
self.l=[] # lambdas
self.g=[] # gammas
self.f=[] # free weights
#---------------------------------------------------------------
def solve(self):
# Compute the turning points,free sets and weights
f,w=self.initAlgo()
self.w.append(np.copy(w)) # store solution
self.l.append(None)
self.g.append(None)
self.f.append(f[:])
while True:
#1) case a): Bound one free weight
l_in=None
if len(f)>1:
covarF,covarFB,meanF,wB=self.getMatrices(f)
covarF_inv=np.linalg.inv(covarF)
j=0
for i in f:
l,bi=self.computeLambda(covarF_inv,covarFB,meanF,wB,j,[self.lB[i],self.uB[i]])
if l_in==None :l_in,i_in,bi_in=l,i,bi
elif l>l_in:l_in,i_in,bi_in=l,i,bi
j+=1
#2) case b): Free one bounded weight
l_out=None
if len(f)<self.mean.shape[0]:
b=self.getB(f)
for i in b:
covarF,covarFB,meanF,wB=self.getMatrices(f+[i])
covarF_inv=np.linalg.inv(covarF)
l,bi=self.computeLambda(covarF_inv,covarFB,meanF,wB,meanF.shape[0]-1,self.w[-1][i])
if (self.l[-1]==None or l<self.l[-1]):
if(l_out==None): l_out,i_out = l,i
elif l>l_out: l_out,i_out=l,i
if (l_in==None or l_in<0) and (l_out==None or l_out<0):
#3) compute minimum variance solution
self.l.append(0)
covarF,covarFB,meanF,wB=self.getMatrices(f)
meanF=np.zeros(meanF.shape)
else:
#4) decide lambda
if (l_in is not None):
if (l_out==None) or (l_in > l_out):
self.l.append(l_in)
f.remove(i_in)
covarF,covarFB,meanF,wB=self.getMatrices(f)
w[i_in]=bi_in # set value at the correct boundary
b=self.getB(f)
wB[b.index(i_in)]=bi_in
else:
self.l.append(l_out)
f.append(i_out)
covarF,covarFB,meanF,wB=self.getMatrices(f)
else:
self.l.append(l_out)
f.append(i_out)
covarF,covarFB,meanF,wB=self.getMatrices(f)
covarF_inv=np.linalg.inv(covarF)
#5) compute solution vector
wF,g=self.computeW(covarF_inv,covarFB,meanF,wB)
for i in range(len(f)):w[f[i]]=wF[i]
self.w.append(np.copy(w)) # store solution
self.g.append(g)
self.f.append(f[:])
if self.l[-1]==0:break
#6) Purge turning points
self.purgeNumErr(10e-10)
self.purgeExcess()
#---------------------------------------------------------------
def initAlgo(self):
# Initialize the algo
#1) Form structured array
a=np.zeros((self.mean.shape[0]),dtype=[('id',int),('mu',float)])
b=[self.mean[i][0] for i in range(self.mean.shape[0])] # dump array into list
a[:]=list(zip(range(self.mean.shape[0]),b)) # fill structured array
#2) Sort structured array
b=np.sort(a,order='mu')
#3) First free weight
i,w=b.shape[0],np.copy(self.lB)
while sum(w)<1:
i-=1
w[b[i][0]]=self.uB[b[i][0]]
w[b[i][0]]+=1-sum(w)
return [b[i][0]],w
#---------------------------------------------------------------
def computeBi(self,c,bi):
if c>0:
bi=bi[1][0]
if c<0:
bi=bi[0][0]
return bi
#---------------------------------------------------------------
def computeW(self,covarF_inv,covarFB,meanF,wB):
#1) compute gamma
onesF=np.ones(meanF.shape)
g1=np.dot(np.dot(onesF.T,covarF_inv),meanF)
g2=np.dot(np.dot(onesF.T,covarF_inv),onesF)
if wB is None:
g,w1=float(-self.l[-1]*g1/g2+1/g2),0
else:
onesB=np.ones(wB.shape)
g3=np.dot(onesB.T,wB)
g4=np.dot(covarF_inv,covarFB)
w1=np.dot(g4,wB)
g4=np.dot(onesF.T,w1)
g=float(-self.l[-1]*g1/g2+(1-g3+g4)/g2)
#2) compute weights
w2=np.dot(covarF_inv,onesF)
w3=np.dot(covarF_inv,meanF)
return -w1+g*w2+self.l[-1]*w3,g
#---------------------------------------------------------------
def computeLambda(self,covarF_inv,covarFB,meanF,wB,i,bi):
#1) C
onesF=np.ones(meanF.shape)
c1=np.dot(np.dot(onesF.T,covarF_inv),onesF)
c2=np.dot(covarF_inv,meanF)
c3=np.dot(np.dot(onesF.T,covarF_inv),meanF)
c4=np.dot(covarF_inv,onesF)
c=-c1*c2[i]+c3*c4[i]
if c==0:return
#2) bi
if type(bi)==list:bi=self.computeBi(c,bi)
#3) Lambda
if wB is None:
# All free assets
return float((c4[i]-c1*bi)/c),bi
else:
onesB=np.ones(wB.shape)
l1=np.dot(onesB.T,wB)
l2=np.dot(covarF_inv,covarFB)
l3=np.dot(l2,wB)
l2=np.dot(onesF.T,l3)
return float(((1-l1+l2)*c4[i]-c1*(bi+l3[i]))/c),bi
#---------------------------------------------------------------
def getMatrices(self,f):
# Slice covarF,covarFB,covarB,meanF,meanB,wF,wB
covarF=self.reduceMatrix(self.covar,f,f)
meanF=self.reduceMatrix(self.mean,f,[0])
b=self.getB(f)
covarFB=self.reduceMatrix(self.covar,f,b)
wB=self.reduceMatrix(self.w[-1],b,[0])
return covarF,covarFB,meanF,wB
#---------------------------------------------------------------
def getB(self,f):
return self.diffLists(range(self.mean.shape[0]),f)
#---------------------------------------------------------------
def diffLists(self,list1,list2):
return list(set(list1)-set(list2))
#---------------------------------------------------------------
def reduceMatrix(self,matrix,listX,listY):
# Reduce a matrix to the provided list of rows and columns
if len(listX)==0 or len(listY)==0:return
matrix_=matrix[:,listY[0]:listY[0]+1]
for i in listY[1:]:
a=matrix[:,i:i+1]
matrix_=np.append(matrix_,a,1)
matrix__=matrix_[listX[0]:listX[0]+1,:]
for i in listX[1:]:
a=matrix_[i:i+1,:]
matrix__=np.append(matrix__,a,0)
return matrix__
#---------------------------------------------------------------
def purgeNumErr(self,tol):
# Purge violations of inequality constraints (associated with ill-conditioned covar matrix)
i=0
while True:
if i==len(self.w):break
w=self.w[i]
for j in range(w.shape[0]):
if w[j]-self.lB[j]<-tol or w[j]-self.uB[j]>tol:
del self.w[i]
del self.l[i]
del self.g[i]
del self.f[i]
break
i+=1
#---------------------------------------------------------------
NCOからこのCLAを呼ぶコードは以下の通りになる。上限は全ての資産で$${+1.}$$、下限を$${-1.}$$とし、空売りを許した。
import numpy as np
import pandas as pd
import MarPat as MP
import ONC as NC
def CLA_nco(cov, mu, lB=None, uB=None, maxNumClusters=None,minVarPortf=True):
import CLA
cov = pd.DataFrame(cov)
mu = pd.Series(mu[:,0])
corr1 = MP.cov2corr(cov)
corr1, clstrs, _ = NC.clusterKMeansBase(corr1, maxNumClusters, n_init=10)
wIntra= pd.DataFrame(0, index=cov.index, columns=clstrs.keys(),dtype='float64')
lB_=None
uB_=None
for i in clstrs:
cov_ = cov.loc[clstrs[i], clstrs[i]].values
mu_ = mu.loc[clstrs[i]].values.reshape(-1,1)
if(lB is not None): lB_ = lB.loc[clstrs[i]].values.reshape(-1,1)
if(uB is not None): uB_ = uB.loc[clstrs[i]].values.reshape(-1,1)
cla=CLA.CLA(mu_,cov_,lB_,uB_)
cla.solve()
if(minVarPortf==True):
wIntra.loc[clstrs[i], i] = cla.w[-1].flatten()
else :
max_sr, max_w_sr = cla.getMaxSR()
wIntra.loc[clstrs[i], i] = max_w_sr.flatten()
cov_= wIntra.T.dot(np.dot(cov, wIntra))
mu_ = wIntra.T.dot(mu)
mu_ = mu_.values.reshape(-1,1)
if(lB is not None):
lB_ = wIntra.T.dot(lB)
lB_ = lB_.values.reshape(-1,1)
if(uB is not None):
uB_ = wIntra.T.dot(uB)
uB_ = uB_.values.reshape(-1,1)
cla=CLA.CLA(mu_,cov_.values,lB_,uB_)
cla.solve()
if(minVarPortf==True):
wInter= pd.Series(cla.w[-1].flatten(), index=cov_.index)
else:
max_sr, max_w_sr = cla.getMaxSR()
wInter= pd.Series(max_w_sr.flatten(), index=cov_.index)
nco = wIntra.mul(wInter, axis=1).sum(axis=1).values.reshape(-1,1)
return nco
def CLAExp(mu0,cov0,lB=None, uB=None,
nObs=1000, nSims=1000 , minVarPortf = True,shrink=False):
w1 = pd.DataFrame(0, index=range(0, nSims),
columns=range(0, cov0.shape[0]),dtype='float64')
w1_d = pd.DataFrame(0, index=range(0, nSims),
columns=range(0, cov0.shape[0]),dtype='float64')
for i in range(0, nSims):
mu1, cov1 = MP.simCovMu(mu0, cov0, nObs, shrink=shrink)
cla=CLA.CLA(mu1,cov1,lB,uB)
cla.solve()
if(minVarPortf == True):
w1.loc[i] = cla.w[-1].flatten()
else:
max_sr, max_w_sr = cla.getMaxSR()
w1.loc[i] = max_w_sr.flatten()
w1_d.loc[i] = CLA_nco(cov1, mu1, lB, uB,int(cov1.shape[0]/2),minVarPortf).flatten()
return w1, w1_d
def main():
nBlocks, bSize, bCorr = 10, 5, .5
q=10.0
np.random.seed(0)
mu0, cov0 = MP.formTrueMatrix(nBlocks, bSize, bCorr)
minVarPortf=True
w1, w1_d=CLAExp(mu0=mu0,cov0=cov0,minVarPortf = minVarPortf)
cla=CLA.CLA(mu0,cov0.values)
cla.solve()
w0 = cla.w[-1]
w0 = np.repeat(w0.T, w1.shape[0], axis=0)
print('minVar Markowitz RMSE: ',
round(np.mean((w1-w0).values.flatten()**2)**.5,4))
print('minVar NCO RMSE:',
round(np.mean((w1_d-w0).values.flatten()**2)**.5,4))
minVarPortf=False
w1, w1_d=CLAExp(mu0=mu0,cov0=cov0,minVarPortf = minVarPortf)
max_sr, w0 = cla.getMaxSR()
w0 = np.repeat(w0.T, w1.shape[0], axis=0)
print('maxSR Markowitz RMSE: ',
round(np.mean((w1-w0).values.flatten()**2)**.5,4))
print('maxSR NCO RMSE:',
round(np.mean((w1_d-w0).values.flatten()**2)**.5,4))
if __name__ == '__main__':
main()
このコードの結果は以下のようになった。