見出し画像

[How to] pybotters watch

pybotters v0.13.0 からwatchが使えるようになったので試しにFTXとbinanceのBTCUSDTから約定履歴などをリアルタイムに取得しプリントするだけのコードを書いてみました。
実運用ではprint(msg.data)部分をcsv書き込みやmongoDB insert_many みたいにデータ保存するコードに書き換えればいいと思います。
インデントがずれてたりするけどとりあえず動くからヨシ

import asyncio
import os
import pybotters
from dataclasses import dataclass


@dataclass
class Stores:
    binance = pybotters.BinanceDataStore()
    ftx = pybotters.FTXDataStore()


async def main():
    async with pybotters.Client() as client:
        stores = Stores()
        binance_symbol = 'btcusdt'
        ftx_symbol = 'BTC-PERP'
        streams = '/'.join([
            f'{binance_symbol}@trade',
            f'{binance_symbol}@kline_1m',
        ])
        await asyncio.gather(
            client.ws_connect(
            f'wss://fstream.binance.com/stream?streams={streams}',
            hdlr_json=stores.binance.onmessage,
            ),
            client.ws_connect(
                'wss://ftx.com/ws/',
                send_json=[
                    {'op': 'subscribe', 'channel': 'trades', 'market': ftx_symbol},
                    {'op': 'subscribe', 'channel': 'ticker', 'market': ftx_symbol},
                    ],
                hdlr_json=stores.ftx.onmessage,
                ),
            )

        while not all([
            len(stores.binance.trade),
            len(stores.binance.kline),
            len(stores.ftx.trades),
            len(stores.ftx.ticker),
        ]):
            await stores.binance.wait()
            await stores.ftx.wait()

        watch_trades_b, watch_klines_b, watch_trades_f, watch_ticker_f = await asyncio.gather(
            trades_b(stores),
            klines_b(stores),
            trades_f(stores),
            ticker_f(stores)
            )
        
        await watch_trades_b
        await watch_klines_b
        await watch_trades_f
        await watch_ticker_f


async def trades_b(stores):
    with stores.binance.trade.watch() as trade:
        async for msg in trade:
            print(msg.data)


async def klines_b(stores):
    with stores.binance.kline.watch() as klines:
        async for msg in klines:
            print(msg.data)
            await asyncio.sleep(60)

async def trades_f(stores):
    with stores.ftx.trades.watch() as trade:
        async for msg in trade:
            print(msg.data)


async def ticker_f(stores):
    with stores.ftx.ticker.watch() as ticker:
        async for msg in ticker:
            print(msg.data)
            await asyncio.sleep(1)


if __name__ == '__main__':
    try:
        if os.name == 'nt':
            asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
        asyncio.run(main())
    except KeyboardInterrupt:
        pass

いいなと思ったら応援しよう!