第9章 ボラティリティ予測と統計的裁定取引 第7節: ペアトレーディングのバックテスト
9章最後になります。
今回はBacktraderを使ってバックテストして、結果をpyfolioで評価したいと思います(*'ω'*)
backtraderのgithubはこちらになります。
インポートと設定
import warnings
warnings.filterwarnings('ignore')
import csv
from collections import defaultdict
from dataclasses import dataclass, asdict
from datetime import date
from pathlib import Path
from time import time
import numpy as np
import pandas as pd
import pandas_datareader.data as web
import matplotlib.pyplot as plt
import seaborn as sns
import backtrader as bt
from backtrader.feeds import PandasData
import pyfolio as pf
sns.set_style('dark')
pd.set_option('display.float_format', lambda x: f'{x:,.2f}')
idx = pd.IndexSlice
STORE = 'backtest.h5'
def format_time(t):
m_, s = divmod(t, 60)
h, m = divmod(m_, 60)
return f'{h:>02.0f}:{m:>02.0f}:{s:>02.0f}'
ペアトレーディングのバックテスト
ペアのデータクラス
@dataclass
class Pair:
period: int
s1: str
s2: str
size1: float
size2: float
long: bool
hr: float
p1: float
p2: float
pos1: float
pos2: float
exec1: bool = False
exec2: bool = False
active: bool = False
entry_date: date = None
exit_date: date = None
entry_spread: float = np.nan
exit_spread: float = np.nan
def executed(self):
return self.exec1 and self.exec2
def get_constituent(self, name):
if name == self.s1:
return 1
elif name == self.s2:
return 2
else:
return 0
def compute_spread(self, p1, p2):
return p1 * self.size1 + p2 * self.size2
def compute_spread_return(self, p1, p2):
current_spread = self.compute_spread(p1, p2)
delta = self.entry_spread - current_spread
return (delta / (np.sign(self.entry_spread) *
self.entry_spread))
Pandasデータの定義
class CustomData(PandasData):
"""
Define pandas DataFrame structure
"""
cols = ['open', 'high', 'low', 'close', 'volume']
# create lines
lines = tuple(cols)
# define parameters
params = {c: -1 for c in cols}
params.update({'datetime': None})
params = tuple(params.items())
戦略の定義
class StatisticalArbitrageCointegration(bt.Strategy):
params = (('trades', None),
('risk_limit', -.2),
('verbose', True),
('log_file', 'backtest.csv'))
def __init__(self):
self.active_pairs = {}
self.closing_pairs = {}
self.exposure = []
self.metrics = []
self.last_close = {}
self.cnt = 0
self.today = None
self.clear_log()
self.order_status = dict(enumerate(['Created', 'Submitted', 'Accepted',
'Partial', 'Completed', 'Canceled',
'Expired', 'Margin', 'Rejected']))
def clear_log(self):
if Path(self.p.log_file).exists():
Path(self.p.log_file).unlink()
with Path(self.p.log_file).open('a') as f:
log_writer = csv.writer(f)
log_writer.writerow(
['Date', 'Pair', 'Symbol', 'Order #', 'Reason',
'Status', 'Long', 'Price', 'Size', 'Position'])
def log(self, txt, dt=None):
""" Logger for the strategy"""
dt = dt or self.datas[0].datetime.datetime(0)
with Path(self.p.log_file).open('a') as f:
log_writer = csv.writer(f)
log_writer.writerow([dt.date()] + txt.split(','))
def get_pair_id(self, s1, s2, period):
return f'{s1}.{s2}.{period}'
def check_risk_limit(self):
for pair_id, pair in list(self.active_pairs.items()):
if pair.active:
p1 = self.last_close.get(pair.s1)
p2 = self.last_close.get(pair.s2)
ret = pair.compute_spread_return(p1, p2)
if ret < self.p.risk_limit:
self.log(f'{pair_id},{pair.s1},{pair.s2},Risk Limit,{ret},')
del self.active_pairs[pair_id]
self.sell_pair(pair_id, pair)
def sell_pair(self, pair_id, pair, reason='close'):
info = {'pair': pair_id, 'type': reason}
if pair.long:
o1 = self.sell(data=pair.s1, size=abs(pair.size1), info=info)
o2 = self.buy(data=pair.s2, size=abs(pair.size2), info=info)
else:
o1 = self.buy(data=pair.s1, size=abs(pair.size1), info=info)
o2 = self.sell(data=pair.s2, size=abs(pair.size2), info=info)
pair.active = False
pair.exec1 = pair.exec2 = False
self.closing_pairs[pair_id] = pair
self.log(f'{pair_id},{pair.s1},{o1.ref},{reason},Created,{pair.long},,{pair.size1},')
self.log(f'{pair_id},{pair.s2},{o2.ref},{reason},Created,{pair.long},,{pair.size2},')
def notify_order(self, order):
symbol = order.data._name
if order.status in [order.Submitted, order.Accepted]:
return
if order.status in [order.Completed]:
p = order.executed.price
s = order.executed.size
order_type = order.info.info['type']
if order_type in ['open', 'close']:
pair_id = order.info.info['pair']
if order_type == 'open':
pair = self.active_pairs.get(pair_id)
else:
pair = self.closing_pairs.get(pair_id)
if pair is None:
self.log(f'{pair_id},{symbol},{order.ref},{order_type},Completed (missing),,{p},{s},{p * s}')
return
component = pair.get_constituent(symbol)
if component == 1:
pair.p1 = p
pair.exec1 = True
elif component == 2:
pair.p2 = p
pair.exec2 = True
if pair.executed():
pair.exec1 = False
pair.exec2 = False
if order_type == 'open':
pair.entry_spread = pair.compute_spread(p1=pair.p1, p2=pair.p2)
pair.entry_date = self.today
pair.active = True
elif order_type == 'close':
pair.exit_spread = pair.compute_spread(p1=pair.p1, p2=pair.p2)
pair.exit_date = self.today
pair.active = False
self.closing_pairs.pop(pair_id)
self.log(f'{pair_id},{symbol},{order.ref},{order_type},Completed,{pair.long},{p},{s},{p * s}')
else:
self.log(f',{symbol},{order.ref},{order_type},Completed,,{p},{s},{p * s}')
elif order.status in [order.Canceled, order.Margin, order.Rejected]:
order_type = order.info.info['type']
self.log(f',{symbol},{order.ref},{order_type},{self.order_status[order.status]},,,,')
def enter_pairs(self, df, long=True):
for s1, s2, hr, period in zip(df.s1, df.s2, df.hedge_ratio, df.period):
pair_id = self.get_pair_id(s1, s2, period)
if self.active_pairs.get(pair_id):
continue
p1 = self.last_close[s1]
p2 = self.last_close[s2]
if long:
size1 = self.target_value / p1
size2 = hr * size1
else:
size2 = self.target_value / p2
size1 = 1 / hr * size2
pair = Pair(s1=s1, s2=s2, period=period, size1=size1, size2=size2,
pos1=p1 * size1, pos2=p2 * size2,
hr=hr, long=long, p1=p1, p2=p2, entry_date=self.today)
info = {'pair': pair_id, 'type': 'open'}
if long:
o1 = self.buy(data=s1, size=size1, info=info)
o2 = self.sell(data=s2, size=abs(size2), info=info)
else:
o1 = self.sell(data=pair.s1, size=abs(pair.size1), info=info)
o2 = self.buy(data=pair.s2, size=abs(pair.size2), info=info)
self.active_pairs[pair_id] = pair
self.log(f'{pair_id},{s1},{o1.ref},Open,Created,{long},{p1},{size1},{pair.pos1}')
self.log(f'{pair_id},{s2},{o2.ref},Open,Created,{long},{p2},{size2},{pair.pos2}')
def adjust_pairs(self):
orders = defaultdict(float)
pairs = defaultdict(list)
for pair_id, pair in self.active_pairs.items():
p1, p2 = self.last_close[pair.s1], self.last_close[pair.s2]
pos1, pos2 = pair.size1 * p1, pair.size2 * p2
if pair.long:
target_size1 = self.target_value / p1
orders[pair.s1] += target_size1 - pair.size1
target_size2 = pos2 / pos1 * self.target_value / p2
orders[pair.s2] += target_size2 - pair.size2
else:
target_size2 = self.target_value / p2
orders[pair.s2] += target_size2 - pair.size2
target_size1 = pos1 / pos2 * self.target_value / p1
orders[pair.s1] += target_size1 - pair.size1
pair.size1 = target_size1
pair.size2 = target_size2
pairs[pair.s1].append(pair_id)
pairs[pair.s2].append(pair_id)
for symbol, size in orders.items():
info = {'pairs': pairs[symbol], 'type': 'adjust'}
if size > 0:
order = self.buy(symbol, size=size, info=info)
elif size < 0:
order = self.sell(symbol, size=abs(size), info=info)
else:
continue
self.log(f',{symbol},{order.ref},Adjust,Created,{size}')
def prenext(self):
self.next()
def next(self):
self.today = self.datas[0].datetime.date()
if self.today not in self.p.trades.index:
return
self.cnt += 1
pf = self.broker.get_value()
cash = self.broker.get_cash()
exp = {d._name: pos.size for d, pos in self.getpositions().items() if pos}
self.last_close = {d._name: d.close[0] for d in self.datas}
exposure = pd.DataFrame({'price' : pd.Series(self.last_close),
'position': pd.Series(exp)}).replace(0, np.nan).dropna()
exposure['value'] = exposure.price * exposure.position
positions = exposure.value.to_dict()
positions['date'] = self.today
positions['cash'] = cash
if not exposure.empty:
self.exposure.append(positions)
long_pos = exposure[exposure.value > 0].value.sum()
short_pos = exposure[exposure.value < 0].value.sum()
for symbol, row in exposure.iterrows():
self.log(f',{symbol},,Positions,Log,,{row.price},{row.position},{row.value}')
else:
long_pos = short_pos = 0
trades = self.p.trades.loc[self.today]
if isinstance(trades, pd.Series):
trades = trades.to_frame().T
close = trades[trades.side == 0].sort_values('period')
for s1, s2, period in zip(close.s1, close.s2, close.period):
pair_id = self.get_pair_id(s1, s2, period)
pair = self.active_pairs.pop(pair_id, None)
if pair is None:
self.log(f'{pair_id},,,Close Attempt,Failed,,,,')
continue
self.sell_pair(pair_id, pair)
if len(self.active_pairs) > 0:
self.check_risk_limit()
long = trades[trades.side == 1]
short = trades[trades.side == -1]
if long.empty and short.empty: return
target = 1 / (len(long) + len(short) + len(self.active_pairs))
self.target_value = pf * target
metrics = [self.today, pf, pf - cash, cash, len(exposure), len(self.active_pairs), long_pos, short_pos,
target, self.target_value, len(long), len(short), len(close)]
self.metrics.append(metrics)
if self.cnt % 21 == 0:
holdings = pf - cash
msg = f'PF: {pf:11,.0f} | Net: {holdings: 11,.0f} | # Pos: {len(exposure):3,.0f} | # Pairs: {len(self.active_pairs):3,.0f} | '
msg += f'Long: {long_pos: 10,.0f} | Short: {short_pos: 10,.0f}'
print(self.today, msg)
self.adjust_pairs()
if not long.empty:
self.enter_pairs(long, long=True)
if not short.empty:
self.enter_pairs(short, long=False)
取引履歴を読み込む
trades = pd.read_hdf(STORE, 'pair_trades').sort_index()
trades.info()
'''
<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 135490 entries, 2017-01-03 to 2019-12-18
Data columns (total 6 columns):
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 s1 135490 non-null object
1 s2 135490 non-null object
2 hedge_ratio 135490 non-null float64
3 period 135490 non-null int64
4 pair 135490 non-null int64
5 side 135490 non-null int64
dtypes: float64(1), int64(3), object(2)
memory usage: 7.2+ MB
'''
trade_dates = np.unique(trades.index)
start = trade_dates.min()
end = trade_dates.max()
traded_symbols = trades.s1.append(trades.s2).unique()
価格を読み込む
prices = (pd.read_hdf(STORE, 'prices')
.sort_index()
.loc[idx[traded_symbols, str(start):str(end)], :])
Cerebroの設定
cerebro = bt.Cerebro()
cash = 1000000
cerebro.broker.setcash(cash)
データの追加
for symbol in traded_symbols:
df = prices.loc[idx[symbol, :], :].droplevel('ticker', axis=0)
df.index.name = 'datetime'
bt_data = CustomData(dataname=df)
cerebro.adddata(bt_data, name=symbol)
戦略とアナライザーの追加
cerebro.addstrategy(StatisticalArbitrageCointegration,
trades=trades, verbose=True,
log_file='bt_log.csv')
cerebro.addanalyzer(bt.analyzers.PyFolio, _name='pyfolio')
戦略の実行
start = time()
results = cerebro.run()
ending_value = cerebro.broker.getvalue()
duration = time() - start
print(f'Final Portfolio Value: {ending_value:,.2f} | Duration: {format_time(duration)}')
Pyfolioのインプットを計算
pyfolio_analyzer = results[0].analyzers.getbyname('pyfolio')
returns, positions, transactions, gross_lev = pyfolio_analyzer.get_pf_items()
returns.to_hdf(STORE, 'returns')
positions.to_hdf(STORE, 'positions')
transactions.to_hdf(STORE, 'transactions/')
gross_lev.to_hdf(STORE, 'gross_lev')
ポジションの取得
traded_pairs = pd.DataFrame(results[0].exposure)
traded_pairs.date = pd.to_datetime(traded_pairs.date)
traded_pairs = traded_pairs.set_index('date').tz_localize('UTC')
traded_pairs.to_hdf(STORE, 'traded_pairs')
traded_pairs.info()
評価基準の取得
metrics = pd.DataFrame(results[0].metrics,
columns=['date', 'pf', 'net_holdings', 'cash',
'npositions', 'npairs', 'nlong_pos', 'nshort_pos',
'target', 'target_val', 'nlong_trades',
'nshort_trades', 'nclose_trades'])
metrics.to_hdf(STORE, 'metrics')
Pyfolioで分析
returns = pd.read_hdf(STORE, 'returns')
transactions = pd.read_hdf(STORE, 'transactions/')
gross_lev = pd.read_hdf(STORE, 'gross_lev')
metrics = pd.read_hdf(STORE, 'metrics').set_index('date')
metrics.info()
'''
<class 'pandas.core.frame.DataFrame'>
Index: 676 entries, 2017-01-03 to 2019-09-30
Data columns (total 12 columns):
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 pf 676 non-null float64
1 net_holdings 676 non-null float64
2 cash 676 non-null float64
3 npositions 676 non-null int64
4 npairs 676 non-null int64
5 nlong_pos 676 non-null float64
6 nshort_pos 676 non-null float64
7 target 676 non-null float64
8 target_val 676 non-null float64
9 nlong_trades 676 non-null int64
10 nshort_trades 676 non-null int64
11 nclose_trades 676 non-null int64
dtypes: float64(7), int64(5)
memory usage: 68.7+ KB
'''
metrics[['nlong_pos', 'nshort_pos']].plot(figsize=(12, 4));
ベンチマーク取得
S&P500をベンチマークとします
start = str(returns.index.min().year)
end = str(returns.index.max().year + 1)
benchmark = web.DataReader('SP500', 'fred',
start=start,
end=end).squeeze()
benchmark = benchmark.pct_change().tz_localize('UTC')
fig, axes = plt.subplots(ncols=3, figsize=(20,5))
pf.plotting.plot_rolling_returns( returns, factor_returns=benchmark, ax=axes[0])
axes[0].set_title('Cumulative Returns')
pf.plotting.plot_rolling_sharpe(returns, ax=axes[1])
pf.plotting.plot_rolling_beta(returns, benchmark, ax=axes[2])
sns.despine()
fig.tight_layout();
tearsheet
pf.create_full_tear_sheet(returns,
positions=positions,
benchmark_rets=benchmark.dropna(),
estimate_intraday=False)
この記事が気に入ったらサポートをしてみませんか?