Bitflyer FXのデータをInfluxDBに格納し続けるプログラム
Bitflyer FXのTicker情報と約定情報をInfluxDBに格納し続けるプログラムです。
PubnubがBitflyerで廃止予定なので、記念にnoteに貼りました。数年前にPythonのパの字も知らないまま書いたので、とてもぐちゃぐちゃなコードですが、半年前は動いていました。Systemdで設定しておくと安心して常時稼働させられるはずです。
# encoding: utf-8
import json
import datetime
import time
import urllib.request
import re
from influxdb import InfluxDBClient
import requests
from pubnub.callbacks import SubscribeCallback
from pubnub.enums import PNStatusCategory
from pubnub.pnconfiguration import PNConfiguration
from pubnub.pubnub_tornado import PubNubTornado
from pubnub.pnconfiguration import PNReconnectionPolicy
from datetime import datetime, timezone, timedelta
config = PNConfiguration()
config.subscribe_key = 'sub-c-52a9ab50-291b-11e5-baaa-0619f8945a4f'
config.reconnect_policy = PNReconnectionPolicy.LINEAR
pubnub = PubNubTornado(config)
from tornado import gen
class influxDB():
def __init__(self,dbName="bitflyer_pubnub",dbname_1s="bitflyer_pubnub_1s"):
self.user = 'root'
self.password = 'root'
self.dbname = dbName
self.client = InfluxDBClient("localhost", "8086", self.user, self.password, self.dbname)
def insert_db(self, json_body):
#下記4行は最初だけ有効にしておけばおっけ
dbs = self.client.get_list_database()
sample_db = {'name' : self.dbname}
if sample_db not in dbs:
self.client.create_database(self.dbname)
rp_query = 'CREATE RETENTION POLICY ' + '"rp_60day" ON "' + self.dbname+ '" DURATION 60d REPLICATION 1'
self.client.query(rp_query)
#rs = self.client.query('Select * From "' + coinName + '" order by time Desc Limit 1')
#tmp = list(rs.get_points(measurement=coinName))
#if len(tmp)!=0:
# if tmp[0]['time']==json_body[0]['time']:
# return 'Duplication'
#print(json_body)
self.client.write_points(json_body)
def convertTimestamp(unixTime):
utcDate = datetime.datetime.utcfromtimestamp(unixTime)
utcDate = str(utcDate).split(" ")[0] + "T" + str(utcDate).split(" ")[1] + "Z"
return utcDate
db_global = influxDB("bitflyer_pubnub")
@gen.coroutine
def main(channels):
class BitflyerSubscriberCallback(SubscribeCallback):
def presence(self, pubnub, presence):
pass # handle incoming presence data
def status(self, pubnub, status):
if status.category == PNStatusCategory.PNUnexpectedDisconnectCategory:
pass # This event happens when radio / connectivity is lost
elif status.category == PNStatusCategory.PNConnectedCategory:
# Connect event. You can do stuff like publish, and know you'll get it.
# Or just use the connected event to confirm you are subscribed for
# UI / internal notifications, etc
pass
elif status.category == PNStatusCategory.PNReconnectedCategory:
pass
# Happens as part of our regular operation. This event happens when
# radio / connectivity is lost, then regained.
elif status.category == PNStatusCategory.PNDecryptionErrorCategory:
pass
# Handle message decryption error. Probably client configured to
# encrypt messages and on live data feed it received plain text.
def message(self, pubnub, message):
# Handle new message stored in message.message
try:
task(message.channel, message.message)
except Exception as e:
print("error: ", e)
listener = BitflyerSubscriberCallback()
pubnub.add_listener(listener)
pubnub.subscribe().channels(channels).execute()
def task(channel, message):
global db_global
if isinstance(message,list):
for i in message:
tmpDictData = []
json_body = []
if 'tick_id' in i:
tmpDictData = {"best_bid":float(i["best_bid"]),"best_bid_size": float(i["best_bid_size"]),"bid_JPY_volume":float(i["best_bid"])*float(i["best_bid_size"]),"best_ask":float(i["best_ask"]),"best_ask_size": float(i["best_ask_size"]),"ask_JPY_volume":float(i["best_ask"])*float(i["best_ask_size"]), 'total_bid_depth': i["total_bid_depth"],'total_ask_depth': i["total_ask_depth"],'ltp':i["ltp"],"volume_by_product":i["volume_by_product"],"volume":i["volume"],"Symbol": "FX_BTC_JPY", 'timestamp':i['timestamp']}
json_body = [{'measurement': "lightning_ticker_FX_BTC_JPY", 'time' : tmpDictData['timestamp'],'fields':tmpDictData}]
#print("ticker")
#lightning_executions_FX_BTC_JPY
elif 'buy_child_order_acceptance_id' in i:
tmpDictData = {"side":str(i['side']),"price": float(i["price"]),"size":float(i["size"]),"JPY_volume":float(i["price"])*float(i["size"]),"Symbol": "FX_BTC_JPY", 'timestamp':i['exec_date']}
json_body = [{'measurement': "lightning_executions_FX_BTC_JPY", 'time' : tmpDictData['timestamp'],'fields':tmpDictData}]
#print("excecutions")
db_global.insert_db(json_body)
else:
i = message
#lightning_ticker_FX_BTC_JPY
tmpDictData = []
json_body = []
if 'tick_id' in i:
tmpDictData = {"best_bid":float(i["best_bid"]),"best_bid_size": float(i["best_bid_size"]),"bid_JPY_volume":float(i["best_bid"])*float(i["best_bid_size"]),"best_ask":float(i["best_ask"]),"best_ask_size": float(i["best_ask_size"]),"ask_JPY_volume":float(i["best_ask"])*float(i["best_ask_size"]), 'total_bid_depth': i["total_bid_depth"],'total_ask_depth': i["total_ask_depth"],'ltp':i["ltp"],"volume_by_product":i["volume_by_product"],"volume":i["volume"],"Symbol": "FX_BTC_JPY", 'timestamp':i['timestamp']}
json_body = [{'measurement': "lightning_ticker_FX_BTC_JPY", 'time' : tmpDictData['timestamp'],'fields':tmpDictData}]
#print("ticker")
#lightning_executions_FX_BTC_JPY
elif 'buy_child_order_acceptance_id' in i:
tmpDictData = {"side":str(i['side']),"price": float(i["price"]),"size":float(i["size"]),"JPY_volume":float(i["price"])*float(i["size"]),"Symbol": "FX_BTC_JPY", 'timestamp':i['exec_date']}
json_body = [{'measurement': "lightning_executions_FX_BTC_JPY", 'time' : tmpDictData['timestamp'],'fields':tmpDictData}]
#print("excecutions")
db_global.insert_db(json_body)
if __name__ == "__main__":
#mainmain()
#main(['lightning_executions_FX_BTC_JPY'])
main(['lightning_executions_FX_BTC_JPY','lightning_ticker_FX_BTC_JPY'])
pubnub.start()