
仮想通貨bot 勉強記録㉚
~バックテストまとめ~
◆前回までのあらすじ
フィルターを実装しました。色々なフィルターを作って試してみてね!
◆今回やること
・バックテストコードの修正
これまで作ったコードを修正します。
色々ミスってるところもあったし、不要なコードもあったので、これを機にお直しです。
(Twitterでミスってる箇所を教えて下さる方々、ありがとうございます!)
綺麗にするだけなので、ほとんど解説はないです。
# -*- coding: utf-8 -*-
"""
Created on Thu May 6 00:02:36 2021
@author: Mamu
"""
import pybybit
from datetime import datetime
import time
import matplotlib.pyplot as plt
import pandas as pd
#--------------------設定項目--------------------
apis = [
'API',
'シークレット'
]
bybit = pybybit.API(*apis, testnet = True)
#====================bot設定====================
symbol = "BTCUSD"
leverage = 3 # レバレッジ
wait = 0 # 待機時間
Buy_term = 20 # 買いブレイク判断期間
Sell_term = 40 # 売りブレイク判断期間
volatility_term = 28 # 平均ボラティリティの計算期間
chart_min = 240 # 時間軸#(1 3 5 15 30 60 120 240 360 720 "D" "W" "M" )
judge_price={
"Buy" : "close", # ブレイク判断 高値(high)か終値(close)を使用
"Sell": "close" # ブレイク判断 安値(low)か終値(close)を使用
}
stop_range = 2 # 何レンジ幅にストップを入れるか
trade_risk = 0.05 # 1トレードあたり口座の何%まで損失を許容するか
slippage = 0.002 # 考慮する手数料
entry_times = 5 # 何回に分けて追加ポジションを取るか
entry_range = 0.4 # 何レンジごとに追加ポジションを取るか
stop_config = "ON" # ON/OFF/Trailing
stop_AF = 0.03 # 加速係数
stop_AF_add = 0.03 # 加速係数を増やす度合
stop_AF_max = 0.3 # 加速係数の上限
filter_VER = "D" # フィルター設定/OFFで無効
#====================バックテスト用====================
start_funds = 1000 # シミュレーション時の初期資金
test_start = '2019/06/01 09:00' # ローソク足取得開始時刻
LOT_MODE = "adjustable" # fixedなら$1000固定、adjustableなら可変ロット
#--------------------価格API---------------------
#====================価格データ取得===================
def get_price(chart_min,start):
price = []
get_start = int(datetime.strptime(start,'%Y/%m/%d %H:%M').timestamp()) # タイムスタンプ変換
#200*n本のローソク足を取得して、price[]に入れる
for o in range(30): # ()内の回数だけデータを取得
#pybybitでローソク足取得
data = bybit.rest.inverse.public_kline_list(
symbol = symbol,
interval = chart_min,
from_ = get_start
).json()
#priceに取得したデータを入れる
for i in data["result"]:
price.append({
"open_time" :i["open_time"],
"open" :float(i["open"]),
"high" :float(i["high"]),
"low" :float(i["low"]),
"close" :float(i["close"]),
"volume" :float(i["volume"])
})
#200本x足の長さ分だけタイムスタンプを進める
if chart_min == "D" :
get_start += 200*60*1440
elif chart_min == "W" :
get_start += 200*60*10080
else:
get_start += 200*60*chart_min
return price
#--------------------補助ツール--------------------
#====================時間と高値・安値をログに記録====================
def log_price( data,flag ):
flag["records"]["log"].append("時間: " + datetime.fromtimestamp(data["open_time"]).strftime('%Y/%m/%d %H:%M') + " 始値" + str(data["open"]) + " 高値: " + str(data["high"]) + " 安値: " + str(data["low"]) + " 終値: " + str(data["close"]) + "\n")
flag["records"]["close_price"].append(data["close"])
flag["records"]["open_time"].append(datetime.fromtimestamp(data["open_time"]).strftime('%Y/%m/%d %H:%M'))
return flag
#--------------------フィルター--------------------
#====================単純移動平均を計算====================
def calculate_SMA( value,before=None ):
if before is not None:
MA = sum(i["close"] for i in last_data[-1*value + before: before]) / value
else:
MA = sum(i["close"] for i in last_data[-1*value:]) / value
return round(MA)
#====================指数移動平均を計算====================
def calculate_EMA( value,before=None ):
# 指定期間だけ前の移動平均を計算
if before is not None:
MA = sum((i["close"] for i in last_data[-2*value + before : -1*value + before]) / value)
EMA = (last_data[-1*value + before]["close"] * 2 / (value+1)) + (MA * (value-1) / (value+1))
for i in range(value-1):
EMA = (last_data[-1*value+before+1 + i]["close"] * 2 /(value+1)) + (EMA * (value-1) / (value+1))
# 最新の移動平均を計算
else:
MA = sum(i["close"] for i in last_data[-2*value: -1*value]) / value
EMA = (last_data[-1*value]["close"] * 2 / (value+1)) + (MA * (value-1) / (value+1))
for i in range(value-1):
EMA = (last_data[-1*value+1 + i]["close"] * 2 /(value+1)) + (EMA * (value-1) / (value+1))
return round(EMA)
#====================エントリーフィルター====================
def filter( signal ):
# フィルターがOFFなら何もしない
if filter_VER == "OFF":
return True
# 最新の終値が200期間前の終値より(大きい/小さい)を確認
if filter_VER == "A":
if len(last_data) < 200:
return True
if data["close"] > float(last_data[-200]["close"]) and signal["side"] == "Buy":
return True
if data["close"] < float(last_data[-200]["close"]) and signal["side"] == "Sell":
return True
# 最新の終値が最新の200単純移動平均より(大きい/小さい)を確認
if filter_VER == "B":
if len(last_data) < 200:
return True
if data["close"] > calculate_SMA(200) and signal["side"] == "Buy":
return True
if data["close"] < calculate_SMA(200) and signal["side"] == "Sell":
return True
# 最新の20単純移動平均が1つ前の20単純移動平均より(大きい/小さい)を確認
if filter_VER == "C":
if len(last_data) < 20:
return True
if calculate_SMA(20) > calculate_SMA(20,-1) and signal["side"] == "Buy":
return True
if calculate_SMA(20) < calculate_SMA(20,-1) and signal["side"] == "Sell":
return True
# 最新の25EMAが最新の350EMAより(大きい/小さい)を確認
if filter_VER == "D":
if len(last_data) < 350*2:
return True
if calculate_EMA(350) < calculate_EMA(25) and signal["side"] == "Buy":
return True
if calculate_EMA(350) > calculate_EMA(25) and signal["side"] == "Sell":
return True
return False
#--------------------資金管理関数---------------------
#====================平均ボラティリティを計算====================
def calculate_volatility( last_data ):
high_sum = sum(i["high"] for i in last_data[-1 * volatility_term :])
low_sum = sum(i["low"] for i in last_data[-1 * volatility_term :])
volatility = round((high_sum - low_sum) / volatility_term)
flag["records"]["log"].append("現在の{0}期間の平均ボラティリティは{1}$です\n".format( volatility_term, volatility ))
return volatility
#====================注文ロットを計算====================
def calculate_lot(last_data,data,flag ):
# 固定ロットでのテスト時
if LOT_MODE == "fixed":
flag["records"]["log"].append("固定ロットでテスト中のため、$1000を注文します\n")
lot = 1000
volatility = calculate_volatility( last_data,flag )
stop = stop_range * volatility
flag["position"]["ATR"] = round( volatility )
return lot,stop,flag
balance = flag["records"]["funds"] # 残高を取得
# 初回エントリー時
if flag["add-position"]["count"] == 0:
volatility = calculate_volatility( last_data ) # ボラティリティを計算
stop = stop_range * volatility # 損切り値幅を計算
calc_lot = int(( balance * trade_risk / (stop / float(data["close"]) ))) # 許容リスクから逆算したロット
flag["add-position"]["unit-size"] = int( calc_lot / entry_times ) # 1回ごとのポジションサイズ
flag["add-position"]["unit-range"] = round( volatility * entry_range ) # ポジションを分割する値幅
flag["add-position"]["stop"] = stop # 損切り価格
flag["position"]["ATR"] = round( volatility ) # ATRの設定
flag["records"]["log"].append("現在のアカウント残高は{}$です\n".format( round( balance,2 ) ))
flag["records"]["log"].append("許容リスクから購入できる枚数は最大{}$までです\n".format( calc_lot ))
flag["records"]["log"].append("{0}回に分けて{1}$ずつ注文します\n".format( entry_times, flag["add-position"]["unit-size"] ))
# 2回目以降のエントリー
else:
balance = (balance * leverage - flag["position"]["lot"]) # 証拠金から1回目のロットを引く
stop = flag["add-position"]["stop"] # 初回エントリー時の損切り値幅を設定
able_lot = int( balance * leverage ) # 設定可能な最大ロット
lot = min(able_lot,flag["add-position"]["unit-size"]) # 実際に設定するロットは小さい方
if able_lot > flag["add-position"]["unit-size"]:
flag["records"]["log"].append("ロットを{}$にします\n".format(flag["add-position"]["unit-size"]))
else:
flag["records"]["log"].append("ロットを{}$にします\n".format(able_lot))
return lot,stop,flag
#====================増し玉を行う====================
def add_position(data,flag):
# ポジションが無かったら実行しない
if flag["position"]["exist"] == False:
return flag
# 固定ロット(1BTC)でのテスト時は何もしない
if LOT_MODE == "fixed":
return flag
# 最初(1回目)のエントリー価格を記録
if flag["add-position"]["count"] == 0: # ポジションの追加が0回目の時
flag["add-position"]["first-entry-price"] = flag["position"]["price"] # 初回エントリー価格
flag["add-position"]["last-entry-price"] = flag["position"]["price"] # 前回(初回)のエントリー価格
flag["add-position"]["count"] += 1 # ポジションの追加回数を+1
# 以下の場合は、追加ポジションを取らない
if flag["add-position"]["count"] >= entry_times: # ポジションを追加した回数が設定値(entry_times)以上の場合
return flag
# この関数の中で使う変数を用意
first_entry_price = flag["add-position"]["first-entry-price"] # 初回エントリー価格
last_entry_price = flag["add-position"]["last-entry-price"] # 前回のエントリー価格
current_price = float(data["close"]) # 現在の価格
unit_range = flag["add-position"]["unit-range"] # ポジションの分割値幅
should_add_position = False # 増し玉の指示変数(初期化)
# 増し玉の指示を出す
if flag["position"]["side"] == "Buy" and (current_price - last_entry_price) > unit_range:
should_add_position = True
elif flag["position"]["side"] == "Sell" and (last_entry_price - current_price) > unit_range:
should_add_position = True
# 基準レンジ分進んでいれば追加注文を出す
if should_add_position == True:
flag["records"]["log"].append("前回のエントリー価格{0}$からブレイクアウトの方向に{1}ATR({2}$)以上動きました\n".format( last_entry_price, entry_range, round( unit_range ) ))
flag["records"]["log"].append("{0}/{1}回目の追加注文を出します\n".format(flag["add-position"]["count"] + 1, entry_times))
# 注文サイズを計算
lot,stop,flag = calculate_lot( last_data,data,flag )
# 追加注文を出す
if flag["position"]["side"] == "Buy":
# ここに買い注文のコードを入れる
entry_price = first_entry_price + (flag["add-position"]["count"] * unit_range) # バックテスト用
entry_price = round((1 + slippage) * entry_price) # スリッページを考慮
flag["records"]["log"].append("現在のポジションに追加して、{0}$で{1}$の買い注文を出します\n".format(entry_price,lot))
if flag["position"]["side"] == "Sell":
# ここに売り注文のコードを入れる
entry_price = first_entry_price - (flag["add-position"]["count"] * unit_range) # バックテスト用
entry_price = round((1 - slippage) * entry_price) # スリッページを考慮
flag["records"]["log"].append("現在のポジションに追加して、{0}$で{1}$の売り注文を出します\n".format(entry_price,lot))
# ポジション全体の情報を更新する
flag["position"]["stop"] = stop #損切り値幅は初回エントリー時から変えない
flag["position"]["price"] = int(round(( flag["position"]["price"] * flag["position"]["lot"] + entry_price * lot ) / ( flag["position"]["lot"] + lot ))) #平均ポジションを算出
flag["position"]["lot"] = (flag["position"]["lot"] + lot) #合計ロットを算出
if flag["position"]["side"] == "Buy":
flag["records"]["log"].append("{0}$の位置にストップを更新します\n".format(flag["position"]["price"] - stop))
elif flag["position"]["side"] == "Sell":
flag["records"]["log"].append("{0}$の位置にストップを更新します\n".format(flag["position"]["price"] + stop))
flag["records"]["log"].append("現在のポジションの取得単価は{}$です\n".format(flag["position"]["price"]))
flag["records"]["log"].append("現在のポジションサイズは{}$です\n".format(flag["position"]["lot"]))
flag["add-position"]["count"] += 1 #ポジションの追加回数をカウント
flag["add-position"]["last-entry-price"] = entry_price #前回のエントリー価格に、今回のエントリー価格を上書き
return flag
# トレイリングストップの関数
def trail_stop( data,flag ):
# まだ追加ポジションの取得中であれば何もしない
if flag["add-position"]["count"] < entry_times and LOT_MODE != "fixed":
return flag
# 高値/安値がエントリー価格からいくら離れたか計算
if flag["position"]["side"] == "Buy":
moved_range = round( data["high"] - flag["position"]["price"] )
if flag["position"]["side"] == "Sell":
moved_range = round( flag["position"]["price"] - data["low_price"] )
# 最高値・最安値を更新したか調べる
if moved_range < 0 or flag["position"]["stop-EP"] >= moved_range:
return flag
else:
flag["position"]["stop-EP"] = moved_range
# 加速係数に応じて損切りラインを動かす
flag["position"]["stop"] = round(flag["position"]["stop"] - ( moved_range + flag["position"]["stop"] ) * flag["position"]["stop-AF"])
# 加速係数を更新
flag["position"]["stop-AF"] = round( flag["position"]["stop-AF"] + stop_AF_add ,2 )
if flag["position"]["stop-AF"] >= stop_AF_max:
flag["position"]["stop-AF"] = stop_AF_max
# ログ出力
if flag["position"]["side"] == "Buy":
flag["records"]["log"].append("トレイリングストップの発動:ストップ位置を{}$に動かして、加速係数を{}に更新します\n".format( round(flag["position"]["price"] - flag["position"]["stop"]) , flag["position"]["stop-AF"] ))
else:
flag["records"]["log"].append("トレイリングストップの発動:ストップ位置を{}$に動かして、加速係数を{}に更新します\n".format( round(flag["position"]["price"] + flag["position"]["stop"]) , flag["position"]["stop-AF"] ))
return flag
#--------------------売買ロジック--------------------
#====================ロジック判定====================
def donchian( data,last_data ):
highest = float(max(i["high"] for i in last_data[(-1*Buy_term) :]))
lowest = float(min(i["low"] for i in last_data[(-1*Sell_term):]))
# data["close"]がhighestを上回ったら買いサイン
if data[judge_price["Buy"]] > highest:
return {"side":"Buy","price" :highest}
# data["close"]がlowestを下回ったら売りサイン
elif data[judge_price["Sell"]] < lowest :
return {"side":"Sell","price":lowest }
else:
return {"side" : None , "price":0}
#====================エントリー====================
def entry_signal(data,last_data,flag ):
signal = donchian( data,last_data )
if signal["side"] == "Buy":
flag["records"]["log"].append("過去{0}足の最高値{1}$を、直近の価格が{2}$でブレイクしました\n".format(Buy_term,signal["price"],data[judge_price["Buy"]]))
# フィルター条件を確認
if filter( signal ) == False:
flag["records"]["log"].append("フィルターのエントリー条件を満たさなかったため、エントリーしません\n")
return flag
lot,stop,flag = calculate_lot( last_data,data,flag )
flag["records"]["log"].append("{0}$あたりに{1}$で買いの成行注文を出します\n".format(data["close"],lot))
#買い注文コード
flag["records"]["log"].append("{0}$にストップを入れます\n".format(data["close"] - stop))
flag["position"]["lot"] = lot
flag["position"]["stop"] = stop
flag["position"]["exist"] = True
flag["position"]["side"] = "Buy"
flag["position"]["price"] = data["close"]
if signal["side"] == "Sell":
lot,stop,flag = calculate_lot( last_data,data,flag )
flag["records"]["log"].append("過去{0}足の最安値{1}$を、直近の価格が{2}$でブレイクしました\n".format(Sell_term,signal["price"],data[judge_price["Sell"]]))
# フィルター条件を確認
if filter( signal ) == False:
flag["records"]["log"].append("フィルターのエントリー条件を満たさなかったため、エントリーしません\n")
return flag
lot,stop,flag = calculate_lot( last_data,data,flag )
flag["records"]["log"].append("{0}$あたりに{1}$で売りの成行注文を出します\n".format(data["close"],lot))
#売り注文コード
flag["records"]["log"].append("{0}$にストップを入れます\n".format(data["close"] + stop))
flag["position"]["lot"] = lot
flag["position"]["stop"] = stop
flag["position"]["exist"] = True
flag["position"]["side"] = "Sell"
flag["position"]["price"] = data["close"]
return flag
#====================成行決済&ドテンエ注文====================
def close_position( data,last_data,flag ):
# ポジションが無ければ何もしない
if flag["position"]["exist"] == False:
return flag
flag["position"]["count"] += 1
signal = donchian( data,last_data )
if flag["position"]["side"] == "Buy" and signal["side"] == "Sell": # 買いポジションかつ売りサインの場合
flag["records"]["log"].append("過去{0}足の最安値{1}$を、直近の価格が{2}$でブレイクしました\n".format(Sell_term,signal["price"],data[judge_price["Sell"]]))
flag["records"]["log"].append("{}$あたりで成行注文を出してポジションを決済します\n".format(str(data["close"])))
# 成行決済注文コードを入れる
records( flag,data,data["close"] )
flag["position"]["exist"] = False
flag["position"]["count"] = 0
flag["position"]["stop-AF"] = stop_AF
flag["position"]["stop-EP"] = 0
flag["add-position"]["count"] = 0
# フィルター条件を確認
if filter( signal ) == False:
flag["records"]["log"].append("フィルターのエントリー条件を満たさなかったため、エントリーしません\n")
return flag
lot,stop,flag = calculate_lot( last_data,data,flag )
flag["records"]["log"].append("さらに{0}$あたりに{1}$の売りの成行注文を入れてドテン注文します\n".format(data["close"],lot))
# 売り指値注文のコードを入れる
flag["records"]["log"].append("{0}$にストップを入れます".format(data["close"] + stop))
flag["order"]["lot"] = lot
flag["order"]["stop"] = stop
flag["order"]["exist"] = True
flag["order"]["side"] = "Sell"
flag["order"]["price"] = data["close"]
if flag["position"]["side"] == "Sell" and signal["side"] == "Buy": # 売りポジション且つ買いサインの場合
flag["records"]["log"].append("過去{0}足の最高値{1}$を、直近の価格が{2}$でブレイクしました\n".format(Buy_term,signal["price"],data[judge_price["Buy"]]))
flag["records"]["log"].append("{}$あたりで成行注文を出してポジションを決済します\n".format(str(data["close"])))
# 成行決済注文コードを入れる
records( flag,data,data["close"] )
flag["position"]["exist"] = False
flag["position"]["count"] = 0
flag["position"]["stop-AF"] = stop_AF
flag["position"]["stop-EP"] = 0
flag["add-position"]["count"] = 0
# フィルター条件を確認
if filter( signal ) == False:
flag["records"]["log"].append("フィルターのエントリー条件を満たさなかったため、エントリーしません\n")
return flag
lot,stop,flag = calculate_lot( last_data,data,flag )
flag["records"]["log"].append("さらに{0}$あたりに{1}$の買いの成行注文を入れてドテン注文します\n".format(data["close"],lot))
# 買い指値注文のコードを入れる
flag["records"]["log"].append("{0}$にストップを入れます\n".format(data["close"] - stop))
flag["order"]["lot"] = lot
flag["order"]["stop"] = stop
flag["order"]["exist"] = True
flag["order"]["side"] = "Buy"
flag["order"]["price"] = data["close"]
return flag
#====================損切確認====================
def stop_position( data,flag ):
# stop_config == "TRAILING"ならトレイリングストップ実行
if stop_config == "TRAILING":
flag = trail_stop( data,flag )
# 買いポジションの時
if flag["position"]["side"] == "Buy":
stop_price = flag["position"]["price"] - flag["position"]["stop"] # 損切り価格を設定
# 損切り価格に引っかかった場合
if data["low"] < stop_price:
flag["records"]["log"].append("{0}$の損切ラインに引っかかりました\n".format( stop_price ))
flag["records"]["log"].append(str(data["low"]) + "$あたりで成行注文を出してポジションを決済します\n")
# 決済の成行注文コード
records( flag,data,stop_price,"STOP" )
flag["position"]["exist"] = False
flag["position"]["count"] = 0
flag["position"]["stop-AF"] = stop_AF
flag["position"]["stop-EP"] = 0
flag["add-position"]["count"] = 0
# 売りポジションの時
if flag["position"]["side"] == "Sell":
stop_price = flag["position"]["price"] + flag["position"]["stop"] # 損切り価格を設定
# 損切り価格に引っかかった場合
if data["high"] > stop_price:
flag["records"]["log"].append("{0}$の損切ラインに引っかかりました\n".format( stop_price ))
flag["records"]["log"].append(str(data["high"]) + "$あたりで成行注文を出してポジションを決済します\n")
# 決済の成行注文コードを入れる
records( flag,data,stop_price,"STOP" )
flag["position"]["exist"] = False
flag["position"]["count"] = 0
flag["position"]["stop-AF"] = stop_AF
flag["position"]["stop-EP"] = 0
flag["add-position"]["count"] = 0
return flag
#--------------------バックテスト関数--------------------
#====================トレードパフォーマンス確認====================
def records(flag,data,exit_price,close_type = None):
entry_price = flag["position"]["price"]
trade_cost = flag["position"]["lot"] * slippage
flag["records"]["slippage"].append(trade_cost)
flag["records"]["log"].append("スリッページ・手数料として " + str(round(trade_cost,1)) + "$を考慮します\n")
# 決済日時,ポジションの保有期間を記録
flag["records"]["date"].append(datetime.fromtimestamp(data["open_time"]).strftime('%Y/%m/%d %H:%M'))
flag["records"]["holding-periods"].append(flag["position"]["count"])
# 損切りにかかった回数をカウント
if close_type == "STOP":
flag["records"]["stop-count"].append(1)
else:
flag["records"]["stop-count"].append(0)
# 値幅の計算
Buy_Price_range = exit_price - entry_price # 買いエントリー時の獲得値幅
Sell_Price_range = entry_price - exit_price # 売りエントリーン時の獲得値幅
# 利益率の計算
Buy_return = Buy_Price_range/entry_price # 買いエントリー時の獲得リターン
Sell_return = Sell_Price_range/entry_price # 売りエントリー時の獲得リターン
# 買いエントリーの時
if flag["position"]["side"] == "Buy":
flag["records"]["return"].append( Buy_return ) # 獲得リターンを記録
flag["records"]["side"].append( flag["position"]["side"] ) # 買いか売りかを記録
flag["records"]["profit"].append((Buy_return-slippage)*flag["position"]["lot"]) # 獲得利益を記録
flag["records"]["funds"] += (Buy_return-slippage)*flag["position"]["lot"] # 証拠金に獲得利益を加算
if Buy_return > 0:
flag["records"]["log"].append(str(round((Buy_return-slippage)*flag["position"]["lot"],1)) + "$の利益です\n\n")
else:
flag["records"]["log"].append(str(round((Buy_return-slippage)*flag["position"]["lot"],1)) + "$の損失です\n\n")
# 売りエントリーの時
if flag["position"]["side"] == "Sell":
flag["records"]["return"].append( Sell_return ) # 獲得リターンを記録
flag["records"]["side"].append( flag["position"]["side"] ) # 買いか売りかを記録
flag["records"]["profit"].append((Sell_return-slippage)*flag["position"]["lot"]) # 獲得利益を記録
flag["records"]["funds"] += (Sell_return-slippage)*flag["position"]["lot"] # 証拠金に獲得利益を加算
if Sell_return > 0:
flag["records"]["log"].append(str(round((Sell_return-slippage)*flag["position"]["lot"],1)) + "$の利益です\n")
else:
flag["records"]["log"].append(str(round((Sell_return-slippage)*flag["position"]["lot"],1)) + "$の損失です\n")
return flag
#====================バックテストの集計====================
def backtest( flag ):
#チャート表示用df
#//////////////////////////////////////////////////////////////////
chart= pd.DataFrame({
"close_price" : flag["records"]["close_price"],
"open_time" : pd.to_datetime(flag["records"]["open_time"])
})
#//////////////////////////////////////////////////////////////////
# 成績をdfに記録
records = pd.DataFrame({
"Date" : pd.to_datetime(flag["records"]["date"]), # 決済日時
"Side" : flag["records"]["side"], # ポジションの側
"Stop" : flag["records"]["stop-count"], # 損切りを行った回数
"Rate" : flag["records"]["return"], # 獲得レート
"Periods" : flag["records"]["holding-periods"], # ポジション保有期間
"Slippage" : flag["records"]["slippage"], # 手数料等
"Profit" : flag["records"]["profit"], # 獲得損益
})
# recordsに総利益の列を追加
records["Gross"] = records.Profit.cumsum() # その行までのrecords.Profitの総和
# recordsに資産推移の列を追加する
records["Funds"] = records.Gross + start_funds # 初期資金+records.Gross
# recordsにドローダウンの列を追加
records["Drawdown"] = records.Funds.cummax().subtract( records.Funds )
records["DrawdownRate"] = records.Drawdown / records.Funds.cummax() * 100
# 連敗回数をカウントする
consecutive_defeats = [] # 連敗回数を記録する配列
defeats = 0 # 初期化
for r in flag["records"]["return"]: # リターンがマイナスなら連敗回数を+1
if r < 0:
defeats += 1
else: # リターンがプラスなら連敗回数をリセット
consecutive_defeats.append( defeats )
defeats = 0
# recordsから買いエントリーと売りエントリーだけをそれぞれ抽出する
Buy_records = records[records.Side.isin(["Buy"])]
Sell_records = records[records.Side.isin(["Sell"])]
# # 月別のデータを集計する
# records["月別集計"] = pd.to_datetime( records.Date.apply(lambda x: x.strftime('%Y/%m')))
# grouped = records.groupby("月別集計")
# monthly_records = pd.DataFrame({
# "Number" : grouped.Profit.count(),
# "Gross" : grouped.Profit.sum(),
# "Funds" : grouped.Funds.last(),
# "Rate" : round(grouped.Rate.mean(),2),
# "Drawdown" : grouped.Drawdown.max(),
# "Periods" : grouped.Periods.mean()
# })
print("\nバックテスト結果")
print("==============================")
print("--------買いエントリ成績--------")
print("トレード回数 : {}回".format(len(Buy_records) ))
print("勝率 : {}%".format(round(len(Buy_records[Buy_records.Profit>0]) / len(Buy_records) * 100,1)))
print("平均リターン : {}%".format(round(Buy_records.Rate.mean()*100,2)))
print("総損益 : {}$".format(round( Buy_records.Profit.sum() ,2)))
print("平均保有期間 : {}足".format(round(Buy_records.Periods.mean(),1) ))
print("損切りの回数 : {}回".format( Buy_records.Stop.sum() ))
print("\n--------売りエントリ成績--------")
print("トレード回数 : {}回".format( len(Sell_records) ))
print("勝率 : {}%".format(round(len(Sell_records[Sell_records.Profit>0]) / len(Sell_records) * 100,1)))
print("平均リターン : {}%".format(round(Sell_records.Rate.mean()*100,2)))
print("総損益 : {}$".format(round( Sell_records.Profit.sum() ,2)))
print("平均保有期間 : {}足".format(round(Sell_records.Periods.mean(),1) ))
print("損切りの回数 : {}回".format( Sell_records.Stop.sum() ))
print("\n------------総合成績--------------")
print("全トレード数 : {}回".format(len(records) ))
print("勝率 : {}%".format(round(len(records[records.Profit>0]) / len(records) * 100,1)))
print("平均リターン : {}%".format(round(records.Rate.mean()*100,2)))
print("標準偏差 : {}%".format(round(records.Rate.std()*100,2)))
print("平均利益率 : {}%".format(round(records[records.Profit>0].Rate.mean()*100,2) ))
print("平均損失率 : {}%".format(round(records[records.Profit<0].Rate.mean()*100,2) ))
print("平均保有期間 : {}足".format(round(records.Periods.mean(),1) ))
print("損切りの回数 : {}回".format( records.Stop.sum() ))
print("最大連敗回数 : {}回".format( max(consecutive_defeats) ))
print("最大勝ちトレード : {}$".format((round(records.Profit.max(),2))))
print("最大負けトレード : {}$".format((round(records.Profit.min(),2))))
print("最大ドローダウン : {0}$ / {1}%".format(round(-1 * records.Drawdown.max()), round( records.DrawdownRate.loc[records.Drawdown.idxmax()] )))
print("利益合計 : {}$".format((round(records[records.Profit>0].Profit.sum(),2))))
print("損失合計 : {}$".format(round(records[records.Profit<0].Profit.sum(),2),))
print("手数料合計 : {}$".format(round(-1 * records.Slippage.sum(),1)))
print("最終損益 : {}$\n".format((round(records.Profit.sum()-(records.Slippage.sum()) ,2))))
print("初期資金 : {}$".format( start_funds ))
print("最終資金 : {}$".format( round(records.Funds.iloc[-1] ,2)))
print("運用成績 : {}%".format( round(records.Funds.iloc[-1] / start_funds * 100 ,2) ))
print("-----------------------------------")
print("各成績指標")
print("-----------------------------------")
print("MARレシオ : {}".format(round( (records.Funds.iloc[-1] / start_funds -1)*100 / records.DrawdownRate.max(),2 )))
print("シャープレシオ : {}".format( round(records.Rate.mean()/records.Rate.std(),2) ))
print("プロフィットファクター : {}".format( round(records[records.Profit>0].Profit.sum()/abs(records[records.Profit<0].Profit.sum()),2) ))
print("損益レシオ : {}".format(round( records[records.Profit>0].Rate.mean()/abs(records[records.Profit<0].Rate.mean()) ,2)))
# print("\n--------------月別成績------------")
# for index , row in monthly_records.iterrows():
# print("===================================")
# print( "{0}年{1}月".format( index.year, index.month ) )
# print("-----------------------------------")
# print("トレード数 : {}回".format( row.Number.astype(int) ))
# print("月間損益 : {}$".format( row.Gross.astype(int) ))
# print("平均リターン : {}%".format( round(row.Rate*100 ,2)))
# print("月間最大ドローダウン : {}$".format( -1 * row.Drawdown.astype(int) ))
# print("平均保有期間 : {}足".format( round(row.Periods.astype(float),1) ))
#際立った損益を表示
n = 10
print("-----------------------------------")
print("+{}%を超えるトレードの回数 : {}回".format(n,len(records[records.Rate>(n/100)]) ))
print("-----------------------------------")
for index,row in records[records.Rate>(n/100)].iterrows():
print( "{0} | {1}% | {2}".format(row.Date,round(row.Rate*100,1),row.Side ))
print("-----------------------------------")
print("-{}%を下回るトレードの回数 : {}回".format(n,len(records[records.Rate< (n/-100)]) ))
print("-----------------------------------")
for index,row in records[records.Rate < (n/-100)].iterrows():
print( "{0} | {1}% | {2}".format(row.Date,round(row.Rate*100,1),row.Side ))
print("==============================")
plot(records,chart) # グラフを表示
output_file(records,flag) # ファイルを出力
#====================損益曲線をプロット====================
def plot(records,chart):
#損益グラフ
plt.subplot(2,1,1)
plt.plot( records.Date, records.Funds ) # X軸、Y軸の値を指定
plt.xlabel("Date") # X軸のラベル名
plt.ylabel("Balance") # Y軸のラベル名
plt.xticks(rotation=50) # X軸の目盛りを50度回転
#チャート
plt.subplot(2,1,2)
plt.plot( chart.open_time, chart.close_price ) # X軸、Y軸の値を指定
plt.xlabel("Date") # X軸のラベル名
plt.ylabel("{}".format(symbol)) # Y軸のラベル名
plt.xticks(rotation=50) # X軸の目盛りを50度回転
# リターン分布の相対度数表を作る
# plt.subplot(2,1,2) # X軸、Y軸の値を指定
# plt.hist( records.Rate,50,rwidth=0.9) # ヒストグラムで表示
# plt.axvline( x=0,linestyle="dashed",label="Return = 0" )
# plt.axvline( records.Rate.mean(), color="orange", label="AverageReturn" )
# plt.legend() # 凡例
plt.show() #グラフの表示
#====================ファイルを出力====================
def output_file(records,flag):
file = open("C:\Pydoc\log\donchian-{0}-log.txt".format(datetime.now().strftime("%Y-%m-%d-%H-%M")),'wt',encoding='utf-8')
file.writelines(flag["records"]["log"])
#pandasのdfをcsvで出力
records.to_csv("C:\Pydoc\log\donchian-{0}-records.csv".format(datetime.now().strftime("%Y-%m-%d-%H-%M")))
#--------------------メイン処理--------------------
# 価格チャートを取得
price = get_price( chart_min,test_start )
flag = {
"order":{
"exist" : False,
"side" : "",
"price" : 0,
"count" : 0,
"ATR" : 0,
"lot" : 0,
"stop" : 0
},
"position":{
"exist" : False,
"side" : "",
"price": 0,
"stop":0,
"stop-AF": stop_AF,
"stop-EP":0,
"ATR":0,
"lot":0,
"count":0
},
"add-position":{
"count":0,
"first-entry-price":0,
"last-entry-price":0,
"unit-range":0,
"unit-size":0,
"stop":0
},
"records":{
"date":[],
"profit":[],
"return":[],
"side":[],
"stop-count":[],
"funds" : start_funds,
"holding-periods":[],
"slippage":[],
"log":[],
#////////////////////
#チャート表示用
"close_price":[],
"open_time":[]
#///////////////////
}
}
last_data = []
need_term = max(Buy_term,Sell_term,volatility_term)
i = 0
while i < len(price):
# ドンチャンの判定に使う期間分の安値・高値データを準備する
if len(last_data) < need_term:
last_data.append(price[i])
flag = log_price(price[i],flag)
time.sleep(wait)
i += 1
continue
data = price[i]
flag = log_price(data,flag)
# ポジションがある場合
if flag["position"]["exist"]:
if stop_config != "OFF":
flag = stop_position( data,flag )
flag = close_position( data,last_data,flag )
flag = add_position( data,flag )
# ポジションがない場合
else:
flag = entry_signal( data,last_data,flag )
last_data.append( data )
i += 1
time.sleep(wait)
print("--------------------------")
print("テスト期間:")
print("開始時点 : " + str(datetime.fromtimestamp(float(price[0]["open_time"]))))
print("終了時点 : " + str(datetime.fromtimestamp(float(price[-1]["open_time"]))))
print(str(len(price)) + "件のローソク足データで検証")
print("--------------------------")
backtest(flag)
・解説
①総当たりコード削除
パラメータ探索ができる関数のままにしていたんですが、コードが見にくくなるのでやめました。探索したいときは総当たりコードにする。
②トレイリングストップ関数を変更
加速係数を使う奴にしました。
固定比率でトレイリングするより、だんだんトレイリング比率を上げていく方がリターンが大きくなります。
↓こちらを見てください(安定の丸投げ)
③メイン関数削除
関数ではなく、メイン処理として直接記述しました。
これで、無駄に引数が多くなることが無くなりました。
④チャートをプロット可能に
使いどころがあるか分からないけど、チャートも表示できるようにしました。plot関数のコメントアウトの部分をいじれば表示できます。
※subplotの行列数はお好みで!plotに関しては↓を参考にするといいです。
他にも細かいところを修正しています。
ファイル出力関数内のパスを相対パス⇒絶対パスにしたり、
価格データを取得する関数で週足にも対応したり。
以上!
◆実行
--------------------------
テスト期間:
開始時点 : 2019-06-01 09:00:00
終了時点 : 2021-05-06 21:00:00
4234件のローソク足データで検証
--------------------------
バックテスト結果
==============================
--------買いエントリ成績--------
トレード回数 : 29回
勝率 : 31.0%
平均リターン : 5.37%
総損益 : 22241.08$
平均保有期間 : 65.5足
損切りの回数 : 17回
--------売りエントリ成績--------
トレード回数 : 19回
勝率 : 15.8%
平均リターン : 0.16%
総損益 : 1172.68$
平均保有期間 : 17.3足
損切りの回数 : 14回
------------総合成績--------------
全トレード数 : 48回
勝率 : 25.0%
平均リターン : 3.31%
標準偏差 : 15.44%
平均利益率 : 23.42%
平均損失率 : -3.39%
平均保有期間 : 46.4足
損切りの回数 : 31回
最大連敗回数 : 7回
最大勝ちトレード : 12875.95$
最大負けトレード : -1397.39$
最大ドローダウン : -4961$ / 17%
利益合計 : 31079.1$
損失合計 : -7665.33$
手数料合計 : -607.1$
最終損益 : 22806.63$
初期資金 : 1000$
最終資金 : 24413.76$
運用成績 : 2441.38%
-----------------------------------
各成績指標
-----------------------------------
MARレシオ : 135.22
シャープレシオ : 0.21
プロフィットファクター : 4.05
損益レシオ : 6.9
-----------------------------------
+10%を超えるトレードの回数 : 9回
-----------------------------------
2019-07-14 17:00:00 | 28.2% | Buy
2019-10-01 09:00:00 | 10.3% | Sell
2020-02-17 17:00:00 | 11.1% | Buy
2020-03-19 17:00:00 | 32.1% | Sell
2020-05-21 21:00:00 | 12.8% | Buy
2020-08-22 05:00:00 | 24.6% | Buy
2020-11-26 17:00:00 | 53.7% | Buy
2021-01-21 17:00:00 | 68.5% | Buy
2021-02-23 17:00:00 | 25.5% | Buy
-----------------------------------
-10%を下回るトレードの回数 : 0回
-----------------------------------
==============================
plotにチャートを表示してみました。
実行結果自体は前回と同じです。
中身スッカスカになってしまったけど、許してください!
今回はここまで。
次回は便利すぎるエディタ:Spyderの紹介か、AWSの登録~使い方の記事を書こうかな~と思ってます。