cryptowatchのAPIを用いて戦略のバックテストを行うコード
こんにちは、alumiです。
自動取引botを作る際に勝てる戦略を探すためのバックテストコードのテンプレみたいなものを公開しときます。pythonです。
テクニカル指標を用いて、固定値幅で利確損切りを行う様々な戦略のバックテストが行えます。もちろん少し改造すれば試せる戦略の幅はもっと広がります。
最終的にmatplotlibで簡単な損益推移グラフをプロットします。
注意
・このコードを見てぼんやりと何やってるかわかる方向けです。
・自分のbotに沿わせて作ったものなのでいろいろと不備はあるかもしれません。
・用意してないテクニカル指標の関数は自分で作ってください。
・EMAの期間は最大100までです(多分)。
・ドローダウンは資産100万固定として計算した値が出ます。
説明とか
・デフォルトの戦略はMACDのクロスでエントリーし固定幅で利確、損切りです。何分足か、固定幅をどれくらいの値にするかは中盤の#設定の欄を変えてみてください。
・オリジナルの戦略を試してみようという方は、各テクニカル指標の関数が最新の値だけを返すものなのか、直近の値を一定数リストにして返すものなのかに注意しつつ(コードを自分で読み取っていただいて判断して欲しいです)buy_signal()とsell_signal()を変更してみてください。
・途中、同じAPIを叩いて終値の配列とohlcの配列の2種類別々に作っていてなんだこれ、と思われる方もいるかもしれませんが、ATRの計算だけohlcデータが必要だった関係でこうしてます。ATRを用いない方は後者は消してもらっていいです。
以下コードです。
# coding: UTF-8
import hashlib
import hmac
import requests
import datetime
import json
from pprint import pprint
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
#-----------------------------------------------------------------
# cryptowatchから終値を取り出す
def get_price_data():
response = requests.get("https://api.cryptowat.ch/markets/bitflyer/btcfxjpy/ohlc",params = { "periods" : period ,"after" : 1})
response = response.json()
close_data = []
for i in range(6000):
close_data.append(response["result"][str(period)][i][4])
arr = np.array(close_data)
return pd.Series(arr)
#--------------------------------------------------------------------
#テクニカル指標実装系関数
#EMA
#EMA_periodは期間、nはろうそく何本分前の値か
def EMA(EMA_period,n):
EMA_data = []
for i in range(2*EMA_period):
EMA_data.insert(0,close[data_n-1-i])
if n == 0:
arr = np.array(EMA_data)[-EMA_period:]
else:
arr = np.array(EMA_data)[-n-EMA_period:-n]
EMA = pd.Series(arr).ewm(span=EMA_period).mean()
return EMA[EMA_period-1]
#MACD
#a=短期EMA_period,b=長期EMA_period,s=シグナル期間
def MACD_and_signal(a,b,s):
MACD = []
for i in range(a):
MACD.insert(0,EMA(a,i)-EMA(b,i))
arr = np.array(MACD)[-s-1:]
Signal = pd.Series(arr).rolling(s).mean()
return MACD,Signal
#ATR
#nは期間、n=14が普通
def ATR(n):
data = []
for i in range(2*n-1):
p1 = response[data_n-i-1][2]-response[data_n-i-1][3] #当日高値-当日安値
p2 = response[data_n-i-1][2]-response[data_n-i-2][4] #当日高値-前日終値
p3 = response[data_n-i-1][3]-response[data_n-i-2][4] #当日安値-前日終値
tr = max(abs(p1),abs(p2),abs(p3))
data.insert(0,tr)
arr = np.array(data)[-n:]
ATR = pd.Series(arr).ewm(span=n).mean()
return ATR[n-1]
#BB
#pは期間,nは偏差の倍率
def BB(p,n):
Bands_period = p
Deviation = n
Base = close.rolling(Bands_period).mean()
sigma = close.rolling(Bands_period).std(ddof=0)
Upper = Base+sigma*Deviation
Lower = Base-sigma*Deviation
print(Upper)
print(Lower)
return Base,Upper,Lower
#RSI
#pは期間
def RSI(p):
RSI_period = p
diff = close.diff(1)
positive = diff.clip_lower(0).ewm(alpha=1.0/RSI_period).mean()
negative = diff.clip_upper(0).ewm(alpha=1.0/RSI_period).mean()
RSI = 100-100/(1-positive/negative)
return RSI
# AKAGAMIさんのコードのほぼパクリですのでご了承ください
def vixfix():
cl_list = []
low_list = []
for i in range(data_n):
cl_list.insert(0,response[-i-1][4])
low_list.insert(0,response[-i-1][3])
cl = pd.Series(np.array(cl_list))
low = pd.Series(np.array(low_list))
prd = 22
bbl = 20
mult = 2.0
lb = 50
ph = 0.85
pl = 1.01
hp = False
sd = False
wvf = (cl.rolling(window=prd,min_periods=1).max()-low)/cl.rolling(window=prd,min_periods=1).max()*100
sDev = mult*wvf.rolling(window=bbl,min_periods=1).std()
midLine = wvf.rolling(window=bbl,min_periods=1).mean()
lowerBand = midLine -sDev
upperBand = midLine + sDev
rangeHigh = wvf.rolling(window=lb,min_periods=1).max()*ph
rangeLow = wvf.rolling(window=lb,min_periods=1).min()*pl
return wvf,upperBand,rangeHigh
#RCI
#nは期間
def RCI(n):
close_data = []
d = 0
for i in range(n):
close_data.insert(0,response["result"][str(period)][-i-1][4])
ser = pd.Series(np.array(close_data))
number = pd.Series(np.arange(n))
df = pd.DataFrame([ser,number]).T
df = df.sort_values(by=0)
df_r = df.reset_index(drop=True)
#print(df)
#print(df_r)
for i in range(n):
print(df_r[1][i])
d += (df_r[1][i]-i)**2
#print(d)
RCI = (1.0-6.0*d/(n*(n*n-1)))*100.0
return RCI
#ストキャスティクス
#期間固定になってます
def stoch():
Kperiod = 14 #%K期間
Dperiod = 3 #%D期間
Slowing = 3 #平滑化期間
high_list = []
low_list = []
for i in range(data_n):
high_list.insert(0,response["result"][str(period)][-i-1][2])
low_list.insert(0,response["result"][str(period)][-i-1][3])
high = pd.Series(np.array(high_list))
low = pd.Series(np.array(low_list))
Hline = high.rolling(Kperiod).max()
Lline = low.rolling(Kperiod).min()
sumlow = (close-Lline).rolling(Slowing).sum()
sumhigh = (Hline-Lline).rolling(Slowing).sum()
Stoch = sumlow/sumhigh*100
Signal = Stoch.rolling(Dperiod).mean()
return Stoch,Signal
#--------------------------------------------------------------------
# 戦略(試したいように自分で作ってください)
# 例としてMACDのクロスでエントリーする戦略を書いときます
def sell_signal():
MACD,Signal = MACD_and_signal(12,26,9)
if MACD[11] < Signal[9] and MACD[10] > Signal[8]:
return True
else: return False
def buy_signal():
MACD,Signal = MACD_and_signal(12,26,9)
if MACD[11] > Signal[9] and MACD[10] < Signal[8]:
return True
else: return False
#--------------------------------------------------------------
# 設定
# ==========================
# 何秒足か
period = 300
# 利確幅
p_width = 10000
# 損切り幅
l_width = 10000
# ==========================
# 終値配列の長さ
data_n = 100
flag = {
"check":True,
"sell_position":False,
"buy_position":False
}
close_data = get_price_data()
response_data = requests.get("https://api.cryptowat.ch/markets/bitflyer/btcfxjpy/ohlc",params = { "periods" : period , "after" : 1})
response_data = response_data.json()
i = profit = loss = count1 = count2 = drawdown = count_position1 = count_position2 = m = 0
input = int(input("何件分のデータで試しますか(最大6000件):"))
start = 6000-input
asset_list = []
time_data = []
limit = 6000-start-(data_n+1)
while i < limit:
while(flag["check"]):
response = []
closelist = []
for j in range(data_n):
response.append(response_data["result"][str(period)][i+j+start])
closelist.append(close_data[i+j+start])
arr = np.array(closelist)
close = pd.Series(arr)
if sell_signal():
print(datetime.datetime.fromtimestamp(response[data_n-1][0]))
print(close[data_n-1])
print("売り注文をします")
price = close[data_n-1]
flag["sell_position"] = True
flag["check"] = False
if buy_signal():
print(datetime.datetime.fromtimestamp(response[data_n-1][0]))
print(close[data_n-1])
print("買い注文をします")
price = close[data_n-1]
flag["buy_position"] = True
flag["check"] = False
i += 1
if i > limit:
break
position_time = 0
while(flag["sell_position"]):
response = []
closelist = []
for j in range(data_n):
response.append(response_data["result"][str(period)][i+j+start])
closelist.append(close_data[i+j+start])
arr = np.array(closelist)
close = pd.Series(arr)
if response[data_n-1][3] < price-p_width:
print(datetime.datetime.fromtimestamp(response[data_n-1][0]))
print(close[data_n-1])
print("利確:+"+str(p_width))
print("ポジションを持っていた時間:"+str(position_time)+"分")
count_position1 += position_time
count1 += 1
profit += p_width
flag["sell_position"] = False
flag["check"] = True
if response[data_n-1][2] > price+l_width:
print(datetime.datetime.fromtimestamp(response[data_n-1][0]))
print(close[data_n-1])
print("損切り:-"+str(l_width))
print("ポジションを持っていた時間:"+str(position_time)+"分")
count_position2 += position_time
count2 += 1
loss += l_width
flag["sell_position"] = False
flag["check"] = True
i += 1
position_time += period/60
if i > limit:
break
while(flag["buy_position"]):
response = []
closelist = []
for j in range(data_n):
response.append(response_data["result"][str(period)][i+j+start])
closelist.append(close_data[i+j+start])
arr = np.array(closelist)
close = pd.Series(arr)
if response[data_n-1][2] > price+p_width:
print(datetime.datetime.fromtimestamp(response[data_n-1][0]))
print(close[data_n-1])
print("利確:+"+str(p_width))
print("ポジションを持っていた時間:"+str(position_time)+"分")
count_position1 += position_time
count1 += 1
profit += p_width
flag["buy_position"] = False
flag["check"] = True
if response[data_n-1][3] < price-l_width:
print(datetime.datetime.fromtimestamp(response[data_n-1][0]))
print(close[data_n-1])
print("損切り:-"+str(l_width))
print("ポジションを持っていた時間:"+str(position_time)+"分")
count_position2 += position_time
count2 += 1
loss += l_width
flag["buy_position"] = False
flag["check"] = True
i += 1
position_time += period/60
if i > limit:
break
asset_list.append(profit-loss)
time_data.append(datetime.datetime.fromtimestamp(response[data_n-1][0]))
if m < profit - loss:
m = profit - loss
if drawdown < m-(profit-loss):
drawdown = m-(profit-loss)
print("------------------------")
print("利益合計:"+str(profit))
print("損失合計:"+str(loss))
print("儲け:"+str(profit-loss))
print("利確回数:"+str(count1))
print("利確平均:"+str(profit/count1))
print("利確した時の平均ポジション保有時間"+str(count_position1/count1)+" 分")
print("損切り回数:"+str(count2))
print("損切り平均:"+str(loss/count2))
print("損切りした時の平均ポジション保有時間:"+str(count_position2/count2)+" 分")
print("勝率:"+str(count1/(count1+count2)*100)+" %")
print("損益率:"+str((profit/count1)/(loss/count2)))
print("profit factor:"+str(profit/loss))
print("最大ドローダウン:"+str(drawdown/1000000.0)+" %")
print("------------------------")
x = np.array(time_data)
y = np.array(asset_list)
plt.plot(x,y)
plt.xticks(rotation=45)
plt.show()
今の所、使い方を細かく説明する気はないのでこの程度にしておきます。
最後に私の販売しているbotを紹介しておきます。宜しくお願いします。
この記事が気に入ったらサポートをしてみませんか?