仮想通貨bot 勉強記録㉖
~ポジションサイズを自動計算する~
仕事忙しすぎ&エラーとの戦いに時間使い過ぎて、更新頻度が激落ちやで。。。
◆前回までのあらすじ
自動で損切りを行うようにしたら、PFの上昇とドローダウンの低下の効果が得られました。
◆今回やること
・適切なロットサイズを計算する
今まではロットサイズを一定に保ってテストをしていましたが、証拠金と許容できる損失割合から、適切なロットを計算させてみます。
from datetime import datetime
import pybybit
import time
import matplotlib.pyplot as plt
import pandas as pd
#------------------------------------------------------------------
#====================API設定====================
apis = [
'プライベートキー',
'シークレットキー'
]
bybit = pybybit.API(*apis, testnet=True)
#===============================================
#====================バックテストの初期設定値====================
slippage = 0.001 # 手数料やスリッページ(0.075%初期値)
wait = 0 # 待機時間
start = '2019/06/01 09:00' # ローソク足取得開始時刻
get_start = int(datetime.strptime(start, '%Y/%m/%d %H:%M').timestamp()) #タイムスタンプ変換
n = 20 # ローソク足取得リクエスト回数
stop_range = 2 # ATRの何倍を損切りラインにするか
volatility_term = 28 # ATRを算出するための足の数
#/////////////////////////////////////////////////////////////////
trade_risk = 0.05 # 1トレードあたり口座の何%まで損失を許容するか
levarage = 3 # レバレッジ倍率の設定
start_funds = 1000 # シミュレーション時の初期資金
#/////////////////////////////////////////////////////////////////
#====================バックテストのパラメーター設定====================
chart_min_list = [ 240 ] # テストに使う時間軸(1 3 5 15 30 60 120 240 360 720 "D" "M" "W")
buy_term_list = [20] # テストに使う上値ブレイクアウトの期間
sell_term_list = [40] # テストに使う下値ブレイクアウトの期間
judge_price_list = [
{"BUY":"close","SELL":"close"}, # ブレイクアウト判定に終値を使用
# {"BUY":"high","SELL":"low"} # ブレイクアウト判定に高値・安値を使用
]
#------------------------------------------------------------------
#====================Bybitから証拠金取得====================
def Bybit_Balance():
Balance = bybit.rest.inverse.private_wallet_balance.result
print("現在のアカウント残高は{}$です".format( float(Balance["result"]["available_balance"]) ))
return float(Balance["result"]["available_balance"])
#====================APIから価格データ取得(ローソク足の本数指定)====================
def get_price_from_API(chart_min,get_start,n):
price = []
#200*n本のローソク足を取得して、price[]に入れる
for o in range(n):
#pybybitでローソク足取得
k = bybit.rest.inverse.public_kline_list(
symbol = "BTCUSD",
interval= chart_min,
from_ = get_start
).json()
#priceに取得したデータを入れる
price += k["result"]
#200本x足の長さ分だけタイムスタンプを進める
if chart_min =="D":
get_start += 200*60*1440
else:
get_start += 200*60*chart_min
get_start = int(datetime.strptime(start, '%Y/%m/%d %H:%M').timestamp())
return price
#====================パラメータぶんのローソク足をリスト化する====================
def get_price_amount(chart_min_list):
price_list = {} #ローソク足を入れる変数
for chart_min in chart_min_list: #for文(chart_min_listの数だけ処理を行う)
print("{0}分足取得中".format([chart_min]))
price_list[chart_min] = get_price_from_API(chart_min,get_start,n) #chart_min分足のローソク足取得リクエストをn回行う
return price_list
#====================時間と高値・安値をログに記録====================
def log_price( data,flag ):
flag["records"]["log"].append("時間: " + datetime.fromtimestamp(data["open_time"]).strftime('%Y/%m/%d %H:%M') + " 始値" + str(data["open"]) + " 高値: " + str(data["high"]) + " 安値: " + str(data["low"]) + " 終値: " + str(data["close"]) + "\n")
return flag
#====================平均ボラティリティを計算====================
def calculate_volatility( last_data,flag ):
high_sum = sum(float(i["high"]) for i in last_data[-1 * volatility_term :])
low_sum = sum(float(i["low"]) for i in last_data[-1 * volatility_term :])
volatility = round((high_sum - low_sum) / volatility_term)
flag["records"]["log"].append("現在の{0}期間の平均ボラティリティは{1}$です\n".format( volatility_term, volatility ))
return volatility
#====================注文ロットを計算====================
def calculate_lot(last_data,data,flag ):
balance = flag["records"]["funds"]
volatility = calculate_volatility( last_data,flag )
stop = stop_range * volatility
calc_lot = int(( balance * trade_risk / (stop / float(data["close"]) )))
able_lot = int( balance * levarage )
lot = min(able_lot,calc_lot)
flag["records"]["log"].append("現在のアカウント残高は{}$です\n".format(round(balance)))
flag["records"]["lot"].append(lot)
if able_lot > calc_lot:
flag["records"]["log"].append("ロットを{}$にします\n".format(calc_lot))
else:
flag["records"]["log"].append("ロットを{}$にします\n".format(able_lot))
return lot,stop
#------------------------------------------------------------------
#====================ロジック判定====================
def donchian( data,last_data,buy_term,sell_term,judge_price ):
highest = max(i["high"] for i in last_data[(-1*buy_term) :])
lowest = min(i["low"] for i in last_data[(-1*sell_term):])
if data[ judge_price["BUY"]] > highest:
return {"side":"BUY","price" :highest}
elif data[ judge_price["SELL"]] < lowest :
return {"side":"SELL","price":lowest }
else:
return {"side" : None , "price":0}
#====================買い・売り注文====================
def entry_signal( data,last_data,flag,buy_term,sell_term,judge_price ):
signal = donchian( data,last_data,buy_term,sell_term,judge_price )
if signal["side"] == "BUY":
lot,stop = calculate_lot( last_data,data,flag )
flag["records"]["log"].append("過去{0}足の最高値{1}$を、直近の価格が{2}$でブレイクしました\n".format(buy_term,signal["price"],data[judge_price["BUY"]]))
flag["records"]["log"].append(data["close"] + "$で買いの指値注文を出します\n")
# ここに買い注文のコードを入れる
flag["order"]["lot"] = lot
flag["order"]["stop"] = stop
flag["order"]["exist"] = True
flag["order"]["side"] = "BUY"
flag["order"]["price"] = float(data["close"])
flag["records"]["log"].append("{0}$にストップを入れます\n".format(float(flag["order"]["price"]) - stop))
if signal["side"] == "SELL":
lot,stop = calculate_lot( last_data,data,flag )
flag["records"]["log"].append("過去{0}足の最安値{1}$を、直近の価格が{2}$でブレイクしました\n".format(sell_term,signal["price"],data[judge_price["SELL"]]))
flag["records"]["log"].append(data["close"] + "$で売りの指値注文を出します\n")
# ここに売り注文のコードを入れる
flag["order"]["lot"] = lot
flag["order"]["stop"] = stop
flag["order"]["exist"] = True
flag["order"]["side"] = "SELL"
flag["order"]["price"] = float(data["close"])
flag["records"]["log"].append("{0}$にストップを入れます\n".format(float(flag["order"]["price"]) + stop))
return flag
#====================注文状況確認====================
def check_order( flag ):
#ここに注文状況確認コード
flag["order"]["exist"] = False
flag["order"]["count"] = 0
flag["position"]["exist"] = True
flag["position"]["side"] = flag["order"]["side"]
flag["position"]["price"] = flag["order"]["price"]
flag["position"]["stop"] = flag["order"]["stop"]
flag["position"]["lot"] = flag["order"]["lot"]
return flag
#====================成行決済&ドテン注文====================
def close_position( data,last_data,flag,buy_term,sell_term,judge_price ):
if flag["position"]["exist"] == False:
return flag
flag["position"]["count"] += 1
signal = donchian( data,last_data,buy_term,sell_term,judge_price )
if flag["position"]["side"] == "BUY" and signal["side"] == "SELL":
flag["records"]["log"].append("過去{0}足の最安値{1}$を、直近の価格が{2}$でブレイクしました\n".format(sell_term,signal["price"],data[judge_price["SELL"]]))
flag["records"]["log"].append(data["close"] + "$あたりで成行注文を出してポジションを決済します\n")
# 成行決済注文コードを入れる
records( flag,data,data["close"] )
flag["position"]["exist"] = False
flag["position"]["count"] = 0
flag["records"]["log"].append("さらに" + data["close"] + "$で売りの指値注文を入れてドテンします\n")
lot,stop = calculate_lot( last_data,data,flag )
# 売り指値注文のコードを入れる
flag["order"]["lot"] = lot
flag["order"]["stop"] = stop
flag["order"]["exist"] = True
flag["order"]["side"] = "SELL"
flag["order"]["price"] = float(data["close"])
flag["records"]["log"].append("{0}$にストップを入れます\n".format(float(data["close"]) + flag["order"]["stop"]))
if flag["position"]["side"] == "SELL" and signal["side"] == "BUY":
flag["records"]["log"].append("過去{0}足の最高値{1}$を、直近の価格が{2}$でブレイクしました\n".format(buy_term,signal["price"],data[judge_price["BUY"]]))
flag["records"]["log"].append(data["close"] + "$あたりで成行注文を出してポジションを決済します\n")
# 成行決済注文コードを入れる
records( flag,data,data["close"] )
flag["position"]["exist"] = False
flag["position"]["count"] = 0
flag["records"]["log"].append("さらに" + data["close"] + "$で買いの指値注文を入れてドテンします\n")
lot,stop = calculate_lot( last_data,data,flag )
# 買い指値注文のコードを入れる
flag["order"]["lot"] = lot
flag["order"]["stop"] = stop
flag["order"]["exist"] = True
flag["order"]["side"] = "BUY"
flag["order"]["price"] = float(data["close"])
flag["records"]["log"].append("{0}$にストップを入れます\n".format(float(data["close"]) - flag["order"]["stop"]))
return flag
#====================損切確認====================
def stop_position( data,flag,last_data,chart_min ):
if flag["position"]["side"] == "BUY":
stop_price = flag["position"]["price"] - flag["position"]["stop"]
if float(data["low"]) < stop_price:
flag["records"]["log"].append("{0}$の損切ラインに引っかかりました。\n".format( stop_price ))
flag["records"]["log"].append(str(round(stop_price,2)) + "$あたりで成行注文を出してポジションを決済します\n")
# 決済の成行注文コードを入れる
exit_price = stop_price
records( flag,data,exit_price,"STOP" )
flag["position"]["exist"] = False
flag["position"]["count"] = 0
if flag["position"]["side"] == "SELL":
stop_price = flag["position"]["price"] + flag["position"]["stop"]
if float(data["high"]) > stop_price:
flag["records"]["log"].append("{0}$の損切ラインに引っかかりました。\n".format( stop_price ))
flag["records"]["log"].append(str(round(stop_price,2)) + "$あたりで成行注文を出してポジションを決済します\n")
# 決済の成行注文コードを入れる
exit_price = stop_price
records( flag,data,exit_price,"STOP" )
flag["position"]["exist"] = False
flag["position"]["count"] = 0
return flag
#------------------------------------------------------------------
#====================トレードパフォーマンス確認====================
def records(flag,data,exit_price, close_type=None):
#手数料等の計算
entry_price = float(flag["position"]["price"])
exit_price = float(exit_price)
trade_cost = flag["position"]["lot"] * slippage
flag["records"]["slippage"].append(trade_cost)
flag["records"]["log"].append("スリッページ・手数料として " + str(round(trade_cost)) + "$を考慮します\n")
# 決済日時,ポジションの保有期間を記録
flag["records"]["date"].append(datetime.fromtimestamp(data["open_time"]).strftime('%Y/%m/%d %H:%M'))
flag["records"]["holding-periods"].append(flag["position"]["count"])
# 損切りにかかった回数をカウント
if close_type == "STOP":
flag["records"]["stop-count"].append(1)
else:
flag["records"]["stop-count"].append(0)
# 値幅の計算
buy_Price_range = exit_price - entry_price
sell_Price_range = entry_price - exit_price
# 利益率の計算
buy_return = buy_Price_range/entry_price
sell_return = sell_Price_range/entry_price
#利益・損失の確認
if flag["position"]["side"] == "BUY":
flag["records"]["return"].append( buy_return ) #獲得リターンを記録
flag["records"]["side"].append( flag["position"]["side"] ) #買いか売りかを記録
#////////////////////////////////////////////////////////////
flag["records"]["funds"] += (buy_return-slippage)*flag["position"]["lot"]
#////////////////////////////////////////////////////////////
if buy_return > 0:
flag["records"]["log"].append(str(round((buy_return-slippage)*flag["position"]["lot"],2)) + "$の利益です\n")
else:
flag["records"]["log"].append(str(round((buy_return-slippage)*flag["position"]["lot"],2)) + "$の損失です\n")
if flag["position"]["side"] == "SELL":
flag["records"]["return"].append( sell_return ) #獲得リターンを記録
flag["records"]["side"].append( flag["position"]["side"] ) #買いか売りかを記録
#////////////////////////////////////////////////////////////
flag["records"]["funds"] += (sell_return-slippage)*flag["position"]["lot"]
#////////////////////////////////////////////////////////////
if sell_return > 0:
flag["records"]["log"].append(str(round((sell_return-slippage)*flag["position"]["lot"],2)) + "$の利益です\n")
else:
flag["records"]["log"].append(str(round((sell_return-slippage)*flag["position"]["lot"],2)) + "$の損失です\n")
return flag
#====================損益曲線をプロット====================
def plot(records,buy_term,sell_term,judge_price,interval):
plt.plot( records.Date, records.Gross ) #X軸、Y軸の値を指定
plt.xlabel("Date") #X軸のラベル名
plt.ylabel("Balance") #Y軸のラベル名
plt.xticks(rotation=50) # X軸の目盛りを50度回転
plt.title("buy_term:{0},sell_term:{1},judge:{2},Interval:{3}".format(buy_term,sell_term,judge_price,interval))
plt.show() #グラフの表示
#====================ファイルを出力====================
def File_output(df,flag):
file = open("log/donchian-{0}-log.txt".format(datetime.now().strftime("%Y-%m-%d-%H-%M")),'wt',encoding='utf-8')
file.writelines(flag["records"]["log"])
#pandasのdfをcsvで出力
df.to_csv("log/donchian-{0}-records.csv".format(datetime.now().strftime("%Y-%m-%d-%H-%M")))
#====================バックテストの集計====================
def backtest( flag,buy_term,sell_term,judge_price,interval ):
# 成績を記録したpandas DataFrameを作成
records = pd.DataFrame({
"Date" : pd.to_datetime(flag["records"]["date"]),#決済日時
"Side" : flag["records"]["side"], #ポジションの側
"Stop" : flag["records"]["stop-count"], #損切りを行った回数
"Rate" : flag["records"]["return"], #獲得レート
"Periods" : flag["records"]["holding-periods"], #ポジション保有期間
"Slippage" : flag["records"]["slippage"], #手数料等
"lot" : flag["records"]["lot"] #ロット
})
# 獲得利益の列を追加
records["Profit"] = records.Rate*records.lot
# 連敗回数をカウントする
consecutive_defeats = []
defeats = 0
for r in flag["records"]["return"]:
if r < 0:
defeats += 1
else:
consecutive_defeats.append( defeats )
defeats = 0
# 総利益の列を追加
records["Gross"] = records.Profit.cumsum()
#//////////////////////////////////////////////
# 資産推移の列を追加する
records["Funds"] = records.Gross + start_funds
#//////////////////////////////////////////////
# ドローダウンの列を追加
records["Drawdown"] = records.Gross.cummax().subtract( abs(records.Gross) )
records["DrawdownRate"] = records.Drawdown / records.Gross.cummax() * 100
# 買いエントリーと売りエントリーだけをそれぞれ抽出する
buy_records = records[records.Side.isin(["BUY"])]
sell_records = records[records.Side.isin(["SELL"])]
# 月別のデータを集計する
# records["月別集計"] = pd.to_datetime( records.Date.apply(lambda x: x.strftime('%Y/%m')))
# grouped = records.groupby("月別集計")
# month_records = pd.DataFrame({
# "Number" : grouped.Profit.count(),
# "Gross" : grouped.Profit.sum(),
# "Funds" : grouped.Funds.last(),
# "Rate" : round(grouped.Rate.mean(),2),
# "Drawdown" : grouped.Drawdown.max(),
# "Periods" : grouped.Periods.mean()
# })
print("\nバックテスト結果")
print("==============================")
print("--------買いエントリ成績--------")
print("トレード回数 : {}回".format(len(buy_records) ))
print("勝率 : {}%".format(round(len(buy_records[buy_records.Profit>0]) / len(buy_records) * 100,1)))
print("平均リターン : {}%".format(round(buy_records.Rate.mean()*100,2)))
print("総損益 : {}$".format(round( buy_records.Profit.sum() ,2)))
print("平均保有期間 : {}足".format(round(buy_records.Periods.mean(),1) ))
print("損切りの回数 : {}回".format( buy_records.Stop.sum() ))
print("\n--------売りエントリ成績--------")
print("トレード回数 : {}回".format( len(sell_records) ))
print("勝率 : {}%".format(round(len(sell_records[sell_records.Profit>0]) / len(sell_records) * 100,1)))
print("平均リターン : {}%".format(round(sell_records.Rate.mean()*100,2)))
print("総損益 : {}$".format(round( sell_records.Profit.sum() ,2)))
print("平均保有期間 : {}足".format(round(sell_records.Periods.mean(),1) ))
print("損切りの回数 : {}回".format( sell_records.Stop.sum() ))
print("\n------------総合成績--------------")
print("全トレード数 : {}回".format(len(records) ))
print("勝率 : {}%".format(round(len(records[records.Profit>0]) / len(records) * 100,1)))
print("平均リターン : {}%".format(round(records.Rate.mean()*100,2)))
print("平均保有期間 : {}足".format(round(records.Periods.mean(),1) ))
print("損切りの回数 : {}回".format( records.Stop.sum() ))
print("最大連敗回数 : {}回".format( max(consecutive_defeats) ))
print("最大の勝ちトレード : {}$".format((round(records.Profit.max(),2))))
print("最大の負けトレード : {}$".format((round(records.Profit.min(),2))))
print("最大ドローダウン : {0}$ / {1}%".format(round(-1 * records.Drawdown.max()), round( records.DrawdownRate.loc[records.Drawdown.idxmax()] )))
print("利益合計 : {}$".format((round(records[records.Profit>0].Profit.sum(),2))))
print("損失合計 : {}$".format(round(records[records.Profit<0].Profit.sum(),2),))
print("手数料合計 : {}$".format(round(-1 * records.Slippage.sum(),1)))
print("最終損益 : {}$\n".format((round(records.Profit.sum()-(records.Slippage.sum()) ,2))))
print("初期資金 : {}$".format( start_funds ))
print("最終資金 : {}$".format( round(records.Funds.iloc[-1] ,2)))
print("運用成績 : {}%".format( round(records.Funds.iloc[-1] / start_funds * 100 ,2) ))
# print("\n--------------月別成績------------")
# for index , row in monthly_records.iterrows():
# print("===================================")
# print( "{0}年{1}月".format( index.year, index.month ) )
# print("-----------------------------------")
# print("トレード数 : {}回".format( row.Count.astype(int) ))
# print("月間損益 : {}$".format( row.Profit.astype(int) ))
# print("平均リターン : {}%".format( round(row.Rate*100 ,2)))
# print("月間最大ドローダウン : {}$".format( -1 * row.Drawdown.astype(int) ))
# print("平均保有期間 : {}足".format( round(row.Periods.astype(float),1) ))
print("==============================")
result = {
"Trade-count" : len(records), #トレード回数
"Win-rate" : round(len(records[records.Profit>0]) / len(records) * 100,1),#勝率
"Return-ave" : round(records.Rate.mean(),2), #平均リターン
"DD-rate-max" : -1 * round( records.DrawdownRate.loc[records.Drawdown.idxmax()] ), #最大ドローダウンレート
"Gross" : records.Profit.sum()-(records.Slippage.sum()), #最終損益
"PF" : round( -1 * (records[records.Profit>0].Profit.sum() / records[records.Profit<0].Profit.sum()) ,2)#プロフィットファクター
}
plot(records,buy_term,sell_term,judge_price,interval)
return result
#====================テスト&集計====================
def aggregate(volatility_term):
# chart_min_listのローソク足リストを取得
price_list = get_price_amount(chart_min_list)
# テストごとの各パラメーターの組み合わせと結果を記録する配列を準備
param = {
"buy_term" : [],
"sell_term" : [],
"chart_min" : [],
"judge_price" : []
}
all_result = {
"count" : [],
"winRate" : [],
"returnRate" : [],
"Drawdown" : [],
"ProfitFactor" : [],
"Gross" : []
}
# 総当たりのためのfor文の準備
combinations = [(chart_min, buy_term, sell_term, judge_price)
for chart_min in chart_min_list
for buy_term in buy_term_list
for sell_term in sell_term_list
for judge_price in judge_price_list]
# 総当たり処理
for chart_min, buy_term, sell_term, judge_price in combinations:
price = price_list[ chart_min ]
last_data = []
need_term = max(buy_term,sell_term,volatility_term)
i = 0
# フラッグ変数の初期化
flag = {
"order":{
"exist" : False,
"side" : "",
"price" : 0,
"count" : 0,
"ATR" : 0,
"lot" :0,
"stop" :0
},
"position":{
"exist" : False,
"side" : "",
"price" : 0,
"count" :0,
"ATR" :0,
"lot" :0,
"stop" :0
},
"records":{
"date" :[],
"return" :[],
"side" :[],
"lot" :[],
"stop-count" :[],
"funds" :start_funds,
"holding-periods":[],
"slippage" :[],
"log" :[]
}
}
# price全数でバックテストを行う(ローソク足を6000本取得していたら6000回)
while i < len(price):
# ドンチャンの判定に使う期間分の安値・高値データを準備する
if len(last_data) < need_term:
last_data.append(price[i])
time.sleep(wait)
i += 1
continue
data = price[i]
flag = log_price(data,flag)
# バックテスト実施
if flag["order"]["exist"]:
flag = check_order( flag )
elif flag["position"]["exist"]:
flag = stop_position( data,flag,last_data,chart_min )
flag = close_position( data,last_data,flag,buy_term,sell_term,judge_price )
else:
flag = entry_signal( data,last_data,flag,buy_term,sell_term,judge_price )
last_data.append( data )
i += 1
time.sleep(wait)
print("テスト期間 ")
print("==============================")
print("開始時点 : " + str(datetime.fromtimestamp(float(price[0]["open_time"]))))
print("終了時点 : " + str(datetime.fromtimestamp(float(price[-1]["open_time"]))))
print("時間足 : {0}".format(chart_min))
print("パラメータ1 : " + str(buy_term) + "期間 / 買い" )
print("パラメータ2 : " + str(sell_term) + "期間 / 売り" )
print("パラメータ3 : " + str(judge_price) + "")
print(str(len(price)) + "件のローソク足データで検証")
print("==============================")
result = backtest( flag,buy_term,sell_term,judge_price,chart_min )
# 今回のループで使ったパラメータの組み合わせを配列に記録する
param["buy_term"].append( buy_term )
param["sell_term"].append( sell_term )
param["chart_min"].append( chart_min )
if judge_price["BUY"] == "high":
param["judge_price"].append( "high/low" )
else:
param["judge_price"].append( "open/close" )
# 今回のループのバックテスト結果を配列に記録する
all_result["count"].append( result["Trade-count"] )
all_result["winRate"].append( result["Win-rate"] )
all_result["returnRate"].append( result["Return-ave" ] )
all_result["Drawdown"].append( result["DD-rate-max"] )
all_result["ProfitFactor"].append( result["PF"] )
all_result["Gross"].append( result["Gross"] )
return param,all_result,flag
#====================表にまとめて、出力====================
def pandas(volatility_term):
param,all_result,flag = aggregate(volatility_term)
# 全てのパラメータによるバックテスト結果をPandasで1つの表にする
df = pd.DataFrame({
"Interval" : param["chart_min"],
"Buy_term" : param["buy_term"],
"Sell_term" : param["sell_term"],
"Judge_price" : param["judge_price"],
"Trade-count" : all_result["count"],
"Win-Rate" : all_result["winRate"],
"Reture-Ave" : all_result["returnRate"],
"DrawDownRate" : all_result["Drawdown"],
"PF" : all_result["ProfitFactor"],
"Gross" : all_result["Gross"]
})
# トレード回数が100に満たない記録は消す
df.drop( df[ df["Trade-count"] < 100].index, inplace=True )
File_output(df,flag)
pandas(volatility_term)
◆解説
前回と同様に、追加した部分を#/////で囲っているので、
その部分を解説します。
・初期値設定
#====================バックテストの初期設定値====================
slippage = 0.001 # 手数料やスリッページ(0.075%初期値)
wait = 0 # 待機時間
start = '2019/06/01 09:00' # ローソク足取得開始時刻
get_start = int(datetime.strptime(start,'%Y/%m/%d %H:%M').timestamp()) #タイムスタンプ変換
n = 50 # ローソク足取得リクエスト回数
stop_range = 2 # ATRの何倍を損切りラインにするか
volatility_term = 28 # ATRを算出するための足の数
#/////////////////////////////////////////////////////////////////
trade_risk = 0.05 # 1トレードあたり口座の何%まで損失を許容するか
levarage = 3 # レバレッジ倍率の設定
start_funds = 1000 # シミュレーション時の初期資金
#/////////////////////////////////////////////////////////////////
初期値を追加してます。
1回のトレードでとる最大リスク、レバレッジ、口座の初期資金を設定しています。
・calculate_lot(last_data,data,flag )
#====================注文ロットを計算====================
def calculate_lot(last_data,data,flag ):
balance = flag["records"]["funds"] #証拠金
volatility = calculate_volatility( last_data,flag ) #直近のボラ
stop = stop_range * volatility #損切り値幅
calc_lot = int(( balance * trade_risk / (stop / float(data["close"]) ))) #適切なロットの計算
able_lot = int( balance * levarage ) #設定可能な最大ロット
lot = min(able_lot,calc_lot) #ロットを設定
flag["records"]["log"].append("現在のアカウント残高は{}$です\n".format(round(balance)))
flag["records"]["lot"].append(lot)
if able_lot > calc_lot:
flag["records"]["log"].append("ロットを{}$にします\n".format(calc_lot))
else:
flag["records"]["log"].append("ロットを{}$にします\n".format(able_lot))
return lot,stop
ロットと損切り価格を計算する関数です。
ロットは、リスクから逆算した適切ロットと、実現可能な最大ロットのうち低い方を設定します。
適切ロットの計算は以下の通りです。
lot = (証拠金×許容リスク)/(損切り値幅/エントリー価格)
= (balance×lisk) / ( ( stop_range × volatility ) / entry_price )
適切ロットを計算できたら、実現可能な最大ロットと比較します。
実現不可能なロットは設定できないので、低い方を今回のトレードのロットとして設定します。
・records(flag,data,exit_price, close_type=None)
#====================トレードパフォーマンス確認====================
def records(flag,data,exit_price, close_type=None):
#手数料等の計算
entry_price = float(flag["position"]["price"])
exit_price = float(exit_price)
trade_cost = flag["position"]["lot"] * slippage
flag["records"]["slippage"].append(trade_cost)
flag["records"]["log"].append("スリッページ・手数料として " + str(round(trade_cost)) + "$を考慮します\n")
# 決済日時,ポジションの保有期間を記録
flag["records"]["date"].append(datetime.fromtimestamp(data["open_time"]).strftime('%Y/%m/%d %H:%M'))
flag["records"]["holding-periods"].append(flag["position"]["count"])
# 損切りにかかった回数をカウント
if close_type == "STOP":
flag["records"]["stop-count"].append(1)
else:
flag["records"]["stop-count"].append(0)
# 値幅の計算
buy_Price_range = exit_price - entry_price
sell_Price_range = entry_price - exit_price
# 利益率の計算
buy_return = buy_Price_range/entry_price
sell_return = sell_Price_range/entry_price
#利益・損失の確認
if flag["position"]["side"] == "BUY":
flag["records"]["return"].append( buy_return ) #獲得リターンを記録
flag["records"]["side"].append( flag["position"]["side"] ) #買いか売りかを記録
#////////////////////////////////////////////////////////////
flag["records"]["funds"] += (buy_return-slippage)*flag["position"]["lot"]
#////////////////////////////////////////////////////////////
if buy_return > 0:
flag["records"]["log"].append(str(round((buy_return-slippage)*flag["position"]["lot"],2)) + "$の利益です\n")
else:
flag["records"]["log"].append(str(round((buy_return-slippage)*flag["position"]["lot"],2)) + "$の損失です\n")
if flag["position"]["side"] == "SELL":
flag["records"]["return"].append( sell_return ) #獲得リターンを記録
flag["records"]["side"].append( flag["position"]["side"] ) #買いか売りかを記録
#////////////////////////////////////////////////////////////
flag["records"]["funds"] += (sell_return-slippage)*flag["position"]["lot"]
#////////////////////////////////////////////////////////////
if sell_return > 0:
flag["records"]["log"].append(str(round((sell_return-slippage)*flag["position"]["lot"],2)) + "$の利益です\n")
else:
flag["records"]["log"].append(str(round((sell_return-slippage)*flag["position"]["lot"],2)) + "$の損失です\n")
return flag
利益・損失の確認の部分に、flag["records"]["funds"]を計算する部分を追加しています。
計算は(リターン-手数料割合)*ロットで行います。
・backtest( flag,buy_term,sell_term,judge_price,interval )
#====================バックテストの集計====================
def backtest( flag,buy_term,sell_term,judge_price,interval ):
# 成績を記録したpandas DataFrameを作成
records = pd.DataFrame({
"Date" : pd.to_datetime(flag["records"]["date"]),#決済日時
"Side" : flag["records"]["side"], #ポジションの側
"Stop" : flag["records"]["stop-count"], #損切りを行った回数
"Rate" : flag["records"]["return"], #獲得レート
"Periods" : flag["records"]["holding-periods"], #ポジション保有期間
"Slippage" : flag["records"]["slippage"], #手数料等
#///////////////////////////////////////////////////////////////
"lot" : flag["records"]["lot"] #ロット
#///////////////////////////////////////////////////////////////
})
# 獲得利益の列を追加
records["Profit"] = records.Rate*records.lot
# 連敗回数をカウントする
consecutive_defeats = []
defeats = 0
for r in flag["records"]["return"]:
if r < 0:
defeats += 1
else:
consecutive_defeats.append( defeats )
defeats = 0
# 総利益の列を追加
records["Gross"] = records.Profit.cumsum()
#//////////////////////////////////////////////
# 資産推移の列を追加する
records["Funds"] = start_funds + records.Gross
#//////////////////////////////////////////////
# ドローダウンの列を追加
records["Drawdown"] = records.Gross.cummax().subtract( abs(records.Gross) )
records["DrawdownRate"] = records.Drawdown / records.Gross.cummax() * 100
# 買いエントリーと売りエントリーだけをそれぞれ抽出する
buy_records = records[records.Side.isin(["BUY"])]
sell_records = records[records.Side.isin(["SELL"])]
# 月別のデータを集計する
# records["月別集計"] = pd.to_datetime( records.Date.apply(lambda x: x.strftime('%Y/%m')))
# grouped = records.groupby("月別集計")
# month_records = pd.DataFrame({
# "Number" : grouped.Profit.count(),
# "Gross" : grouped.Profit.sum(),
# "Funds" : grouped.Funds.last(),
# "Rate" : round(grouped.Rate.mean(),2),
# "Drawdown" : grouped.Drawdown.max(),
# "Periods" : grouped.Periods.mean()
# })
print("\nバックテスト結果")
print("==============================")
print("--------買いエントリ成績--------")
print("トレード回数 : {}回".format(len(buy_records) ))
print("勝率 : {}%".format(round(len(buy_records[buy_records.Profit>0]) / len(buy_records) * 100,1)))
print("平均リターン : {}%".format(round(buy_records.Rate.mean()*100,2)))
print("総損益 : {}$".format(round( buy_records.Profit.sum() ,2)))
print("平均保有期間 : {}足".format(round(buy_records.Periods.mean(),1) ))
print("損切りの回数 : {}回".format( buy_records.Stop.sum() ))
print("\n--------売りエントリ成績--------")
print("トレード回数 : {}回".format( len(sell_records) ))
print("勝率 : {}%".format(round(len(sell_records[sell_records.Profit>0]) / len(sell_records) * 100,1)))
print("平均リターン : {}%".format(round(sell_records.Rate.mean()*100,2)))
print("総損益 : {}$".format(round( sell_records.Profit.sum() ,2)))
print("平均保有期間 : {}足".format(round(sell_records.Periods.mean(),1) ))
print("損切りの回数 : {}回".format( sell_records.Stop.sum() ))
print("\n------------総合成績--------------")
print("全トレード数 : {}回".format(len(records) ))
print("勝率 : {}%".format(round(len(records[records.Profit>0]) / len(records) * 100,1)))
print("平均リターン : {}%".format(round(records.Rate.mean()*100,2)))
print("平均保有期間 : {}足".format(round(records.Periods.mean(),1) ))
print("損切りの回数 : {}回".format( records.Stop.sum() ))
print("最大連敗回数 : {}回".format( max(consecutive_defeats) ))
print("最大の勝ちトレード : {}$".format((round(records.Profit.max(),2))))
print("最大の負けトレード : {}$".format((round(records.Profit.min(),2))))
print("最大ドローダウン : {0}$ / {1}%".format(round(-1 * records.Drawdown.max()), round( records.DrawdownRate.loc[records.Drawdown.idxmax()] )))
print("利益合計 : {}$".format((round(records[records.Profit>0].Profit.sum(),2))))
print("損失合計 : {}$".format(round(records[records.Profit<0].Profit.sum(),2),))
print("手数料合計 : {}$".format(round(-1 * records.Slippage.sum(),1)))
print("最終損益 : {}$\n".format((round(records.Profit.sum()-(records.Slippage.sum()) ,2))))
print("初期資金 : {}$".format( start_funds ))
print("最終資金 : {}$".format( round(records.Funds.iloc[-1] ,2)))
print("運用成績 : {}%".format( round(records.Funds.iloc[-1] / start_funds * 100 ,2) ))
# print("\n--------------月別成績------------")
# for index , row in monthly_records.iterrows():
# print("===================================")
# print( "{0}年{1}月".format( index.year, index.month ) )
# print("-----------------------------------")
# print("トレード数 : {}回".format( row.Count.astype(int) ))
# print("月間損益 : {}$".format( row.Profit.astype(int) ))
# print("平均リターン : {}%".format( round(row.Rate*100 ,2)))
# print("月間最大ドローダウン : {}$".format( -1 * row.Drawdown.astype(int) ))
# print("平均保有期間 : {}足".format( round(row.Periods.astype(float),1) ))
print("==============================")
result = {
"Trade-count" : len(records), #トレード回数
"Win-rate" : round(len(records[records.Profit>0]) / len(records) * 100,1),#勝率
"Return-ave" : round(records.Rate.mean(),2), #平均リターン
"DD-rate-max" : -1 * round( records.DrawdownRate.loc[records.Drawdown.idxmax()] ), #最大ドローダウンレート
"Gross" : records.Profit.sum()-(records.Slippage.sum()), #最終損益
"PF" : round( -1 * (records[records.Profit>0].Profit.sum() / records[records.Profit<0].Profit.sum()) ,2)#プロフィットファクター
}
plot(records,buy_term,sell_term,judge_price,interval)
return result
recordsにlot,Fundsの項目を追加しました。
これでロットと証拠金の推移を確認できます。
・aggregate(volatility_term)
#====================テスト&集計====================
def aggregate(volatility_term):
# chart_min_listのローソク足リストを取得
price_list = get_price_amount(chart_min_list)
# テストごとの各パラメーターの組み合わせと結果を記録する配列を準備
param = {
"buy_term" : [],
"sell_term" : [],
"chart_min" : [],
"judge_price" : []
}
all_result = {
"count" : [],
"winRate" : [],
"returnRate" : [],
"Drawdown" : [],
"ProfitFactor" : [],
"Gross" : []
}
# 総当たりのためのfor文の準備
combinations = [(chart_min, buy_term, sell_term, judge_price)
for chart_min in chart_min_list
for buy_term in buy_term_list
for sell_term in sell_term_list
for judge_price in judge_price_list]
# 総当たり処理
for chart_min, buy_term, sell_term, judge_price in combinations:
price = price_list[ chart_min ]
last_data = []
need_term = max(buy_term,sell_term,volatility_term)
i = 0
# フラッグ変数の初期化
flag = {
"order":{
"exist" : False,
"side" : "",
"price" : 0,
"count" : 0,
"ATR" : 0,
"lot" :0,
"stop" :0
},
"position":{
"exist" : False,
"side" : "",
"price" : 0,
"count" :0,
"ATR" :0,
#//////////
"lot" :0,
#//////////
"stop" :0
},
"records":{
"date" :[],
"return" :[],
"side" :[],
"lot" :[],
"stop-count" :[],
#/////////////////////////////
"funds" :start_funds,
#/////////////////////////////
"holding-periods":[],
"slippage" :[],
"log" :[]
}
}
# price全数でバックテストを行う(ローソク足を6000本取得していたら6000回)
while i < len(price):
# ドンチャンの判定に使う期間分の安値・高値データを準備する
if len(last_data) < need_term:
last_data.append(price[i])
time.sleep(wait)
i += 1
continue
data = price[i]
flag = log_price(data,flag)
# バックテスト実施
if flag["order"]["exist"]:
flag = check_order( flag )
elif flag["position"]["exist"]:
flag = stop_position( data,flag,last_data,chart_min )
flag = close_position( data,last_data,flag,buy_term,sell_term,judge_price )
else:
flag = entry_signal( data,last_data,flag,buy_term,sell_term,judge_price )
last_data.append( data )
i += 1
time.sleep(wait)
print("テスト期間 ")
print("==============================")
print("開始時点 : " + str(datetime.fromtimestamp(float(price[0]["open_time"]))))
print("終了時点 : " + str(datetime.fromtimestamp(float(price[-1]["open_time"]))))
print("時間足 : {0}".format(chart_min))
print("パラメータ1 : " + str(buy_term) + "期間 / 買い" )
print("パラメータ2 : " + str(sell_term) + "期間 / 売り" )
print("パラメータ3 : " + str(judge_price) + "")
print(str(len(price)) + "件のローソク足データで検証")
print("==============================")
result = backtest( flag,buy_term,sell_term,judge_price,chart_min )
# 今回のループで使ったパラメータの組み合わせを配列に記録する
param["buy_term"].append( buy_term )
param["sell_term"].append( sell_term )
param["chart_min"].append( chart_min )
if judge_price["BUY"] == "high":
param["judge_price"].append( "high/low" )
else:
param["judge_price"].append( "open/close" )
# 今回のループのバックテスト結果を配列に記録する
all_result["count"].append( result["Trade-count"] )
all_result["winRate"].append( result["Win-rate"] )
all_result["returnRate"].append( result["Return-ave" ] )
all_result["Drawdown"].append( result["DD-rate-max"] )
all_result["ProfitFactor"].append( result["PF"] )
all_result["Gross"].append( result["Gross"] )
return param,all_result,flag
変数:flagにlot、fundsの項目を追加しています。
用途はbacktest()で解説したrecordsへの追加です。
変化点は以上です。これで、ロットを自動で設定することができ、資金管理力がアップしたはずです!
・実行結果
開始時点 : 2019-06-01 09:00:00
使用ローソク足数:4000件
パラメータ1 : 20期間 / 買い
パラメータ2 : 40期間 / 売り
パラメータ3 : {'BUY': 'close', 'SELL': 'close'}
以上の条件で固定し、トレードリスクを変化させた際のリターンを確認してみます。
①trade_lisk:0.02
------------総合成績--------------
全トレード数 : 91回
勝率 : 39.6%
平均リターン : 2.76%
平均保有期間 : 32.1足
損切りの回数 : 51回
最大連敗回数 : 10回
最大の勝ちトレード : 1692.96$
最大の負けトレード : -99.37$
最大ドローダウン : -312$ / 27%
利益合計 : 5868.97$
損失合計 : -2013.09$
手数料合計 : -106.4$
最終損益 : 3749.48$
初期資金 : 1000$
最終資金 : 4855.88$
運用成績 : 485.59%
②trade_lisk:0.05
------------総合成績--------------
全トレード数 : 91回
勝率 : 39.6%
平均リターン : 2.76%
平均保有期間 : 32.1足
損切りの回数 : 51回
最大連敗回数 : 10回
最大の勝ちトレード : 8174.68$
最大の負けトレード : -859.93$
最大ドローダウン : -1962$ / 43%
利益合計 : 25500.41$
損失合計 : -10730.3$
手数料合計 : -460.8$
最終損益 : 14309.3$
初期資金 : 1000$
最終資金 : 15770.11$
運用成績 : 1577.01%
③trade_lisk:0.1
------------総合成績--------------
全トレード数 : 91回
勝率 : 39.6%
平均リターン : 2.76%
平均保有期間 : 32.1足
損切りの回数 : 51回
最大連敗回数 : 10回
最大の勝ちトレード : 25090.39$
最大の負けトレード : -4131.49$
最大ドローダウン : -8701$ / 21%
利益合計 : 69414.01$
損失合計 : -36709.04$
手数料合計 : -1301.6$
最終損益 : 31403.36$
初期資金 : 1000$
最終資金 : 33704.97$
運用成績 : 3370.5%
・比較結果
比較した結果、以下のことが分かりました。
①トレードリスクが大きいほど、グラフの上下は激しくなる
②今回のテスト期間は右肩上がりの損益なので、リスクを取るほどリターンが大きくなる
③リスクを小さくするほどドローダウンが小さくなるわけではない
①:リスクを大きくとるほど利益・損失双方が増加するので、グラフの波が激しくなります。
②:もし右肩下がりのロジックでリスクを大きくしたら、損失の方が大きくなります。
③:分からん。。。単純に利益も損失も大きくなるわけではないのか?
損切りを設定しているから、期間によっては損失を抑えてリターンを大きくすることができるのかな?
次回は分割エントリーをやっていく予定ですッ