【楽天ウォレット】APIラッパー【Python】
こんにちは。ドースー(@dosu0217)です。
楽天ウォレットの証拠金取引で自動売買などをするためのAPIラッパーを公開します。
全て動作確認しているわけではないので、引数入れることでエラーになるかもしれません。なんかあったらこっそり教えてください。
楽天ウォレットのAPIドキュメント
なお、今回もこのソースコードはこちらのニッケルメッキ先生が公開しているbitFlyer APIのラッパークラスの記事をとても参考にしています。
基本的な使い方
rakutenクラスの各APIメソッドをコールして、rakuten.sendメソッドを実行してください。
構成
rakuten.py [楽天ウォレット APIのラッパークラス]
sample.py [ラッパークラスを用いたサンプルプログラム]
注意点
・動作未確認のリクエストがあります。未確認って書いてます。
・公開しているソースコードは利益などを保証する物ではありません。
・損失について製作者は一切責任を負わないものとします。
・今後実装されるAPIについて適宜サポートする訳ではありません。
**** 変更履歴 ****
2022/11/27 記事公開
********************
ソースコード
# rakuten.py
import aiohttp
import asyncio
import async_timeout
import json
from aiohttp import WSMsgType
import traceback
from datetime import datetime
import time
import hashlib
import hmac
import urllib
from requests import Request
class Rakuten():
MARKET = 7 # 銘柄
# 7:BTC_JPY
# 8:ETH_JPY
# 9:BCH_JPY
# 10:LTC_JPY
# 11:XRP_JPY
# 14:ADA_JPY
# 15:DOT_JPY
# 16:XLM_JPY
# 17:XTZ_JPY
URLS = {'REST': 'https://exchange.rakuten-wallet.co.jp'}
# 定数
TIMEOUT = 3600 # タイムアウト
# 変数
api_key = ''
api_secret = ''
session = None # セッション保持
requests = [] # リクエストパラメータ
# ------------------------------------------------ #
# init
# ------------------------------------------------ #
def __init__(self, api_key, api_secret):
# APIキー・SECRETをセット
self.api_key = api_key
self.api_secret = api_secret
# ------------------------------------------------ #
# async request for rest api
# ------------------------------------------------ #
def set_request(self, method, access_modifiers, target_path, params):
if access_modifiers == 'public':
url = ''.join([self.URLS['REST'], target_path])
if method == 'GET':
headers = ''
self.requests.append({'method': method,
'access_modifiers': access_modifiers,
'target_path': target_path, 'url': url,
'params': params, 'headers':{}})
if method == 'POST':
headers = {'Content-Type': 'application/json'}
self.requests.append({'method': method,
'access_modifiers': access_modifiers,
'target_path': target_path, 'url': url,
'params': params, 'headers':headers})
if access_modifiers == 'private':
url = ''.join([self.URLS['REST'], target_path])
path = target_path
nonce = str(int(time.time() * 1000))
if method in {'GET', 'DELETE'}:
signature_payload = self.get_payload(nonce, method, url, params)
signature = self.get_sign(signature_payload)
headers = self.set_headers_for_private(nonce=str(nonce),
sign=signature, params=params)
self.requests.append({'url': url,
'method': method,
'headers': headers,
'params': params,
})
if method in {'POST', 'PUT'}:
post_data = json.dumps(params)
signature_payload = self.get_payload(nonce, method, url, params)
signature = self.get_sign(signature_payload)
headers = self.set_headers_for_private(nonce=str(nonce),
sign=signature, params=params)
self.requests.append({'url': url,
'method': method,
'headers': headers,
'params': post_data,
})
def set_headers_for_private(self, nonce, sign, params):
headers = {'API-KEY': self.api_key,
'NONCE': nonce,
'SIGNATURE': sign,
}
if len(params) > 0:
headers['Content-Type'] = 'application/json'
return headers
def get_payload(self, nonce, method, url, params):
request = Request(method, url)
prepared = request.prepare()
if method in {"GET", "DELETE"}:
if len(params) > 0:
signature_payload = ''.join([str(nonce), prepared.path_url, '?{}'.format(urllib.parse.urlencode(params))]).encode()
else:
signature_payload = ''.join([str(nonce), prepared.path_url]).encode()
else:
signature_payload = ''.join([str(nonce), '{}'.format(json.dumps(params))]).encode()
return signature_payload
def get_sign(self, signature_payload):
signature = hmac.new(self.api_secret.encode(), signature_payload, hashlib.sha256).hexdigest()
return signature
async def fetch(self, request):
status = 0
content = []
async with async_timeout.timeout(self.TIMEOUT):
try:
if self.session is None:
self.session = await aiohttp.ClientSession().__aenter__()
if request['method'] is 'GET':
async with self.session.get(url=request['url'],
params=request['params'],
headers=request['headers']) as response:
status = response.status
content = await response.read()
if status != 200:
# エラーのログ出力など必要な場合
pass
elif request['method'] is 'POST':
async with self.session.post(url=request['url'],
data=request['params'],
headers=request['headers']) as response:
status = response.status
content = await response.read()
if status != 200:
# エラーのログ出力など必要な場合
pass
elif request['method'] is 'PUT':
async with self.session.put(url=request['url'],
data=request['params'],
headers=request['headers']) as response:
status = response.status
content = await response.read()
if status != 200:
# エラーのログ出力など必要な場合
pass
elif request['method'] is 'DELETE':
async with self.session.delete(url=request['url'],
params=request['params'],
headers=request['headers']) as response:
status = response.status
content = await response.read()
if status != 200:
# エラーのログ出力など必要な場合
pass
if len(content) == 0:
result = []
else:
try:
result = json.loads(content.decode('utf-8'))
except Exception as e:
traceback.print_exc()
return result
except Exception as e:
# セッション終了
if self.session is not None:
await self.session.__aexit__(None, None, None)
await asyncio.sleep(0)
self.session = None
traceback.print_exc()
async def send(self):
promises = [self.fetch(req) for req in self.requests]
self.requests.clear()
return await asyncio.gather(*promises)
# ------------------------------------------------ #
# REST API(Public)
# ------------------------------------------------ #
# 銘柄一覧取得
def symbol(self, authority="PERSONAL"):
params = {'authority': authority}
self.set_request(method='GET', access_modifiers='public',
target_path='/api/v1/cfd/symbol', params=params)
# ローソク足取得
def candlestick(self, candlestickType="PT1M", dateFrom="", dateTo="" ):
params = {
'symbolId': self.MARKET,
'candlestickType': candlestickType
}
if len(str(dateFrom)) > 0:
params['dateFrom'] = dateFrom
if len(str(dateTo)) > 0:
params['dateTo'] = dateTo
self.set_request(method='GET', access_modifiers='public',
target_path='/api/v1/candlestick', params=params)
# 板取得
def orderbook(self):
params = {
'symbolId': self.MARKET,
}
self.set_request(method='GET', access_modifiers='public',
target_path='/api/v1/orderbook', params=params)
# ティッカー取得
def ticker(self):
params = {
'symbolId': self.MARKET,
}
self.set_request(method='GET', access_modifiers='public',
target_path='/api/v1/ticker', params=params)
# 歩み値取得
def trades(self):
params = {
'symbolId': self.MARKET,
}
self.set_request(method='GET', access_modifiers='public',
target_path='/api/v1/trades', params=params)
# ------------------------------------------------ #
# REST API(Private)
# ------------------------------------------------ #
# 残高一覧取得
def asset(self):
params = {}
self.set_request(method='GET', access_modifiers='private',
target_path='/api/v1/asset', params=params)
# 証拠金関連項目取得
def equitydata(self):
params = {}
self.set_request(method='GET', access_modifiers='private',
target_path='/api/v1/cfd/equitydata', params=params)
# 注文一覧取得
def get_order(self, id="", dateFrom="", dateTo="", orderBehavior="", orderSide="", orderPattern="", orderType="", closeBehavior="", orderStatus="", postOnly="", size=""):
params = {
'symbolId': self.MARKET,
}
if len(str(dateFrom)) > 0:
params['dateFrom'] = dateFrom
if len(str(dateTo)) > 0:
params['dateTo'] = dateTo
if len(str(orderBehavior)) > 0:
params['orderBehavior'] = orderBehavior
if len(str(orderSide)) > 0:
params['orderSide'] = orderSide
if len(str(orderPattern)) > 0:
params['orderPattern'] = orderPattern
if len(str(orderType)) > 0:
params['orderType'] = orderType
if len(str(closeBehavior)) > 0:
params['closeBehavior'] = closeBehavior
if len(str(orderStatus)) > 0:
params['orderStatus'] = orderStatus
if len(str(postOnly)) > 0:
params['postOnly'] = postOnly
if len(str(size)) > 0:
params['size'] = size
self.set_request(method='GET', access_modifiers='private',
target_path='/api/v1/cfd/order', params=params)
# 注文(NORMAL)
def post_order(self, orderPattern="NORMAL", orderBehavior="OPEN", positionId="", orderSide="", orderType="", price="", amount="", orderExpire="", leverage="", closeBehavior="FIFO", postOnly=""):
orderData = {}
params = {
'symbolId': self.MARKET,
'orderPattern': orderPattern
}
if len(str(orderBehavior)) > 0:
orderData['orderBehavior'] = orderBehavior
if len(str(positionId)) > 0:
orderData['positionId'] = positionId
if len(str(orderSide)) > 0:
orderData['orderSide'] = orderSide
if len(str(orderType)) > 0:
orderData['orderType'] = orderType
if len(str(price)) > 0:
orderData['price'] = price
if len(str(amount)) > 0:
orderData['amount'] = amount
if len(str(orderExpire)) > 0:
orderData['orderExpire'] = orderExpire
if len(str(leverage)) > 0:
orderData['leverage'] = leverage
if len(str(closeBehavior)) > 0:
orderData['closeBehavior'] = closeBehavior
if len(str(postOnly)) > 0:
orderData['postOnly'] = postOnly
if len(orderData) > 0:
params['orderData'] = orderData
self.set_request(method='POST', access_modifiers='private',
target_path='/api/v1/cfd/order', params=params)
# 注文訂正
def update_order(self, orderPattern="NORMAL", orderId="", orderType="", price="", amount=""):
orderData = {}
params = {
'symbolId': self.MARKET,
'orderPattern': orderPattern
}
if len(str(orderId)) > 0:
orderData['orderId'] = orderId
if len(str(orderType)) > 0:
orderData['orderType'] = orderType
if len(str(price)) > 0:
orderData['price'] = price
if len(str(amount)) > 0:
orderData['amount'] = amount
if len(orderData) > 0:
params['orderData'] = orderData
self.set_request(method='PUT', access_modifiers='private',
target_path='/api/v1/cfd/order', params=params)
# 注文取消
def cancel_order(self, id=""):
params = {
'symbolId': self.MARKET,
}
if len(str(id)) > 0:
params['id'] = id
self.set_request(method='DELETE', access_modifiers='private',
target_path='/api/v1/cfd/order', params=params)
# 約定一覧取得
def trade(self, id="", dateFrom="", dateTo="", orderBehavior="", tradeBehavior="", orderSide="", orderPattern="", orderType="", tradeAction="", orderId="", positionId="", size=""):
params = {
'symbolId': self.MARKET,
}
if len(str(id)) > 0:
params['id'] = id
if len(str(dateFrom)) > 0:
params['dateFrom'] = dateFrom
if len(str(dateTo)) > 0:
params['dateTo'] = dateTo
if len(str(orderBehavior)) > 0:
params['orderBehavior'] = orderBehavior
if len(str(tradeBehavior)) > 0:
params['tradeBehavior'] = tradeBehavior
if len(str(orderSide)) > 0:
params['orderSide'] = orderSide
if len(str(orderPattern)) > 0:
params['orderPattern'] = orderPattern
if len(str(orderType)) > 0:
params['orderType'] = orderType
if len(str(tradeAction)) > 0:
params['tradeAction'] = tradeAction
if len(str(orderId)) > 0:
params['orderId'] = orderId
if len(str(positionId)) > 0:
params['positionId'] = positionId
if len(str(size)) > 0:
params['size'] = size
self.set_request(method='GET', access_modifiers='private',
target_path='/api/v1/cfd/trade', params=params)
# 建玉一覧取得
def position(self, id="", dateFrom="", dateTo="", positionStatus="", orderSide="", size=""):
params = {
'symbolId': self.MARKET,
}
if len(str(id)) > 0:
params['id'] = id
if len(str(dateFrom)) > 0:
params['dateFrom'] = dateFrom
if len(str(dateTo)) > 0:
params['dateTo'] = dateTo
if len(str(positionStatus)) > 0:
params['positionStatus'] = positionStatus
if len(str(orderSide)) > 0:
params['orderSide'] = orderSide
if len(str(size)) > 0:
params['size'] = size
self.set_request(method='GET', access_modifiers='private',
target_path='/api/v1/cfd/position', params=params)
# sample.py
import asyncio
import traceback
import numpy as np
import json
import requests
import asyncio
from time import time
from datetime import datetime, timedelta
from collections import deque
from rakuten import Rakuten
class RakutenBot():
# ---------------------------------------- #
# init
# ---------------------------------------- #
def __init__(self, api_key, api_secret):
self.rakuten = Rakuten(api_key=api_key, api_secret=api_secret)
self.rakuten.MARKET = 10
# タスクの設定およびイベントループの開始
loop = asyncio.get_event_loop()
tasks = [
self.run()
]
loop.run_until_complete(asyncio.wait(tasks))
# ---------------------------------------- #
# bot main
# ---------------------------------------- #
async def run(self):
while(True):
await self.main()
await asyncio.sleep(60)
async def main(self):
try:
# 銘柄一覧取得
self.rakuten.symbol()
response = await self.rakuten.send()
print("銘柄一覧取得")
print(response)
await asyncio.sleep(5)
# ローソク足取得
self.rakuten.candlestick()
response = await self.rakuten.send()
print("ローソク足取得")
print(response)
await asyncio.sleep(5)
# 板取得
self.rakuten.orderbook()
response = await self.rakuten.send()
print("板取得")
print(response)
await asyncio.sleep(5)
# ティッカー取得
self.rakuten.ticker()
response = await self.rakuten.send()
print("ティッカー取得")
print(response)
await asyncio.sleep(5)
# 歩み値取得
self.rakuten.trades()
response = await self.rakuten.send()
print("歩み値取得")
print(response)
await asyncio.sleep(5)
# 残高一覧取得
self.rakuten.asset()
response = await self.rakuten.send()
print("残高一覧取得")
print(response)
await asyncio.sleep(5)
# 証拠金関連項目取得
self.rakuten.equitydata()
response = await self.rakuten.send()
print("証拠金関連項目取得")
print(response)
await asyncio.sleep(5)
# 注文一覧取得
self.rakuten.get_order()
response = await self.rakuten.send()
print("注文一覧取得")
print(response)
await asyncio.sleep(5)
# 注文
self.rakuten.post_order(orderSide="BUY", orderType="LIMIT", price=10000, amount=0.2 )
response = await self.rakuten.send()
print("注文")
print(response)
id = response[0][0]["id"]
print("id:",id)
await asyncio.sleep(5)
# 注文訂正
self.rakuten.update_order(orderId=id, orderType="LIMIT", price=9999, amount=0.1 )
response = await self.rakuten.send()
print("注文訂正")
print(response)
await asyncio.sleep(5)
# 注文取消
self.rakuten.cancel_order(id=id)
response = await self.rakuten.send()
print("注文取消")
print(response)
await asyncio.sleep(5)
# 約定一覧取得
self.rakuten.trade()
response = await self.rakuten.send()
print("約定一覧取得")
print(response)
await asyncio.sleep(5)
# 建玉一覧取得
self.rakuten.position()
response = await self.rakuten.send()
print("建玉一覧取得")
print(response)
await asyncio.sleep(5)
except Exception as e:
print(e)
print(traceback.format_exc().strip())
# --------------------------------------- #
# main
# --------------------------------------- #
if __name__ == '__main__':
api_key = "API_KEY"
api_secret = "API_SECRET"
RakutenBot(api_key=api_key, api_secret=api_secret)
ファイル
最後に
これを公開した理由としてはあまりにも出来高が少なくて約定機会がないので、出来高を増やしたいという思いがあります。
少しでも皆様の開発コストの低減につながれば幸いです。
下記サポートボタンでランチ代を奢っていただければこれ幸いです。
よろしければサポートをお願いします。 次回記事を公開するための研究開発費として利用させていただきます。