
仮想通貨bot 勉強記録⑮
~エラー対策コードを書く~
◆前回までのあらすじ
試作コードを作ったけど、ZeroDivisionErrorというエラーが出てしまいました。
◆今回やること
・エラーが出たときに、想定内のエラーなら自動で対処できるようにする!
例外処理というらしいです。
今回はZeroDivisionErrorが出る原因解説と、エラー時に対処できるようにコードを修正します。
修正後のコードはこちら
from datetime import datetime,timedelta
import time
from rich import print as pp
import pybybit
apis = [
'プライベートキー',
'シークレットキー'
]
bybit = pybybit.API(*apis, testnet=True)
#最新ローソク足を取得する関数
def get_price(min,i):
t = int((datetime.now() - timedelta(hours=1)).timestamp())#1時間前の時刻取得&変換
d = bybit.rest.inverse.public_kline_list(
symbol = "BTCUSD",
interval=min,
from_=t).json()
data = d['result'][i]
#返り値取得
return { "close_time" : data['open_time'],
"open_price" : float(data['open']),
"high_price" : float(data['high']),
"low_price" : float(data['low']),
"close_price" : float(data['close'])}
#画面出力
def print_price(data):
pp( " 時間: " + datetime.fromtimestamp(data['close_time']).strftime('%Y/%m/%d %H:%M')
+ " 始値: " + str(data['open_price'])
+ " 終値: " + str(data['close_price']))
#ローソク足の条件判別
def check_candle(data,side):
try:
#実体割合の算出
realbody_rate = abs(data["close_price"] - data["open_price"]) / (data["high_price"]-data["low_price"])
except ZeroDivisionError:
realbody_rate = 0
try:
#実体の大きさの算出
increase_rate = (data["close_price"] / data["open_price"]) - 1
except ZeroDivisionError:
increase_rate = 0
if side == "buy":
if data["close_price"] < data["open_price"] : return False #ローソク足が赤だったらFalse
elif increase_rate < 0.0001 : return False #実体の大きさが現在価格の0.01%未満ならFalse
elif realbody_rate < 0.1 : return False #実体の割合がローソク足の10%未満ならFalse
else : return True #上記すべて条件が当てはまらなければTrue
if side == "sell":
if data["close_price"] > data["open_price"] : return False #ローソク足が緑だったらFalse
elif increase_rate > -0.0001 : return False #実体の大きさが現在価格の0.01%未満ならFalse
elif realbody_rate < 0.1 : return False #実体の割合がローソク足の10%未満ならFalse
else : return True #上記すべて条件が当てはまらなければTrue
#ローソク足の連続上昇の判別
def check_ascend( data,last_data ):
#今回の始値が前回の始値を上回っている且つ今回の終値が前回の終値を上回っていればTure
if data["open_price"] > last_data["open_price"] and data["close_price"] > last_data["close_price"]:
return True
else: return False
#ローソク足の連続下降の判別
def check_descend( data,last_data ):
#今回の始値が前回の始値を下回っている且つ今回の終値が前回の終値を下回っていればTure
if data["open_price"] < last_data["open_price"] and data["close_price"] < last_data["close_price"]:
return True
else: return False
#買いサイン・注文を出す関数
def buy_signal( data,last_data,flag ):
if flag["buy_signal"] == 0 and check_candle( data,"buy" ):#陽線1本目
flag["buy_signal"] = 1
elif flag["buy_signal"] == 1 and check_candle( data,"buy" ) and check_ascend( data,last_data ):#陽線2本目
flag["buy_signal"] = 2
elif flag["buy_signal"] == 2 and check_candle( data,"buy" ) and check_ascend( data,last_data ):#陽線3本目
pp("3本連続で陽線 なので" + str(data["close_price"]) + "で買い指値")
flag["buy_signal"] = 3
#指値買い注文コード
order = bybit.rest.inverse.private_order_create(
side='Buy',
symbol='BTCUSD',
order_type='Limit',
qty=1,
price=data["close_price"],
time_in_force='GoodTillTime')
order
flag["order"]["exist"] = True
flag["order"]["side"] = "BUY"
#陽線が途切れたらシグナルリセット
else:
flag["buy_signal"] = 0
return flag
#売りサイン・注文を出す関数
def sell_signal( data,last_data,flag ):
if flag["sell_signal"] == 0 and check_candle( data,"sell" ):#陰線1本目
flag["sell_signal"] = 1
elif flag["sell_signal"] == 1 and check_candle( data,"sell" ) and check_descend( data,last_data ):#陰線2本目
flag["sell_signal"] = 2
elif flag["sell_signal"] == 2 and check_candle( data,"sell" ) and check_descend( data,last_data ):#陰線3本目
pp("3本連続で陰線 なので" + str(data["close_price"]) + "で売り指値")
#指値売り注文コード
order = bybit.rest.inverse.private_order_create(
side='Sell',
symbol='BTCUSD',
order_type='Limit',
qty=1,
price=data["close_price"],
time_in_force='GoodTillTime')
order
flag["order"]["exist"] = True
flag["order"]["side"] = "SELL"
#陰線が途切れたらシグナルリセット
else:
flag["sell_signal"] = 0
return flag
#決済用の関数
def close_position( data,last_data,flag ):
if flag["position"]["side"] == "BUY":
if data["close_price"] < last_data["close_price"]:
pp("前回の終値を下回ったので" + str(data["close_price"]) + "あたりで成行で決済します")
#成行売り注文コード
order = bybit.rest.inverse.private_order_create(
side='Sell',
symbol='BTCUSD',
order_type='Market',
qty=1,
price=data["close_price"],
time_in_force='GoodTillTime')
order
flag["position"]["exist"] = False
if flag["position"]["side"] == "SELL":
if data["close_price"] > last_data["close_price"]:
pp("前回の終値を上回ったので" + str(data["close_price"]) + "あたりで成行で決済します")
#成行買い注文コード
order = bybit.rest.inverse.private_order_create(
side='Buy',
symbol='BTCUSD',
order_type='Market',
qty=1,
price=data["close_price"],
time_in_force='GoodTillTime')
order
flag["position"]["exist"] = False
return flag
#注文状況確認用の関数
def check_order( flag ):
position = bybit.rest.inverse.private_position_list(symbol = 'BTCUSD').json()
position = position['result']['side']
orders = bybit.rest.inverse.private_order_list(symbol = 'BTCUSD').json()
orders = orders['result']['data']
if position != 'None':
pp("注文が約定しました!")
flag["order"]["exist"] = False
flag["order"]["count"] = 0
flag["position"]["exist"] = True
flag["position"]["side"] = flag["order"]["side"]
else:
if orders[0]['order_status'] == "New":
pp("まだ未約定の注文があります")
for o in orders:
if o['order_status'] == "New":
pp(o['order_id'])
flag["order"]["count"] += 1
if flag["order"]["count"] > 6:
flag = cancel_order( orders,flag )
else:
pp("注文が遅延しているようです")
return flag
#注文をキャンセルする関数
def cancel_order( orders,flag ):
orders = bybit.rest.inverse.private_order_list(symbol = 'BTCUSD').json()
orders = orders['result']['data']
if orders[0]['order_status'] == "New":
for o in orders:
if o['order_status'] == "New":
bybit.rest.inverse.private_order_cancel(
symbol = 'BTCUSD',
order_id = o['order_id'])
pp("約定していない注文をキャンセルしました")
flag["order"]["count"] = 0
flag["order"]["exist"] = False
time.sleep(20)
position = bybit.rest.inverse.private_position_list(symbol = 'BTCUSD').json()
position = position['result']['side']
if not position == 'None':
pp("現在、未決済の建玉はありません")
else:
pp
("現在、まだ未決済の建玉があります")
flag["position"]["exist"] = True
flag["position"]["side"] = position
return flag
#メイン関数
def main():
last_data = get_price(1,-2)
print_price( last_data )
flag = {
"buy_signal":0,
"sell_signal":0,
"order":{
"exist" : False,
"side" : "",
"count" : 0},
"position":{
"exist" : False,
"side" : ""}}
time.sleep(10)
while True:
if flag["order"]["exist"]:
flag = check_order( flag )
data = get_price(1,-2)
if data["close_time"] != last_data["close_time"]:
print_price( data )
if flag["position"]["exist"]:
flag = close_position( data,last_data,flag )
else:
flag = buy_signal( data,last_data,flag )
flag = sell_signal( data,last_data,flag )
last_data["close_time"] = data["close_time"]
last_data["open_price"] = data["open_price"]
last_data["close_price"] = data["close_price"]
time.sleep(10)
main()
前回からほぼ変わってないです。
◆解説
・ZeroDivisionErrorについて
ZeroDivisionErrorというのは、ある値を0あるいは限りなく0に近い値で割ろうとしたときに出るエラーのようです。
コンピューターに計算させると、x/0=∞となってしまい、エラーが出るらしい。
・ZeroDivisionErrorが出る場所
エラーログを見ると、realbody_rateを算出するときにZeroDivisionErrorが出ています。
何回かbotを動かしてみたら、increase_rateを算出するときにも同じエラーで止まったことがありました。
・対策
エラー対策として、”エラーが出た時に行う処理”を事前時に準備しておくことで、エラー時にbotが停止してしまうことを回避します。
try:
#処理
except エラー名:
#エラーが返ってきたときの処理
pythonでは、上記のコードを使うことで例外処理を行います。
対策をした場所を確認してみます。
def check_candle(data,side):
try:
#実体割合の算出
realbody_rate = abs(data["close_price"] - data["open_price"]) / (data["high_price"]-data["low_price"])
except ZeroDivisionError:
realbody_rate = 0
try:
#実体の大きさの算出
increase_rate = (data["close_price"] / data["open_price"]) - 1
except ZeroDivisionError:
increase_rate = 0
実体割合の算出の場合を見ていきます。
まずtry:で処理を行います。
処理を行って帰ってきた結果がZeroDivisionErrorだった場合、increase_rateに0を代入します。
これで、ZeroDivisionErrorを回避することができます。
※increase_rateも同様です。
◆結果
例外処理無しだと割とすぐに止まってしまいましたが、いい感じです。
もし他にもエラーが出るようなら、その都度例外処理を追加していきます。
やっとbybitで動くbotができたぞ~!!
次からはバックテスト用のデータ集めなどをやっていこうと思ってます。