Bybitの板情報をPythonで扱いたい
みなさんこんにちは.
みなさんはBitMEXクローンの仮想通貨取引所は数多あることをご存知だと思います.
今回はあのMAXも使っているBybitで板情報をwebsocketからとって扱おうとして苦労したことなどについて書いていきます
Bybit公式のWebsocketコネクタを使ってみる
BybitはBitMEXと同じように,公式でPythonコネクタが用意されています.
試しにOrderBook情報を扱おうとこんなコードを書いてみます.
bybitws = BybitWebsocket(wsURL="wss://stream-testnet.bybit.com/realtime", api_key=None, api_secret=None, symbol='BTCUSD') bybitws.subscribe_orderBookL2() while True: sleep(5) print(bybitws.get_data('orderBookL2_25.BTCUSD'))
すると返ってくるのは...
'delete': [{'price': '6774.00', 'symbol': 'BTCUSD', 'id': 67740000, 'side': 'Buy'}], 'update': [], 'insert': [{'price': '6773.50', 'symbol': 'BTCUSD', 'id': 67735000, 'side': 'Buy', 'size': 1633}], 'transactTimeE6': 0} {'delete': [], 'update': [{'price': '6772.00', 'symbol': 'BTCUSD', 'id': 67720000, 'side': 'Buy', 'size': 35784}], 'insert': [], 'transactTimeE6': 0} {'delete': [{'price': '6776.50', 'symbol': 'BTCUSD', 'id': 67765000, 'side': 'Buy'}, {'price': '6776.00', 'symbol': 'BTCUSD', 'id': 67760000, 'side': 'Buy'}, {'price': '6777.00', 'symbol': 'BTCUSD', 'id': 67770000, 'side': 'Buy'}], 'update': [{'price': '6787.00', 'symbol': 'BTCUSD', 'id': 67870000, 'side': 'Sell', 'size': 17609}, {'price': '6786.50', 'symbol': 'BTCUSD', 'id': 67865000, 'side': 'Sell', 'size': 21198}, {'price': '6785.00', 'symbol': 'BTCUSD', 'id': 67850000, 'side': 'Sell', 'size': 23983}, {'price': '6775.00', 'symbol': 'BTCUSD', 'id': 67750000, 'side': 'Buy', 'size': 2266}], 'insert': [{'price': '6762.00', 'symbol': 'BTCUSD', 'id': 67620000, 'side': 'Buy', 'size': 8432}, {'price': '6775.50', 'symbol': 'BTCUSD', 'id': 67755000, 'side': 'Buy', 'size': 23871}, {'price': '6777.50', 'symbol': 'BTCUSD', 'id': 67775000, 'side': 'Buy', 'size': 45646}], 'transactTimeE6': 0} {'delete': [{'price': '6798.50', 'symbol': 'BTCUSD', 'id': 67985000, 'side': 'Sell'}, {'price': '6765.50', 'symbol': 'BTCUSD', 'id': 67655000, 'side': 'Buy'}, {'price': '6765.00', 'symbol': 'BTCUSD', 'id': 67650000, 'side': 'Buy'}], 'update': [{'price': '6778.00', 'symbol': 'BTCUSD', 'id': 67780000, 'side': 'Buy', 'size': 22444}, {'price': '6778.50', 'symbol': 'BTCUSD', 'id': 67785000, 'side': 'Buy', 'size': 37785}, {'price': '6779.00', 'symbol': 'BTCUSD', 'id': 67790000, 'side': 'Buy', 'size': 45131}, {'price': '6786.50', 'symbol': 'BTCUSD', 'id': 67865000, 'side': 'Sell', 'size': 27567}], 'insert': [{'price': '6780.50', 'symbol': 'BTCUSD', 'id': 67805000, 'side': 'Buy', 'size': 40484}, {'price': '6780.00', 'symbol': 'BTCUSD', 'id': 67800000, 'side': 'Buy', 'size': 43470}, {'price': '6787.50', 'symbol': 'BTCUSD', 'id': 67875000, 'side': 'Sell', 'size': 17584}], 'transactTimeE6': 0}
...
...
...
...
...
....
...
そう. 受け取った情報をそのまま流してくるのである!
これを扱いたいBotterは,削除やアップデートの各クエリに対してのプログラムを書かなくてはいけない. BitMEXとは大違いである.
仕方ないので自分で書く
なので,自分の記事 を参考に,板情報をpandasのDataFrame型で返す関数を追加したBybitWebsocketクラスを書いた.
import sys import websocket import threading import traceback import pandas as pd from time import sleep import json import logging import urllib import math import time import hmac from logging import Formatter, getLogger, StreamHandler, DEBUG, INFO, WARNING, basicConfig # This is a simple adapters for connecting to Bybit's websocket API. # You could use methods whose names begin with “subscribe”, and get result by "get_data" method. # All the provided websocket APIs are implemented, includes public and private topic. # Private methods are only available after authorization, but public methods do not. # If you have any questions during use, please contact us vim the e-mail "support@bybit.com". class BybitWebsocket: #User can ues MAX_DATA_CAPACITY to control memory usage. MAX_DATA_CAPACITY = 200 PRIVATE_TOPIC = ['position', 'execution', 'order'] def __init__(self, wsURL, symbol, api_key, api_secret): self.df = pd.DataFrame() self.formatter = Formatter('%(asctime)-15s - %(levelname)-8s - %(message)s') self.logger = getLogger(__name__) self.handler = StreamHandler(sys.stdout) self.handler.setLevel(DEBUG) self.handler.setFormatter(self.formatter) self.logger.setLevel(DEBUG) self.logger.addHandler(self.handler) self.logger.debug("Initializing WebSocket.") self.symbol = symbol if api_key is not None and api_secret is None: raise ValueError('api_secret is required if api_key is provided') if api_key is None and api_secret is not None: raise ValueError('api_key is required if api_secret is provided') self.api_key = api_key self.api_secret = api_secret self.data = {} self.exited = False self.auth = False # We can subscribe right in the connection querystring, so let's build that. # Subscribe to all pertinent endpoints self.logger.info("Connecting to %s" % wsURL) self.__connect(wsURL) def exit(self): '''Call this to exit - will close websocket.''' self.exited = True self.ws.close() def __connect(self, wsURL): '''Connect to the websocket in a thread.''' self.logger.debug("Starting thread") self.ws = websocket.WebSocketApp(wsURL, on_message=self.__on_message, on_close=self.__on_close, on_open=self.__on_open, on_error=self.__on_error, keep_running=True) self.wst = threading.Thread(target=lambda: self.ws.run_forever()) self.wst.daemon = True self.wst.start() self.logger.debug("Started thread") # Wait for connect before continuing retry_times = 5 while not self.ws.sock or not self.ws.sock.connected and retry_times: sleep(1) retry_times -= 1 if retry_times == 0 and not self.ws.sock.connected: self.logger.error("Couldn't connect to WebSocket! Exiting.") self.exit() raise websocket.WebSocketTimeoutException('Error!Couldn not connect to WebSocket!.') if self.api_key and self.api_secret: self.__do_auth() def generate_signature(self, expires): """Generate a request signature.""" _val = 'GET/realtime' + expires return str(hmac.new(bytes(self.api_secret, "utf-8"), bytes(_val, "utf-8"), digestmod="sha256").hexdigest()) def __do_auth(self): expires = str(int(round(time.time())+1))+"000" signature = self.generate_signature(expires) auth = {} auth["op"] = "auth" auth["args"] = [self.api_key, expires, signature] args = json.dumps(auth) self.ws.send(args) def __on_message(self, message): '''Handler for parsing WS messages.''' message = json.loads(message) if 'success' in message and message["success"]: if 'request' in message and message["request"]["op"] == 'auth': self.auth = True self.logger.info("Authentication success.") if 'ret_msg' in message and message["ret_msg"] == 'pong': self.data["pong"].append("PING success") if 'topic' in message: self.data[message["topic"]].append(message["data"]) if message['topic'] == 'orderBookL2_25.{}'.format(self.symbol): typ = message['type'] data = message['data'] if typ == 'snapshot': self.df = pd.io.json.json_normalize(message['data']) elif typ == 'delta': # self.logger.debug(data) for key, value in data.items(): if key == 'transactTimeE6': continue elif key == 'update': df_data = pd.io.json.json_normalize(value) indicies, Is = self.search_index(self.df, df_data) for i in range(len(indicies)): index = indicies[i] id = self.df.iloc[index]['id'] price = self.df.loc[index, 'price'] self.df.loc[index] = df_data.iloc[Is[i]] self.df.loc[index, 'price'] = price elif key == 'delete': df_data = pd.io.json.json_normalize(value) indicies, Is = self.search_index(self.df, df_data) self.df.drop(index=self.df.index[indicies], inplace=True) self.df = self.df.sort_values('price', ascending=False) self.df.reset_index(inplace=True, drop=True) elif key == "insert": df_data = pd.io.json.json_normalize(value) self.df = pd.concat([self.df, df_data], sort=True) self.df = self.df.sort_values('price', ascending=False) self.df.reset_index(inplace=True, drop=True) if len(self.data[message["topic"]]) > BybitWebsocket.MAX_DATA_CAPACITY: self.data[message["topic"]] = self.data[message["topic"]][BybitWebsocket.MAX_DATA_CAPACITY//2:] def __on_error(self, error): '''Called on fatal websocket errors. We exit on these.''' if not self.exited: self.logger.error("Error : %s" % error) raise websocket.WebSocketException(error) def __on_open(self): '''Called when the WS opens.''' self.logger.debug("Websocket Opened.") def __on_close(self): '''Called on websocket close.''' self.logger.info('Websocket Closed') def ping(self): self.ws.send('{"op":"ping"}') if 'pong' not in self.data: self.data['pong'] = [] def subscribe_kline(self, interval:str): param = {} param['op'] = 'subscribe' param['args'] = ['kline.' + self.symbol + '.' + interval] self.ws.send(json.dumps(param)) if 'kline.' + self.symbol + '.' + interval not in self.data: self.data['kline.' + self.symbol + '.' + interval] = [] def subscribe_trade(self): self.ws.send('{"op":"subscribe","args":["trade"]}') if "trade.BTCUSD" not in self.data: self.data["trade.BTCUSD"] = [] self.data["trade.ETHUSD"] = [] self.data["trade.EOSUSD"] = [] self.data["trade.XRPUSD"] = [] def subscribe_insurance(self): self.ws.send('{"op":"subscribe","args":["insurance"]}') if 'insurance.BTC' not in self.data: self.data['insurance.BTC'] = [] self.data['insurance.XRP'] = [] self.data['insurance.EOS'] = [] self.data['insurance.ETH'] = [] def subscribe_orderBookL2(self): param = {} param['op'] = 'subscribe' param['args'] = ['orderBookL2_25.' + self.symbol] self.ws.send(json.dumps(param)) if 'orderBookL2_25.' + self.symbol not in self.data: self.data['orderBookL2_25.' + self.symbol] = [] def subscribe_instrument_info(self): param = {} param['op'] = 'subscribe' param['args'] = ['instrument_info.100ms.' + self.symbol] self.ws.send(json.dumps(param)) if 'instrument_info.100ms.' + self.symbol not in self.data: self.data['instrument_info.100ms.' + self.symbol] = [] def subscribe_position(self): self.ws.send('{"op":"subscribe","args":["position"]}') if 'position' not in self.data: self.data['position'] = [] def subscribe_execution(self): self.ws.send('{"op":"subscribe","args":["execution"]}') if 'execution' not in self.data: self.data['execution'] = [] def subscribe_order(self): self.ws.send('{"op":"subscribe","args":["order"]}') if 'order' not in self.data: self.data['order'] = [] def get_data(self, topic): if topic not in self.data: self.logger.info(" The topic %s is not subscribed." % topic) return [] if topic.split('.')[0] in BybitWebsocket.PRIVATE_TOPIC and not self.auth: self.logger.info("Authentication failed. Please check your api_key and api_secret. Topic: %s" % topic) return [] else: if len(self.data[topic]) == 0: # self.logger.info(" The topic %s is empty." % topic) return [] # while len(self.data[topic]) == 0 : # sleep(0.1) return self.data[topic].pop() def get_df(self): return self.df def search_index(self, a, b): # DataFrame a から DataFrame Bと一致する行を探す indicies = list() Is = list() for i in range(len(b)): id = b.iloc[i]['id'] side = b.iloc[i]['side'] index = a.query('id == {} and side == "{}"'.format(id, side)).index[0] indicies.append(index) Is.append(i) return indicies, Is if __name__ == "__main__": bybitws = BybitWebsocket(wsURL="wss://stream-testnet.bybit.com/realtime", api_key=None, api_secret=None, symbol='BTCUSD') bybitws.subscribe_orderBookL2() while True: sleep(5) print(bybitws.get_data('orderBookL2_25.BTCUSD'))
使い方
もとのコネクタと同じように
bybitws = BybitWebsocket(wsURL="wss://stream-testnet.bybit.com/realtime", api_key=None, api_secret=None, symbol='BTCUSD')
として,get_df
関数を呼ぶことで,OrderBook情報がpandasのDataFrame型として返ってくる.
bybitws.get_df()
id price side size symbol 0 67980000 6798.00 Sell 4168 BTCUSD 1 67975000 6797.50 Sell 16294 BTCUSD 2 67970000 6797.00 Sell 342 BTCUSD 3 67965000 6796.50 Sell 17166 BTCUSD 4 67960000 6796.00 Sell 10858 BTCUSD 5 67955000 6795.50 Sell 8516 BTCUSD 6 67950000 6795.00 Sell 27528 BTCUSD 7 67945000 6794.50 Sell 26413 BTCUSD 8 67940000 6794.00 Sell 23465 BTCUSD 9 67935000 6793.50 Sell 21457 BTCUSD 10 67930000 6793.00 Sell 6215 BTCUSD 11 67925000 6792.50 Sell 18226 BTCUSD 12 67920000 6792.00 Sell 4577 BTCUSD 13 67915000 6791.50 Sell 12060 BTCUSD 14 67910000 6791.00 Sell 16535 BTCUSD 15 67905000 6790.50 Sell 22345 BTCUSD 16 67900000 6790.00 Sell 7112 BTCUSD 17 67895000 6789.50 Sell 23136 BTCUSD 18 67890000 6789.00 Sell 18644 BTCUSD 19 67885000 6788.50 Sell 8303 BTCUSD 20 67880000 6788.00 Sell 322 BTCUSD 21 67865000 6786.50 Sell 8690 BTCUSD 22 67860000 6786.00 Sell 15285 BTCUSD 23 67855000 6785.50 Sell 14570 BTCUSD 24 67850000 6785.00 Sell 10832 BTCUSD 25 67805000 6780.50 Buy 9987 BTCUSD 26 67800000 6780.00 Buy 14040 BTCUSD 27 67795000 6779.50 Buy 3923 BTCUSD 28 67790000 6779.00 Buy 9540 BTCUSD 29 67780000 6778.00 Buy 513 BTCUSD 30 67775000 6777.50 Buy 30129 BTCUSD 31 67770000 6777.00 Buy 29851 BTCUSD 32 67765000 6776.50 Buy 11973 BTCUSD 33 67760000 6776.00 Buy 11096 BTCUSD 34 67755000 6775.50 Buy 8102 BTCUSD 35 67745000 6774.50 Buy 16637 BTCUSD 36 67740000 6774.00 Buy 31585 BTCUSD 37 67735000 6773.50 Buy 10602 BTCUSD 38 67730000 6773.00 Buy 14331 BTCUSD 39 67725000 6772.50 Buy 31113 BTCUSD 40 67720000 6772.00 Buy 26608 BTCUSD 41 67715000 6771.50 Buy 5231 BTCUSD 42 67710000 6771.00 Buy 44615 BTCUSD 43 67705000 6770.50 Buy 13054 BTCUSD 44 67700000 6770.00 Buy 28312 BTCUSD 45 67695000 6769.50 Buy 3325 BTCUSD 46 67690000 6769.00 Buy 16470 BTCUSD 47 67685000 6768.50 Buy 13675 BTCUSD 48 67680000 6768.00 Buy 22811 BTCUSD 49 67675000 6767.50 Buy 16363 BTCUSD
おわりに
過去のコードがほとんどそのまま使えたので助かった.
もしなにかこのコードにバグや不明な点などがあったら気軽にコメントお願いします.
では.