pe氏pybottersinagoflyer読み解き殴り書きメモ4
続き
引き続きmain関数をみていく
# time barを約定履歴で初期化
resp = await client.get(
"/v1/getexecutions", params={"producet_code": args.symbol, "count": 500}
)
data = await resp.json()
await bar_l.init(data[::-1])
await bar_s.init(data[::-1])
RESTAPIをつかって、約定履歴を取得し、そのデータを用いて初期化してる
[::-1]というのはすべての要素を逆に返す
[1,2,3,4] → [4,3,2,1]みたいな
それをbar_l,sのオーバーライドしたinitにいれて処理してる。
# web socketに接続
await client.ws_connect(
"wss://ws.lightstream.bitflyer.com/json-rpc",
send_json=[
{
"method": "subscribe",
"params": {"channel": f"lightning_board_snapshot_{args.symbol}"},
"id": 1,
},
{
"method": "subscribe",
"params": {"channel": f"lightning_board_{args.symbol}"},
"id": 2,
},
{
"method": "subscribe",
"params": {"channel": f"lightning_executions_{args.symbol}"},
"id": 3,
},
{
"method": "subscribe",
"params": {"channel": "child_order_events"},
"id": 4,
},
],
hdlr_json=store.onmessage,
)
while not all([len(w) for w in [store.board, store.executions]]):
logger.debug("[WAITING SOCKET RESPONSE]")
await store.wait()
pybottersおなじみの書き方。
websocketに接続してそれをstoreの中に入れていく。
なお最後の3行はstoreのなかに欲しいデータが入ってくるまで、waitして次のプログラムが動かないようにしている。
await BitflyerInagoBot(
client,
store,
bar_l,
bar_s,
lower_threshold=args.lower_threshold,
upper_threshold=args.upper_threshold,
entry_patience_seconds=args.entry_patience_seconds,
entry_price_change=args.entry_price_change,
trail_margin=args.trail_margin,
symbol=args.symbol,
size=args.size,
side=args.side,
logger=logger,
).loop()
最後にBitflyerInagoBotに最初に指定した引数、今まで作ってきたbar等を入れて、ループ処理
なお、bar等は勝手に更新されていくようになってるのでいちいち呼びだす必要がない
クラスBitflyerInagoBotをみていく
class BitflyerInagoBot(AbstractInagoBot):
AbstractInagoBotの子クラス。AbstractInagoBotをみていく
class AbstractInagoBot:
"""イナゴBOT用の抽象クラス(仮)"""
def __init__(
self,
client: pybotters.Client,
is_inago_start_fn: Callable[[AbstractInagoBot], tuple[bool, str]] = None,
is_inago_end_fn: Callable[[AbstractInagoBot], bool] = None,
off_loop_interval: float = 0.5,
logger=loguru.logger,
):
self._client = client
self._is_inago_start_fn = is_inago_start_fn
self._is_inago_end_fn = is_inago_end_fn
self._off_loop_interval = off_loop_interval
self._logger = logger
self._side = None
# 要オーバーライド
async def inago_stream(self) -> Generator[dict]:
""" "イナゴ"(主に約定情報になると思う)を流す"""
raise NotImplementedError
async def on_inago(self, inago):
"""各イナゴに対する処理"""
pass
async def is_inago_start(self) -> tuple[bool, str]:
"""イナゴ到来判定"""
if self._is_inago_start_fn is None:
raise NotImplementedError
if asyncio.iscoroutinefunction(self._is_inago_start_fn):
return await self._is_inago_start_fn(self)
else:
return self._is_inago_start_fn(self)
async def is_inago_end(self) -> bool:
"""イナゴ終了判定"""
if self._is_inago_end_fn is None:
raise NotImplementedError
if asyncio.iscoroutinefunction(self._is_inago_end_fn):
return await self._is_inago_end_fn(self)
else:
return self._is_inago_end_fn(self)
async def loop(self):
"""イナゴBOTのメインループ
- off_loop(): イナゴ待機ループ(検知)
- on_loop(): イナゴ処理ループ(トレード)
この二つを繰り返す。
- *_begin(), *_end(): 各関数の前後で呼ばれるhook
"""
while True:
self._logger.debug("BEGIN LOOP")
await self.on_loop_begin()
self._logger.debug("BEGIN OFF LOOP")
await self.off_loop()
self._logger.debug(f"END OFF LOOP")
self._logger.debug(f"BEGIN ON LOOP: {self._side}")
await self.on_loop()
self._logger.debug("END ON LOOP")
await self.on_loop_end()
self._logger.debug("END LOOP")
async def off_loop(self):
"""イナゴオフ時のループ=イナゴ検知"""
await self.on_off_loop_begin()
while True:
is_start, side = await self.is_inago_start()
if is_start:
self._side = side
break
await asyncio.sleep(self._off_loop_interval)
await self.on_off_loop_end()
async def on_loop(self):
"""イナゴオン時のループ=トレード"""
await self.on_on_loop_begin()
async for inago in self.inago_stream():
await self.on_inago_begin(inago)
await self.on_inago(inago)
is_end = await self.is_inago_end()
if is_end:
self._side = None
break
await self.on_inago_end(inago)
await self.on_on_loop_end()
async def on_loop_begin(self):
pass
async def on_loop_end(self):
pass
async def on_off_loop_begin(self):
pass
async def on_off_loop_end(self):
pass
async def on_on_loop_begin(self):
pass
async def on_on_loop_end(self):
pass
async def on_inago_begin(self, inago):
pass
async def on_inago_end(self, inago):
pass
@property
def client(self):
return self._client
@property
def side(self):
return self._side
イナゴBOT用の抽象クラスとある。おそらくこれをもとにほかの取引所等でもプログラムがつくれそう。
初期処理から見てく
def __init__(
self,
client: pybotters.Client,
is_inago_start_fn: Callable[[AbstractInagoBot], tuple[bool, str]] = None,
is_inago_end_fn: Callable[[AbstractInagoBot], bool] = None,
off_loop_interval: float = 0.5,
logger=loguru.logger,
):
self._client = client
self._is_inago_start_fn = is_inago_start_fn
self._is_inago_end_fn = is_inago_end_fn
self._off_loop_interval = off_loop_interval
self._logger = logger
self._side = None
引数はいつなににつかうのかこのままだとわからないが、とりあえず複雑なとこがないから飛ばす
inago_streamとon_inagoは書いてある説明で完結
async def is_inago_start(self) -> tuple[bool, str]:
"""イナゴ到来判定"""
if self._is_inago_start_fn is None:
raise NotImplementedError
if asyncio.iscoroutinefunction(self._is_inago_start_fn):
return await self._is_inago_start_fn(self)
else:
return self._is_inago_start_fn(self)
引数をもらってイナゴの判定をおこなう。
asyncio.iscoroutinefunctionに関して調べる
つまり、self._is_inago_end_fnがコルーチン関数だったらawait処理、そうじゃないとそのままreturnで値を返す
終了判定も同じなので省略
async def loop(self):
"""イナゴBOTのメインループ
- off_loop(): イナゴ待機ループ(検知)
- on_loop(): イナゴ処理ループ(トレード)
この二つを繰り返す。
- *_begin(), *_end(): 各関数の前後で呼ばれるhook
"""
while True:
self._logger.debug("BEGIN LOOP")
await self.on_loop_begin()
self._logger.debug("BEGIN OFF LOOP")
await self.off_loop()
self._logger.debug(f"END OFF LOOP")
self._logger.debug(f"BEGIN ON LOOP: {self._side}")
await self.on_loop()
self._logger.debug("END ON LOOP")
await self.on_loop_end()
self._logger.debug("END LOOP")
ここは説明通りっぽい
スタート→待機→トレード→終了みないな流れ
async def off_loop(self):
"""イナゴオフ時のループ=イナゴ検知"""
await self.on_off_loop_begin()
while True:
is_start, side = await self.is_inago_start()
if is_start:
self._side = side
break
await asyncio.sleep(self._off_loop_interval)
await self.on_off_loop_end()
async def on_loop(self):
"""イナゴオン時のループ=トレード"""
await self.on_on_loop_begin()
async for inago in self.inago_stream():
await self.on_inago_begin(inago)
await self.on_inago(inago)
is_end = await self.is_inago_end()
if is_end:
self._side = None
break
await self.on_inago_end(inago)
await self.on_on_loop_end()
今まで見てきたものを元にイナゴの検知とトレードを行ってるっぽい
その他、関数に関しては省略
今回は本当に殴り書きが多かった。というか純粋にもう書いてあるコメントよめば理解ができた。
実際に動かした際の流れは何となく理解できた。
次の記事でおそらく最後になると思う
この記事が気に入ったらサポートをしてみませんか?