pe氏pybottersinagoflyer読み解き殴り書きメモ5
続き
前回はAbstractInagoBotの中身を見ていった。
今回はそれの子クラスのBitflyerInagoBotを見ていく
まずは初期処理から
def __init__(
self,
client: pybotters.Client,
store: pybotters.bitFlyerDataStore,
bar_l: BitflyerTimeBar,
bar_s: BitflyerTimeBar,
*,
lower_threshold: float,
upper_threshold: float,
entry_patience_seconds: int,
entry_price_change: int,
trail_margin: int,
symbol: str = "FX_BTC_JPY",
size: float = 0.01,
side: str = "BOTH",
**kwargs,
):
super(BitflyerInagoBot, self).__init__(client, **kwargs)
self._store = store
self._bar_l = bar_l
self._bar_s = bar_s
self._lower_threshold = lower_threshold
self._upper_threshold = upper_threshold
self._entry_patience_seconds = entry_patience_seconds
self._entry_price_change = entry_price_change
self._trail_margin = trail_margin
self._symbol = symbol
self._size = size
self._entry_side = side
self._entry_order_info = None
self._exit_order_info = None
self._asks, self._bids = None, None
asyncio.create_task(self.auto_ask_bid_update())
基本的にはここまでで作成されてるbarと、argsの引数がここに渡される
また、ここでもcreate_taskを用いてる。auto_ask_bid_updateの中身をみる
async def auto_ask_bid_update(self):
"""板情報の自動更新タスク"""
while True:
await self._store.board.wait()
self._asks, self._bids = self._store.board.sorted().values()
板情報が返ってくるまでwait,その返ってきた板情報をsortedしてる。
このソートはpybottersのもので、ask,bidに板の順番通りに分けてくれる
async def on_loop_end(self):
"""トレードログ"""
assert self._entry_order_info is not None
assert self._exit_order_info is not None
pnl = self._exit_order_info["price"] - self._entry_order_info["price"]
if self._entry_order_info["side"] == "SELL":
pnl *= -1
pnl *= self._entry_order_info["size"]
self._logger.debug(f"[LOOP FINISH] pnl={pnl}")
self._entry_order_info = None
self._exit_order_info = None
ログの出力。pnlを計算して出力してるっぽい
async def inago_stream(self):
with self._store.executions.watch() as stream:
async for msg in stream:
yield msg.data
約定データのwatch。オーバーライド。
async def is_inago_start(self) -> tuple[bool, str]:
"""イナゴ検知ロジック。
2段階で検知する。
(1)閾値判定:短期足(秒足)でのボリュームが閾値をクリア
(2)経過判定:n秒(``self._entry_patience_seconds``)後にイナゴ方向に値動き(``self._entry_price_change``)があるか否か
"""
d = self._bar_s.d
if len(d) == 0:
self._logger.warning(f"[INFORMATION IS EMPTY] {d}")
return False, None
self._logger.debug(f"[WAITING INAGO] {d}")
async def _primary_check():
"""閾値判定"""
if (
self._entry_side in ("BUY", "BOTH")
and d["sv_log"]
< self._lower_threshold
< d["bv_log"]
< self._upper_threshold
):
self._logger.debug("[PRIMARY CHECK] YES BUY")
return "BUY"
elif (
self._entry_side in ("SELL", "BOTH")
and d["bv_log"]
< self._lower_threshold
< d["sv_log"]
< self._upper_threshold
):
self._logger.debug("[PRIMARY CHECK] YES SELL")
return "SELL"
else:
return None
async def _secondary_check(s):
"""時間経過判定"""
# 仲値を値動きの参照値に使う
mark_price_start = int(self.mid)
self._logger.debug(f"[SECONDARY CHECK] mark_price={mark_price_start}")
while True:
mark_price = int(self.mid)
price_change = mark_price - mark_price_start
if s == "SELL":
price_change *= -1
print(
f"\r\033[31m>>> [SECONDARY CHECK] {mark_price_start}/{mark_price}/{price_change:+.0f}\033[0m",
end="",
)
if price_change > self._entry_price_change:
# イナゴ方向への値動きがあった
break
await asyncio.sleep(0.1)
# 閾値判定
side = await _primary_check()
if side:
try:
# 経過判定
await asyncio.wait_for(
_secondary_check(side), timeout=self._entry_patience_seconds
)
# carriage return調整してるだけ
print()
# イナゴ検知
return True, side
except asyncio.TimeoutError as e:
# carriage return調整してるだけ
print()
# 指定秒数以内に値動きがみられなかったのでスルー
self._logger.debug(f"[CANCEL] mark_price={self.mid}")
return False, None
else:
return False, None
ロジックは以下二つ
閾値判定:短期足(秒足)でのボリュームが閾値をクリア
経過判定:n秒(``self._entry_patience_seconds``)後にイナゴ方向に値動き(``self._entry_price_change``)があるか否か
d = self._bar_s.d
if len(d) == 0:
self._logger.warning(f"[INFORMATION IS EMPTY] {d}")
return False, None
self._logger.debug(f"[WAITING INAGO] {d}")
スタート処理
async def _primary_check():
"""閾値判定"""
if (
self._entry_side in ("BUY", "BOTH")
and d["sv_log"]
< self._lower_threshold
< d["bv_log"]
< self._upper_threshold
):
self._logger.debug("[PRIMARY CHECK] YES BUY")
return "BUY"
elif (
self._entry_side in ("SELL", "BOTH")
and d["bv_log"]
< self._lower_threshold
< d["sv_log"]
< self._upper_threshold
):
self._logger.debug("[PRIMARY CHECK] YES SELL")
return "SELL"
else:
return None
ここはそのまま理解
async def _secondary_check(s):
"""時間経過判定"""
# 仲値を値動きの参照値に使う
mark_price_start = int(self.mid)
self._logger.debug(f"[SECONDARY CHECK] mark_price={mark_price_start}")
while True:
mark_price = int(self.mid)
price_change = mark_price - mark_price_start
if s == "SELL":
price_change *= -1
print(
f"\r\033[31m>>> [SECONDARY CHECK] {mark_price_start}/{mark_price}/{price_change:+.0f}\033[0m",
end="",
)
if price_change > self._entry_price_change:
# イナゴ方向への値動きがあった
break
await asyncio.sleep(0.1)
midpriceがどう動いたかで判定している
ここもそのまま読めばわかる
# 閾値判定
side = await _primary_check()
if side:
try:
# 経過判定
await asyncio.wait_for(
_secondary_check(side), timeout=self._entry_patience_seconds
)
# carriage return調整してるだけ
print()
# イナゴ検知
return True, side
except asyncio.TimeoutError as e:
# carriage return調整してるだけ
print()
# 指定秒数以内に値動きがみられなかったのでスルー
self._logger.debug(f"[CANCEL] mark_price={self.mid}")
return False, None
else:
return False, None
閾値を超えれた場合、返ってくるのは、"BUY","SELL"のどちらか
もしsideの中にどちらかがあればif文の処理
if文では時間経過の値動きの関数を起動していて、閾値を超えればTrue,asyncio.TimeoutErrorで、指定した時間を超えても帰ってこない場合はFalseとしている
async def on_loop(self):
"""ロジック
- 色々とhookを用意したものの、ロジック的に当てはめられなかったのでon_loop丸ごとオーバーライドしている(爆)
- 「約定情報を参照して決済注文を出す」といったロジックであれば以下のように分けて実装できると思う(元々はそう考えていた)
- ``on_on_loop_begin``で新規注文
- ``is_inago_endo``で終了判定
- ``on_on_loop_end``で決済注文
"""
# 新規注文
order_id = await market_order(self.client, self._symbol, self.side, self._size)
self._entry_order_info = await watch_execution(
self.store.childorderevents, order_id
)
self._logger.debug(f"[ENTRY ORDER] {self._entry_order_info}")
entry_price = self._entry_order_info["price"]
実際のロジック。market_orderをみていく
# 注文ヘルパー
async def market_order(client, symbol, side, size):
res = await client.post(
"/v1/me/sendchildorder",
data={
"product_code": symbol,
"side": side,
"size": f"{size:.8f}",
"child_order_type": "MARKET",
},
)
data = await res.json()
if res.status != 200:
raise RuntimeError(f"Invalid request: {data}")
else:
return data["child_order_acceptance_id"]
成行注文のヘルパー。idを返してる
async def watch_execution(execution: pybotters.models.bitflyer.ChildOrders, order_id):
with execution.watch() as stream:
async for msg in stream:
if (
msg.operation == "insert"
and msg.data["child_order_acceptance_id"] == order_id
and msg.data["event_type"] == "EXECUTION"
):
return msg.data
約定したかどうかをwatch機能を用いて確認してる
成行のidを参照して、それがwatchで返ってきたデータのidと一致した場合、その注文のデータを返す
# 建値±``_trail_margin`` を初期ストップ値としてトレイルスタート
if self.side == "BUY":
stop_price = entry_price - self._trail_margin
else:
stop_price = entry_price + self._trail_margin
trailer = BarBasedPriceTrailer(
stop_price, self.side, self._bar_l, self._trail_margin, self._logger
)
self._logger.debug(f"[TRAIL START] entry={entry_price} stop={stop_price}")
BarBasedPriceTrailerに関してみていく
class AbstractPriceTrailer:
"""面倒になったのでここは箱だけ..."""
class BarBasedPriceTrailer(AbstractPriceTrailer):
"""足が確定するたびに最後の足のclose ± marginのところにstopを置き直す(ストップ値が悪化(?)する場合は更新なし)"""
def __init__(
self,
price: int,
side: str,
bar: AbstractTimeBar,
margin: int,
logger=loguru.logger,
):
check_side(side)
self._price = price
self._side = side
self._bar = bar
self._margin = margin
self._logger = logger
self._task = asyncio.create_task(self.auto_trail())
def __del__(self):
# gcの機嫌次第でいつ呼ばれるか(はたまた本当に呼ばれるのか)わからないが一応オブジェクト破棄時に
# Taskをキャンセルをするようにする。
self.cancel()
async def auto_trail(self):
while True:
# 最新足確定まで待機
bar = await self._bar.get_bar_at_settled()
last_close = bar[-1, 3]
if self._side == "BUY":
new_price = last_close - self._margin
if new_price > self._price:
self._price = new_price
else:
new_price = last_close + self._margin
if new_price < self._price:
self._price = new_price
self._logger.debug(
f"[TRAIL] {self._price:.0f} (last_close={last_close:.0f})"
)
def cancel(self, msg=None):
return self._task.cancel(msg)
@property
def price(self):
return self._price
親クラスはめんどくさくなったらしい
でもここまで読んで最初は箱だけの意味すらわからなかったが、今ならわかるようになってる。うれしい
引数はそのまま
delの中身を見るとgcがなんたらとかいてある
gcに関して調べる
GCが行われるときに、taskとして残っていたらそれを破棄するものっぽい
auto_trailは最新足をもらったあとに自動的にtrail価格を計算してくれる
それをtaskとしておいとくことで、並列化してる
while True:
await asyncio.sleep(0.1)
# 最良気配値がストップ値を割ったら決済
# ストップ値はtrailerが長期足(e.g., 1分足)の確定毎に更新
if self.side == "BUY":
mark_price = self.best_bid
pnl = mark_price - entry_price
if mark_price <= trailer.price:
break
else:
mark_price = self.best_ask
pnl = entry_price - mark_price
if mark_price >= trailer.price:
break
print(
f"\r\033[31m>>> [TRAILING] entry={entry_price:.0f} stop={trailer.price:.0f} mark={mark_price:.0f} pnl={pnl:+.0f}\033[0m",
end="",
)
ここはコメント通りっぽい
# trailタスクが回り続けてしまうので明示的にキャンセルする
# asyncio.Taskはオブジェクトがスコープを外れて破壊されてもキャンセルされない
trailer.cancel()
side = "SELL" if self._entry_order_info["side"] == "BUY" else "BUY"
order_id = await market_order(self.client, self._symbol, side, self._size)
self._exit_order_info = await watch_execution(
self.store.childorderevents, order_id
)
self._logger.debug(f"[EXIT ORDER] {self._exit_order_info}")
決済の指令がでてるのにtrailが回り続けないようにタスクをキャンセルしてる
成行で決済注文して、ちゃんと決済されたかをwatchしてる
@property
def store(self):
return self._store
@property
def best_ask(self):
if self._asks is None:
return -1
return self._asks[0]["price"]
@property
def best_bid(self):
if self._bids is None:
return -1
return self._bids[0]["price"]
@property
def mid(self):
return (self.best_ask + self.best_bid) / 2
ここは見たまま
これで一通りの流れがわかった。
まとめると
高頻度のローソク足を作成するために、約定データをwatchして、使いやすいように加工する工程
自炊したローソク足を用いてロジックを動かす(inagoを検知する)工程
inagoを検知したら成成で処理する工程
これを、非同期処理を用いて作成してる
自分だけだと到底思いつかない処理の方法が詰め込まれててためになった。
これを完全に自分に落とし込むために次はFTXで動かせないか試してみます。
メモは気が向いたら乗せる