見出し画像

仮想通貨bot 勉強記録㉔

~作成したロジックの最適パラメータを探してみる~

◆前回までのあらすじ

Pandasを使って月別の成績を確認したり、CSVファイルを出力できるようになりました。

◆今回やること

・総当たりテストで最適パラメータを探してみる

今回はめちゃくちゃ難しかった。
分からな過ぎて何度か放心状態になったけど、なんとかパラメータ探索ができるようになりました。

作成したコードはこちら↓

from datetime import datetime
import pybybit
import time
import json
import matplotlib.pyplot as plt
import pandas as pd

#====================API設定====================
apis = [
'プライベートキー',
'シークレットキー'
]

bybit = pybybit.API(*apis, testnet=True)
#===============================================

#====================バックテストの初期設定値====================
lot         = 1000                                 # 1トレードのロット($)
slippage    = 0.001                                # 手数料やスリッページ(0.075%初期値)
wait       =  0                                    # 待機時間
start = '2019/06/01 09:00'                         # ローソク足取得開始時刻
get_start = int(datetime.strptime(start, '%Y/%m/%d %H:%M').timestamp()) #タイムスタンプ変換
n = 50                                             # ローソク足取得リクエスト回数
path = "log/donchian-2021-04-14-18-16-price_list.json"

#====================バックテストのパラメーター設定====================
chart_min_list  = [ 30,60,120,240,720,"D" ] # テストに使う時間軸
buy_term_list   = [ 20,25,30,35,40,45 ]     # テストに使う上値ブレイクアウトの期間
sell_term_list  = [ 20,25,30,35,40,45 ]     # テストに使う下値ブレイクアウトの期間
judge_price_list = [
   {"BUY":"close","SELL":"close"},         # ブレイクアウト判定に終値を使用
   {"BUY":"high","SELL":"low"}             # ブレイクアウト判定に高値・安値を使用
]

#====================APIから価格データ取得(ローソク足の本数指定)====================
def get_price_from_API(chart_min,get_start,n):
   price = []

   #200*n本のローソク足を取得して、price[]に入れる
   for o in range(n):
       #pybybitでローソク足取得
       k = bybit.rest.inverse.public_kline_list(
             symbol = "BTCUSD",
             interval= chart_min,
             from_ = get_start
             ).json()

       #priceに取得したデータを入れる
       price += k["result"]
       #200本x足の長さ分だけタイムスタンプを進める
       if chart_min =="D":
           get_start += 200*60*1440
       else:
           get_start += 200*60*chart_min

   get_start = int(datetime.strptime(start, '%Y/%m/%d %H:%M').timestamp())

   return price

#====================パラメータぶんのローソク足をリスト化する====================
def get_price_amount(chart_min_list):
   price_list = {}                                                       #ローソク足を入れる変数
   for chart_min in chart_min_list:                                      #for文(chart_min_listの数だけ処理を行う)
       print("{0}分足取得中".format([chart_min]))
       price_list[chart_min] = get_price_from_API(chart_min,get_start,n) #chart_min分足のローソク足取得リクエストをn回行う

   return price_list


#====================ファイルから価格データを読み込む====================
def get_price_from_file(path):
   file = open(path,'r',encoding='utf-8')
   price = json.load(file)

   print(price)
   return price


