【GMOコイン】APIラッパー【python】
こんにちは。ドースー(@dosu0217)です。
自分のBotでも利用しているGMOコインで自動売買などをするためのAPIラッパーを公開します。
GMOコインのAPIドキュメントはこちら
なお、このソースコードはこちらのニッケルメッキ先生が公開しているbitFlyer APIのラッパークラスの記事をとても参考にしています。
とても勉強になるソースですのでよかったら見てみてください。
**** 変更履歴 ****
2020/09/08 ・新規注文(order)、決済注文(closeOrder)、一括決済
注文(closeBulkOrder)にtimeInForceを追加
・記事の一部内容を変更。
2022/02/07 ・取引高情報API追加に対応
********************
基本的な使い方
GMOCoinクラスの各APIメソッドをコールして、GMOCoin.sendメソッドを実行してください。
構成
gmocoin.py [GMOコイン APIのラッパークラス]
sample.py [ラッパークラスを用いたサンプルプログラム]
ソースコード
# gmocoin.py
import aiohttp
import asyncio
import async_timeout
import json
from aiohttp import WSMsgType
import traceback
import time
from datetime import datetime
import hmac
import hashlib
import urllib
from secrets import token_hex
class GMOCoin():
# 定数
TIMEOUT = 3600 # タイムアウト
EXTEND_TOKEN_TIME = 3000 # アクセストークン延長までの時間
SYMBOL = 'BTC_JPY' # 銘柄 BTC_JPY
URLS = {'public': 'https://api.coin.z.com/public',
'private': 'https://api.coin.z.com/private',
'publicWS': 'wss://api.coin.z.com/ws/public/v1',
'privateWS': 'wss://api.coin.z.com/ws/private/v1',
}
PUBLIC_CHANNELS = ['ticker', 'orderbooks', 'trades']
PRIVATE_CHANNELS = ['executionEvents', 'orderEvents', 'positionEvents', 'positionSummaryEvents']
# 変数
api_key = ''
api_secret = ''
session = None # セッション保持
requests = [] # リクエストパラメータ
token = '' # Private Websocket API用トークン
# ------------------------------------------------ #
# init
# ------------------------------------------------ #
def __init__(self, api_key, api_secret):
# APIキー・SECRETをセット
self.api_key = api_key
self.api_secret = api_secret
# ------------------------------------------------ #
# async request for rest api
# ------------------------------------------------ #
def set_request(self, method, access_modifiers, target_path, params):
if access_modifiers == 'public':
url = ''.join([self.URLS['public'], target_path])
if method == 'GET':
headers = ''
self.requests.append({'method': method,
'access_modifiers': access_modifiers,
'target_path': target_path, 'url': url,
'params': params, 'headers':{}})
if method == 'POST':
headers = {'Content-Type': 'application/json'}
self.requests.append({'method': method,
'access_modifiers': access_modifiers,
'target_path': target_path, 'url': url,
'params': params, 'headers':headers})
if access_modifiers == 'private':
url = ''.join([self.URLS['private'], target_path])
path = target_path
timestamp = '{0}000'.format(int(time.mktime(datetime.now().timetuple())))
if method == 'GET':
text = ''.join([timestamp, method, path,])
sign = self.get_sign(text)
headers = self.set_headers_for_private(timestamp=timestamp,
sign=sign)
self.requests.append({'url': url,
'method': method,
'headers': headers,
'params': params,
})
if method == 'POST':
post_data = json.dumps(params)
text = ''.join([timestamp, method, path, post_data])
sign = self.get_sign(text)
headers = self.set_headers_for_private(timestamp=timestamp,
sign=sign)
self.requests.append({'url': url,
'method': method,
'headers': headers,
'params': post_data,
})
if method == 'PUT':
post_data = json.dumps(params)
text = ''.join([timestamp, method, path])
sign = self.get_sign(text)
headers = self.set_headers_for_private(timestamp=timestamp,
sign=sign)
self.requests.append({'url': url,
'method': method,
'headers': headers,
'params': post_data,
})
def set_headers_for_private(self, timestamp, sign):
headers = {'API-KEY': self.api_key,
'API-TIMESTAMP': timestamp,
'API-SIGN': sign}
return headers
def get_sign(self, text):
sign = hmac.new(bytes(self.api_secret.encode('ascii')),
bytes(text.encode('ascii')), hashlib.sha256).hexdigest()
return sign
async def fetch(self, request):
status = 0
content = []
async with async_timeout.timeout(self.TIMEOUT):
try:
if self.session is None:
self.session = await aiohttp.ClientSession().__aenter__()
if request['method'] is 'GET':
async with self.session.get(url=request['url'],
params=request['params'],
headers=request['headers']) as response:
status = response.status
content = await response.read()
if status != 200:
# エラーのログ出力など必要な場合
pass
elif request['method'] is 'POST':
async with self.session.post(url=request['url'],
data=request['params'],
headers=request['headers']) as response:
status = response.status
content = await response.read()
if status != 200:
# エラーのログ出力など必要な場合
pass
elif request['method'] is 'PUT':
async with self.session.put(url=request['url'],
data=request['params'],
headers=request['headers']) as response:
status = response.status
content = await response.read()
if status != 200:
# エラーのログ出力など必要な場合
pass
if len(content) == 0:
result = []
else:
try:
result = json.loads(content.decode('utf-8'))
except Exception as e:
traceback.print_exc()
return result
except Exception as e:
# セッション終了
if self.session is not None:
await self.session.__aexit__(None, None, None)
await asyncio.sleep(0)
self.session = None
traceback.print_exc()
async def send(self):
promises = [self.fetch(req) for req in self.requests]
self.requests.clear()
return await asyncio.gather(*promises)
# ------------------------------------------------ #
# public api
# ------------------------------------------------ #
# 取引所ステータス
# 取引所の稼動状態を取得します。
def status(self):
params = {}
self.set_request(method='GET', access_modifiers='public',
target_path='/v1/status', params=params)
# 最新レート
# 指定した銘柄の最新レートを取得します。
def ticker(self):
params = {'symbol': self.SYMBOL}
self.set_request(method='GET', access_modifiers='public',
target_path='/v1/ticker', params=params)
# 板情報
# 指定した銘柄の板情報(snapshot)を取得します。
def orderbooks(self):
params = {'symbol': self.SYMBOL}
self.set_request(method='GET', access_modifiers='public',
target_path='/v1/orderbooks', params=params)
# 取引履歴
# 指定した銘柄の取引履歴を取得します。
def trades(self, page=1, count=100):
params = {'symbol': self.SYMBOL,
'page': page,
'count': count
}
self.set_request(method='GET', access_modifiers='public',
target_path='/v1/trades', params=params)
# ------------------------------------------------ #
# private api
# ------------------------------------------------ #
# 余力情報を取得
# 余力情報を取得します。
def margin(self):
params = {}
self.set_request(method='GET', access_modifiers='private',
target_path='/v1/account/margin', params=params)
# 資産残高を取得
# 資産残高を取得します。
def assets(self):
params = {}
self.set_request(method='GET', access_modifiers='private',
target_path='/v1/account/assets', params=params)
# 取引高情報を取得
# 取引高情報を取得します。
def tradingVolume(self):
params = {}
self.set_request(method='GET', access_modifiers='private',
target_path='/v1/account/tradingVolume', params=params)
# 注文情報取得
# 指定した注文IDの注文情報を取得します。
def orders(self, orderId):
params = {'orderId': orderId}
self.set_request(method='GET', access_modifiers='private',
target_path='/v1/orders', params=params)
# 有効注文一覧
# 有効注文一覧を取得します。
def activeOrders(self, page=1, count=100):
params = {'symbol': self.SYMBOL,
'page': page,
'count': count
}
self.set_request(method='GET', access_modifiers='private',
target_path='/v1/activeOrders', params=params)
# 約定情報取得
# 約定情報を取得します。
def executions(self, orderId='', executionId=''):
params = {'symbol': self.SYMBOL}
if len(orderId) > 0:
params['orderId'] = orderId
elif len(executionId) > 0:
params['executionId'] = executionId
self.set_request(method='GET', access_modifiers='private',
target_path='/v1/executions', params=params)
# 最新の約定一覧
# 最新約定一覧を取得します。
def latestExecutions(self, page=1, count=100):
params = {'symbol': self.SYMBOL,
'page': page,
'count': count
}
self.set_request(method='GET', access_modifiers='private',
target_path='/v1/latestExecutions', params=params)
# 建玉一覧を取得
# 有効建玉一覧を取得します。
def openPositions(self, page=1, count=100):
params = {'symbol': self.SYMBOL,
'page': page,
'count': count
}
self.set_request(method='GET', access_modifiers='private',
target_path='/v1/openPositions', params=params)
# 建玉サマリーを取得
# 建玉サマリーを取得します。
def positionSummary(self):
params = {'symbol': self.SYMBOL}
self.set_request(method='GET', access_modifiers='private',
target_path='/v1/positionSummary', params=params)
# 新規注文を出す
def order(self, side, executionType, price, size, losscutPrice='', timeInForce=''):
params = {'symbol': self.SYMBOL,
'side': side,
'executionType': executionType,
#'price': price,
'size': size
}
if executionType == 'LIMIT':
params['price'] = price
if len(losscutPrice) > 0:
params['losscutPrice'] = losscutPrice
if len(timeInForce) > 0:
params['timeInForce'] = timeInForce
self.set_request(method='POST', access_modifiers='private',
target_path='/v1/order', params=params)
# 注文変更
# 注文変更をします。
# 対象: 現物取引、レバレッジ取引
def changeOrder(self, orderId, price, losscutPrice=''):
params = {'orderId': orderId,
'price': price
}
if len(losscutPrice) > 0:
params['losscutPrice'] = losscutPrice
self.set_request(method='POST', access_modifiers='private',
target_path='/v1/changeOrder', params=params)
# 注文キャンセル
# 注文取消をします。
# 対象: 現物取引、レバレッジ取引
def cancelOrder(self, orderId):
params = {'orderId': orderId}
self.set_request(method='POST', access_modifiers='private',
target_path='/v1/cancelOrder', params=params)
# 決済注文
# 決済注文をします。
# 対象: レバレッジ取引
def closeOrder(self, side, executionType, price, settlePosition, timeInForce=''):
params = {'symbol': self.SYMBOL,
'side': side,
'executionType': executionType,
#'price': price,
'settlePosition': settlePosition
}
if executionType == 'LIMIT':
params['price'] = price
if len(timeInForce) > 0:
params['timeInForce'] = timeInForce
self.set_request(method='POST', access_modifiers='private',
target_path='/v1/closeOrder', params=params)
# 一括決済注文
# 一括決済注文をします。
# 対象: レバレッジ取引
def closeBulkOrder(self, side, executionType, price, size, timeInForce=''):
params = {'symbol': self.SYMBOL,
'side': side,
'executionType': executionType,
#'price': price,
'size': size
}
if executionType == 'LIMIT':
params['price'] = price
if len(timeInForce) > 0:
params['timeInForce'] = timeInForce
self.set_request(method='POST', access_modifiers='private',
target_path='/v1/closeBulkOrder', params=params)
# ロスカットレート変更
# 建玉のロスカットレート変更をします。
# 対象: レバレッジ取引
def changeLosscutPrice(self, positionId, losscutPrice):
params = {'positionId': positionId,
'losscutPrice': losscutPrice
}
self.set_request(method='POST', access_modifiers='private',
target_path='/v1/changeLosscutPrice', params=params)
# ---------------------------------------- #
# Private WebSocket API
# ---------------------------------------- #
# アクセストークンを取得
# Private WebSocket API用のアクセストークンを取得します。
def post_ws_auth(self):
params = {}
self.set_request(method='POST', access_modifiers='private',
target_path='/v1/ws-auth', params=params)
# アクセストークンを延長
# Private WebSocket API用のアクセストークンを延長します。
def put_ws_auth(self, token):
params = {'token': token}
self.set_request(method='PUT', access_modifiers='private',
target_path='/v1/ws-auth', params=params)
# アクセストークンを削除
# Private WebSocket API用のアクセストークンを削除します。
def delete_ws_auth(self, token):
params = {'token': token}
self.set_request(method='DELETE', access_modifiers='private',
target_path='/v1/ws-auth', params=params)
# ------------------------------------------------ #
# WebSocket
# ------------------------------------------------ #
# Public WebSocket
async def public_ws_run(self, callback):
# 変数
end_point_public = self.URLS['publicWS']
while True:
try:
async with aiohttp.ClientSession() as session:
# Public WebSocket
async with session.ws_connect(end_point_public,
receive_timeout=self.TIMEOUT) as client:
if len(self.PUBLIC_CHANNELS) > 0:
await self.subscribe(client, self.PUBLIC_CHANNELS)
async for response in client:
if response.type != WSMsgType.TEXT:
print('response:' + str(response))
break
elif 'error' in response[1]:
print(response[1])
break
else:
data = json.loads(response[1])
await self.handler(callback, data)
except Exception as e:
print(e)
print(traceback.format_exc().strip())
await asyncio.sleep(10)
# Private WebSocket
async def private_ws_run(self, callback):
while True:
try:
async with aiohttp.ClientSession() as session:
# トークンの取得
if self.token == '':
self.post_ws_auth()
response = await self.send()
if response[0]['status'] != 0:
print(response[0])
await asyncio.sleep(10)
continue
else:
self.token = response[0]['data']
# 変数
end_point_private = ''.join([self.URLS['privateWS'], '/', self.token])
# Private WebSocket
async with session.ws_connect(end_point_private,
receive_timeout=self.TIMEOUT) as client:
if len(self.PRIVATE_CHANNELS) > 0:
await self.subscribe(client, self.PRIVATE_CHANNELS)
async for response in client:
if response.type != WSMsgType.TEXT:
print('response:' + str(response))
break
elif 'error' in response[1]:
print(response[1])
break
else:
data = json.loads(response[1])
await self.handler(callback, data)
except Exception as e:
print(e)
print(traceback.format_exc().strip())
await asyncio.sleep(10)
if self.token != '':
self.token = ''
# 購読
async def subscribe(self, client, channels):
for channel in channels:
if channel == "trades":
params = {"command":"subscribe", "channel":channel, "symbol": self.SYMBOL, "option": "TAKER_ONLY"}
elif channel in ['ticker', 'orderbooks']:
params = {"command":"subscribe", "channel":channel, "symbol": self.SYMBOL}
elif channel == 'positionSummaryEvents':
params = {"command":"subscribe", "channel":channel, "option": "PERIODIC"}
else:
params = {"command":"subscribe", "channel":channel}
await asyncio.wait([client.send_str(json.dumps(params))])
print('---- %s connect ----' %(channel))
await asyncio.sleep(2)
# トークンの延長
async def extend_token(self):
while True:
try:
await asyncio.sleep(self.EXTEND_TOKEN_TIME)
if self.token != '':
# トークンの延長
self.put_ws_auth(self.token)
response = await self.send()
except Exception as e:
print(e)
print(traceback.format_exc().strip())
# UTILS
# コールバック、ハンドラー
async def handler(self, func, *args):
return await func(*args)
# sample.py
import asyncio
from gmocoin import GMOCoin
class Sample():
# ---------------------------------------- #
# init
# ---------------------------------------- #
def __init__(self, api_key, api_secret):
self.gmocoin = GMOCoin(api_key=api_key, api_secret=api_secret)
# タスクの設定およびイベントループの開始
loop = asyncio.get_event_loop()
tasks = [
self.gmocoin.public_ws_run(self.realtime),
self.gmocoin.private_ws_run(self.realtime),
self.gmocoin.extend_token(),
self.run()
]
loop.run_until_complete(asyncio.wait(tasks))
# ---------------------------------------- #
# bot main
# ---------------------------------------- #
async def run(self):
while(True):
await self.main(5)
await asyncio.sleep(0)
async def main(self, interval):
# main処理
# 余力情報を取得
self.gmocoin.margin()
response = await self.gmocoin.send()
print(response[0])
'''
# 買い指値を注文とキャンセル
# 注文
self.gmocoin.order(side='BUY', executionType='LIMIT', price=1000000 , size=0.01)
response = await self.gmocoin.send()
print(response[0])
orderId = response[0]['data']
# 注文キャンセル
self.gmocoin.cancelOrder(orderId=orderId)
response = await self.gmocoin.send()
print(response[0])
await asyncio.sleep(interval)
'''
# リアルタイムデータの受信
async def realtime(self, data):
# ここにWebSocketから配信されるデータが落ちてきますので適宜加工して利用してみてください。
print(data)
await asyncio.sleep(0)
# --------------------------------------- #
# main
# --------------------------------------- #
if __name__ == '__main__':
api_key = ''
api_secret = ''
Sample(api_key=api_key, api_secret=api_secret)
noteのcode記事の仕様(?)で最初のインデントが3スペースになっています。
邪教なので実際に利用する際はスペース修正をお願いできればこれ幸い。
なお、僕はインデントは4スペース教に入信しています。
インデントは宗教戦争
以上となります。
少しでも皆様の開発コストの低減につながれば幸いです。
よろしければサポートをお願いします。 次回記事を公開するための研究開発費として利用させていただきます。