仮想通貨bot 勉強記録㉓
~Pandasで無双する~
◆前回までのあらすじ
バックテストの結果をグラフに表示することができました。
◆今回やること
・データを集計し、バックテスト結果を分析する
今回はPandasでデータを集計して、好きなだけバックテストを分析していきます。初めてのPandasでかなり苦労したけど、めちゃくちゃ楽しいです!
今回のコードを実行すると、バックテストの結果・logファイル・集計データ・グラフを出力します。
↓こんな感じ(情報過多)
コードはこちら
from datetime import datetime
import pybybit
import time
import json
import matplotlib.pyplot as plt
import pandas as pd
#====================API設定====================
apis = [
'プライベートキー',
'シークレットキー'
]
bybit = pybybit.API(*apis, testnet=True)
#===============================================
#====================バックテスト用の初期設定値====================
lot = 1000 # 1トレードのロット($)
slippage = 0.001 # 手数料やスリッページ(0.075%初期値)
chart_min = 60 # (1 3 5 15 30 60 120 240 360 720 "D" "M" "W")
test_start = 0 # 何番目のデータからテストするか
test_end = int(60*24*365/chart_min) # 何番目のデータまでテストするか
path = "backdata/*********************json" # 読み込むファイルのパス
term = 20 # 過去n期間
wait = 0 # 待機時間
#====================APIから価格データ取得====================
def get_price_from_API(chart_min):
#取得開始時刻の設定
get_start = {
"year" : int(2019), #年
"month" : int(4), #月
"day" : int(1), #日
"hour" : int(00), #時
"minute" : int(00) #分
}
get_start = int(datetime(**get_start).timestamp())
#取得終了時刻の設定
get_end = {
"year" : int(2021), #年
"month" : int(4), #月
"day" : int(1), #日
"hour" : int(00), #時
"minute" : int(00) #分
}
get_end = int(datetime(**get_end).timestamp())
#データを格納する変数
price = []
#tがnowより小さければ実行
while get_start < get_end:
#pybybitでローソク足取得
k = bybit.rest.inverse.public_kline_list(
symbol = "BTCUSD",
interval= chart_min,
from_ = get_start
).json()
#priceに取得したデータを入れる
price += k["result"]
#200本x足の長さ分だけタイムスタンプを進める
get_start += 200*60*chart_min
# //////////新規追加部分//////////
Unnecessary = int((get_start-get_end)/(chart_min*60))
price = price[:-Unnecessary]
return price
#====================ファイルから価格データを読み込む====================
def get_price_from_file(path):
file = open(path,'r',encoding='utf-8')
price = json.load(file)
return price
#====================画面出力====================
def print_price(data):
print( " 時間: " + datetime.fromtimestamp(data['open_time']).strftime('%Y/%m/%d %H:%M')
+ " 始値: " + str(data['open'])
+ " 終値: " + str(data['close']))
#====================時間と始値・終値をログに記録====================
def log_price( data,flag ):
log = "時間: " + datetime.fromtimestamp(data["open_time"]).strftime('%Y/%m/%d %H:%M') + " 始値: " + str(data["open"]) + " 終値: " + str(data["close"]) + "\n"
flag["records"]["log"].append(log)
return flag
#====================ロジック判定====================
def donchian( data,last_data ):
highest = max(i["high"] for i in last_data)
if data["high"] > highest:
return {"side":"BUY","price":highest}
lowest = min(i["low"] for i in last_data)
if data["low"] < lowest:
return {"side":"SELL","price":lowest}
return {"side" : None , "price":0}
#====================買い・売り注文====================
def entry_signal( data,last_data,flag ):
signal = donchian( data,last_data )
if signal["side"] == "BUY":
flag["records"]["log"].append("過去{0}足の最高値{1}$を、直近の高値が{2}$でブレイク\n".format(term,signal["price"],data["high"]))
flag["records"]["log"].append(str(data["close"]) + "$で買い指値注文を出します\n")
# ここに買い注文のコードを入れる
flag["order"]["exist"] = True
flag["order"]["side"] = "BUY"
flag["order"]["price"] = float(data["close"])
if signal["side"] == "SELL":
flag["records"]["log"].append("過去{0}足の最安値{1}$を、直近の安値が{2}$でブレイク\n".format(term,signal["price"],data["low"]))
flag["records"]["log"].append(str(data["close"]) + "$で売り指値注文を出します\n")
# ここに売り注文のコードを入れる
flag["order"]["exist"] = True
flag["order"]["side"] = "SELL"
flag["order"]["price"] = data["close"]
return flag
#====================注文状況確認====================
def check_order( flag ):
"""ここに注文状況確認コード"""
flag["order"]["exist"] = False
flag["order"]["count"] = 0
flag["position"]["exist"] = True
flag["position"]["side"] = flag["order"]["side"]
flag["position"]["price"] = flag["order"]["price"]
return flag
#====================成行決済&ドテン注文====================
def close_position( data,last_data,flag ):
flag["position"]["count"] += 1
signal = donchian( data,last_data )
if flag["position"]["side"] == "BUY":
if signal["side"] == "SELL":
flag["records"]["log"].append("過去{0}足の最安値{1}$を、直近の安値が{2}$でブレイク\n".format(term,signal["price"],data["low"]))
flag["records"]["log"].append(str(data["close"]) + "あたりで成行注文を出してポジションを決済します\n")
# 成行決済注文コードを入れる
records( flag,data )
flag["position"]["exist"] = False
flag["position"]["count"] = 0
flag["records"]["log"].append("さらに" + str(data["close"]) + "$で売りの指値注文を入れてドテンします\n")
# 売り指値注文のコードを入れる
flag["order"]["exist"] = True
flag["order"]["side"] = "SELL"
flag["order"]["price"] = float(data["close"])
if flag["position"]["side"] == "SELL":
if signal["side"] == "BUY":
flag["records"]["log"].append("過去{0}足の最高値{1}$を、直近の高値が{2}$でブレイク\n".format(term,signal["price"],data["high"]))
flag["records"]["log"].append(str(data["close"]) + "あたりで成行注文を出してポジションを決済します\n")
# 成行決済注文コードを入れる
records( flag,data )
flag["position"]["exist"] = False
flag["position"]["count"] = 0
flag["records"]["log"].append("さらに" + str(data["close"]) + "$で買いの指値注文を入れてドテンします\n")
# 買い指値注文のコードを入れる
flag["order"]["exist"] = True
flag["order"]["side"] = "BUY"
flag["order"]["price"] = float(data["close"])
return flag
#====================トレードパフォーマンス確認====================
def records(flag,data):
entry_price = float(flag["position"]["price"])
exit_price = round(float(data["close"]))
trade_cost = lot * slippage
log ="スリッページ・手数料として" + str(trade_cost) + "$を考慮\n"
flag["records"]["log"].append(log)
flag["records"]["slippage"].append(trade_cost)
# //////////新規追加部分//////////
flag["records"]["date"].append(datetime.fromtimestamp(data["open_time"]).strftime('%Y/%m/%d %H:%M')) # 決済日時を記録
flag["records"]["holding-periods"].append(flag["position"]["count"]) # ポジションの保有時間を記録
# 値幅の計算
buy_Price_range = exit_price - entry_price
sell_Price_range = entry_price - exit_price
# 利益率の計算
buy_return = buy_Price_range/entry_price
sell_return = sell_Price_range/entry_price
#利益・損失の確認
if flag["position"]["side"] == "BUY":
# //////////新規追加部分//////////
flag["records"]["Price_range"].append( buy_Price_range) #獲得値幅を記録
flag["records"]["return"].append( buy_return ) #獲得リターンを記録
flag["records"]["side"].append( flag["position"]["side"] ) #買いか売りかを記録
if buy_Price_range > 0:
log = str(round(buy_return * lot , 4 )) + "$の利益\n"
flag["records"]["log"].append(log)
else:
log = str(abs(round(buy_return * lot , 4 ))) + "$の損失\n"
flag["records"]["log"].append(log)
if flag["position"]["side"] == "SELL":
# //////////新規追加部分//////////
flag["records"]["Price_range"].append( sell_Price_range ) #獲得値幅を記録
flag["records"]["return"].append( sell_return ) #獲得リターンを記録
flag["records"]["side"].append( flag["position"]["side"] ) #買いか売りかを記録
if sell_Price_range > 0:
log = str(round( sell_return * lot , 4 )) + "$の利益\n"
flag["records"]["log"].append(log)
else:
log = str(abs(round( sell_return * lot , 4 ))) + "$の損失\n"
flag["records"]["log"].append(log)
return flag
#====================損益曲線をプロット====================
# //////////新規追加部分//////////
def plot(records):
plt.plot( records.Date, records.Gross ) #X軸、Y軸の値を指定
plt.xlabel("Date") #X軸のラベル名
plt.ylabel("Balance") #Y軸のラベル名
plt.xticks(rotation=50) # X軸の目盛りを50度回転
plt.show()
#====================ファイルを出力====================
# //////////新規追加部分//////////
def File_output(flag,records):
#logファイルをtxtで出力
file = open("log/donchian-{0}-log.txt".format(datetime.now().strftime("%Y-%m-%d-%H-%M")),'wt',encoding='utf-8')
file.writelines(flag["records"]["log"])
#pandasのdfをcsvで出力
records.to_csv("log/donchian-{0}-records.csv".format(datetime.now().strftime("%Y-%m-%d-%H-%M")))
#====================バックテストの集計====================
def backtest(flag):
# //////////新規追加部分//////////
# 成績を記録したpandas DataFrameを作成
records = pd.DataFrame({
"Date" : pd.to_datetime(flag["records"]["date"]),
"Side" : flag["records"]["side"],
"Price_range" : flag["records"]["Price_range"],
"Rate" : flag["records"]["return"],
"Periods" : flag["records"]["holding-periods"],
"Slippage" : flag["records"]["slippage"]
})
# 獲得利益の列を追加
records["Profit"] = records.Rate*lot
# 総利益の列を追加
records["Gross"] = records.Profit.cumsum()
# ドローダウンの列を追加
records["Drawdown"] = records.Gross.cummax().subtract( abs(records.Gross) )
records["DrawdownRate"] = records.Drawdown / records.Gross.cummax() * 100
# 月別集計の列を追加
records["Monthly_Records"] = pd.to_datetime( records.Date.apply(lambda x: x.strftime('%Y/%m')))
# dfを月別でグループ分け
grouped = records.groupby("Monthly_Records")
#月別集計データのdfを作成
monthly_records = pd.DataFrame({
"Count" : grouped.Price_range.count(),
"Profit" : grouped.Profit.sum(),
"Rate" : grouped.Rate.mean(),
"Drawdown" : grouped.Drawdown.max(),
"Periods" : grouped.Periods.mean()
})
# 買いエントリーと売りエントリーだけをそれぞれ抽出
buy_records = records[records.Side.isin(["BUY"])]
sell_records = records[records.Side.isin(["SELL"])]
print("\nバックテスト結果")
print("==============================")
print("--------買いエントリの成績--------")
print("トレード回数 : {}回".format(len(buy_records) ))
print("勝率 : {}%".format(round(len(buy_records[buy_records.Profit>0]) / len(buy_records) * 100,1)))
print("平均リターン : {}%".format(round(buy_records.Rate.mean()*100,2)))
print("総損益 : {}$".format(round( buy_records.Rate.sum()*lot ,2)))
print("平均保有期間 : {}足".format(round(buy_records.Periods.mean(),1) ))
print("\n--------売りエントリの成績--------")
print("トレード回数 : {}回".format( len(sell_records) ))
print("勝率 : {}%".format(round(len(sell_records[sell_records.Profit>0]) / len(sell_records) * 100,1)))
print("平均リターン : {}%".format(round(sell_records.Rate.mean()*100,2)))
print("総損益 : {}$".format(round( sell_records.Rate.sum()*lot ,2)))
print("平均保有期間 : {}足".format(round(sell_records.Periods.mean(),1) ))
print("\n------------総合成績--------------")
print("全トレード数 : {}回".format(len(records) ))
print("勝率 : {}%".format(round(len(records[records.Profit>0]) / len(records) * 100,1)))
print("平均リターン : {}%".format(round(records.Rate.mean()*100,2)))
print("平均保有期間 : {}足".format(round(records.Periods.mean(),1) ))
print("最大の勝ちトレード : {}$".format((round(records.Rate.max()*lot,2))))
print("最大の負けトレード : {}$".format((round(records.Rate.min()*lot,2))))
print("最大ドローダウン : {0}$ / {1}%".format(round(-1 * records.Drawdown.max()), round( records.DrawdownRate.loc[records.Drawdown.idxmax()] )))
print("利益合計 : {}$".format((round(records[records.Rate>0].Rate.sum()*lot,2))))
print("損失合計 : {}$".format(round(records[records.Rate<0].Rate.sum()*lot,2),))
print("手数料合計 : {}$".format(-1 * records.Slippage.sum() ))
print("最終損益 : {}$".format((round(records.Rate.sum()*lot-(records.Slippage.sum()) ,2))))
print("\n--------------月別成績------------")
for index , row in monthly_records.iterrows():
print("===================================")
print( "{0}年{1}月".format( index.year, index.month ) )
print("-----------------------------------")
print("トレード数 : {}回".format( row.Count.astype(int) ))
print("月間損益 : {}$".format( row.Profit.astype(int) ))
print("平均リターン : {}%".format( round(row.Rate*100 ,2)))
print("月間最大ドローダウン : {}$".format( -1 * row.Drawdown.astype(int) ))
print("平均保有期間 : {}足".format( round(row.Periods.astype(float),1) ))
print("==============================")
#グラフを表示
plot(records)
#log,dfファイルを出力
File_output(flag,records)
#====================メイン====================
def main():
price = get_price_from_API(chart_min) # APIから取得する場合はこちら
# price = get_price_from_file(path) # jsonファイルを読み込む場合はこちら
last_data = []
print("テスト期間 ")
print("==============================")
print("開始時点 : " + str(datetime.fromtimestamp(float(price[test_start]["open_time"]))))
print("終了時点 : " + str(datetime.fromtimestamp(float(price[test_end]["open_time"]))))
print(str(test_end-test_start) + "件のローソク足データで検証")
print("==============================")
flag = {
"order":{
"exist" : False, #オーダーの有り無し
"side" : "", #買いか売りか
"count" : 0, #保有期間
"price" : 0 #オーダー価格
},
"position":{
"exist" : False, #ポジションの有り無し
"side" : "", #買いか売りか
"count" : 0, #保有期間
"price" : 0 #ポジション価格
},
"records":{
# //////////新規追加部分//////////
"date":[], # 決済日時
"Price_range":[], # 全トレードの損益を記録
"side":[], # 買いか売りか
"return":[], # リターン
"holding-periods":[], # ポジション保有期間
"slippage":[], # 各トレードで生じた手数料を記録
"log":[] # あとでテキストファイルに出力したい内容を記録
}
}
i = 0
while i < test_end:
# ドンチャンの判定に使う過去term足分の安値・高値データを準備する
if len(last_data) < term:
last_data.append(price[i])
log_price(price[i],flag)
time.sleep(wait)
i += 1
#if文の条件を満たすまでループ処理
continue
data = price[i]
flag = log_price(data,flag) #iの価格を記
if flag["order"]["exist"]:
flag = check_order( flag )
elif flag["position"]["exist"]:
flag = close_position( data,last_data,flag )
else:
flag = entry_signal( data,last_data,flag )
# 過去データをterm個ピッタリに保つために先頭を削除
del last_data[0]
last_data.append( data )
i += 1
time.sleep(wait)
backtest(flag)
main()
◆解説
前回からの変化点を解説していくぞい
・get_price_from_API(chart_min)
#====================APIから価格データ取得====================
def get_price_from_API(chart_min):
#取得開始時刻の設定
get_start = {
"year" : int(2019), #年
"month" : int(4), #月
"day" : int(1), #日
"hour" : int(00), #時
"minute" : int(00) #分
}
get_start = int(datetime(**get_start).timestamp())
#取得終了時刻の設定
get_end = {
"year" : int(2021), #年
"month" : int(1), #月
"day" : int(4), #日
"hour" : int(00), #時
"minute" : int(00) #分
}
get_end = int(datetime(**get_end).timestamp())
#データを格納する変数
price = []
#tがnowより小さければ実行
while get_start < get_end:
#pybybitでローソク足取得
k = bybit.rest.inverse.public_kline_list(
symbol = "BTCUSD",
interval= chart_min,
from_ = get_start
).json()
#priceに取得したデータを入れる
price += k["result"]
#200本x足の長さ分だけタイムスタンプを進める
get_start += 200*60*chart_min
# //////////新規追加部分//////////
Unnecessary = int((get_start-get_end)/(chart_min*60))
price = price[:-Unnecessary]
return price
最後に不要場部分を削除するコードを入れました。
(200本分ずる時間を進めると、get_endを越えて取得してしまうため、はみ出した部分を削除)
# //////////新規追加部分//////////
Unnecessary = int((get_start-get_end)/(chart_min*60))
price = price[:-Unnecessary]
・close_position( data,last_data,flag )
#====================成行決済&ドテン注文====================
def close_position( data,last_data,flag ):
flag["position"]["count"] += 1
signal = donchian( data,last_data )
if flag["position"]["side"] == "BUY":
if signal["side"] == "SELL":
flag["records"]["log"].append("過去{0}足の最安値{1}$を、直近の安値が{2}$でブレイク\n".format(term,signal["price"],data["low"]))
flag["records"]["log"].append(str(data["close"]) + "あたりで成行注文を出してポジションを決済します\n")
# 成行決済注文コードを入れる
records( flag,data )
flag["position"]["exist"] = False
flag["position"]["count"] = 0
flag["records"]["log"].append("さらに" + str(data["close"]) + "$で売りの指値注文を入れてドテンします\n")
# 売り指値注文のコードを入れる
flag["order"]["exist"] = True
flag["order"]["side"] = "SELL"
flag["order"]["price"] = float(data["close"])
if flag["position"]["side"] == "SELL":
if signal["side"] == "BUY":
flag["records"]["log"].append("過去{0}足の最高値{1}$を、直近の高値が{2}$でブレイク\n".format(term,signal["price"],data["high"]))
flag["records"]["log"].append(str(data["close"]) + "あたりで成行注文を出してポジションを決済します\n")
# 成行決済注文コードを入れる
records( flag,data )
flag["position"]["exist"] = False
flag["position"]["count"] = 0
flag["records"]["log"].append("さらに" + str(data["close"]) + "$で買いの指値注文を入れてドテンします\n")
# 買い指値注文のコードを入れる
flag["order"]["exist"] = True
flag["order"]["side"] = "BUY"
flag["order"]["price"] = float(data["close"])
return flag
flag["records"]["log"]に書き込む際、一度変数:logに書き込み内容を代入するのをやめて、直接書き込むように変更しました。
if flag["position"]["side"] == "BUY":
if signal["side"] == "SELL":
flag["records"]["log"].append("過去{0}足の最安値{1}$を、直近の安値が{2}$でブレイク\n".format(term,signal["price"],data["low"]))
flag["records"]["log"].append(str(data["close"]) + "あたりで成行注文を出してポジションを決済します\n")
・records(flag,data)
#====================トレードパフォーマンス確認====================
def records(flag,data):
entry_price = float(flag["position"]["price"])
exit_price = round(float(data["close"]))
trade_cost = lot * slippage
log ="スリッページ・手数料として" + str(trade_cost) + "$を考慮\n"
flag["records"]["log"].append(log)
flag["records"]["slippage"].append(trade_cost)
# //////////新規追加部分//////////
flag["records"]["date"].append(datetime.fromtimestamp(data["open_time"])) # 決済日時を記録
flag["records"]["holding-periods"].append(flag["position"]["count"]) # ポジションの保有時間を記録
# 値幅の計算
buy_Price_range = exit_price - entry_price
sell_Price_range = entry_price - exit_price
# 利益率の計算
buy_return = buy_Price_range/entry_price
sell_return = sell_Price_range/entry_price
#利益・損失の確認
if flag["position"]["side"] == "BUY":
# //////////新規追加部分//////////
flag["records"]["Price_range"].append( buy_Price_range) #獲得値幅を記録
flag["records"]["return"].append( buy_return ) #獲得リターンを記録
flag["records"]["side"].append( flag["position"]["side"] ) #買いか売りかを記録
if buy_Price_range > 0:
log = str(round(buy_return * lot , 4 )) + "$の利益\n"
flag["records"]["log"].append(log)
else:
log = str(abs(round(buy_return * lot , 4 ))) + "$の損失\n"
flag["records"]["log"].append(log)
if flag["position"]["side"] == "SELL":
# //////////新規追加部分//////////
flag["records"]["Price_range"].append( sell_Price_range ) #獲得値幅を記録
flag["records"]["return"].append( sell_return ) #獲得リターンを記録
flag["records"]["side"].append( flag["position"]["side"] ) #買いか売りかを記録
if sell_Price_range > 0:
log = str(round( sell_return * lot , 4 )) + "$の利益\n"
flag["records"]["log"].append(log)
else:
log = str(abs(round( sell_return * lot , 4 ))) + "$の損失\n"
flag["records"]["log"].append(log)
return flag
flag["records"]に["date"]・["holding-periods"]の項目を追加し、ポジションの決済日時とポジションを保有していた期間を記録するようにしました。
# //////////新規追加部分//////////
flag["records"]["date"].append(datetime.fromtimestamp(data["open_time"])) # 決済日時を記録
flag["records"]["holding-periods"].append(flag["position"]["count"]) # ポジションの保有時間を記録
# //////////新規追加部分//////////
flag["records"]["Price_range"].append( buy_Price_range) #獲得値幅を記録
flag["records"]["return"].append( buy_return ) #獲得リターンを記録
flag["records"]["side"].append( flag["position"]["side"] ) #買いか売りかを記録
flag["records"]に["Price_range"]・["return"]・[side"]の項目を追加し、獲得値幅・獲得リターン・買い売りのどちらかを記録するようにしました。
・plot(records)
#====================損益曲線をプロット====================
# //////////新規追加部分//////////
def plot(records):
plt.plot( records.Date, records.Gross ) #X軸、Y軸の値を指定
plt.xlabel("Date") #X軸のラベル名
plt.ylabel("Balance") #Y軸のラベル名
plt.xticks(rotation=50) # X軸の目盛りを50度回転
plt.show()
backtest(flag)に入れていたグラフを表示する部分を、関数にしました。
・def File_output(flag,records)
#====================ファイルを出力====================
# //////////新規追加部分//////////
def File_output(flag,records):
#logファイルをtxtで出力
file = open("log/donchian-{0}-log.txt".format(datetime.now().strftime("%Y-%m-%d-%H-%M")),'wt',encoding='utf-8')
file.writelines(flag["records"]["log"])
#pandasのdfをcsvで出力
records.to_csv("log/donchian-{0}-records.csv".format(datetime.now().strftime("%Y-%m-%d-%H-%M")))
backtest(flag)に入れていたファイルを出力する部分を、関数にしました。
また、flag["records"]["log"]を出力するだけでなく、Pandasで作成するdf(データフレーム)をcsv形式で出力するようにしました。
・backtest(flag)
#====================バックテストの集計====================
def backtest(flag):
# //////////新規追加部分//////////
# 成績を記録したpandas DataFrameを作成
records = pd.DataFrame({
"Date" : flag["records"]["date"],
"Side" : flag["records"]["side"],
"Price_range" : flag["records"]["Price_range"],
"Rate" : flag["records"]["return"],
"Periods" : flag["records"]["holding-periods"],
"Slippage" : flag["records"]["slippage"]
})
# 獲得利益の列を追加
records["Profit"] = records.Rate*lot
# 総利益の列を追加
records["Gross"] = records.Profit.cumsum()
# ドローダウンの列を追加
records["Drawdown"] = records.Gross.cummax().subtract( abs(records.Gross) )
records["DrawdownRate"] = round(records.Drawdown / records.Gross.cummax() * 100,1)
# 月別のデータを集計
records["Monthly_Records"] = pd.to_datetime( records.Date.apply(lambda x: x.strftime('%Y/%m')))
grouped = records.groupby("Monthly_Records")
#月別集計データのdfを作成
monthly_records = pd.DataFrame({
"Count" : grouped.Price_range.count(),
"Profit" : grouped.Profit.sum(),
"Rate" : grouped.Rate.mean(),
"Drawdown" : grouped.Drawdown.max(),
"Periods" : grouped.Periods.mean()
})
# 買いエントリーと売りエントリーだけをそれぞれ抽出
buy_records = records[records.Side.isin(["BUY"])]
sell_records = records[records.Side.isin(["SELL"])]
print("\nバックテスト結果")
print("==============================")
print("--------買いエントリの成績--------")
print("トレード回数 : {}回".format( len(buy_records) ))
print("勝率 : {}%".format(round(len(buy_records[buy_records.Price_range>0]) / len(buy_records) * 100,1)))
print("平均リターン : {}%".format(round(buy_records.Rate.mean()*100,2)))
print("総損益 : {}$".format(round( buy_records.Rate.sum()*lot ,2)))
print("平均保有期間 : {}足".format( round(buy_records.Periods.mean(),1) ))
print("\n--------売りエントリの成績--------")
print("トレード回数 : {}回".format( len(sell_records) ))
print("勝率 : {}%".format(round(len(sell_records[sell_records.Price_range>0]) / len(sell_records) * 100,1)))
print("平均リターン : {}%".format(round(sell_records.Rate.mean()*100,2)))
print("総損益 : {}$".format(round( sell_records.Rate.sum()*lot ,2)))
print("平均保有期間 : {}足".format( round(sell_records.Periods.mean(),1) ))
print("\n------------総合成績--------------")
print("全トレード数 : {}回".format(len(records) ))
print("勝率 : {}%".format(round(len(records[records.Price_range>0]) / len(records) * 100,1)))
print("平均リターン : {}%".format(round(records.Rate.mean()*100,2)))
print("平均保有期間 : {}足".format( round(records.Periods.mean(),1) ))
print("最大の勝ちトレード : {}$".format((round(records.Rate.max()*lot,2))))
print("最大の負けトレード : {}$".format((round(records.Rate.min()*lot,2))))
print("最大ドローダウン : {0}$ / {1}%".format(round(-1 * records.Drawdown.max()), round(100-( records.DrawdownRate.loc[records.Drawdown.idxmax()] ))))
print("利益合計 : {}$".format((round(records[records.Rate>0].Rate.sum()*lot,2))))
print("損失合計 : {}$".format(round(records[records.Rate<0].Rate.sum()*lot,2),))
print("手数料合計 : {}$".format( -1 * records.Slippage.sum() ))
print("最終損益 : {}$".format((round(records.Rate.sum()*lot-(records.Slippage.sum()) ,2))))
print("\n--------------月別成績------------")
for index , row in monthly_records.iterrows():
print("===================================")
print( "{0}年{1}月".format( index.year, index.month ) )
print("-----------------------------------")
print("トレード数 : {}回".format( row.Count.astype(int) ))
print("月間損益 : {}$".format( row.Profit.astype(int) ))
print("平均リターン : {}%".format( round(row.Rate*100 ,2)))
print("月間最大ドローダウン : {}$".format( -1 * row.Drawdown.astype(int) ))
print("平均保有期間 : {}足".format( round(buy_records.Periods.mean(),1) ))
print("==============================")
#グラフを表示
plot(records)
#log,dfファイルを出力
File_output(flag,records)
この関数はPandasを活用するため、大きく変わってます。
# //////////新規追加部分//////////
# 成績を記録したpandas DataFrameを作成
records = pd.DataFrame({
"Date" : flag["records"]["date"],
"Side" : flag["records"]["side"],
"Price_range" : flag["records"]["Price_range"],
"Rate" : flag["records"]["return"],
"Periods" : flag["records"]["holding-periods"],
"Slippage" : flag["records"]["slippage"]
})
まず、pd.dataframeでflag["records"]の各値を使用し、df(データフレーム)を作成します。
# 獲得利益の列を追加
records["Profit"] = records.Rate*lot
# 総利益の列を追加
records["Gross"] = records.Profit.cumsum()
# ドローダウンの列を追加
records["Drawdown"] = records.Gross.cummax().subtract( abs(records.Gross) )
records["DrawdownRate"] = records.Drawdown / records.Gross.cummax() * 100
# 月別集計の列を追加
records["Monthly_Records"] = pd.to_datetime( records.Date.apply(lambda x: x.strftime('%Y/%m')))
dfに新しい列を追加します。
records["Profit"] = records.Rate*lot
獲得利益を計算・追加します。
records["Gross"] = records.Profit.cumsum()
その行までのProfitの累積和を計算・追加します。
※.cumsum()でその行までの累積和を計算できる
records["Drawdown"] = records.Gross.cummax().subtract( abs(records.Gross) ) その行までのGrossの最大値-Grossでドローダウンを計算・追加します。
※.cummax()でその行までの最大値を取り出せる
※.subtract()で引き算ができる
records["DrawdownRate"] = records.Drawdown / records.Gross.cummax()
ドローダウン/その行までのGrossの最大値でドローダウンの割合を計算・追加します。
records["Monthly_Records"] = pd.to_datetime( records.Date.apply(lambda x: x.strftime('%Y/%m')))
Dateから年/月のみを抽出した列を追加します。
records.Dateをstrftimeで変換して追加してます。
# dfを月別でグループ分け
grouped = records.groupby("Monthly_Records")
#月別集計データのdfを作成
monthly_records = pd.DataFrame({
"Count" : grouped.Price_range.count(),
"Profit" : grouped.Profit.sum(),
"Rate" : grouped.Rate.mean(),
"Drawdown" : grouped.Drawdown.max(),
"Periods" : grouped.Periods.mean()
})
grouped = records.groupby("Monthly_Records")
records.Monthly_Recordsが同じ値のものでグループ分けします。
monthly_records =
Monthly_Recordsでグループ分けしたdfで、月別のdfを作成
# 買いエントリーと売りエントリーだけをそれぞれ抽出
buy_records = records[records.Side.isin(["BUY"])]
sell_records = records[records.Side.isin(["SELL"])]
records.SideがBUYのもの、SELLのものをそれぞれ抽出し、buy_records・sell_recordsに入れておきます。
集計の時に使います。
print("\nバックテスト結果")
print("==============================")
print("--------買いエントリの成績--------")
print("トレード回数 : {}回".format( len(buy_records) ))
print("勝率 : {}%".format(round(len(buy_records[buy_records.Price_range>0]) / len(buy_records) * 100,1)))
print("平均リターン : {}%".format(round(buy_records.Rate.mean()*100,2)))
print("総損益 : {}$".format(round( buy_records.Rate.sum()*lot ,2)))
print("平均保有期間 : {}足".format( round(buy_records.Periods.mean(),1) ))
ここから画面出力ゾーンです。
先ほど作ったPandas_dfを使って算出していきます。
print("トレード回数 : {}回".format( len(buy_records) ))
買いエントリの回数は、buy_recordsの個数で求めます。
※len()でサイズが分かる。
print("勝率 : {}%".format(round(len(buy_records[buy_records.Price_range>0]) / len(buy_records) * 100,1)))
勝率は、buy.recordsでPrice_rangeがプラスのもの/buy.recordsの総数で求めます。(Price_rangeの部分はProfitでもRateでもいいです。)
※df名[df名.要素>0]で、[]内の条件を満たしているものだけを抽出できる
print("平均リターン : {}%".format(round(buy_records.Rate.mean()*100,2)))
平均リターンは、buy_records.Rate.mean()で求めます。
※.mean()で平均値が分かる。
print("総損益 : {}$".format(round( buy_records.Rate.sum()*lot ,2)))
総損益は、buy_records.Rate.sum()*lotで求めます。
※.sum()で総和が分かる。
print("平均保有期間 : {}足".format( round(buy_records.Periods.mean(),1) ))
平均保有期間は、buy_records.Periods.mean()で求めます。
売りエントリの成績・総合成績も基本的に同じ方法で求めます。
print("\n------------総合成績--------------")
#中略
print("最大ドローダウン : {0}$ / {1}%".format(round(-1 * records.Drawdown.max()), round( records.DrawdownRate.loc[records.Drawdown.idxmax()] )))
records.DrawdownRate.loc[records.Drawdown.idxmax()]
この部分の意味は、"DrawdownRate列の中から、records.Drawdownが最大になっている行の要素を取り出す"です。
※.loc[]で[]内の条件を満たしている行を取り出せる
※.idxmax()で最大値を検索できる
print("\n--------------月別成績------------")
for index , row in monthly_records.iterrows():
print("===================================")
print( "{0}年{1}月".format( index.year, index.month ) )
print("-----------------------------------")
print("トレード数 : {}回".format( row.Count.astype(int) ))
print("月間損益 : {}$".format( row.Profit.astype(int) ))
print("平均リターン : {}%".format( round(row.Rate*100 ,2)))
print("月間最大ドローダウン : {}$".format( -1 * row.Drawdown.astype(int) ))
print("平均保有期間 : {}足".format( round(row.Periods.astype(float),1) ))
print("==============================")
for index , row in monthly_records.iterrows()で、monthly_recordsデータフレームを1行ずつfor文処理します
monthly_recordsのdfは以下の様にまとめられているので、これを1行ずつ処理しているわけです。
print( "{0}年{1}月".format( index.year, index.month ) )
index.yearで、0列目(index)から、yearを取り出しています。
indexはpd.to_datetimeによりでDate型になってるので、Yearやmonthを取り出すことができます。
print("トレード数 : {}回".format( row.Count.astype(int) ))
row(行).Countの数値をastype(int)でint型に変換しています。
※行を指定するときはrow
※astype()で型を変換できる
他の部分は%で確認するために*100したり、必要に応じてroundで四捨五入してます。
#グラフを表示
plot(records)
#log,dfファイルを出力
File_output(flag,records)
最後にグラフ表示の関数、ファイル出力の関数を呼び出します。
・main()
flag = {
"order":{
"exist" : False, #オーダーの有り無し
"side" : "", #買いか売りか
"count" : 0, #保有期間
"price" : 0 #オーダー価格
},
"position":{
"exist" : False, #ポジションの有り無し
"side" : "", #買いか売りか
"count" : 0, #保有期間
"price" : 0 #ポジション価格
},
"records":{
# //////////新規追加部分//////////
"date":[], # 決済日時
"Price_range":[], # 全トレードの損益を記録
"side":[], # 買いか売りか
"return":[], # リターン
"holding-periods":[], # ポジション保有期間
"slippage":[], # 各トレードで生じた手数料を記録
"log":[] # あとでテキストファイルに出力したい内容を記録
}
}
main()の変化点は、backtest(flag)で使うために、flag[records]に項目を追加したくらいです。
解説終わり!疲れた!
◆実行結果
※もしコピペして実行するときは、.pyファイルがあるフォルダの中に、"log"という名前のフォルダを作ってください。
コードを実行すると
①コンソール出力
②グラフ出力
③ログファイル出力
④recordsのcxvファイル出力
の4つの出力を行います。
③、④はlogフォルダに生まれます。
①コンソール出力
結果が一覧で確認できます。いいっすね~最高だぜ!!
②グラフ出力
これは前回と同じ
③ログファイル出力
これも前回と同じ
④csvファイル出力
今回の目玉と言っても過言ではないcsv出力です!
※開幕から爆損していてDrawdownRateがおかしな値になってますが、、、
Excelならほとんどの人が使った経験あると思うので、もしPandasで詰まったら、”Excelでやるならどういう処理をすればいいか”を考えるといいと思います。
例えば”買いエントリーのみを確認するなら、Sideの列でフィルターを掛ける”とか、勝ちトレードを確認するならProfitの列の値が0より大きい行のみを表示させるとか。
いやーPandasめっちゃ便利ですね。
シンプルにデータ処理が面白くて、今日本屋でPandasの本を眺めてきました(笑)
↓これが一番読みやすかったです。でも簡単な分、物足りない感じもしたかも。
↓こちらは索引ができて便利でした。
突然のアフィタイム終わり。
今回はここまで!
次回はパラメータの最適化をやりたい。