Bitflyerの板情報を作成する
はじめに
このノートは、無償ですべての内容をご覧いただけますので、あえて購入する必要はございません。もしお気に召していただけたなら、購入いただくか、あるいはサポートしていただけるとやる気が出ます。
対象
このノートは、pythonにてbitflyer向けのBOT等を作成している方を対象としております。
注意
ほぼ無償のノートですし、質問やサポートに十分答えられるとも限りませんのでその点、ご了承ください。
コード
#!/usr/bin/python3
#
import time
import sys
import os
import signal
from multiprocessing import Process, Manager
import os
import pandas as pd
VERSION='0.0.0.0'
# ===============================================================#
def process_create_board(name, m, d, q):
# update boards
df_asks = pd.DataFrame()
df_bids = pd.DataFrame()
try:
while True:
if (q.empty()):
time.sleep(0.1)
continue
item = q.get(timeout=5)
if ('ticker' in item):
d['ticker'] = item['ticker']
if (not 'ticker' in d):
continue
if ('board_snapshot' in item):
if ('asks' in item['board_snapshot'] and len(item['board_snapshot']['asks']) > 0):
df_asks = pd.io.json.json_normalize(item['board_snapshot']['asks'])
if ('bids' in item['board_snapshot'] and len(item['board_snapshot']['bids']) > 0):
df_bids = pd.io.json.json_normalize(item['board_snapshot']['bids'])
if ('board' in item):
if ('asks' in item['board'] and len(item['board']['asks']) > 0):
df = pd.io.json.json_normalize(item['board']['asks'])
df_asks = pd.concat([df_asks, df])
if ('bids' in item['board'] and len(item['board']['bids']) > 0):
df = pd.io.json.json_normalize(item['board']['bids'])
df_bids = pd.concat([df_bids, df])
if (len(df_asks) > 0):
df_asks = df_asks.groupby('price')['size'].last().reset_index()
df_asks = df_asks[df_asks['size'] != 0.0]
df_asks = df_asks.sort_values('price', ascending=False)
if (len(df_bids) > 0):
df_bids = df_bids.groupby('price')['size'].last().reset_index()
df_bids = df_bids[df_bids['size'] != 0.0]
df_bids = df_bids.sort_values('price', ascending=False)
# show the board
if (len(df_asks) > 0 and len(df_bids) > 0):
#os.system('clear')
pdf = df_asks[-10:].sort_values('price', ascending=False)
print(pdf.set_index('price'))
print ('\nASK :', d['ticker']['best_ask'])
print ('BID :', d['ticker']['best_bid'], '\n')
pdf = df_bids[:10].sort_values('price', ascending=False)
print(pdf.set_index('price'))
time.sleep(0.1)
except Exception as x:
print('error :', name, x)
os.kill(os.getpid(), signal.SIGTERM)
finally:
os.kill(os.getpid(), signal.SIGTERM)
'''
board_snapshot 板情報
{
"mid_price": 33320,
"bids": [
{
"price": 30000,
"size": 0.1
},
{
"price": 25570,
"size": 3
}
],
"asks": [
{
"price": 36640,
"size": 5
},
{
"price": 36700,
"size": 1.2
}
]
}
'''
'''
board 板情報差分。消えた場合はsize0になる
{
mid_price: 35625,
bids: [
{
price: 33350,
size: 1
}
],
asks: []
}
'''
'''
約定
[
{
"id": 39361,
"side": "SELL",
"price": 35100,
"size": 0.01,
"exec_date": "2015-07-07T10:44:33.547",
"buy_child_order_acceptance_id": "JRF20150707-014356-184990",
"sell_child_order_acceptance_id": "JRF20150707-104433-186048"
}
]
'''
'''
Ticker
{
"product_code": "BTC_JPY",
"timestamp": "2015-07-08T02:50:59.97",
"tick_id": 3579,
"best_bid": 30000,
"best_ask": 36640,
"best_bid_size": 0.1,
"best_ask_size": 5,
"total_bid_depth": 15.13,
"total_ask_depth": 20,
"ltp": 31690,
"volume": 16819.26,
"volume_by_product": 6819.26
}
'''
def process_subscribe(name, m, d, q):
from websocket import create_connection
import json
WS_URI = "wss://ws.lightstream.bitflyer.com/json-rpc"
channels = [
"lightning_ticker_FX_BTC_JPY",
"lightning_executions_FX_BTC_JPY",
"lightning_board_snapshot_FX_BTC_JPY",
"lightning_board_FX_BTC_JPY"
]
simple_name = {
"lightning_ticker_FX_BTC_JPY": 'ticker',
"lightning_executions_FX_BTC_JPY": 'executions',
"lightning_board_snapshot_FX_BTC_JPY": 'board_snapshot',
"lightning_board_FX_BTC_JPY": 'board'
}
# Initialize
ws = create_connection(WS_URI)
for c in channels:
ws_command = {"method": "subscribe", "params": {"channel": c}}
ws.send(json.dumps(ws_command))
try:
while True:
# receive Websocket data
try:
result = ws.recv()
except Exception as x:
break
# convert response
j = json.loads(result)
if (j['params']['channel'] in channels):
r = {}
r[simple_name[j['params']['channel']]] = j["params"]["message"]
# push item into queue
q.put(r)
time.sleep(0.01)
except Exception as x:
print('error :', name, x)
os.kill(os.getpid(), signal.SIGTERM)
finally:
os.kill(os.getpid(), signal.SIGTERM)
if __name__ == '__main__':
import multiprocessing as mp
# Create shared object
m = mp.Manager()
d = m.dict()
q = m.Queue()
try:
ps = []
pn = 'sunbscribe'
ps.append(mp.Process(name=pn, target=process_subscribe, args=(pn, m, d, q), daemon=True))
ps[-1].start()
pn = 'create_board'
ps.append(mp.Process(name=pn, target=process_create_board, args=(pn, m, d, q), daemon=True))
ps[-1].start()
while True:
# check process
for p in ps:
if not p.is_alive():
break
time.sleep(1)
for p in ps:
p.terminate()
sys.exit()
except Exception as x:
print('error : main', x)
for p in ps:
p.terminate()
sys.exit()
finally:
for p in ps:
p.terminate()
sys.exit()