Pythonを使ってテクニカル分析用の関数作成に挑戦してみた
自作関数
仮想通貨の自動売買botを作るために、テクニカル分析を自作関数に組み込む必要があった。
そこで、マーケットがニュースなどに影響されないサイクルを4分間と判断して、自作関数を作ってみた。
20秒を一単位として、12分間仮想通貨の取引情報であるティッカーを分析し、4分単位で相場の上下を判定している。
テクニカル分析を関数にする
以前、自作関数用に作ったコードは、次の通り。このコードにテクニカル分析の要素を加えていく。
# -*- coding: utf-8 -*-
"""
The god who knows people is beyond the horizon.
"""
import requests
class furture(object):
def core(core_data, core_sell_buy):
"""
データの加工
"""
Check_symbol = ([row[1] for row in core_data])
Check_ltp = ([row[3] for row in core_data])
Check_ask = ([row[4] for row in core_data])
Check_bid = ([row[5] for row in core_data])
Check_ask_size = ([row[6] for row in core_data])
Check_bid_size = ([row[7] for row in core_data])
Check_ask_depth = ([row[8] for row in core_data])
Check_bid_depth = ([row[9] for row in core_data])
Check_volue = ([row[10] for row in core_data])
Check_volue_product = ([row[11] for row in core_data])
print(Check_symbol[0])
print("ltp " + str(Check_ltp[0]))
print("ask " + str(Check_ask[0]))
print("bid " + str(Check_bid[0]))
print("volue " + str(Check_volue[0]))
"""
データ解析
"""
Check_result = furture().decision(Check_ltp, Check_ask, Check_bid,Check_ask_size, Check_bid_size, Check_ask_depth, Check_bid_depth, Check_volue, Check_volue_product)
"""
売買判定
"""
core_sell_buy = Check_result
return
def decision(self, ltp, ask, bid, ask_size, bid_size, ask_depth ,bid_depth, volume, volume_product):
return 'Buy'
まず、テクニカル分析の計算方法をおさらいしよう。RSIの計算方法はこちら。
【ワイルダー氏の計算法】
(1)最初の平均上昇幅=(14日間の上昇幅の合計)÷14
平均上昇幅=(前日までの平均上昇幅x13+直近の上昇幅)÷14
(2)最初の平均下落幅=(14日間の下落幅の合計)÷14
平均下落幅=(前日までの平均下落幅x13+直近の下落幅)÷14
続いて、ストキャスティクスの計算方法はこちら。
ストキャスティクスの計算式は、以下の通りです。
「%K」=(当日終値-過去n日間の最安値)÷(過去n日間の最高値-過去n日間の最安値)×100
「%D」=(当日終値-過去n日間の最安値)のm日間の合計÷(過去n日間の最高値-過去n日間の最安値)のm日間の合計×100
「%K」は一定期間の値幅を100として現在どの水準にいるかという数値であり、一定期間の最高値と最安値、現在の価格を用いて算出します。「%D」は「%K」を移動平均化したものです。
最後にボリンジャーバンドの計算方法はこちら。ボリンジャーバンドは、グラフ上に標準偏差を加減したものだけど、ボクは売買判定をする指標という意味で使っているから用語を正確に使っていないので注意。
上部バンド:単純移動平均線+2標準偏差(σ)
単純移動平均線(MA):過去N日間の移動平均線(通常20日間移動平均線)
下部バンド:単純移動平均線-2標準偏差(σ)
標準偏差(σシグマ:Standard deviation):ボラティリティー(volatility予想変動率)
テクニカル分析の真実
テクニカル分析をPythonで作っていると、ボクは気がついた。これらの指標は、少なくとも4分間という短期でのチャート分析では全く役に立たない。まず、RSIの計算のコードはこちら。一行目で最終取引価格から過去のデータの差を計算方法。二行目と三行目でプラスとマイナスの群に仕分けして、最後でRSIの計算をしている。
RSI_Data1 = list(map(lambda x:x - ltp[0],ltp))
RSI_Data2 = sum(list(filter(lambda x:x > 0,RSI_Data1)))
RSI_Data3 = sum(list(filter(lambda x:x < 0,RSI_Data1)))
RSI = round(RSI_Data3 / (RSI_Data3 - RSI_Data2),3)
return RSI
次にストキャスティクス。最初の行で最小値、2番目の行で最大値を抽出して、3行目で平均を計測している。
STO_DATA1 = ltp[0] - min(ltp)
STO_DATA2 = max(ltp) - min(ltp)
STO = round(STO_DATA1 / STO_DATA2,3)
return STO
最後は、ボリンジャーバンド。まず、単純移動平均を一行目で計算して、2行目で標準偏差を計算。最後の行で、シグマ1で予想最低取引価格を算出した。
EMA = round(statistics.mean(ltp),3)
PST = round(statistics.pstdev(ltp),3)
ASK = EMA - PST
return ASK
でも、テクニカル分析は、マーケットが想定していない、仮想通貨には顕著にみられる乱高下を繰り返す相場に対応ができないからだ。そこで、ボクは、テクニカル分析の計算方法を参考にしつつ、関数を書き直すことにした。まず、RSIは上昇中と下降中かを確認するために使う。
def RSI(self,ltp):
"""
RSI ※集計が降順のため、本来の計算と逆にしてる
"""
RSI_Data1 = list(map(lambda x:x - ltp[0],ltp))
RSI_Data2 = sum(list(filter(lambda x:x < 0,RSI_Data1)))
RSI_Data3 = sum(list(filter(lambda x:x > 0,RSI_Data1)))
if RSI_Data3 - RSI_Data2 > 0:
RSI = round(1 - (RSI_Data2 / (RSI_Data2 - RSI_Data3)),4)
return RSI
elif RSI_Data3 + RSI_Data2 > 0:
RSI = round(RSI_Data3 / (RSI_Data3 - RSI_Data2),4)
return RSI
else:
return 0
ストキャスティクスの関数で利益が出る取引範囲を調べることだけにした。つまり、bitとaskの差が取引所のスプレッド程度以上の値幅がないと、売買をしない。
def STO(self,ltp):
"""
1時間と12分の値幅が0.1%以上になるか計算
"""
STO_DATA1 = 1 - (min(ltp) / ltp[0])
STO_DATA2 = (max(ltp) / ltp[0]) - 1
if STO_DATA1 > STO_DATA2:
return round(STO_DATA1,4)
else:
return round(STO_DATA2,4)
ボリンジャーバンドは、買いと売りのタイミングを判定させることにした。統計上、無駄のない範囲が自由度に黄金比率を乗じたものと考えた。
def PST(self,ltp):
"""
標準偏差の自由の度を黄金比で算出
1時間と12分の取引から売買確率を判定
"""
EMA = round(statistics.mean(ltp),1)
PST = round(statistics.pstdev(ltp),1)
BIT = round(EMA + (1.61803397 * PST) - ltp[0],1)
ASK = round(EMA - (1.61803397 * PST) - ltp[0],1)
if BIT + ASK == 0 or BIT - ASK == 0:
return (0.5, 0.5)
elif BIT > 0 and ASK < 0:
SELL = round(BIT / (BIT - ASK),4)
BUY = ASK / (BIT - ASK)
return (SELL, round(1 - SELL,4))
else:
SELL = round(BIT / (BIT + ASK),4)
BUY = round(ASK / (BIT + ASK),4)
return (SELL, BUY)
売買は、標準偏差の自由度を黄金比にした。この範囲だと約80%程度の確率で売買の判定ができる。これらの関数から計算された数字を判定の材料にして、decisionという自作関数を新たに作り、売りの場合はSell、買いの場合はBuy、他はWaitという文字列を返すようにした。
def decision(RSI1, RSI2, RSI3, STO1, STO2, PST1_ask, PTS1_bid, PTS2_ask, PTS2_bid):
"""
売買判定
"""
if RSI1 == 1 or RSI2 == 1 or RSI3 == 1 or RSI1 == 0 or RSI2 == 0 or RSI3 == 0:
"""
まだ上昇か下降しているため除外
"""
print('RSI')
return 'Wait'
elif STO1 < 0.0005 or STO2 < 0.0005:
"""
利益が見込めないから除外
"""
print('STO')
return 'Wait'
else:
"""
確率低いものは除外
"""
if PTS2_ask > PTS2_bid and PST1_ask < PTS1_bid:
return 'Sell_Check'
elif PTS2_bid > PTS2_ask and PTS1_bid < PST1_ask:
return 'Buy_Check'
else:
print('PTS')
return 'Wait'
この売買判定を使って何回も取引をしていけば、80%の確率で売買のタイミングで判定できると想定して、徐々に利益が出ると考えた。そして、完成した自作関数がこれだ。
# -*- coding: utf-8 -*-
"""
The god who knows people is beyond the horizon.
"""
import requests
import statistics
import math
class furture(object):
def core(core_data):
"""
データの加工
"""
Check_symbol = ([row[1] for row in core_data])
Check_ltp = ([row[3] for row in core_data])
Check_ask = ([row[4] for row in core_data])
Check_bid = ([row[5] for row in core_data])
Check_volue_product = ([row[11] for row in core_data])
print('-----' + Check_symbol[0] + '-----')
print("価格" + str(Check_ltp[0]) + ",買値" + str(Check_ask[0]) + ",売値" + str(Check_bid[0]))
"""
データ解析
"""
if len(Check_symbol) < 36:
print('サンプル不足')
return 'Wait'
else:
Check_ltp_Short = Check_ltp[:36]
Check_ask_Short = Check_ask[:36]
Check_bid_Short = Check_bid[:36]
Check_ltp_Fast = Check_ltp[:12]
Data_RSI1 = furture().RSI(Check_ltp_Short)
Data_RSI2 = furture().RSI(Check_ask_Short)
Data_RSI3 = furture().RSI(Check_bid_Short)
Data_STO1 = furture().STO(Check_ltp_Short)
Data_STO2 = furture().STO(Check_ltp_Fast)
Data_PST1 = []
Data_PST1 = furture().PST(Check_ltp_Short)
Data_PST2 = []
Data_PST2 = furture().PST(Check_ltp_Fast)
print('RSI_ltp ' + str(Data_RSI1) + ' ,RSI_ask ' + str(Data_RSI2) + ' ,RSI_bid ' + str(Data_RSI3))
print('STO36 ' + str(Data_STO1) + ' ,STO12 ' + str(Data_STO2))
print('PST36 Sell ' + str(Data_PST1[0]) + ', Buy ' + str(Data_PST1[1]))
print('PST12 Sell ' + str(Data_PST2[0]) + ', Buy ' + str(Data_PST2[1]))
RESULT = furture.decision(Data_RSI1,Data_RSI2,Data_RSI3,Data_STO1,Data_STO2,Data_PST1[0],Data_PST1[1],Data_PST2[0],Data_PST2[1])
return RESULT
def decision(RSI1, RSI2, RSI3, STO1, STO2, PST1_ask, PTS1_bid, PTS2_ask, PTS2_bid):
"""
売買判定
"""
if RSI1 == 1 or RSI2 == 1 or RSI3 == 1 or RSI1 == 0 or RSI2 == 0 or RSI3 == 0:
"""
まだ上昇か下降しているため除外
"""
print('RSI')
return 'Wait'
elif STO1 < 0.0005 or STO2 < 0.0005:
"""
利益が見込めないから除外
"""
print('STO')
return 'Wait'
else:
"""
確率低いものは除外
"""
if PTS2_ask > PTS2_bid and PST1_ask < PTS1_bid:
return 'Sell_Check'
elif PTS2_bid > PTS2_ask and PTS1_bid < PST1_ask:
return 'Buy_Check'
else:
print('PTS')
return 'Wait'
def RSI(self,ltp):
"""
RSI ※集計が降順のため、本来の計算と逆にしてる
"""
RSI_Data1 = list(map(lambda x:x - ltp[0],ltp))
RSI_Data2 = sum(list(filter(lambda x:x < 0,RSI_Data1)))
RSI_Data3 = sum(list(filter(lambda x:x > 0,RSI_Data1)))
if RSI_Data3 - RSI_Data2 > 0:
RSI = round(1 - (RSI_Data2 / (RSI_Data2 - RSI_Data3)),4)
return RSI
elif RSI_Data3 + RSI_Data2 > 0:
RSI = round(RSI_Data3 / (RSI_Data3 - RSI_Data2),4)
return RSI
else:
return 0
def STO(self,ltp):
"""
1時間と12分の値幅が0.1%以上になるか計算
"""
STO_DATA1 = 1 - (min(ltp) / ltp[0])
STO_DATA2 = (max(ltp) / ltp[0]) - 1
if STO_DATA1 > STO_DATA2:
return round(STO_DATA1,4)
else:
return round(STO_DATA2,4)
def PST(self,ltp):
"""
標準偏差の自由の度を黄金比で算出
1時間と12分の取引から売買確率を判定
"""
EMA = round(statistics.mean(ltp),1)
PST = round(statistics.pstdev(ltp),1)
BIT = round(EMA + (1.61803397 * PST) - ltp[0],1)
ASK = round(EMA - (1.61803397 * PST) - ltp[0],1)
if BIT + ASK == 0 or BIT - ASK == 0:
return (0.5, 0.5)
elif BIT > 0 and ASK < 0:
SELL = round(BIT / (BIT - ASK),4)
BUY = ASK / (BIT - ASK)
return (SELL, round(1 - SELL,4))
else:
SELL = round(BIT / (BIT + ASK),4)
BUY = round(ASK / (BIT + ASK),4)
return (SELL, BUY)
この自作関数、何度もテストしてみたけど、まだ精度としては全然足りない。まだまだ修正の余地はあるけど、とりあえず自動売買botを完成させよう。
目次
次の記事
前の記事