
Photo by
piromoneyorigami
[Python]Bybit websocket
こんな記事にたどり着いてしまう変態仮想通貨botterの皆さんこんにちは
年中自粛botterのOneです。
仮想通貨取引所BybitのwebsocketAPIのコードを張るだけの記事です。
BITMEX以外の海外取引所のサンプルコードって少ない気がするので時短のためにコピーしちゃう人用です。
まちゅけんさんのコードにかなり影響を受けています
Bybitだけでなく他のコードもすごく読みやすくていつも参考にさせて頂いています。自分のコードはカスタマイズしやすいようにベーシックなところしか書いていません。使いやすいようにアレンジしてください。
動くとは思いますが変なところがあればごめんなさい!
注意点ですが、positionを購読するのであれば最初にhttpで取得するか、強引にinputで突っ込むかした方がいいと思います。(自分は使わないので放置しています)
それでは早速コードです。ありがとうございました!
# -*- coding: utf-8 -*-
import hmac
import hashlib
import json
import logging
import websocket
import threading
from time import time, sleep
from collections import deque
class Bybit(object):
def __init__(self, api_key, secret, symbol='BTCUSD'):
self.api_key = api_key
self.secret = secret
self.symbol = symbol
self.endpoint = 'wss://stream.bybit.com/realtime'
# 不要なチャンネルは行ごと消してください
self.channel_list = ['trade.' + self.symbol,
'instrument.' + self.symbol,
'orderBookL2_25.' + self.symbol,
'position',
'execution',
'order'
]
self.logger = logging.getLogger(__name__)
self.logger.debug("Initializing WebSocket.")
self.data = {
'connection':False,
'last_price':None,
'timestamp':{},
'execution':deque(maxlen=200),
'instrument':None,
'board_snapshot':{
'asks':[],
'bids':[]
},
'position':{},
'my_execution':deque(maxlen=50),
'my_order':deque(maxlen=50)
}
for i in self.channel_list:
self.data['timestamp'][i] = None
self.board_snapshot = {}
self.board_snapshot_bids_dict = {}
self.board_snapshot_asks_dict = {}
self.__connect(self.endpoint)
self.ping_th = threading.Thread(target=self.send_ping)
self.ping_th.daemon = True
self.ping_th.start()
def __connect(self, endpoint):
while not self.data['connection']:
self.ws = websocket.WebSocketApp(endpoint,
on_message=self.__on_message,
on_close=self.__on_close,
on_open=self.__on_open,
on_error=self.__on_error,
header=None)
self.ws_th = threading.Thread(target=lambda: self.ws.run_forever())
self.ws_th.daemon = True
self.ws_th.start()
sleep(10)
self.__wait_first_data
self.logger.info("WebSocket Opend.")
def __exit(self):
self.ws.close()
self.data['connection'] = False
for i in self.data['timestamp']:
self.data['timestamp'][i] = None
def __wait_first_data(self):
for i in self.data['timestamp']:
# 実際に使う際はポジションは取得してからの方がいいです
if i in ['position', 'execution', 'order']:
continue
while not self.data['timestamp'][i]:
sleep(0.1)
def __on_open(self):
if 'position' in self.channel_list or 'order' in self.channel_list or 'execution' in self.channel_list:
# timestamp足してあげないとAuthエラーが出る
timestamp = int((time() + 10.0) * 1000)
param_str = 'GET/realtime' + str(timestamp)
sign = hmac.new(self.secret.encode('utf-8'),
param_str.encode('utf-8'), hashlib.sha256).hexdigest()
self.ws.send(json.dumps(
{'op': 'auth', 'args': [self.api_key, timestamp, sign]}))
self.ws.send(json.dumps(
{'op': 'subscribe', 'args': self.channel_list}))
def __on_close(self):
self.logger.info('Websocket Closed.')
def __on_error(self, error):
self.logger.error("Error : %s" % error)
self.__exit()
self.__connect(self.endpoint)
def __on_message(self, message):
try:
message = json.loads(message)
topic = message.get('topic')
data = message.get('data')
ret_msg = message.get('ret_msg')
self.data['timestamp'][topic] = time()
if topic == 'trade.' + self.symbol:
for d in data:
self.data['last_price'] = d['price']
self.data['execution'].append(d)
elif topic == 'instrument.' + self.symbol:
self.data['instrument'] = data[0]
elif topic == 'orderBookL2_25.' + self.symbol:
if message['type'] == 'snapshot':
self.board_snapshot_bids_dict.clear()
self.board_snapshot_asks_dict.clear()
for d in data:
if d['side'] == 'Buy':
self.board_snapshot_bids_dict[float(d['price'])] = [float(d['price']), float(d['size'])]
elif d['side'] == 'Sell':
self.board_snapshot_asks_dict[float(d['price'])] = [float(d['price']), float(d['size'])]
else:
if data['delete']:
for d in data['delete']:
if d['side'] == 'Buy':
del self.board_snapshot_bids_dict[float(d['price'])]
elif d['side'] == 'Sell':
del self.board_snapshot_asks_dict[float(d['price'])]
if data['insert']:
for i in data['insert']:
if i['side'] == 'Buy':
self.board_snapshot_bids_dict[float(i['price'])] = [float(i['price']), float(i['size'])]
elif i['side'] == 'Sell':
self.board_snapshot_asks_dict[float(i['price'])] = [float(i['price']), float(i['size'])]
if data['update']:
for u in data['update']:
if u['side'] == 'Buy':
self.board_snapshot_bids_dict[float(u['price'])] = [float(u['price']), float(u['size'])]
elif u['side'] == 'Sell':
self.board_snapshot_asks_dict[float(u['price'])] = [float(u['price']), float(u['size'])]
self.data['board_snapshot']['bids'] = [i[1] for i in sorted(self.board_snapshot_bids_dict.items(), key=lambda bid: bid[1][0],reverse=True)]
self.data['board_snapshot']['asks'] = [i[1] for i in sorted(self.board_snapshot_asks_dict.items(), key=lambda ask: ask[1][0],reverse=False)]
elif topic == 'position':
if data[0]['symbol'] == self.symbol:
self.data['position'] = data[0]
elif topic == 'execution':
for d in data:
if d['symbol'] == self.symbol:
self.data['my_execution'].append(d)
elif topic == 'order':
for d in data:
if d['symbol'] == self.symbol:
self.data['my_order'].append(d)
elif 'success' in message.keys():
if message['success'] == True:
if ret_msg == 'pong':
pass
elif len(message['request']['args']) == len(self.channel_list):
self.data['connection'] = True
else:
raise Exception("Connection failed: %s" % message)
else:
raise Exception("Unknown message: %s" % message)
except Exception as e:
self.logger.error(e)
def send_ping(self):
# 30~60秒ごとにピンポンした方が良いらしい
while True:
self.ws.send('{"op":"ping"}')
sleep(30)
def reconnect(self):
self.__exit()
self.__connect(self.endpoint)
if __name__ == '__main__':
from pprint import pprint
symbol = 'BTCUSD'
api_key = ''
secret = ''
bybit = Bybit(api_key, secret, symbol=symbol)
while True:
pprint(bybit.data)
sleep(30)