FTXの取引履歴を一括で全取得してCSVファイルを作成するプログラム

こんにちは、JINです。Twitter IDはこちら

今回は、FTX(日本人は登録禁止)の取引履歴を一括で全部取得してcsvファイルを生成するプログラムを公開します。私が必要だったのでnoteで探してみたら以下の記事を見つけました。

で、動かしてみたんですが、なんか上手くいかなったのでソースを読んでみたけど、自分があまりやらない書き方だったので自分なりに改良したコードが以下です。基本的な接続に関する関数などはそのまま利用させて頂きました(参考になりました。Yuckeyさんありがとうございます。)が、get_fillsでデータ取得後の整形部分以下はオリジナルです。コードは以下になります。これは5000行以上の取引履歴がある人のみ対象で、そんな人はbotterのみだと思うので細かい環境構築や説明はなしとします。以上で終わりです。バグあるかもしれないしコードが雑ですがお許しをw

import os
import datetime
import pprint
import time
import urllib.parse
from typing import Optional, Dict, Any, List
from requests import Request, Session, Response
from collections import deque
import hmac
"""
【要設定】
 _api_key:取得したいアカウントのAPI Key
 _api_secret:API Keyに対応したAPI Secret
 _subaccount_name:サブアカウントの名称(空文字はメイン)
"""
_api_key = ''
_api_secret = ''
_subaccount_name = ''
"""
【要設定】
 _start:()内にカンマ区切りで年,月,日,時,分,秒
 (設定不要)_end: 上と同じフォーマットで自動で今日の日付を代入
 _start <= 取得対象 <= _end で取得する
一度に取得する行数制限(最大5000)
"""
_start = datetime.datetime(2021, 1, 1).timestamp()
_now = datetime.datetime.now()
_end = datetime.datetime(_now.year, _now.month, _now.day).timestamp()
_limit_row = 5000
"""
【要設定】
 出力するファイル名を設定
 同名のファイルが存在する場合は上書き
 相対パス可。このまま実行するとこのプログラムと同じ場所に出力する。
"""
_csv_file = 'ftx_fills_data_2021.csv'
_session = Session()
_ENDPOINT = 'https://ftx.com/api/'

def _get(path, params = None):
  return _request('GET', path, params=params)
def _request(method, path, **kwargs):
  request = Request(method, _ENDPOINT + path, **kwargs)
  _sign_request(request)
  response = _session.send(request.prepare())
  return _process_response(response)
def _sign_request(request):
  ts = int(time.time() * 1000)
  prepared = request.prepare()
  signature_payload = f'{ts}{prepared.method}{prepared.path_url}'.encode()
  if prepared.body:
      signature_payload += prepared.body
  signature = hmac.new(_api_secret.encode(), signature_payload, 'sha256').hexdigest()
  request.headers['FTX-KEY'] = _api_key
  request.headers['FTX-SIGN'] = signature
  request.headers['FTX-TS'] = str(ts)
  if _subaccount_name:
      request.headers['FTX-SUBACCOUNT'] = urllib.parse.quote(_subaccount_name)
def _process_response(response):
  try:
      data = response.json()
  except ValueError:
      response.raise_for_status()
      raise
  else:
      if not data['success']:
          raise Exception(data['error'])
      return data['result']
def get_fills(market = None, limit = None, start_time = None, end_time = None, order = None, orderId = None):
  return _get(f'fills', {'market': market, 'limit': limit, 'start_time': start_time, 'end_time': end_time, 'order': order, 'orderId': orderId})
def make_list(res):
 global combine_list
 i=1
 for market in res:
   if i == _limit_row:
     combine_list.append(market)
     replace_time = market['time'][:10]+' '+market['time'][11:19]
     end_time = datetime.datetime.strptime(replace_time, '%Y-%m-%d %H:%M:%S')+datetime.timedelta(hours=9)
     return end_time
      
   else:
     combine_list.append(market)
     replace_time = market['time'][:10]+' '+market['time'][11:19]
     jst_time = datetime.datetime.strptime(replace_time, '%Y-%m-%d %H:%M:%S')+datetime.timedelta(hours=9)
   i=i+1
def has_duplicates(seq):
 seen = []
 unique_list = [x for x in seq if x not in seen and not seen.append(x)]
 return len(seq) != len(unique_list)
def get_unique_list(seq):
   seen = []
   return [x for x in seq if x not in seen and not seen.append(x)]
combine_list = []
while True:
 res = get_fills(limit=_limit_row, start_time=_start, end_time=_end)
 end_time = make_list(res)
 if end_time is None:
   break
 else:
   _next_end = end_time+datetime.timedelta(days=1)
   _next_start = _next_end+datetime.timedelta(days=-90)
   _start = datetime.datetime(_next_start.year, _next_start.month, _next_start.day).timestamp()
   _end = datetime.datetime(_next_end.year, _next_end.month, _next_end.day).timestamp()
combine_list2 = get_unique_list(combine_list)
os.remove(_csv_file)
with open(_csv_file, 'a') as f:
 f.write("id,time,market,side,type,size,price,fee,feeCurrency\n")
 i = 0
 for market in combine_list2:
   replace_time = market['time'][:10]+' '+market['time'][11:19]
   tmp_time = datetime.datetime.strptime(replace_time, '%Y-%m-%d %H:%M:%S')
   jst_time = str(tmp_time).replace(' ', 'T')+'.000000+00:00'
   if market["liquidity"]=="maker":
     f.write(str(market['id'])+','+str(jst_time)+','+str(market['market'])+','+str(market['side'])+','+str('Limit')+','+str(market['size'])+','+str(market['price'])+','+str(market['fee'])+','+str(market['feeCurrency'])+'\n')
   else:
     f.write(str(market['id'])+','+str(jst_time)+','+str(market['market'])+','+str(market['side'])+','+str('Market')+','+str(market['size'])+','+str(market['price'])+','+str(market['fee'])+','+str(market['feeCurrency'])+'\n')
   i = i + 1
 f.close()
print('check duplicates.')
print('duplicate: '+str(has_duplicates(combine_list)))
print('before: '+str(len(combine_list)))
print('after: '+str(len(combine_list2)))

この記事が気に入ったらサポートをしてみませんか?