PythonとWebsocketを使ってMEXの板情報を高速に扱いたい コーディング篇
はい。 この記事では、僕がBitMEXの板情報(いわゆるOrderBook)を扱うときに困った点や、それを解決した方法について書きます。
追記:色々とバグを直したバージョンを公開しました:
なぜWebsocketを使って板情報を扱うのか
デイトレードレベルの頻度でAPIにアクセスするなら、実装ハードルも低いREST APIを使えばいいのですが、 HFTをしようともなると、いちいちアクセスしていてはAPI Limitが枯渇してしまいます。
そこで、では、僕がBitMEXの板情報(いわゆるOrderBook)を扱うときに困った点や、それを解決した方法について書きます。
なぜWebsocketを使って板情報を扱うのか
デイトレードレベルの頻度でAPIにアクセスするなら、実装ハードルも低いREST APIを使えばいいのですが、 HFTをしようともなると、いちいちアクセスしていてはAPI Limitが枯渇してしまいます。
そこで、Websocketを使うことで、API Limitを消費することなく、リアルタイムにOrderBookを高速に扱うことができるのです。 BitMEX先輩もそう言っています。
BitMEX ではリアルタイムデータを購読可能です。 このアクセスはいったん接続するとレートの制限がなくなるため、プログラムに最新データを取り込む最適な方法です。
https://www.bitmex.com/app/wsAPI
公式コネクタを使ってみる
このページにたどり着く皆さんは、BitMEXの公式コネクタの存在を知っていると思います。 ですが、それと同時に公式コネクタはとても遅いこともご存知だと思います。
それについては、よく検証されたnoteがあるのでそちらを参照してください。
公式コネクタが遅いことがよく分かると思います。
モッチオさんのnoteに貼ってあるものを試す。
モッチオさんのnoteには、公式コネクタをindexを記憶した辞書を使うことによって高速化されたコネクタのコードがあるのですが、 下記noteやコメント欄を見ると分かる通り、完璧なindex管理がなされておらず、時間がたつとorderbookの情報がズレてしまいます。
そこで、上記noteを書いたmatsuoさんのPuppetterというフレームワークはMITライセンスで公開されているため、 WebSocketコネクタのソースコードを覗いてみました。
puppeteer/inmemorydb_bitmex_websocket.py at master · o-matsuo/puppeteer · GitHub
想像以上に行数が多く、他のプログラムとも結合しているために、コピペは断念(そりゃそう)。
自分で作ることにしました。
自分で作る
そもそもなぜ公式コネクタが遅いのかは、配列の検索するメソッドにあります。 それをindex配列を作ることでモッチオさんは高速化をしていましたが、私はOrderBookさえとれればいいので、 OrderBookの情報をPandasのDataFrameとして扱うことによって、ライブラリの力で高速化しました。
import websocket import threading from time import sleep import json import urllib import sys import pandas as pd #おまじない from logging import Formatter, getLogger, StreamHandler, DEBUG class BitMEXWebsocket: max_table_len = 200 def __init__(self, endpoint, symbol): self.formatter = Formatter('%(asctime)-15s - %(levelname)-8s - %(message)s') self.logger = getLogger(__name__) self.handler = StreamHandler(sys.stdout) self.handler.setLevel(20) self.handler.setFormatter(self.formatter) self.logger.setLevel(20) self.logger.addHandler(self.handler) self.endpoint = endpoint self.symbol = symbol self.exited = False self.df = pd.DataFrame() wsurl = self.__get_url() self.logger.info('Connecting to %s' % wsurl) self.__connect(wsurl, symbol) self.logger.info('Connected to WS.') self.__wait_for_symbol(symbol) def __wait_for_symbol(self, symbol): while self.df.empty: sleep(0.1) def __get_url(self): symbolSubs = ["orderBookL2_25"] genericSubs = ["margin"] subscriptions = [sub + ':' + self.symbol for sub in symbolSubs] subscriptions += genericSubs urlparts = list(urllib.parse.urlparse(self.endpoint)) urlparts[0] = urlparts[0].replace('http', 'ws') urlparts[2] = "/realtime?subscribe={}".format(','.join(subscriptions)) return urllib.parse.urlunparse(urlparts) def __connect(self, wsurl, symbol): self.logger.debug('Starting thread') self.ws = websocket.WebSocketApp(wsurl, on_message=self.__on_message, on_close=self.__on_close, on_open=self.__on_open, on_error=self.__on_error) self.wst = threading.Thread(target=lambda: self.ws.run_forever()) self.wst.daemon = True self.wst.start() self.logger.debug('Started thread') sleep(5) def __on_message(self, message): message = json.loads(message) action = message['action'] data = message['data'] if action == 'partial': self.df = pd.io.json.json_normalize(data) self.df.to_csv('orderbook.csv') elif action == 'update': df_data = pd.io.json.json_normalize(data) indicies, Is = self.search_index(self.df, df_data) for i in range(len(indicies)): index = indicies[i] id = self.df.iloc[index]['id'] self.df.iloc[index] = df_data.iloc[Is[i]] self.df['price'].iloc[index] = -0.01 * id + 88000000 elif action == "delete": df_data = pd.io.json.json_normalize(data) indicies, Is = self.search_index(self.df, df_data) self.df.drop(index=self.df.index[indicies], inplace=True) self.df = self.df.sort_values('price', ascending=False) self.df.reset_index(inplace=True, drop=True) elif action == "insert": df_data = pd.io.json.json_normalize(data) self.df = pd.concat([self.df, df_data], sort=True) self.df = self.df.sort_values('price', ascending=False) self.df.reset_index(inplace=True, drop=True) def __on_error(self, error): if not self.exited: self.logger.error("Error : %s" % error) raise websocket.WebSocketException(error) def __on_open(self): self.logger.debug('Websocket Opened') def __on_close(self): self.logger.info('Websocket Closed') def get_orderbook(self): return self.df def search_index(self, a, b): # DataFrame a から DataFrame Bと一致する行を探す indicies = list() Is = list() # self.logger.debug(a) for i in range(len(b)): id = b.iloc[i]['id'] side = b.iloc[i]['side'] index = a.query('id == {} and side == "{}"'.format(id, side)).index[0] indicies.append(index) Is.append(i) return indicies, Is if __name__ == "__main__": bmw = BitMEXWebsocket(endpoint='wss://www.bitmex.com/realtime', symbol='XBTUSD') while True: sleep(1)
このクラスの関数get_orderbook()を呼ぶことでOrderBookを得ることができるのですが、 NaNが入ってしまうというバグに悩まされているため、現在調査中です
To be continued....
次回に続く...
追記:色々とバグを直したバージョンを公開しました: