bitFlyerの価格をリアルタイム取得する、ほぼ最小のPython実装
Websocketの小さな実用例が欲しい方や、Bot開発のお供にどうぞ。
組み込んでの再配布など、ご自由にお使いください。
価値を感じたら、サポート・投げ銭・購入などで応援をお願いします。
(記事自体は無料部分で全てです)
概要:
・現在価格・最寄り板の価格、Buy/Sellボリューム等を取得できます
・APIリクエスト制限に関わらず問い合わせできます
・スレッドを使い、メインの処理を止めません
・ログイン情報不要、エラー時は自動で再接続します
下準備:
$ pip install websocket
$ pip install websocket-client==0.46.0
実装&動作サンプル:
#!/usr/bin/python3
# coding: utf-8
#----------------------------------------------------------
# BTC tip先:36ds4QZByQ1PdA2MXtQ8EGg4doci5w5hky
#----------------------------------------------------------
import datetime
import threading
import json
import websocket
import time
# websocketを使ってtickerをリアルタイム取得
class BfRealtimeTicker(object):
def __init__(self, symbol):
self.symbol = symbol
self.ticker = None
self.connect()
def connect(self):
self.ws = websocket.WebSocketApp(
'wss://ws.lightstream.bitflyer.com/json-rpc', header=None,
on_open = self.on_open, on_message = self.on_message,
on_error = self.on_error, on_close = self.on_close)
self.ws.keep_running = True
self.thread = threading.Thread(target=lambda: self.ws.run_forever())
self.thread.daemon = True
self.thread.start()
def is_connected(self):
return self.ws.sock and self.ws.sock.connected
def disconnect(self):
self.ws.keep_running = False
self.ws.close()
def get(self):
return self.ticker
def on_message(self, ws, message):
message = json.loads(message)['params']
self.ticker = message['message']
def on_error(self, ws, error):
self.disconnect()
time.sleep(0.5)
self.connect()
def on_close(self, ws):
print('Websocket disconnected')
def on_open(self, ws):
ws.send(json.dumps( {'method':'subscribe',
'params':{'channel':'lightning_ticker_' + self.symbol}} ))
print('Websocket connected')
# 使用例
if __name__ == '__main__':
bfrt = BfRealtimeTicker('FX_BTC_JPY')
while True:
print(bfrt.get())
time.sleep(0.5)
ご武運を!
ここから先は
0字
¥ 200
期間限定!PayPayで支払うと抽選でお得
この記事が気に入ったらチップで応援してみませんか?