Bitflyerの板情報を作成する

はじめに

このノートは、無償ですべての内容をご覧いただけますので、あえて購入する必要はございません。もしお気に召していただけたなら、購入いただくか、あるいはサポートしていただけるとやる気が出ます。

対象

このノートは、pythonにてbitflyer向けのBOT等を作成している方を対象としております。

注意

ほぼ無償のノートですし、質問やサポートに十分答えられるとも限りませんのでその点、ご了承ください。

コード

#!/usr/bin/python3
#

import time
import sys
import os
import signal
from multiprocessing import Process, Manager
import os
import pandas as pd

VERSION='0.0.0.0'

# ===============================================================#

def process_create_board(name, m, d, q):
    # update boards
    df_asks = pd.DataFrame()
    df_bids = pd.DataFrame()
    try:
        while True:
            if (q.empty()):
                time.sleep(0.1)
                continue
            
            item = q.get(timeout=5)
            
            if ('ticker' in item):
                d['ticker'] = item['ticker']
            
            if (not 'ticker' in d):
                continue
            
            if ('board_snapshot' in item):
                if ('asks' in item['board_snapshot'and len(item['board_snapshot']['asks']) > 0):
                    df_asks = pd.io.json.json_normalize(item['board_snapshot']['asks'])
                if ('bids' in item['board_snapshot'and len(item['board_snapshot']['bids']) > 0):
                    df_bids = pd.io.json.json_normalize(item['board_snapshot']['bids'])
            
            if ('board' in item):
                if ('asks' in item['board'and len(item['board']['asks']) > 0):
                    df = pd.io.json.json_normalize(item['board']['asks'])
                    df_asks = pd.concat([df_asks, df])
                if ('bids' in item['board'and len(item['board']['bids']) > 0):
                    df = pd.io.json.json_normalize(item['board']['bids'])
                    df_bids = pd.concat([df_bids, df])
            
            if (len(df_asks) > 0):
                df_asks = df_asks.groupby('price')['size'].last().reset_index()
                df_asks = df_asks[df_asks['size'] != 0.0]
                df_asks = df_asks.sort_values('price', ascending=False)

            if (len(df_bids) > 0):
                df_bids = df_bids.groupby('price')['size'].last().reset_index()
                df_bids = df_bids[df_bids['size'] != 0.0]
                df_bids = df_bids.sort_values('price', ascending=False)

            # show the board
            if (len(df_asks) > 0 and len(df_bids) > 0):
                #os.system('clear')
                pdf = df_asks[-10:].sort_values('price', ascending=False)
                print(pdf.set_index('price'))
                print ('\nASK :', d['ticker']['best_ask'])
                print ('BID :', d['ticker']['best_bid'], '\n')
                pdf = df_bids[:10].sort_values('price', ascending=False)
                print(pdf.set_index('price'))
            
            time.sleep(0.1)
            
    except Exception as x:
        print('error :', name, x)
        os.kill(os.getpid(), signal.SIGTERM)
    finally:
        os.kill(os.getpid(), signal.SIGTERM)

'''
board_snapshot 板情報
{
  "mid_price": 33320,
  "bids": [
    {
      "price": 30000,
      "size": 0.1
    },
    {
      "price": 25570,
      "size": 3
    }
  ],
  "asks": [
    {
      "price": 36640,
      "size": 5
    },
    {
      "price": 36700,
      "size": 1.2
    }
  ]
}
'''
'''
board 板情報差分。消えた場合はsize0になる
{
  mid_price: 35625,
  bids: [
    {
      price: 33350,
      size: 1
    }
  ],
  asks: []
}
'''
'''
約定
[
  {
    "id": 39361,
    "side": "SELL",
    "price": 35100,
    "size": 0.01,
    "exec_date": "2015-07-07T10:44:33.547",
    "buy_child_order_acceptance_id": "JRF20150707-014356-184990",
    "sell_child_order_acceptance_id": "JRF20150707-104433-186048"
  }
]
'''
'''
Ticker
{
  "product_code": "BTC_JPY",
  "timestamp": "2015-07-08T02:50:59.97",
  "tick_id": 3579,
  "best_bid": 30000,
  "best_ask": 36640,
  "best_bid_size": 0.1,
  "best_ask_size": 5,
  "total_bid_depth": 15.13,
  "total_ask_depth": 20,
  "ltp": 31690,
  "volume": 16819.26,
  "volume_by_product": 6819.26
}
'''
def process_subscribe(name, m, d, q):
    from websocket import create_connection
    import json

    WS_URI  = "wss://ws.lightstream.bitflyer.com/json-rpc"
    channels =  [
                    "lightning_ticker_FX_BTC_JPY",
                    "lightning_executions_FX_BTC_JPY",
                    "lightning_board_snapshot_FX_BTC_JPY",
                    "lightning_board_FX_BTC_JPY"
                ]
    simple_name =   {
                        "lightning_ticker_FX_BTC_JPY"'ticker',
                        "lightning_executions_FX_BTC_JPY"'executions',
                        "lightning_board_snapshot_FX_BTC_JPY"'board_snapshot',
                        "lightning_board_FX_BTC_JPY"'board'
                    }

    # Initialize
    ws  = create_connection(WS_URI)
    
    for c in channels:
        ws_command = {"method""subscribe""params": {"channel": c}}
        ws.send(json.dumps(ws_command))
    
    try:
        while True:
            # receive Websocket data
            try:
                result =  ws.recv()
            except Exception as x:
                break

            # convert response
            j = json.loads(result)
            if (j['params']['channel'in channels):
                r = {}
                r[simple_name[j['params']['channel']]] = j["params"]["message"]
                # push item into queue
                q.put(r)
                
            time.sleep(0.01)
            
    except Exception as x:
        print('error :', name, x)
        os.kill(os.getpid(), signal.SIGTERM)
    finally:
        os.kill(os.getpid(), signal.SIGTERM)

if __name__ == '__main__':
    import multiprocessing as mp

    # Create shared object
    m = mp.Manager()
    d = m.dict()
    q = m.Queue()
    
    try:
        ps = []
        
        pn = 'sunbscribe'
        ps.append(mp.Process(name=pn, target=process_subscribe, args=(pn, m, d, q), daemon=True))
        ps[-1].start()
        
        pn = 'create_board'
        ps.append(mp.Process(name=pn, target=process_create_board, args=(pn, m, d, q), daemon=True))
        ps[-1].start()
       
        while True:
            # check process
            for p in ps:
                if not p.is_alive():
                    break   
 
            time.sleep(1)

        for p in ps:
            p.terminate()

        sys.exit()

    except Exception as x:
        print('error : main', x)
        
        for p in ps:
            p.terminate()
        sys.exit()

    finally:
        for p in ps:
            p.terminate()

        sys.exit()



いいなと思ったら応援しよう!