Pythonとpybitflyerで自動売買botを作ることに挑戦してみた
暫定的完成
Pythonとpybitflyerを使ったbitflyer社の自動売買botを実行するプログラムを作った。
でも、まだ売買テストをしていない。だから、きちんと動作するか保障しない。
動作を確認するには、口座を作って取引するしかないからだ。今のボクには小遣いがないから、動作確認ができない。
だから、実際に資産運用してみて利益が出るようなら修正したコードをアップしようと思う。
今回記事に書くのは、自動売買botを動かすための二つのpyファイルだ。
Cryptcurrency.py
これがメインのCryptcurrency.pyだ。2つのシンボルコードを入力して、BALANCESというリストから[]の中にシンボルに対応する数字を入れることで残高を取得できる。対応する数字の表は、過去の記事に記載している。ちなみに、API.iniというファイルを別に作ってあげないと動作しない。
# -*- coding: utf-8 -*- #
"""
We crush capitalism.
"""
import pybitflyer
import csv
import time
import sqlite3
import furture
import configparser
"""
初期設定
"""
Distance = furture.furture
"""
iniファイルの読み込み
"""
config = configparser.ConfigParser()
config.read('API.ini')
API_KEY = config['API']['api_key']
API_SECRET = config['API']['api_secret']
MIN_JPN = config.getfloat('Investment', 'minimum_jpn')
MAX_JPN = config.getfloat('Investment', 'maximum_jpn')
"""
価格の設定
"""
Ticker_symbol = ["ETH", "XRP"]
AMOUNT = [0, 0, 0]
RATE = [1, 0, 0]
Max_SIZE = [0, 0, 0]
BALANCES = pybitflyer.API(api_key=API_KEY, api_secret=API_SECRET).getbalance()
AMOUNT[0] = (BALANCES[0])["amount"]
AMOUNT[1] = (BALANCES[3])["amount"]
AMOUNT[2] = (BALANCES[8])["amount"]
RATE[1] = pybitflyer.API().ticker(product_code='ETH_JPY')["ltp"]
RATE[2] = pybitflyer.API().ticker(product_code='XRP_JPY')["ltp"]
Max_SIZE[0] = AMOUNT[0]
if RATE[1] == 0 and RATE[2] == 0:
Max_SIZE[1] = (AMOUNT[0] - MIN_JPN)/ 2
Max_SIZE[2] = (AMOUNT[0] - MIN_JPN)/ 2
elif RATE[1] == 0 and RATE[2] > 0:
Max_SIZE[1] = (AMOUNT[0] - MIN_JPN)
elif RATE[2] == 0 and RATE[1] > 0:
Max_SIZE[2] = (AMOUNT[0] - MAX_JPN)
"""
データベース設定
"""
Data_Base = sqlite3.connect("furture.db")
Data_cur = Data_Base.cursor()
Data_cur.execute('CREATE TABLE IF NOT EXISTS furture(ID INTEGER PRIMARY KEY, symbol text, timestamp datatime, lpt real, ask real, bid real, ask_size real, bid_size real, ask_depth real, bid_depth real, volume_product real, volume real)')
Data_Base.commit()
Data_Base.close()
print('==========初期設定終了==========')
Start_time = time.perf_counter()
while True:
"""
初期化
"""
print(Start_time)
print(time.perf_counter())
Count_Int = 0
Sell_Buy = 'Wait'
"""
残高取得
"""
BALANCES = pybitflyer.API(api_key=API_KEY, api_secret=API_SECRET).getbalance()
AMOUNT[0] = (BALANCES[0])["amount"]
AMOUNT[1] = (BALANCES[3])["amount"]
AMOUNT[2] = (BALANCES[8])["amount"]
if AMOUNT[1] < 0 and AMOUNT[2] < 0:
Max_SIZE[0] = (AMOUNT[0] - MIN_JPN) /2
else:
Max_SIZE[0] = AMOUNT[0] - MAX_JPN
print('JPN' + str(AMOUNT[0]) + Ticker_symbol[0] + str(AMOUNT[1]) + Ticker_symbol[1] + str(AMOUNT[2]))
"""
ティッカー操作処理
"""
for SYMBOL in Ticker_symbol:
Count_Int += 1
"""
ティッカーからデータ取得
"""
CODE = SYMBOL + "_JPY"
tick = pybitflyer.API().ticker(product_code=CODE)
timestamp = tick["timestamp"]
ltp = tick["ltp"]
ask = tick["best_ask"]
bid = tick["best_bid"]
ask_size = tick["best_ask_size"]
bid_size = tick["best_bid_size"]
ask_depth = tick["total_ask_depth"]
bid_depth = tick["total_ask_depth"]
volume_product = tick["volume_by_product"]
volume = tick["volume"]
"""
データベースへ格納
"""
Data_Base = sqlite3.connect("furture.db")
Data_cur = Data_Base.cursor()
Data_cur.execute('INSERT INTO furture(symbol, timestamp, lpt, ask, bid, ask_size, bid_size, ask_depth, bid_depth, volume_product, volume) values (?,?,?,?,?,?,?,?,?,?,?)',[SYMBOL, timestamp, ltp, ask, bid, ask_size, bid_size, ask_depth, bid_depth, volume_product, volume])
Data_Base.commit()
"""
データを抽出してリストに入れる
"""
Data_Sort = 'select * from furture where symbol = ? ORDER BY ID DESC'
Data_cur.execute(Data_Sort, (SYMBOL,))
NewData = Data_cur.fetchmany(360)
Data_Base.close()
"""
売買判定
売り命令 SELL、資金あり、前回購入価格より高い
買い命令
"""
Sell_Buy = Distance.core(NewData)
if Sell_Buy is 'Sell' and AMOUNT[Count_Int] > MIN_JPN and Max_SIZE[Count_Int] < bid * AMOUNT[Count_Int]:
SIZE = AMOUNT[Count_Int]
pass
elif Sell_Buy is 'Buy' and AMOUNT[Count_Int] > 0 and RATE[Count_Int] < bid:
Sell_Buy = 'Wait'
RATE[Count_Int] = ask
Max_SIZE[Count_Int] = ask * AMOUNT[Count_Int]
print('下落傾向損切')
pass
elif Sell_Buy is 'Buy' and sum(Max_SIZE) / 2 < AMOUNT[0] and RATE[Count_Int] > ask:
if Ticker_symbol is 'BTC' or Ticker_symbol is 'ETH':
SIZE = float(int((Max_SIZE[0]* 100) / ask) * 0.01)
else:
SIZE = float(int((Max_SIZE[0]* 10) / ask) * 0.1)
pass
elif Sell_Buy is 'Sell_Check' or Sell_Buy is 'Buy_Check':
PRICE = ltp
SIZE = 0
pass
else:
Sell_Buy = 'Wait'
SIZE = 0
print('最大購入単位' + str(SIZE))
"""
売買判定2
if sum(AMOUNT) == 0:
Sell_Buy = 'Wait'
print('資金なし')
pass
"""
"""
売買の数値を引数に代入
売りの場合 bid×所持仮想通貨
買いの場合 所持金合計-最小取引額 ÷4
"""
TYPE = 'LIMIT'
EXPIRE = 1
FORCE = 'FOK'
if Sell_Buy is 'Sell':
PRICE = bid
RATE[Count_Int] = bid
Max_SIZE[Count_Int] = AMOUNT[Count_Int] * RATE[Count_Int]
pass
elif Sell_Buy is 'Buy':
PRICE = ask
RATE[Count_Int] = ask
Max_SIZE[Count_Int] = (sum(Max_SIZE) - MIN_JPN) / 4
pass
"""
売買処理
"""
print(SYMBOL + ' '+ Sell_Buy)
if Sell_Buy is 'Wait':
print('売買待機')
pass
elif Sell_Buy is 'Sell_Check' or Sell_Buy is 'Buy_Check':
print('テスト')
with open('Sell_Buy.csv','a',newline='') as files:
writer = csv.writer(files)
writer.writerow([CODE,str(timestamp),TYPE,Sell_Buy,str(SIZE),str(PRICE),str(EXPIRE),EXPIRE])
pass
else:
pybitflyer.API(api_key=API_KEY, api_secret=API_SECRET).sendchildorder(product_code=CODE,child_order_type=TYPE,side=Sell_Buy,size=SIZE,price=PRICE,minute_to_expire=EXPIRE, time_in_force=FORCE)
with open('Sell_Buy.csv','a',newline='') as files:
writer = csv.writer(files)
writer.writerow([CODE,str(timestamp),TYPE,Sell_Buy,str(SIZE),str(PRICE),str(EXPIRE),EXPIRE])
"""
CSVへ書込み
"""
with open('furture.csv','a',newline='') as files:
writer = csv.writer(files)
writer.writerow([SYMBOL,str(timestamp),str(ltp),str(ask),str(bid),str(ask_size),str(bid_size),str(ask_depth),str(bid_depth),str(volume_product),str(volume)])
"""
20秒タイマー
"""
print('==========20秒待機=========')
time.sleep((20 + Start_time ) - time.perf_counter())
Start_time = Start_time + 20
furture.py
二つ目のファイルfurture.pyは、売買判定をするための自作関数だ。ボクとしてはまだまだ修正の余地があるし、メインのコードも急拵えなどで、独学だからか書き方が汚い感じがする。
# -*- coding: utf-8 -*-
"""
The god who knows people is beyond the horizon.
"""
import requests
import statistics
import math
class furture(object):
def core(core_data):
"""
データの加工
"""
Check_symbol = ([row[1] for row in core_data])
Check_ltp = ([row[3] for row in core_data])
Check_ask = ([row[4] for row in core_data])
Check_bid = ([row[5] for row in core_data])
Check_volue_product = ([row[11] for row in core_data])
print('-----' + Check_symbol[0] + '-----')
print("価格" + str(Check_ltp[0]) + ",買値" + str(Check_ask[0]) + ",売値" + str(Check_bid[0]))
"""
データ解析
"""
if len(Check_symbol) < 36:
print('サンプル不足')
return 'Wait'
else:
Check_ltp_Short = Check_ltp[:36]
Check_ask_Short = Check_ask[:36]
Check_bid_Short = Check_bid[:36]
Check_ltp_Fast = Check_ltp[:12]
Data_RSI1 = furture().RSI(Check_ltp_Short)
Data_RSI2 = furture().RSI(Check_ask_Short)
Data_RSI3 = furture().RSI(Check_bid_Short)
Data_STO1 = furture().STO(Check_ltp_Short)
Data_STO2 = furture().STO(Check_ltp_Fast)
Data_PST1 = []
Data_PST1 = furture().PST(Check_ltp_Short)
Data_PST2 = []
Data_PST2 = furture().PST(Check_ltp_Fast)
print('RSI_ltp ' + str(Data_RSI1) + ' ,RSI_ask ' + str(Data_RSI2) + ' ,RSI_bid ' + str(Data_RSI3))
print('STO36 ' + str(Data_STO1) + ' ,STO12 ' + str(Data_STO2))
print('PST36 Sell ' + str(Data_PST1[0]) + ', Buy ' + str(Data_PST1[1]))
print('PST12 Sell ' + str(Data_PST2[0]) + ', Buy ' + str(Data_PST2[1]))
RESULT = furture.decision(Data_RSI1,Data_RSI2,Data_RSI3,Data_STO1,Data_STO2,Data_PST1[0],Data_PST1[1],Data_PST2[0],Data_PST2[1])
return RESULT
def decision(RSI1, RSI2, RSI3, STO1, STO2, PST1_ask, PTS1_bid, PTS2_ask, PTS2_bid):
"""
売買判定
"""
if RSI1 == 1 or RSI2 == 1 or RSI3 == 1 or RSI1 == 0 or RSI2 == 0 or RSI3 == 0:
"""
まだ上昇か下降しているため除外
"""
print('RSI')
return 'Wait'
elif STO1 < 0.0005 or STO2 < 0.0005:
"""
利益が見込めないから除外
"""
print('STO')
return 'Wait'
else:
"""
確率低いものは除外
"""
if PTS2_ask > PTS2_bid and PST1_ask < PTS1_bid:
return 'Sell_Check'
elif PTS2_bid > PTS2_ask and PTS1_bid < PST1_ask:
return 'Buy_Check'
else:
print('PTS')
return 'Wait'
def RSI(self,ltp):
"""
RSI ※集計が降順のため、本来の計算と逆にしてる
"""
RSI_Data1 = list(map(lambda x:x - ltp[0],ltp))
RSI_Data2 = sum(list(filter(lambda x:x < 0,RSI_Data1)))
RSI_Data3 = sum(list(filter(lambda x:x > 0,RSI_Data1)))
if RSI_Data3 - RSI_Data2 > 0:
RSI = round(1 - (RSI_Data2 / (RSI_Data2 - RSI_Data3)),4)
return RSI
elif RSI_Data3 + RSI_Data2 > 0:
RSI = round(RSI_Data3 / (RSI_Data3 - RSI_Data2),4)
return RSI
else:
return 0
def STO(self,ltp):
"""
1時間と12分の値幅が0.1%以上になるか計算
"""
STO_DATA1 = 1 - (min(ltp) / ltp[0])
STO_DATA2 = (max(ltp) / ltp[0]) - 1
if STO_DATA1 > STO_DATA2:
return round(STO_DATA1,4)
else:
return round(STO_DATA2,4)
def PST(self,ltp):
"""
標準偏差の自由の度を黄金比で算出
1時間と12分の取引から売買確率を判定
"""
EMA = round(statistics.mean(ltp),1)
PST = round(statistics.pstdev(ltp),1)
BIT = round(EMA + (1.61803397 * PST) - ltp[0],1)
ASK = round(EMA - (1.61803397 * PST) - ltp[0],1)
if BIT + ASK == 0 or BIT - ASK == 0:
return (0.5, 0.5)
elif BIT > 0 and ASK < 0:
SELL = round(BIT / (BIT - ASK),4)
BUY = ASK / (BIT - ASK)
return (SELL, round(1 - SELL,4))
else:
SELL = round(BIT / (BIT + ASK),4)
BUY = round(ASK / (BIT + ASK),4)
return (SELL, BUY)
仮想通貨自動売買のアプリは、携帯アプリでも作れないことはないけど、高性能のPCを使わないと動作確認が難しいので、今回使ったプログラムを修正しながら地道に資金を増やそうと思う。ボクは、残り2つの記事をnoteにアップして休載する。記事を再開したなら、新しくPCが買えるほどの利益が出たと思って欲しい。
目次
前の記事