
Photo by
debupinoko
ペアトレボット
稼働していますが、あまり儲かっていないボットの公開です。スケーラビリティーが低くて、今後注力する予定が無いので公開しました。公開用に整えてないので、かなり雑です。自前で書いたクラスへの依存とかあるので、そのままだと動かないと思います。ロジックのヒントになれば
原理
KfEstimatorがコアロジックです。ペアトレできそうな2つの銘柄A, Bを用意。カルマンフィルターでBの対数価格からAの対数価格を予測。ポジションを予測誤差(resid)に比例させてペアトレ。
発注量を計算するためのクラス
import time
import datetime
import threading
import traceback
import pandas as pd
import numpy as np
import copy
from pykalman import KalmanFilter
from concurrent.futures import ThreadPoolExecutor
class KfEstimator:
def __init__(self, delta=None, delta2=None, market_count=2):
n_dim_state = market_count
trans_cov = delta / (1 - delta) * np.eye(n_dim_state)
trans_cov[0, 0] = delta2 / (1 - delta2)
trans_mat = np.eye(n_dim_state)
self.state_mean = np.zeros(n_dim_state)
self.state_cov = np.eye(n_dim_state)
self.kf = KalmanFilter(
n_dim_obs=1,
n_dim_state=n_dim_state,
initial_state_mean=self.state_mean,
initial_state_covariance=self.state_cov,
transition_matrices=trans_mat,
transition_covariance=trans_cov,
observation_matrices=np.ones((1, n_dim_state)),
observation_covariance=1.0,
)
self.resid = 0.0
def update(self, x=None):
obs_mat = x.copy()
obs_mat[0] = 1
self.state_mean, self.state_cov = self.kf.filter_update(
self.state_mean,
self.state_cov,
observation=x[0:1],
observation_matrix=obs_mat.reshape(1, -1),
)
resid = x[0] - np.sum(self.state_mean * obs_mat)
resid /= self.state_cov[0, 0] ** 0.5
t = 1.0 / 12
self.resid = resid * t + (1 - t) * self.resid
class FtxPairPlugin:
def __init__(self, jsonl_logger=None, prod=False, symbols=[], leverage=20, max_weight=0.25):
self.jsonl_logger = jsonl_logger
self.prod = prod
self.symbols = symbols
self.lock = threading.Lock()
# 2ヶ月くらい必要 (x_diffを作るため + kalman filterの初期化)
self.history_period = datetime.timedelta(hours=24 * 30 * 2)
self.last_loop_at = None
self.start_at = time.time()
self.leverage = leverage
self.max_weight = max_weight
self.max_positions = {}
self.max_position_update_interval = 5 * 60
self.kf_timestamp = pd.to_datetime('2000-01-01 00:00:00Z')
self.weights = {}
for symbol in symbols:
self.weights[symbol] = 0
# self.time_resolution = 60 # for test
self.time_resolution = 300 # for test
# self.time_resolution = 3600
self.kf_estimator = KfEstimator(
delta=1e-2 / 12 ** 2,
delta2=1e-2 / 12 ** 2,
market_count=len(symbols),
)
def allocate_ftx_client(self):
return self.trader.allocate_ftx_client()
def initialize(self, trader=None):
self.trader = trader
self.logger = trader.logger
self.rate_limiter = trader.rate_limiter
self.df_ftx = pd.DataFrame()
self.fetch_new_candles()
self.update_max_position()
self.logger.info('plugin thread start')
self.thread = threading.Thread(target=self.run)
self.thread.start()
# plugin interface
def get_order_info(self, positions=None, size_units=None, price_units=None):
with self.lock:
pinned_weights = copy.copy(self.weights)
pinned_max_positions = copy.copy(self.max_positions)
pinned_cls = self.df_ftx.groupby('market')['cl'].nth(-1).to_dict()
if self.last_loop_at is None or pinned_weights is None:
return []
result = {}
for symbol in self.symbols:
target_pos = pinned_weights[symbol] * pinned_max_positions[symbol]
size_unit = size_units[symbol]
min_amount = 10 * size_unit # minimum order size回避
buy_amount = target_pos - positions[symbol]
buy_amount = round(buy_amount / size_unit) * size_unit
if buy_amount < min_amount:
buy_amount = 0
sell_amount = -target_pos + positions[symbol]
sell_amount = round(sell_amount / size_unit) * size_unit
if sell_amount < min_amount:
sell_amount = 0
result[symbol] = {
'buy_amount': buy_amount,
'sell_amount': sell_amount,
'buy_price': pinned_cls[symbol] - price_units[symbol],
'sell_price': pinned_cls[symbol] + price_units[symbol],
'raw_target_pos': target_pos
}
return result
# plugin interface
def get_status(self):
if self.last_loop_at is None:
live = time.time() - self.start_at < 1.5 * 60 * 60
else:
live = time.time() - self.last_loop_at.timestamp() < 1.5 * 60 * 60
if self.kf_timestamp is None:
live = live and time.time() - self.start_at < 1.5 * 60 * 60
else:
live = live and time.time() - self.kf_timestamp.timestamp() < 2.5 * 60 * 60
return {
'weights': self.weights,
'max_positions': self.max_positions,
'live': live
}
def run(self):
while True:
try:
self.loop()
except KeyboardInterrupt:
raise
except Exception as e:
self.logger.error('exception ' + traceback.format_exc())
time.sleep(20) # 短時間にAPIエラーが多いと制限されたりするので
time.sleep(0.1)
def update_max_position(self):
with self.allocate_ftx_client() as ftx:
self.rate_limiter.rate_limit(tags=['all'])
account = ftx.privateGetAccount()['result']
self.rate_limiter.rate_limit(tags=['all'])
futures = ftx.publicGetFutures()['result']
collateral = account['collateral']
with self.lock:
for future in futures:
symbol = future['name']
if symbol not in self.symbols:
continue
mark_price = future['mark']
self.max_positions[symbol] = collateral / mark_price * self.leverage
self.logger.info('max_position updated {} {} collateral {} mark_price {} leverage {}'.format(
symbol, self.max_positions[symbol], collateral, mark_price, self.leverage))
self.max_position_updated_at = time.time()
def loop(self):
latest_tf = latest_time_frame(self.time_resolution)
if self.max_position_updated_at < time.time() - self.max_position_update_interval:
self.update_max_position()
if self.last_loop_at is not None and self.last_loop_at >= latest_tf:
return
self.logger.debug('plugin loop')
start = time.time()
fetch_ftx_sec = self.wait_for_ftx() - start
start = time.time()
self.update_ml_states()
update_ml_sec = time.time() - start
log = {
'timestamp': str(now()),
'strategy_id': 'mm_ftx_portfolio_plugin',
'weights': self.weights,
'max_position': self.max_positions,
'ftx_size': self.df_ftx.shape[0],
'ftx_min_of_max_time': str(self.df_ftx.groupby('market')['timestamp_col'].max().min()),
'ftx_max_time': str(self.df_ftx['timestamp_col'].max()),
'ftx_min_time': str(self.df_ftx['timestamp_col'].min()),
'ftx_sec': fetch_ftx_sec,
'update_ml_sec': update_ml_sec,
}
self.jsonl_logger.write(log)
# 古いのを削除
min_index = latest_tf - self.history_period
self.df_ftx = self.df_ftx[self.df_ftx['timestamp_col'] >= min_index]
self.last_loop_at = latest_tf
def wait_for_ftx(self):
latest_tf = latest_time_frame(self.time_resolution)
self.logger.debug('wait_for_ftx latest_tf {} df_ftx.shape {}'.format(latest_tf, self.df_ftx.shape))
# rest版
first = True
while self.df_ftx.groupby('market')['timestamp_col'].max().min() < latest_tf - datetime.timedelta(seconds=self.time_resolution):
self.fetch_new_candles()
if not first:
time.sleep(1.0)
first = False
return time.time()
def fetch_new_candles(self):
if self.df_ftx.shape[0]:
limit = 10
else:
limit = 5000
def do_fetch_new_candles(params):
symbol = params['symbol']
end_time = params['end_time']
self.rate_limiter.rate_limit(tags=['all'])
with self.allocate_ftx_client() as ftx:
data = ftx.publicGetMarketsMarketNameCandles({
'market_name': symbol,
'end_time': end_time, # キャッシュを無効にするために必要
'resolution': self.time_resolution,
'limit': limit,
})['result']
if len(data) == 0:
return None
df = pd.DataFrame(data)
df = df.rename(columns={
'open': 'op',
'high': 'hi',
'low': 'lo',
'close': 'cl',
'startTime': 'timestamp',
})[['timestamp', 'op', 'hi', 'lo', 'cl', 'volume']]
df['timestamp'] = pd.to_datetime(df['timestamp'], utc=True)
# 最後は未確定足なので削除
df = df[df['timestamp'] != df['timestamp'].max()]
df['market'] = symbol
df = df.set_index(['market', 'timestamp'], drop=False)
df = df.rename(columns={
'market': 'market_col',
'timestamp': 'timestamp_col',
})
return df
# 未来時刻だと何も返らない
# float OK
end_time_base = int(np.floor(time.time() - 0.1))
params = []
for symbol in self.symbols:
for i in range(1 if self.df_ftx.shape[0] else 12 * 24 * 30 * 2 // limit + 1):
params.append({
'symbol': symbol,
'end_time': end_time_base - self.time_resolution * (limit - 10) * i, # -10はoverlapさせるため
})
with ThreadPoolExecutor() as executor:
dfs = executor.map(do_fetch_new_candles, params)
for df in dfs:
if df is not None:
self.df_ftx = smart_append(self.df_ftx, df)
def update_ml_states(self):
start = time.time()
self.logger.debug('df_ftx.shape {}'.format(self.df_ftx.shape))
df = self.df_ftx.reset_index().copy()
df['x'] = np.log(df['cl'])
df['x_diff'] = df['x'] - df.groupby('market')['x'].rolling(24 * 30 * 12, 1).mean().values
df = df.dropna()
df_x_diff = df.pivot(index='timestamp', columns='market', values='x_diff')
df_x_diff = df_x_diff.ffill().dropna()
df_x_diff = df_x_diff[self.symbols]
df_x_diff = df_x_diff[self.kf_timestamp < df_x_diff.index]
if df_x_diff.shape[0] == 0:
return
for i in range(df_x_diff.shape[0]):
x = df_x_diff.iloc[i].values
self.kf_estimator.update(x)
self.kf_timestamp = df_x_diff.index[-1]
self.logger.info('kf_timestamp updated {}'.format(self.kf_timestamp))
pos = -self.kf_estimator.resid
with self.lock:
self.weights = {}
self.weights[self.symbols[0]] = pos
for i in range(1, len(self.symbols)):
self.weights[self.symbols[i]] = -pos * self.kf_estimator.state_mean[i]
# clip
for symbol in self.weights:
self.weights[symbol] = np.clip(self.weights[symbol], -self.max_weight, self.max_weight)
self.logger.debug('update_ml_states elapsed sec {}'.format(time.time() - start))
# https://stackoverflow.com/questions/3463930/how-to-round-the-minute-of-a-datetime-object/10854034#10854034
def floor_time(dt=None, roundTo=60):
"""Round a datetime object to any time lapse in seconds
dt : datetime.datetime object, default now.
roundTo : Closest number of seconds to round to, default 1 minute.
Author: Thierry Husson 2012 - Use it as you want but don't blame me.
"""
if dt == None : dt = datetime.datetime.now()
seconds = (dt.replace(tzinfo=None) - dt.min).seconds
rounding = seconds // roundTo * roundTo
return dt + datetime.timedelta(0,rounding-seconds,-dt.microsecond)
def now():
return datetime.datetime.now(datetime.timezone.utc)
def latest_time_frame(resolution=None):
return floor_time(now(), resolution)
def smart_append(df, other):
df = df.append(other)
df.sort_index(inplace=True)
# https://stackoverflow.com/questions/13035764/remove-rows-with-duplicate-indices-pandas-dataframe-and-timeseries
return df[~df.index.duplicated(keep='last')]
発注するクラス
import time
import threading
import logging
import pandas as pd
import ccxt
from functools import reduce, partial
from concurrent.futures import ThreadPoolExecutor
from ccxt.base.errors import BadRequest, OrderNotFound, ExchangeError, ExchangeNotAvailable
from sync_executor import SyncExecutor
from panic_manager import PanicManager
from instance_pool import InstancePool
import ccxt_rate_limiter
class DefaultPlugin:
def __init__(self):
None
def initialize(self, trader=None):
None
def get_order_info(self, positions=None, size_units=None, price_units=None):
position = positions['XTZ-PERP']
size_unit = size_units['XTZ-PERP']
max_position = 40 * size_unit
min_amount = 20 * size_unit
buy_amount = round(max([0, max_position - position]) / size_unit) * size_unit
if buy_amount < min_amount:
buy_amount = 0
sell_amount = round(max([0, max_position + position]) / size_unit) * size_unit
if sell_amount < min_amount:
sell_amount = 0
return {
'XTZ-PERP': {
'buy_amount': buy_amount,
'sell_amount': sell_amount
}
}
def get_status(self):
return { 'live': True }
class MmFtxPortfolioTrader:
def __init__(self, api_key=None, api_secret=None, prod=False,
ftx_ws=None, logger=None, plugin=None, symbols=['XTZ-PERP'], subaccount=None):
self.panic_manager = PanicManager(logger=logger)
self.panic_manager.register(tag='trader', start_time=5 * 60, interval=60)
rate_limits = ccxt_rate_limiter.ftx.ftx_limits()
rate_limits = ccxt_rate_limiter.scale_limits(rate_limits, 0.75)
self.rate_limiter = ccxt_rate_limiter.rate_limiter_group.RateLimiterGroup(limits=rate_limits)
def create_ftx():
headers = {}
if prod:
headers['FTX-SUBACCOUNT'] = subaccount
else:
headers['FTX-SUBACCOUNT'] = 'bottest'
ftx = ccxt.ftx({
'apiKey': api_key,
'secret': api_secret,
'enableRateLimit': False,
'headers': headers,
})
ccxt_rate_limiter.wrap_object(
ftx,
rate_limiter_group=self.rate_limiter,
wrap_defs=ccxt_rate_limiter.ftx.ftx_wrap_defs()
)
return ftx
self.ftx_pool = InstancePool(create_fn=create_ftx)
# self.ftx_ws = ftx_ws
self.logger = logger
self.symbols = symbols
self.orders = []
self.prev_log_status = time.time()
self.lock = threading.Lock()
self.plugin = plugin or DefaultPlugin()
self.exchange_error_at = 0
self.current_positions = {}
for symbol in self.symbols:
self.current_positions[symbol] = 0.0
self.last_position_updated_at = time.time()
self.position_update_interval = 10
self.last_order_updated_at = time.time()
self.order_update_interval = 1
self.replace_order_at = {}
with self.allocate_ftx_client() as ftx:
ftx.privateDeleteOrders({})
markets = ftx.publicGetMarkets()['result']
self.price_units = {}
self.size_units = {}
for market in markets:
if market['name'] in self.symbols:
self.price_units[market['name']] = market['priceIncrement']
self.size_units[market['name']] = market['sizeIncrement']
self.logger.info(market)
self.update_position()
self.plugin.initialize(trader=self)
def allocate_ftx_client(self):
return self.ftx_pool.allocate()
def trade(self):
try:
self.do_trade()
except ExchangeNotAvailable as e:
self.exchange_error_at = time.time()
self.logger.error('ExchangeNotAvailable occured. hard circuit break enabled. retrying after sleep {}'.format(e))
time.sleep(0.5) # https://www.bitmex.com/app/restAPI#Overload
def update_position(self):
with self.allocate_ftx_client() as ftx:
positions = ftx.privateGetPositions()['result']
for position in positions:
if position['future'] not in self.symbols:
continue
pos = side_to_int(position['side']) * position['size']
if self.current_positions[position['future']] != pos:
self.logger.info('current_position force updated {} {} -> {}'.format(position['future'], self.current_positions[position['future']], pos))
self.current_positions[position['future']] = pos
def reserve_update_orders(self):
# openapi系の変更APIは現在の値が返ってこないので、
# 次のループでちゃんと同期する
self.last_order_updated_at = 1
def order_updated(self, old_order=None, new_order=None):
with self.lock:
symbol = old_order['market']
old_filled_size = old_order.get('filledSize', 0)
executed_size = new_order.get('filledSize', 0) - old_filled_size
signed_executed_size = executed_size * side_to_int(new_order['side'])
if executed_size:
self.logger.info('{} {} order executed {} signed_executed_size {}'.format(symbol, new_order['side'], new_order['id'], signed_executed_size))
pos = self.current_positions[symbol] + signed_executed_size
self.logger.info('current_position updated by execution {} {} -> {}'.format(symbol, self.current_positions[symbol], pos))
self.current_positions[symbol] = pos
status = new_order['status']
if status == 'closed':
if new_order['size'] == new_order.get('filledSize', 0):
self.logger.info('order fully executed {}'.format(new_order))
else:
self.logger.info('order canceled {}'.format(new_order))
new_order = None
return new_order
def update_orders(self):
def update_order(order):
with self.allocate_ftx_client() as ftx:
new_order = ftx.privateGetOrdersOrderId({
'order_id': order['id'],
})['result']
return self.order_updated(old_order=order, new_order=new_order)
if len(self.orders) <= 1:
results = list(map(update_order, self.orders))
else:
with ThreadPoolExecutor() as executor:
results = executor.map(update_order, self.orders)
self.orders = [x for x in results if x is not None]
self.logger.debug('order force updated')
def fetch_quote(self, symbol=None):
with self.allocate_ftx_client() as ftx:
ob = ftx.publicGetMarketsMarketNameOrderbook({
'market_name': symbol,
'depth': 1,
})['result']
return {
'ask_price': ob['asks'][0][0],
'ask_size': ob['asks'][0][1],
'bid_price': ob['bids'][0][0],
'bid_size': ob['bids'][0][1],
}
def do_trade(self):
self.logger.debug('trade loop')
if self.last_order_updated_at < time.time() - self.order_update_interval:
self.update_orders()
self.last_order_updated_at = time.time()
# 注文が無い状態がしばらく続いたらポジション更新 (不整合を防ぐため)
if self.orders:
self.last_position_updated_at = time.time()
else:
if self.last_position_updated_at < time.time() - self.position_update_interval:
self.update_position()
self.last_position_updated_at = time.time()
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug('orders {}'.format(self.orders))
order_infos = self.plugin.get_order_info(
positions=self.current_positions,
size_units=self.size_units,
price_units=self.price_units,
)
# すでに注文が出ているか、発注要求があれば、active_symbolsに加える
active_symbols = []
for symbol in order_infos:
order_info = order_infos[symbol]
if (order_info['buy_amount'] or order_info['sell_amount']) and symbol not in active_symbols:
active_symbols.append(symbol)
for order in self.orders:
if order['market'] not in active_symbols:
active_symbols.append(order['market'])
with self.create_executor(sync=len(active_symbols) <= 1) as executor:
order_results = []
for symbol in active_symbols:
order_info = order_infos[symbol]
buy_amount = order_info['buy_amount']
sell_amount = order_info['sell_amount']
buy_price = order_info.get('buy_price')
sell_price = order_info.get('sell_price')
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug('buy_amount {}'.format(buy_amount))
self.logger.debug('sell_amount {}'.format(sell_amount))
self.logger.debug('buy_price {}'.format(buy_price))
self.logger.debug('sell_price {}'.format(sell_price))
target_buy_order = reduce(partial(target_buy, symbol), self.orders, None)
if target_buy_order:
self.orders.remove(target_buy_order)
order_future = executor.submit(self.update_limit_order, order=target_buy_order, side='buy', amount=buy_amount, symbol=symbol, price=buy_price)
order_results.append({
'order_future': order_future,
'original_order': target_buy_order,
})
target_sell_order = reduce(partial(target_sell, symbol), self.orders, None)
if target_sell_order:
self.orders.remove(target_sell_order)
order_future = executor.submit(self.update_limit_order, order=target_sell_order, side='sell', amount=sell_amount, symbol=symbol, price=sell_price)
order_results.append({
'order_future': order_future,
'original_order': target_sell_order,
})
cancel_results = []
for order in self.orders:
cancel_results.append(executor.submit(self.cancel_order, order))
for idx, result in enumerate(cancel_results):
try:
result.result()
self.orders[idx] = None
except Exception as e:
self.logger.error('error during cancel {}'.format(e))
for result in order_results:
try:
self.orders.append(result['order_future'].result())
except Exception as e:
self.logger.error('error during ordering {}'.format(e))
self.orders.append(result['original_order'])
self.orders = [x for x in self.orders if x]
now = time.time()
if now - self.prev_log_status > 10:
plugin_status = self.plugin.get_status()
self.logger.info('api/min {} position {} order_info {} plugin {}'.format(self.rate_limiter.status_info(), self.current_positions, order_infos, plugin_status))
self.prev_log_status = now
if plugin_status['live']:
self.panic_manager.ping('trader')
time.sleep(0.1)
def create_executor(self, sync=False):
if sync:
return SyncExecutor()
else:
return ThreadPoolExecutor()
def cancel_order(self, order=None):
self.logger.info('cancel_order {} {}'.format(order['market'], order['id']))
with self.allocate_ftx_client() as ftx:
self.reserve_update_orders()
ftx.privateDeleteOrdersOrderId({
'order_id': order['id'],
})
self.logger.info('cancel_order done')
def is_size_equal(self, a, b, symbol):
return round(a / self.size_units[symbol]) == round(b / self.size_units[symbol])
def is_price_equal(self, a, b, symbol):
return round(a / self.price_units[symbol]) == round(b / self.price_units[symbol])
def update_limit_order(self, order=None, side=None, amount=None, symbol=None, price=None):
quote = self.fetch_quote(symbol=symbol)
price_unit = self.price_units[symbol]
if price is None:
# old logic
if order:
if side.lower() == 'buy':
price = max([order['price'], quote['bid_price']])
else:
price = min([order['price'], quote['ask_price']])
else:
if side.lower() == 'buy':
price = min([quote['ask_price'] - price_unit, quote['bid_price'] + price_unit])
else:
price = max([quote['bid_price'] + price_unit, quote['ask_price'] - price_unit])
else:
if side.lower() == 'buy':
price = min([quote['ask_price'] - price_unit, price])
else:
price = max([quote['bid_price'] + price_unit, price])
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug('update_limit_order {} side {} amount {} price {}'.format(symbol, side, amount, price))
if order:
if self.is_size_equal(amount, order['remainingSize'], symbol) and self.is_price_equal(price, order['price'], symbol):
self.logger.debug('same order. do nothing')
return order
else:
# 注文変更はキャンセルと発注によって実装されているので、
# キャンセル完了するまで待つ
# キャンセル前に約定することを考慮してorderはそのまま返す
self.cancel_order(order=order)
return order
# elif amount == 0:
# self.cancel_order(order=order)
# return None
# else:
# quote_delay = time.time() - self.last_ob_updated_at
# self.logger.info('edit_order {} limit side {} amount {} price {} quote_delay(sec) {:.6f}'.format(order['id'], side, amount, price, quote_delay))
# params = {
# 'order_id': order['id'],
# }
# if not self.is_size_equal(amount, order['remainingSize']):
# params['size'] = order.get('filledSize', 0) + amount
# if not self.is_price_equal(price, order['price']):
# params['price'] = price
# params_str = str(params)
# if self.replace_order_at.get(params_str, 0) < time.time() - 1:
# self.rate_limiter.rate_limit(tags=['all'])
# with self.allocate_ftx_client() as ftx:
# new_order = ftx.privatePostOrdersOrderIdModify(params)['result']
# order = self.order_updated(old_order=order, new_order=new_order)
# self.replace_order_at[params_str] = time.time()
# self.logger.info('edit_order done')
# else:
# self.logger.debug('edit_order same order in short time skipped')
# return order
else:
if amount == 0:
self.logger.debug('amount 0. do nothing')
return None
else:
self.logger.info('create_order {} limit side {} amount {} price {}'.format(symbol, side, amount, price))
params = {
'market': symbol,
'type': "limit",
'side': side,
'size': amount,
'price': price,
'postOnly': True
}
with self.allocate_ftx_client() as ftx:
order = ftx.privatePostOrders(params)['result']
self.logger.info('create_order done')
return order
raise Exception('should not reach here')
def target_buy(market, existing, target):
# 必須条件
if target['side'].lower() != 'buy':
return existing
if target['status'] == 'closed':
return existing
if target['market'] != market:
return existing
if not existing:
return target
# 順序
if pd.to_datetime(target['created_at']) < pd.to_datetime(existing['created_at']):
return target
return existing
def target_sell(market, existing, target):
# 必須条件
if target['side'].lower() != 'sell':
return existing
if target['status'] == 'closed':
return existing
if target['market'] != market:
return existing
if not existing:
return target
# 順序
if pd.to_datetime(target['created_at']) < pd.to_datetime(existing['created_at']):
return target
return existing
def side_to_int(side):
return -1 if side.lower() == 'sell' else 1
セットアップ
trader = MmFtxPortfolioTrader(
api_key=ftx_config()['key'],
api_secret=ftx_config()['secret'],
prod=args.prod,
plugin=FtxPairPlugin(
jsonl_logger=jsonl_logger,
prod=args.prod,
symbols=['BSV-PERP', 'BCH-PERP'],
max_weight=0.05,
leverage=20.0,
),
symbols=['BSV-PERP', 'BCH-PERP'],
logger=logger,
subaccount='my_subaccount1',
)
ライセンス
CC0
Q and A
Q. SyncExecutorって?
A. ThreadPoolExecutorと同じインターフェースで普通にシングルスレッドで同期で実行するものです。パフォーマンスが重要なボットを作っていたときに、マルチスレッドコストを消すために作ったもののなごりです。コピペで作っているので、他ボットのなごりがかなり含まれています。
Q. InstancePoolって必要?
InstancePoolは以下のクラスです。使う理由は2つ。
・ccxtのインスタンスはスレッドセーフではない(注意して使えば使えるかもだが)、
・たしか、ccxtのインスタンスを使い回すとコネクションを使い回せた気がする(調べて確認した気がするが、正確には忘れた)。これもパフォーマンスが重要なボットのなごりです。
import threading
class InstanceAllocation:
def __init__(self, pool, instance):
self.pool = pool
self.instance = instance
def __enter__(self):
return self.instance
def __exit__(self, exc_type, exc_val, exc_tb):
self.pool.free(self.instance)
class InstancePool:
def __init__(self, create_fn=None):
self.lock = threading.Lock()
self.instances = []
self.create_fn = create_fn
def allocate(self):
with self.lock:
if len(self.instances):
instance = self.instances.pop(-1)
else:
instance = None
if instance is None:
instance = self.create_fn()
return InstanceAllocation(self, instance)
def free(self, instance):
with self.lock:
self.instances.append(instance)