[Python Websocket JSON-RPC] bitFlyer Realtime API の受信データを記録し続けて、圧縮後Google Driveにアップロードする
これはなに?
bitFlyer の Realtime API の情報を延々と記録し続けるだけのコード
・定期メンテナンス(04:00〜04:10)までひたすらファイルにロギング
・定期メンテナンス中
- 前日のログファイル圧縮、圧縮前ログファイルの削除
- GoogleDriveへのログファイルのアップロード、アップロードしたログファイルの削除
ダウンロードするやつ
確認環境
Mac/Ubuntu で動作確認
※websocket-clientは0.49以降だとうまく受信できないかもしれないので注意
MacBookPro
$ sw_vers
ProductName: Mac OS X
ProductVersion: 10.14
BuildVersion: 18A391
$ python --version
Python 3.6.2 :: Anaconda, Inc.
$ pip --version
pypip 18.0
$ pip show websocket-client PyDrive google-api-python-client
Name: websocket-client
Version: 0.48.0
Name: PyDrive
Version: 1.3.1
Name: google-api-python-client
Version: 1.7.4
Ubuntu
$ cat /etc/os-release
NAME="Ubuntu"
VERSION="16.04.4 LTS (Xenial Xerus)"
$ python --version
Python 3.6.6
$ pip --version
pip 18.0
$ pip show websocket-client PyDrive google-api-python-client
Name: websocket-client
Version: 0.48.0
Name: PyDrive
Version: 1.3.1
Name: google-api-python-client
Version: 1.7.4
コード
適当なので気になる箇所あれば各自で修正してください
# -*- Coding: utf-8 -*-
import os
import pathlib
import traceback
import websocket
import json
from datetime import datetime, timedelta
import zipfile
from zipfile import ZIP_DEFLATED, ZIP_BZIP2, ZIP_LZMA
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
import logging
from logging import getLogger, StreamHandler, handlers, Formatter, INFO
UPLOAD_GDRIVE = True
GDRIVE_FOLDER_ID = 'GoogleDriveFolderId'
LOG_FMT_TH = '%(asctime)s\t%(levelname)s\t%(message)s'
LOG_FMT_DATA = '%(asctime)s\t%(created).6f\t%(message)s'
SYMBOL_BTC_FX = 'FX_BTC_JPY'
SYMBOL_BTC_SPOT = 'BTC_JPY'
WSS_URL = "wss://ws.lightstream.bitflyer.com/json-rpc"
WSS_BOARD_SS_BASE = "lightning_board_snapshot_"
WSS_BOARD_SS_FX = WSS_BOARD_SS_BASE + SYMBOL_BTC_FX
WSS_BOARD_SS_SPOT = WSS_BOARD_SS_BASE + SYMBOL_BTC_SPOT
WSS_BOARD_BASE = "lightning_board_"
WSS_BOARD_FX = WSS_BOARD_BASE + SYMBOL_BTC_FX
WSS_BOARD_SPOT = WSS_BOARD_BASE + SYMBOL_BTC_SPOT
WSS_EXCE_BASE = "lightning_executions_"
WSS_EXEC_FX = WSS_EXCE_BASE + SYMBOL_BTC_FX
WSS_EXEC_SPOT = WSS_EXCE_BASE + SYMBOL_BTC_SPOT
WSS_TICKER_BASE = "lightning_ticker_"
WSS_TICKER_FX = WSS_TICKER_BASE + SYMBOL_BTC_FX
WSS_TICKER_SPOT = WSS_TICKER_BASE + SYMBOL_BTC_SPOT
MNT_START_HOUR = 4
MNT_START_MIN = 0
MNT_START_MARGIN = 1
ITV_MNT = 10
def getMyLogger(loggerName=__name__, file=None, fmt=LOG_FMT_TH):
if not file:
return None
# Logger 生成
logger = getLogger(loggerName)
logger.setLevel(INFO)
# ディレクトリがない場合は作成
dir = pathlib.Path(file).parent
if dir and not dir.exists():
dir.mkdir()
# ログ取得開始日が違っていればリネーム
if pathlib.Path(file).exists():
l = ""
with open(file) as f:
l = f.readline()
if l and l[:10] != datetime.now().strftime("%Y-%m-%d"):
pathlib.Path(file).rename(file + '.' + l[:10])
# Handler 生成
handler = handlers.TimedRotatingFileHandler(file, when="midnight", interval=1, backupCount=30)
handler.setLevel(INFO)
# ログフォーマット設定
formatter = Formatter(fmt)
formatter.default_msec_format = '%s.%03d'
handler.setFormatter(formatter)
# Handler 登録
logger.addHandler(handler)
return logger
loggerTh = getMyLogger(__name__ + "Th", file="log/realtime_api_thread.log")
class ReailtimeApiReceiver(object):
def __init__(self, rcvChannels, oneFile=False):
self.loop = True
self.itvWssReconnect = 0.5
self.rcvChannels = rcvChannels
if oneFile:
logger = getMyLogger(__name__ + "All", file="log/realtime_api_all.log", fmt=LOG_FMT_DATA)
self.loggers = {}
for ch, isRcv in rcvChannels.items():
if not isRcv: continue
if not oneFile: logger = getMyLogger(__name__ + ch, file="log/realtime_api_" + ch + ".log", fmt=LOG_FMT_DATA)
self.loggers[ch] = logger
#Define Websocket
self.ws = websocket.WebSocketApp(WSS_URL, header=None, on_open=self.on_open, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close)
websocket.enableTrace(True)
loggerTh.info("Initialized Realtime API Receiver.")
def run(self):
loggerTh.info("Start Realtime API Receiver.")
while self.loop:
self.ws.run_forever()
loggerTh.info('Web Socket process ended. Retrying reconnect.')
sleep(self.itvWssReconnect)
loggerTh.info("Stop Realtime API Receiver.")
"""
Below are callback functions of websocket.
"""
# when we get message
def on_message(self, ws, message):
ch = json.loads(message)["params"]["channel"]
self.loggers[ch].info(message)
# when error occurs
def on_error(self, ws, error):
loggerTh.error(error)
# when websocket closed.
def on_close(self, ws):
loggerTh.info('disconnected streaming server')
# when websocket opened.
def on_open(self, ws):
loggerTh.info('connected streaming server')
for ch, isRcv in self.rcvChannels.items():
if isRcv:
ws.send(json.dumps({"method": "subscribe", "params": {"channel": ch}}))
def is_alive(self):
return self.ws.keep_running
def exitLoop(self):
self.loop = False
self.ws.close()
def calcMntTime():
now = datetime.now()
mntStart = datetime(now.year, now.month, now.day, MNT_START_HOUR, MNT_START_MIN + MNT_START_MARGIN)
if mntStart + timedelta(minutes=ITV_MNT) < now:
mntStart += timedelta(days=1)
return mntStart
def compressLogs(yesterday, compression=ZIP_LZMA):
logFiles = pathlib.Path().glob("realtime_api_*.log." + yesterday)
for logFile in logFiles:
loggerTh.info("compress {}".format(logFile))
with zipfile.ZipFile(str(logFile) + ".zip", "w", compression=ZIP_DEFLATED) as logZip:
logZip.write(logFile)
logFile.unlink()
def upload2gdrive(yesterday, gauth, drive):
zippedFiles = pathlib.Path().glob("realtime_api_*.log." + yesterday + ".zip")
for zippedFile in zippedFiles:
loggerTh.info("upload {}".format(zippedFile))
metadata = {
'title': str(zippedFile),
}
if GDRIVE_FOLDER_ID:
metadata['parents'] = [{'id': GDRIVE_FOLDER_ID}]
driveFile = drive.CreateFile(metadata)
driveFile.SetContentFile(str(zippedFile))
driveFile.Upload()
zippedFile.unlink()
def compressAndUpload():
if UPLOAD_GDRIVE:
gauth = GoogleAuth()
gauth.CommandLineAuth()
drive = GoogleDrive(gauth)
while True:
nextMnt = calcMntTime()
loggerTh.info("sleep start, until {}".format(nextMnt))
while nextMnt > datetime.now():
sleep(1)
loggerTh.info("sleep finish")
os.chdir("log")
yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
compressLogs(yesterday)
if UPLOAD_GDRIVE:
upload2gdrive(yesterday, gauth, drive)
os.chdir("..")
sleep(ITV_MNT * 60)
if __name__ == "__main__":
rcvChannels = {
WSS_EXEC_FX: True,
WSS_BOARD_SS_FX: True,
WSS_BOARD_FX: True,
WSS_TICKER_FX: True,
WSS_EXEC_SPOT: True,
WSS_BOARD_SS_SPOT: True,
WSS_BOARD_SPOT: True,
WSS_TICKER_SPOT: True,
}
receiver = ReailtimeApiReceiver(rcvChannels, False)
import concurrent.futures
executor = concurrent.futures.ThreadPoolExecutor(max_workers=2)
executor.submit(receiver.run)
executor.submit(compressAndUpload)
from time import sleep
try:
while True:
sleep(1)
except (KeyboardInterrupt, Exception) as e:
loggerTh.error(e)
loggerTh.error(traceback.format_exc())
receiver.exitLoop()
executor.shutdown(wait=True)
動作について
ReailtimeApiReceiverの引数
- rcvChannels: {channel: isRcv} でどのチャンネルを購読するかを決定
- oneFile: ログをチャンネル別に分けるか、1つのファイルにまとめるか
ログファイル
- フォーマットは <時刻(YYYY-MM-DD hh:mm:ss.f)><タブ><タイムスタンプ><タブ><受信データ>
<タイムスタンプ> を出力しているのは usec を取得するため
サンプル: 2018-11-03 09:55:15.240 1541206515.240989 {"jsonrpc":"2.0","method":"channelMessage","params":{"channel":"lightning_ticker_FX_BTC_JPY","message":{"product_code":"FX_BTC_JPY","timestamp":"2018-11-03T00:55:15.1120093Z","tick_id":12899309,"best_bid":730944,"best_ask":730961,"best_bid_size":0.03,"best_ask_size":0.04086046,"total_bid_depth":13427.18752245,"total_ask_depth":10912.80026445,"ltp":730961,"volume":172394.54656461,"volume_by_product":172394.54656461}}}
- oneFile = True の場合 log/realtime_api_all.log に出力
- oneFile = False の場合 log/realtime_api_<channel_name>.log に出力
- (システム時刻における)日付変更でローテーション
- 定期メンテナンス(04:00〜04:10)になったら、前日のログファイルを圧縮して削除する
Google Driveへのアップロード
- UPLOAD_GDRIVE = True でアップロード
- GDRIVE_FOLDER_ID に folder id を設定(値は URL の末尾部分、詳しくは参考リンク参照)
注意事項
・ディスクフル(空き容量ゼロ)にはご注意
・Google Driveの空き容量にはご注意
参考サイト
おわりに
有料(¥100)にしてるけど、これで内容は全部です。募金してくれる人がいれば、ジュース代としていただけると嬉しい。コードは、インデントくずれが起きたりするようなので、コピペ時には注意してください。
マガジン
コメント用note(未購入者向け)
干し芋
ここから先は
0字
¥ 100
期間限定!Amazon Payで支払うと抽選で
Amazonギフトカード5,000円分が当たる
Amazonギフトカード5,000円分が当たる
サポート頂けると励みになります BTC,BCH: 39kcicufyycWVf8gcGxgsFn2B8Nd7reNUA LTC: LUFGHgdx1qqashDw4WxDcSYQPzd9w9f3iL MONA: MJXExiB7T7FFXKYf9SLqykrtGYDFn3gnaM