
websocketでbitFlyerの約定情報を取得し、ローソク足を作成する
くもすけさんが無料で公開されているコードが2022年時点のPandasで動かなかった為、少し修正してみました。主な変更点は下記のとおりです。
・df.ix -> df.ilocに変更。NaNの処理方法も変更
・ファイルのヘッダにopen, high, low, close等を記載
・csvを開いていたりしてプログラムからcsv書き込みができないときに新規ファイルを作成
・データ数50000件以上で新規ファイル作成
# coding: utf-8
import threading
from collections import deque
import websocket
import warnings
import time
import json
import pandas
import dateutil.parser
import numpy as np
from datetime import datetime,timedelta
product = "FX_BTC_JPY"
timescale = 60
sleeptime = timescale *2
write_counter_max = 50000
def get_logfile_name(dt_now):
_log_file_name = 'bit_FX_'+ str(timescale) +'s_'+ dt_now.strftime('%Y-%m-%d-%H-%M-%S') + '.csv'
return _log_file_name
class Websocketexecutions:
def __init__(self, product, timescale):
self.product = product
self.timescale = str(timescale)+"s"
self.executions = deque(maxlen=timescale*500)
self.executionsWebsocket()
warnings.simplefilter(action="ignore", category=FutureWarning)
def updatecandle(self):
tmpExecutions = list(self.executions)
self.raw = pandas.DataFrame([[dateutil.parser.parse(tick["exec_date"].replace('T',' ')[:-1])+timedelta(hours=9),tick["price"],tick["size"],tick["size"] if tick["side"]=='BUY' else 0,tick["size"] if tick["side"]=='SELL' else 0] for tick in tmpExecutions],columns=["date","price","volume","buy","sell"])
self.raw = self.raw.set_index('date')
self.candle = self.raw['price'].resample(self.timescale, label = 'left', closed = 'left').ohlc()
self.candle = self.candle.assign( volume = self.raw['volume'].resample(self.timescale).sum().values)
self.candle = self.candle.assign( buy = self.raw['buy'].resample(self.timescale).sum().values)
self.candle = self.candle.assign( sell = self.raw['sell'].resample(self.timescale).sum().values)
#NaNがある場合の処理 一旦open値で判断
nan_index = self.candle.index[(self.candle['open'].isna())]
if len(nan_index) > 0:
self.candle['close'] = self.candle['close'].fillna(method='ffill')
self.candle['volume'] = self.candle['volume'].fillna(0)
for c in nan_index:
self.candle.at[c, 'open'] = self.candle.at[c, 'close']
self.candle.at[c, 'high'] = self.candle.at[c, 'close']
self.candle.at[c, 'low'] = self.candle.at[c, 'close']
def executionsWebsocket(self):
def on_message(ws, message):
messages = json.loads(message)
recept_data = messages["params"]["message"]
for i in recept_data:
self.executions.append(i)
def on_open(ws):
ws.send(json.dumps({"method": "subscribe", "params": {"channel": "lightning_executions_{}".format(product)}}))
def run(ws):
while True:
ws.run_forever()
time.sleep(3)
ws = websocket.WebSocketApp( "wss://ws.lightstream.bitflyer.com/json-rpc", on_message=on_message )
ws.on_open = on_open
websocketThread = threading.Thread(target=run, args=(ws, ))
websocketThread.start()
if __name__ == "__main__":
print('start bitflyer websocket')
dt_now = datetime.now()
log_file_name = get_logfile_name(dt_now)
websocket = Websocketexecutions( product, timescale )
while not websocket.executions :
time.sleep(1)
lastdate = ""
write_counter = 0
while True:
time.sleep(sleeptime)
dt_now_for_candle = datetime.now()
websocket.updatecandle()
lastpos = 0 if lastdate == "" else websocket.candle.index.get_loc(lastdate)
latestCandle = websocket.candle[lastpos:len(websocket.candle)-1]
if len(latestCandle)>0 :
print(latestCandle)
if write_counter == 0:
latestCandle.to_csv( log_file_name, header=True)
else:
try:
latestCandle.to_csv( log_file_name, header=False, mode='a')
except:
dt_now = datetime.now()
log_file_name = get_logfile_name(dt_now)
latestCandle.to_csv( log_file_name, header=True)
write_counter = 0
write_counter = write_counter + 1
if write_counter >= write_counter_max:
dt_now = datetime.now()
log_file_name = get_logfile_name(dt_now)
write_counter = 0
lastdate = websocket.candle[-1:].index[0]
必要なライブラリはpandas 、websocket-client、numpyくらいだったかと思います。
ローソク足の時間は'timescale'にて設定できます。秒単位なので60と指定すると1分足が出力されます。
ファイル名は「bit_FX_{秒足}s_{ファイル取得開始時間}」で作成されます。
ファイル名を変更されたい方は'_log_file_name'を変更してください。
こちらが元々のくもすけさんのコードです。
有料の部分に記載はありませんが、この記事が役に立った方はコーヒー代として、有料部分を購入していただけると非常に喜びます。
お役に立てば幸いです。
ここから先は
56字
¥ 100
この記事が気に入ったらチップで応援してみませんか?