#====================ロジック判定====================
def donchian( data,last_data,buy_term,sell_term,judge_price ):

   highest = max(i["high"for i in last_data[(-1*buy_term):])
   if data[ judge_price["BUY"]] > highest:
       return {"side":"BUY","price":highest}

   lowest = min(i["low"for i in last_data[(-1*sell_term):])
   if data[judge_price["SELL"]] < lowest:
       return {"side":"SELL","price":lowest}

   return {"side" : None , "price":0}


#====================買い・売り注文====================
def entry_signal( data,last_data,flag,buy_term,sell_term,judge_price ):

   signal = donchian( data,last_data,buy_term,sell_term,judge_price )

   if signal["side"] == "BUY":

       # ここに買い注文のコードを入れる

       flag["order"]["exist"] = True
       flag["order"]["side"] = "BUY"
       flag["order"]["price"] = float(data["close"])

   if signal["side"] == "SELL":

       # ここに売り注文のコードを入れる

       flag["order"]["exist"] = True
       flag["order"]["side"] = "SELL"
       flag["order"]["price"] = data["close"]

   return flag


#====================注文状況確認====================
def check_order( flag ):

   """ここに注文状況確認コード"""

   flag["order"]["exist"] = False
   flag["order"]["count"] = 0

   flag["position"]["exist"] = True
   flag["position"]["side"] = flag["order"]["side"]
   flag["position"]["price"] = flag["order"]["price"]

   return flag


#====================成行決済&ドテン注文====================
def close_position( data,last_data,flag,buy_term,sell_term,judge_price ):

   flag["position"]["count"] += 1
   signal = donchian( data,last_data,buy_term,sell_term,judge_price )

   if flag["position"]["side"] == "BUY":
       if signal["side"] == "SELL":

           # 成行決済注文コードを入れる

           records( flag,data )
           flag["position"]["exist"] = False
           flag["position"]["count"] = 0

           # 売り指値注文のコードを入れる

           flag["order"]["exist"] = True
           flag["order"]["side"] = "SELL"
           flag["order"]["price"] = float(data["close"])


   if flag["position"]["side"] == "SELL":
       if signal["side"] == "BUY":

           # 成行決済注文コードを入れる

           records( flag,data )
           flag["position"]["exist"] = False
           flag["position"]["count"] = 0

           # 買い指値注文のコードを入れる

           flag["order"]["exist"] = True
           flag["order"]["side"] = "BUY"
           flag["order"]["price"] = float(data["close"])

   return flag

#====================トレードパフォーマンス確認====================
def records(flag,data):
   #手数料等の計算
   entry_price = float(flag["position"]["price"])
   exit_price = round(float(data["close"]))
   trade_cost  = lot * slippage

   flag["records"]["slippage"].append(trade_cost)

   # 決済日時,ポジションの保有期間を記録
   flag["records"]["date"].append(datetime.fromtimestamp(data["open_time"]).strftime('%Y/%m/%d %H:%M'))
   flag["records"]["holding-periods"].append(flag["position"]["count"])

   # 値幅の計算
   buy_Price_range = exit_price - entry_price
   sell_Price_range = entry_price - exit_price

   # 利益率の計算
   buy_return = buy_Price_range/entry_price
   sell_return = sell_Price_range/entry_price

   #利益・損失の確認
   if flag["position"]["side"] == "BUY":
       flag["records"]["return"].append( buy_return )              #獲得リターンを記録
       flag["records"]["side"].append( flag["position"]["side"] )  #買いか売りかを記録


   if flag["position"]["side"] == "SELL":
       flag["records"]["return"].append( sell_return )              #獲得リターンを記録
       flag["records"]["side"].append( flag["position"]["side"] )   #買いか売りかを記録


   return flag


#====================損益曲線をプロット====================
def plot(records):

   plt.plot( records.Date, records.Gross )  #X軸、Y軸の値を指定
   plt.xlabel("Date")                       #X軸のラベル名
   plt.ylabel("Balance")                    #Y軸のラベル名
   plt.xticks(rotation=50)                  # X軸の目盛りを50度回転

   plt.show()                               #グラフの表示


#====================ファイルを出力====================
def File_output(df):

   #pandasのdfをcsvで出力
   df.to_csv("log/donchian-{0}-records.csv".format(datetime.now().strftime("%Y-%m-%d-%H-%M")))


#====================バックテストの集計====================
def backtest(flag):

   # 成績を記録したpandas DataFrameを作成
   records = pd.DataFrame({
       "Date"          :  pd.to_datetime(flag["records"]["date"]),#決済日時
       "Side"          :  flag["records"]["side"],                #ポジションの側
       "Rate"          :  flag["records"]["return"],              #獲得レート
       "Periods"       :  flag["records"]["holding-periods"],     #ポジション保有期間
       "Slippage"      :  flag["records"]["slippage"]             #手数料等
   })

   # 獲得利益の列を追加
   records["Profit"] = records.Rate*lot

   # 総利益の列を追加
   records["Gross"] = records.Profit.cumsum()

   # ドローダウンの列を追加
   records["Drawdown"] = records.Gross.cummax().subtract( abs(records.Gross) )
   records["DrawdownRate"] = records.Drawdown / records.Gross.cummax() * 100

   print("\nバックテスト結果")
   print("==============================")
   print("\n------------総合成績--------------")
   print("全トレード数       :  {}回".format(len(records) ))
   print("勝率            :  {}%".format(round(len(records[records.Profit>0]) / len(records) * 100,1)))
   print("平均リターン       :  {}%".format(round(records.Rate.mean()*100,2)))
   print("平均保有期間     :  {}足".format(round(records.Periods.mean(),1) ))
   print("最大の勝ちトレード  :  {}$".format((round(records.Rate.max()*lot,2))))
   print("最大の負けトレード  :  {}$".format((round(records.Rate.min()*lot,2))))
   print("最大ドローダウン    :  {0}$ / {1}%".format(round(-1 * records.Drawdown.max()), round( records.DrawdownRate.loc[records.Drawdown.idxmax()] )))
   print("利益合計         :  {}$".format((round(records[records.Rate>0].Rate.sum()*lot,2))))
   print("損失合計         :  {}$".format(round(records[records.Rate<0].Rate.sum()*lot,2),))
   print("手数料合計       :  {}$".format(-1 * records.Slippage.sum() ))
   print("最終損益         :  {}$".format((round(records.Rate.sum()*lot-(records.Slippage.sum()) ,2))))
   print("==============================")

   result = {
       "Trade-count"  : len(records),                                                #トレード回数
       "Win-rate"     : round(len(records[records.Profit>0]) / len(records) * 100,1),#勝率
       "Return-ave"   : round(records.Rate.mean(),2),                                #平均リターン
       "DD-rate-max"  : -1 * records.DrawdownRate.max(),                             #最大ドローダウンレート
       "Gross"        : records.Profit.sum(),                                        #最終損益
       "PF"           : round( -1 * (records[records.Profit>0].Profit.sum() / records[records.Profit<0].Profit.sum()) ,2)#プロットファクター
   }

   return result


#====================テスト&集計====================
def aggregate():
   # chart_min_listのローソク足リストを取得
   price_list = get_price_amount(chart_min_list)

   # テストごとの各パラメーターの組み合わせと結果を記録する配列を準備
   param = {
       "buy_term"    : [],
       "sell_term"   : [],
       "chart_min"   : [],
       "judge_price" : []
       }

   all_result = {
       "count" : [],
       "winRate" : [],
       "returnRate" : [],
       "Drawdown" : [],
       "ProfitFactor" : [],
       "Gross" : []
       }


   # 総当たりのためのfor文の準備
   combinations = [(chart_min, buy_term, sell_term, judge_price)
       for chart_min in chart_min_list
       for buy_term  in buy_term_list
       for sell_term in sell_term_list
       for judge_price in judge_price_list]

   # 総当たり処理
   for chart_min, buy_term, sell_term, judge_price in combinations:
       price = price_list[ chart_min ]
       last_data = []
       i = 0

       # フラッグ変数の初期化
       flag = {
           "order":{
               "exist" : False,
               "side" : "",
               "price" : 0,
               "count" : 0
           },
           "position":{
               "exist" : False,
               "side" : "",
               "price"0,
               "count":0
           },
           "records":{
               "date":[],
               "profit":[],
               "return":[],
               "side":[],
               "holding-periods":[],
               "slippage":[]

           }
       }

       # price全数でバックテストを行う(ローソク足を6000本取得していたら6000回)
       while i < len(price):

           # ドンチャンの判定に使う期間分の安値・高値データを準備する
           if len(last_data) < buy_term or len(last_data) < sell_term:
               last_data.append(price[i])
               time.sleep(wait)
               i += 1
               continue

           data = price[i]

           # バックテスト実施
           if flag["order"]["exist"]:
               flag = check_order( flag )
           elif flag["position"]["exist"]:
               flag = close_position( data,last_data,flag,buy_term,sell_term,judge_price )
           else:
               flag = entry_signal( data,last_data,flag,buy_term,sell_term,judge_price )

           last_data.append( data )
           i += 1
           time.sleep(wait)


       print("テスト期間 ")
       print("==============================")
       print("開始時点  : " + str(datetime.fromtimestamp(float(price[0]["open_time"]))))
       print("終了時点  : " + str(datetime.fromtimestamp(float(price[-1]["open_time"]))))
       print("パラメータ1 : " + str(buy_term)  + "期間 / 買い" )
       print("パラメータ2 : " + str(sell_term) + "期間 / 売り" )
       print("足       : {0}".format(chart_min))
       print(str(len(price)) + "件のローソク足データで検証")
       print("==============================")

       result = backtest( flag )

       # 今回のループで使ったパラメータの組み合わせを配列に記録する
       param["buy_term"].append( buy_term )
       param["sell_term"].append( sell_term )
       param["chart_min"].append( chart_min )

       if judge_price["BUY"] == "high":
           param["judge_price"].append( "high/low" )
       else:
           param["judge_price"].append( "open/close" )

       # 今回のループのバックテスト結果を配列に記録する
       all_result["count"].append( result["Trade-count"] )
       all_result["winRate"].append( result["Win-rate"] )
       all_result["returnRate"].append( result["Return-ave" ] )
       all_result["Drawdown"].append( result["DD-rate-max"] )
       all_result["ProfitFactor"].append( result["PF"] )
       all_result["Gross"].append( result["Gross"] )

   return param,all_result

#====================表にまとめて、出力====================
def pandas():
   param,all_result = aggregate()

   # 全てのパラメータによるバックテスト結果をPandasで1つの表にする
   df = pd.DataFrame({
   "Interval"      : param["chart_min"],
   "Buy_term"      : param["buy_term"],
   "Sell_term"     : param["sell_term"],
   "Judge_price"   : param["judge_price"],
   "Trade-count"   : all_result["count"],
   "Win-Rate"      : all_result["winRate"],
   "Reture-Ave"    : all_result["returnRate"],
   "DrawDownRate"  : all_result["Drawdown"],
   "PF"            : all_result["ProfitFactor"],
   "Gross"         : all_result["Gross"]
   })

   # トレード回数が100に満たない記録は消す
   df.drop( df[ df["Trade-count"] < 100].index, inplace=True )

   print(df)
   File_output(df)

pandas()

◆解説

・初期値設定

#====================バックテストの初期設定値====================
lot         = 1000                                 # 1トレードのロット($)
slippage    = 0.001                                # 手数料やスリッページ(0.075%初期値)
wait       =  0                                    # 待機時間
start = '2019/06/01 09:00'                         # ローソク足取得開始時刻
get_start = int(datetime.strptime(start, '%Y/%m/%d %H:%M').timestamp()) #タイムスタンプ変換
n = 50                                             # ローソク足取得リクエスト回数

#====================バックテストのパラメーター設定====================
chart_min_list  = [ 30,60,120,240,720,"D" ] # テストに使う時間軸
buy_term_list   = [ 20,25,30,35,40,45 ]     # テストに使う上値ブレイクアウトの期間
sell_term_list  = [ 20,25,30,35,40,45 ]     # テストに使う下値ブレイクアウトの期間
judge_price_list = [
   {"BUY":"close","SELL":"close"},         # ブレイクアウト判定に終値を使用
   {"BUY":"high","SELL":"low"}             # ブレイクアウト判定に高値・安値を使用
]

今回はローソク足の取得を本数で指定します。(前回までは期間で指定してた)
そのため、変数:nを設定しています。初期値は50で、n*200本のローソク足を取得します。

パラメータ設定について
今回調べるパラメータは4種類です。

①時間軸(足の長さ)
②上値ブレイクアウトの期間
③下値ブレイクアウトの期間
④ブレイクアウト判定に使用する価格

これをそれぞれリストに入れています。
テストは総当たりで行うので、①×②×③×④回のバックテスト結果を取得します。

・get_price_from_API(chart_min,get_start,n)

#====================APIから価格データ取得(ローソク足の本数指定)====================
def get_price_from_API(chart_min,get_start,n):
   price = []

   #200*n本のローソク足を取得して、price[]に入れる
   for o in range(n):
       #pybybitでローソク足取得
       k = bybit.rest.inverse.public_kline_list(
             symbol = "BTCUSD",
             interval= chart_min,
             from_ = get_start
             ).json()

       #priceに取得したデータを入れる
       price += k["result"]
       
       #200本x足の長さ分だけタイムスタンプを進める
       if chart_min =="D":
           get_start += 200*60*1440
       else:
           get_start += 200*60*chart_min

   get_start = int(datetime.strptime(start, '%Y/%m/%d %H:%M').timestamp())

   return price

ローソク足取得関数を前回までと変えてます。
以下の流れです。

price=[]を用意
②ローソク足取得・ローソク足をpriceに入れる・タイムスタンプを進めるをn回行う
③タイムスタンプをリセット
priceを返す

足の長さが"D"だとタイムスタンプを進める計算ができないので、
chart_min =="D"の時だけ1440分(24時間)ぶんのタイムスタンプを進めるようにしました。

・get_price_amount(chart_min_list)

#====================パラメータぶんのローソク足をリスト化する====================
def get_price_amount(chart_min_list):
   price_list = {}                                                       #ローソク足を入れる変数
   for chart_min in chart_min_list:                                      #for文(chart_min_listの数だけ処理を行う)
       print("{0}分足取得中".format([chart_min]))
       price_list[chart_min] = get_price_from_API(chart_min,get_start,n) #chart_min分足のローソク足取得リクエストをn回行う

   return price_list

get_price_from_API(chart_min,get_start,n)を使って、パラメータ:chart_min_listのローソク足をすべて取得・リスト化する関数です。
以下の流れです。

price_list={}を用意
②for文でパラメータ:chart_min_listのローソク足を取得し、price_listに入れる。
price_listを返す。

・donchian( data,last_data,buy_term,sell_term,judge_price )

#====================ロジック判定====================
def donchian( data,last_data,buy_term,sell_term,judge_price ):

   highest = max(i["high"for i in last_data[(-1*buy_term):])
   if data[ judge_price["BUY"]] > highest:
       return {"side":"BUY","price":highest}

   lowest = min(i["low"for i in last_data[(-1*sell_term):])
   if data[judge_price["SELL"]] < lowest:
       return {"side":"SELL","price":lowest}

   return {"side" : None , "price":0}

ロジック判定の部分ですが、前回と変わってます。
また、引数が増えてます。

 highest = max(i["high"for i in last_data[(-1*buy_term):])

highestを”last_dataの0~-buy_term番目までの中で、最も高い["high"]の値”にしてます。(ややこしい)

 if data[ judge_price["BUY"]] > highest:
      return {"side":"BUY","price":highest}

また、data[ judge_price["BUY"]] > highestを条件に{"side":"BUY","price":highest}を返します。
 judge_price["BUY"]というのはパラメータの一つで、判断条件(終値か高値/安値か)が入ります。こっちもややこしい。

ここ理解するの結構大変だと思います。(時間かかった)

lowestも同じです。

・entry_signal( data,last_data,flag,buy_term,sell_term,judge_price )

#====================買い・売り注文====================
def entry_signal( data,last_data,flag,buy_term,sell_term,judge_price ):

   signal = donchian( data,last_data,buy_term,sell_term,judge_price )

   if signal["side"] == "BUY":

       # ここに買い注文のコードを入れる

       flag["order"]["exist"] = True
       flag["order"]["side"] = "BUY"
       flag["order"]["price"] = float(data["close"])

   if signal["side"] == "SELL":

       # ここに売り注文のコードを入れる

       flag["order"]["exist"] = True
       flag["order"]["side"] = "SELL"
       flag["order"]["price"] = data["close"]

   return flag

引数が増えてますが、中身は前回と同じなので割愛。

・check_order( flag )

#====================注文状況確認====================
def check_order( flag ):

   """ここに注文状況確認コード"""

   flag["order"]["exist"] = False
   flag["order"]["count"] = 0

   flag["position"]["exist"] = True
   flag["position"]["side"] = flag["order"]["side"]
   flag["position"]["price"] = flag["order"]["price"]

   return flag

割愛。

・close_position( data,last_data,flag,buy_term,sell_term,judge_price )

#====================成行決済&ドテン注文====================
def close_position( data,last_data,flag,buy_term,sell_term,judge_price ):

   flag["position"]["count"] += 1
   signal = donchian( data,last_data,buy_term,sell_term,judge_price )

   if flag["position"]["side"] == "BUY":
       if signal["side"] == "SELL":

           # 成行決済注文コードを入れる

           records( flag,data )
           flag["position"]["exist"] = False
           flag["position"]["count"] = 0

           # 売り指値注文のコードを入れる

           flag["order"]["exist"] = True
           flag["order"]["side"] = "SELL"
           flag["order"]["price"] = float(data["close"])


   if flag["position"]["side"] == "SELL":
       if signal["side"] == "BUY":

           # 成行決済注文コードを入れる

           records( flag,data )
           flag["position"]["exist"] = False
           flag["position"]["count"] = 0

           # 買い指値注文のコードを入れる

           flag["order"]["exist"] = True
           flag["order"]["side"] = "BUY"
           flag["order"]["price"] = float(data["close"])

   return flag

割愛。

・records(flag,data)

#====================トレードパフォーマンス確認====================
def records(flag,data):
   #手数料等の計算
   entry_price = float(flag["position"]["price"])
   exit_price = round(float(data["close"]))
   trade_cost  = lot * slippage

   flag["records"]["slippage"].append(trade_cost)

   # 決済日時,ポジションの保有期間を記録
   flag["records"]["date"].append(datetime.fromtimestamp(data["open_time"]).strftime('%Y/%m/%d %H:%M'))
   flag["records"]["holding-periods"].append(flag["position"]["count"])

   # 値幅の計算
   buy_Price_range = exit_price - entry_price
   sell_Price_range = entry_price - exit_price

   # 利益率の計算
   buy_return = buy_Price_range/entry_price
   sell_return = sell_Price_range/entry_price

   #利益・損失の確認
   if flag["position"]["side"] == "BUY":
       flag["records"]["return"].append( buy_return )              #獲得リターンを記録
       flag["records"]["side"].append( flag["position"]["side"] )  #買いか売りかを記録


   if flag["position"]["side"] == "SELL":
       flag["records"]["return"].append( sell_return )              #獲得リターンを記録
       flag["records"]["side"].append( flag["position"]["side"] )   #買いか売りかを記録


   return flag

割愛。

・File_output(df)

#====================ファイルを出力====================
def File_output(df):

   #pandasのdfをcsvで出力
   df.to_csv("log/donchian-{0}-records.csv".format(datetime.now().strftime("%Y-%m-%d-%H-%M")))

前回と同じですが、今回はlogを出力しないので、txtファイルを出力する部分は消してます。

・backtest(flag)

#====================バックテストの集計====================
def backtest(flag):

   # 成績を記録したpandas DataFrameを作成
   records = pd.DataFrame({
       "Date"          :  pd.to_datetime(flag["records"]["date"]),#決済日時
       "Side"          :  flag["records"]["side"],                #ポジションの側
       "Rate"          :  flag["records"]["return"],              #獲得レート
       "Periods"       :  flag["records"]["holding-periods"],     #ポジション保有期間
       "Slippage"      :  flag["records"]["slippage"]             #手数料等
   })

   # 獲得利益の列を追加
   records["Profit"] = records.Rate*lot

   # 総利益の列を追加
   records["Gross"] = records.Profit.cumsum()

   # ドローダウンの列を追加
   records["Drawdown"] = records.Gross.cummax().subtract( abs(records.Gross) )
   records["DrawdownRate"] = records.Drawdown / records.Gross.cummax() * 100

   print("\nバックテスト結果")
   print("==============================")
   print("\n------------総合成績--------------")
   print("全トレード数       :  {}回".format(len(records) ))
   print("勝率            :  {}%".format(round(len(records[records.Profit>0]) / len(records) * 100,1)))
   print("平均リターン       :  {}%".format(round(records.Rate.mean()*100,2)))
   print("平均保有期間     :  {}足".format(round(records.Periods.mean(),1) ))
   print("最大の勝ちトレード  :  {}$".format((round(records.Rate.max()*lot,2))))
   print("最大の負けトレード  :  {}$".format((round(records.Rate.min()*lot,2))))
   print("最大ドローダウン    :  {0}$ / {1}%".format(round(-1 * records.Drawdown.max()), round( records.DrawdownRate.loc[records.Drawdown.idxmax()] )))
   print("利益合計         :  {}$".format((round(records[records.Rate>0].Rate.sum()*lot,2))))
   print("損失合計         :  {}$".format(round(records[records.Rate<0].Rate.sum()*lot,2),))
   print("手数料合計       :  {}$".format(-1 * records.Slippage.sum() ))
   print("最終損益         :  {}$".format((round(records.Rate.sum()*lot-(records.Slippage.sum()) ,2))))
   print("==============================")

   result = {
       "Trade-count"  : len(records),                                                #トレード回数
       "Win-rate"     : round(len(records[records.Profit>0]) / len(records) * 100,1),#勝率
       "Return-ave"   : round(records.Rate.mean(),2),                                #平均リターン
       "DD-rate-max"  : -1 * records.DrawdownRate.max(),                             #最大ドローダウンレート
       "Gross"        : records.Profit.sum(),                                        #最終損益
       "PF"           : round( -1 * (records[records.Profit>0].Profit.sum() / records[records.Profit<0].Profit.sum()) ,2)#プロットファクター
   }
   
   return result

recordsから"Price_range"の項目を削除しました。
また、今回は月別の集計は行わないので、それに関する部分も削除しています。

大きな変化点として、配列:resultを追加し、最後にresultを返しています。

   result = {
       "Trade-count"  : len(records),                                                #トレード回数
       "Win-rate"     : round(len(records[records.Profit>0]) / len(records) * 100,1),#勝率
       "Return-ave"   : round(records.Rate.mean(),2),                                #平均リターン
       "DD-rate-max"  : -1 * records.DrawdownRate.max(),                             #最大ドローダウンレート
       "Gross"        : records.Profit.sum(),                                        #最終損益
       "PF"           : round( -1 * (records[records.Profit>0].Profit.sum() / records[records.Profit<0].Profit.sum()) ,2)#プロットファクター
   }
   
   return result

resultは、”今回のパラメータでのテスト結果”を記録するための変数です。
後ですべてのテスト結果をまとめる際に使います。
それぞれの数字はrecordsから計算・代入してます。

"Trade-count" :トレード回数
 "Win-rate"    :勝率
"Return-ave"   :平均リターン
"DD-rate-max"  :最大ドローダウンレート
"Gross"      :最終損益
"PF"      :プロフィットファクター

・aggregate()

#====================テスト&集計====================
def aggregate():
   # chart_min_listのローソク足リストを取得
   price_list = get_price_amount(chart_min_list)

   # テストごとの各パラメーターの組み合わせと結果を記録する配列を準備
   param = {
       "buy_term"    : [],
       "sell_term"   : [],
       "chart_min"   : [],
       "judge_price" : []
       }

   all_result = {
       "count" : [],
       "winRate" : [],
       "returnRate" : [],
       "Drawdown" : [],
       "ProfitFactor" : [],
       "Gross" : []
       }


   # 総当たりのためのfor文の準備
   combinations = [(chart_min, buy_term, sell_term, judge_price)
       for chart_min in chart_min_list
       for buy_term  in buy_term_list
       for sell_term in sell_term_list
       for judge_price in judge_price_list]

   # 総当たり処理
   for chart_min, buy_term, sell_term, judge_price in combinations:
       price = price_list[ chart_min ]
       last_data = []
       i = 0

       # フラッグ変数の初期化
       flag = {
           "order":{
               "exist" : False,
               "side" : "",
               "price" : 0,
               "count" : 0
           },
           "position":{
               "exist" : False,
               "side" : "",
               "price"0,
               "count":0
           },
           "records":{
               "date":[],
               "profit":[],
               "return":[],
               "side":[],
               "holding-periods":[],
               "slippage":[]

           }
       }

       # price全数でバックテストを行う(ローソク足を6000本取得していたら6000回)
       while i < len(price):

           # ドンチャンの判定に使う期間分の安値・高値データを準備する
           if len(last_data) < buy_term or len(last_data) < sell_term:
               last_data.append(price[i])
               time.sleep(wait)
               i += 1
               continue

           data = price[i]

           # バックテスト実施
           if flag["order"]["exist"]:
               flag = check_order( flag )
           elif flag["position"]["exist"]:
               flag = close_position( data,last_data,flag,buy_term,sell_term,judge_price )
           else:
               flag = entry_signal( data,last_data,flag,buy_term,sell_term,judge_price )

           last_data.append( data )
           i += 1
           time.sleep(wait)


       print("テスト期間 ")
       print("==============================")
       print("開始時点  : " + str(datetime.fromtimestamp(float(price[0]["open_time"]))))
       print("終了時点  : " + str(datetime.fromtimestamp(float(price[-1]["open_time"]))))
       print("パラメータ1 : " + str(buy_term)  + "期間 / 買い" )
       print("パラメータ2 : " + str(sell_term) + "期間 / 売り" )
       print("足       : {0}".format(chart_min))
       print(str(len(price)) + "件のローソク足データで検証")
       print("==============================")

       result = backtest( flag )

       # 今回のループで使ったパラメータの組み合わせを配列に記録する
       param["buy_term"].append( buy_term )
       param["sell_term"].append( sell_term )
       param["chart_min"].append( chart_min )

       if judge_price["BUY"] == "high":
           param["judge_price"].append( "high/low" )
       else:
           param["judge_price"].append( "open/close" )

       # 今回のループのバックテスト結果を配列に記録する
       all_result["count"].append( result["Trade-count"] )
       all_result["winRate"].append( result["Win-rate"] )
       all_result["returnRate"].append( result["Return-ave" ] )
       all_result["Drawdown"].append( result["DD-rate-max"] )
       all_result["ProfitFactor"].append( result["PF"] )
       all_result["Gross"].append( result["Gross"] )

   return param,all_result

全組み合わせのテストを行う&集計を行う関数です。

   # chart_min_listのローソク足リストを取得
   price_list = get_price_amount(chart_min_list)

   # テストごとの各パラメーターの組み合わせと結果を記録する配列を準備
   param = {
       "buy_term"    : [],
       "sell_term"   : [],
       "chart_min"   : [],
       "judge_price" : []
       }

   all_result = {
       "count" : [],
       "winRate" : [],
       "returnRate" : [],
       "Drawdown" : [],
       "ProfitFactor" : [],
       "Gross" : []
       }

準備ゾーン①です。

①price_list:各chart_minのローソク足をすべて取得
②param :使用したパラメータを記録するリストを作成
③all_result:各テスト結果を追加していくリストを作成

    # 総当たりのためのfor文の準備
   combinations = [(chart_min, buy_term, sell_term, judge_price)
       for chart_min in chart_min_list
       for buy_term  in buy_term_list
       for sell_term in sell_term_list
       for judge_price in judge_price_list]

準備ゾーン②です。

総当たりの処理をするとき、Pythonでは以下の様に記述します。

combinations = [(a, b, c, d)
	for a in paramA
	for b in paramB
	for c in paramC
	for d in paramD]

これを当てはめて、準備コードを書きます。
こちらを参考にしています↓

   # 総当たり処理
   for chart_min, buy_term, sell_term, judge_price in combinations:
       price = price_list[ chart_min ]
       last_data = []
       i = 0

繰り返し処理の部分です。

以下の記述で、総当たりのバックテストを行います。

for a,b,c,d in combinations:
	# 各組合せで実行したい処理

priceにはchart_min足のローソク足データを代入し、last_dataに過去データを入れる配列を設定、while文のカウントに使うi0に設定しておきます。

        # フラッグ変数の初期化
       flag = {
           "order":{
               "exist" : False,
               "side" : "",
               "price" : 0,
               "count" : 0
           },
           "position":{
               "exist" : False,
               "side" : "",
               "price"0,
               "count":0
           },
           "records":{
               "date":[],
               "profit":[],
               "return":[],
               "side":[],
               "holding-periods":[],
               "slippage":[]

           }
       }

各バックテストで使うflagを初期化します。
これをやらないと、パラーメータ組み合わせ①で使ったflagのまま組み合わせ②に進んでしまい、テストができません。

        # price全数でバックテストを行う(ローソク足を6000本取得していたら6000回)
       while i < len(price):

           # ドンチャンの判定に使う期間分の安値・高値データを準備する
           if len(last_data) < buy_term or len(last_data) < sell_term:
               last_data.append(price[i])
               time.sleep(wait)
               i += 1
               continue

           data = price[i]

           # バックテスト実施
           if flag["order"]["exist"]:
               flag = check_order( flag )
           elif flag["position"]["exist"]:
               flag = close_position( data,last_data,flag,buy_term,sell_term,judge_price )
           else:
               flag = entry_signal( data,last_data,flag,buy_term,sell_term,judge_price )

           last_data.append( data )
           i += 1
           time.sleep(wait)

ローソク足を使って売買シミュレーションを行う部分です。
ここは前回と同じなので割愛。

       print("テスト期間 ")
       print("==============================")
       print("開始時点  : " + str(datetime.fromtimestamp(float(price[0]["open_time"]))))
       print("終了時点  : " + str(datetime.fromtimestamp(float(price[-1]["open_time"]))))
       print("パラメータ1 : " + str(buy_term)  + "期間 / 買い" )
       print("パラメータ2 : " + str(sell_term) + "期間 / 売り" )
       print("足       : {0}".format(chart_min))
       print(str(len(price)) + "件のローソク足データで検証")
       print("==============================")

       result = backtest( flag )

今回の組み合わせテストで使ったパラメータやローソク足の長さなどを表示し、backtest( flag )を呼び出します。
同時にresultにbacktest( flag )の返り値を入れます。

        # 今回のループで使ったパラメータの組み合わせを配列に記録する
       param["buy_term"].append( buy_term )
       param["sell_term"].append( sell_term )
       param["chart_min"].append( chart_min )

       if judge_price["BUY"] == "high":
           param["judge_price"].append"high/low" )
       else:
           param["judge_price"].append"open/close" )

       # 今回のループのバックテスト結果を配列に記録する
       all_result["count"].append( result["Trade-count"] )
       all_result["winRate"].append( result["Win-rate"] )
       all_result["returnRate"].append( result["Return-ave" ] )
       all_result["Drawdown"].append( result["DD-rate-max"] )
       all_result["ProfitFactor"].append( result["PF"] )
       all_result["Gross"].append( result["Gross"] )

   return param,all_result

最後に今回使用したパラメータを配列:paramに追加し、今回のバックテスト結果を配列:all_resultに追加して、param,all_resultを返します。

・pandas()

#====================表にまとめて、出力====================
def pandas():
   param,all_result = aggregate()

   # 全てのパラメータによるバックテスト結果をPandasで1つの表にする
   df = pd.DataFrame({
   "Interval"      : param["chart_min"],
   "Buy_term"      : param["buy_term"],
   "Sell_term"     : param["sell_term"],
   "Judge_price"   : param["judge_price"],
   "Trade-count"   : all_result["count"],
   "Win-Rate"      : all_result["winRate"],
   "Reture-Ave"    : all_result["returnRate"],
   "DrawDownRate"  : all_result["Drawdown"],
   "PF"            : all_result["ProfitFactor"],
   "Gross"         : all_result["Gross"]
   })

   # トレード回数が100に満たない記録は消す
   df.drop( df[ df["Trade-count"] < 100].index, inplace=True )

   print(df)
   # File_output(flag,records)
   File_output(df)

   param,all_result = aggregate()

   # 全てのパラメータによるバックテスト結果をPandasで1つの表にする
   df = pd.DataFrame({
   "Interval"      : param["chart_min"],
   "Buy_term"      : param["buy_term"],
   "Sell_term"     : param["sell_term"],
   "Judge_price"   : param["judge_price"],
   "Trade-count"   : all_result["count"],
   "Win-Rate"      : all_result["winRate"],
   "Reture-Ave"    : all_result["returnRate"],
   "DrawDownRate"  : all_result["Drawdown"],
   "PF"            : all_result["ProfitFactor"],
   "Gross"         : all_result["Gross"]
   })

Pandasデータフレームにまとめる関数です。
aggregate()からparam,all_resultを受け取り、dfにまとめます。

   # トレード回数が100に満たない記録は消す
   df.drop( df[ df["Trade-count"] < 100].index, inplace=True )

バックテストにて、トレード回数が100回未満の行を削除します。
トレード回数が少ないと結果が偏っている可能性がある(テストの信頼性が低い)ので、排除します。

df名.drop(条件)でdfから指定の行を削除できます。
inplce=Trueは、現在の表データに結果をそのまま上書きする、という意味だそうです。

   print(df)
   File_output(df)

最後にdfを表示し、File_output(df)を呼び出してCSV形式のファイルを生成します。
dfは別に表示しなくてもいいです。

以上!

あ、pandas()の実行を忘れずに。

◆実行結果

実行すると、logフォルダにCSVデータが出力されます。

※PFの列を降順に並び替えてます。

PFの意味の説明忘れてました。

「 総利益 / 総損失 」で表される指標のことをプロフィットファクター(PF)といいます。PFが大きければ大きいほど、そのシステムは安定していてリスクが小さいと評価できます。

参考元

とのことです。(全力他力本願)
データを見た感じ、60分足と120分足がPFがよさそうですね。
ここから更にパラメータを絞り込みたければ、コードの最初で設定したパラメータリストをいじってみるといいです。

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

これで、作成したロジックの最適パラメータを探すことができるようになりました。
かなり成長したんちゃうか!?
そろそろオリジナルロジック作りとか、精度の高い指標探しとか、アイデアをもとにバックテストしまくるとか、いろいろやりたいですね。

疲れたので今回はここまで!

いいなと思ったら応援しよう!