Pybottersを使おう WebSocket編
前回の続きで、Pybottersを使ってWebSocketに接続してみます。
1.インストール
PyPIに登録してあるので、pip installだけでインストール可能です。
pip install pybotters
2.使用方法
2-1.RestAPI
2-2.WebSocket
bybitで試してみましょう。
import pybotters
import asyncio
from rich import print
import pandas as pd
#windowsのみ
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
apis = {
'bybit' :['', ''],
'bybit_testnet' :['', ''],
'binance' :['', ''],
'binance_testnet':['', ''],
'ftx' :['', ''],
'bitflyer' :['', ''],
'gmocoin' :['', ''],
'liquid' :['', ''],
'bitbank' :['', '']
}
RestAPI_url = {
'bybit' :'https://api.bybit.com',
'bybit_testnet' :'https://api-testnet.bybit.com',
'binance' :'https://fapi.binance.com',
'binance_testnet':'https://testnet.binancefuture.com',
'ftx' :'https://ftx.com/api/',
'bitflyer' :'https://api.bitflyer.com/v1/',
'gmocoin_public' :'https://api.coin.z.com/public',
'gmocoin_private':'https://api.coin.z.com/private',
'liquid' :'https://api.liquid.com',
'bitbank_public' :'https://public.bitbank.cc',
'bitbank_private':'https://api.bitbank.cc/v1'
}
wss_url = {
'bybit' :'wss://stream.bybit.com/realtime',
'bybit_testnet' :'wss://stream-testnet.bybit.com/realtime',
'binance' :'wss://fstream.binance.com',
'binance_testnet':'wss://stream.binancefuture.com',
'ftx' :'wss://ftx.com/ws/',
'gmocoin' :''
}
async def bybit_ws():
async with pybotters.Client(apis=apis) as client:
# データストアのインスタンスを生成する
store = pybotters.BybitDataStore()
# WebSocket接続
wstask = await client.ws_connect(
wss_url['bybit'],
send_json={'op': 'subscribe',
'args': [
"orderBook_200.100ms.BTCUSD",
'trade.BTCUSD',
'instrument_info.100ms.BTCUSD',
]
},
hdlr_json=store.onmessage,
)
# WebSocketでデータを受信するまで待機
while not all([
len(store.orderbook),
len(store.instrument)
]):
await store.wait()
# メインループ
while True:
# データ参照
data = dict(
orderbook =store.orderbook.sorted(),
instrument=store.instrument.get({"symbol":"BTCUSD"}),
position =store.position_inverse.find(),
trade =store.trade.find(),
execution =store.execution.find(),
order =store.order.find()
)
print(data['instrument'])
await asyncio.sleep(1)
if __name__ == '__main__':
try:
asyncio.run(bybit_ws())
except KeyboardInterrupt:
pass
メイン関数の解説をしていきます。
メイン関数前の辞書リストは前回の記事で説明しています。
# データストアのインスタンスを生成する
store = pybotters.BybitDataStore()
まずはBybit専用のデータストアを作成します。
データストアはWSで取得したデータを保存しておく場所で、取り出す際に必要に応じてソートや最新のものを取り出す等ができます。
# WebSocket接続
wstask = await client.ws_connect(
wss_url['bybit'],
send_json={'op': 'subscribe',
'args': [
"orderBook_200.100ms.BTCUSD",
'trade.BTCUSD',
'instrument_info.100ms.BTCUSD',
]
},
hdlr_json=store.onmessage,
)
WebSocketに接続します。
send_json()の中に、購読したいトピックを記入して、WS接続を行います。
hdlr_jsonは受け取ったjson形式のデータをどう処理するか決める部分で、store.onmessageを指定するとデータストアにどんどんデータが突っ込まれていきます。
pybotters.print_handlerにすると文字列でコンソールに出力されます。
一度WSに接続したら、再接続処理等は不要です。
pybottersの仕様で自動でやってくれるらしいです(神か、、、)
# WebSocketでデータを受信するまで待機
while not all(
[
len(store.orderbook),
len(store.instrument)
]
):
await store.wait()
地味に重要な部分です。
WS接続を行うと、トピックごとに最初のデータを受信する時間が違います。
ここでほしいデータを受信したことを確認してからメインの処理に移ります。
# メインループ
while True:
# データ参照
data = dict(
orderbook =store.orderbook.sorted(),
instrument=store.instrument.get({"symbol":"BTCUSD"}),
position =store.position_inverse.find(),
trade =store.trade.find(),
execution =store.execution.find(),
order =store.order.find()
)
print(data['instrument'])
await asyncio.sleep(1)
続いてメインループです。
dataに辞書形式でデータストアから取り出した値を格納しています。
※dict(a=b)で{'a':'b'}と同じ扱いになります。
ループの待機時間にはtime.sleep()ではなく、await asyncio.sleep()を使います。こうしないとsleepが同期処理として扱われるので、全体の処理がスリープしていしまいます。
if __name__ == '__main__':
try:
asyncio.run(bybit_ws())
except KeyboardInterrupt:
pass
最後にメイン関数の実行部分です。
前回説明したように、呼び出し時はasyncio.run()を使います。
・・・・・
とりあえずここまでにします。
冒頭のコードを動かすと、1秒ごとにコンソールにティッカー情報が表示されますので、ぜひ動かしてみてください。