BitMEXのWebsocket Connectorが遅い! OrderBookのバグを直す篇
この記事では、前回の記事で作ったプログラムで、OrderBookの BestBidとBestAskを取る際に、NaNが入ってしまうバグの原因と対策を考えてきいきます。
目次:
とりあえずLogをファイル出力してみる
前回の記事で書いたinmemmorydb_bitmex_websocket.pyの10行あたりを、
from logging import Formatter, getLogger, StreamHandler, DEBUG, INFO, WARNING, basicConfig
と変えます。
そして、
basicConfig(filename='logfile/logger.log', level=DEBUG)
を追加します。 そして、inmemmorydb_bitmex_websocket.pyがあるディレクトリに、logfileディレクトリを作成します。 すると、いい感じにログがたまると思います。
そうして眺めているうちに、なんとなく「これ、データを受け取って反映するまでの僅かな時間に、無が存在していて、それを参照しているのでは?」 と思いました。
init関数に,
self.wip = False
を追加、 on_message関数の一番初めに
self.wip = True
を追加し、on_message関数の最後に
self.wip = False
を追加します。
そして、get_orderbook関数を
def get_orderbook(self): while True: if not self.wip: return self.df sleep(0.001)
と変更してみます。 私の環境では、数日ほどOrderBookを記録していますが、ズレはないように思えます。 また、get_orderbook関数のsleepの秒数は、実行環境の性能にもだいぶ左右されると思いますので、ログとにらめっこしながら職人技で調整して下さい。 すると、いい感じにキレイにOrderBookをとれるようになったとおもいます。
一応全体ファイルを置くと、
import websocket import threading from time import sleep import json import urllib import sys import pandas as pd #おまじない from logging import Formatter, getLogger, StreamHandler, DEBUG, INFO, WARNING, basicConfig class BitMEXWebsocket: max_table_len = 200 def __init__(self, endpoint, symbol): self.formatter = Formatter('%(asctime)-15s - %(levelname)-8s - %(message)s') self.logger = getLogger(__name__) self.handler = StreamHandler(sys.stdout) self.handler.setLevel(DEBUG) self.handler.setFormatter(self.formatter) self.logger.setLevel(DEBUG) self.logger.addHandler(self.handler) basicConfig(filename='logfile/logger.log', level=DEBUG) self.endpoint = endpoint self.symbol = symbol self.exited = False self.wip = False self.df = pd.DataFrame() wsurl = self.__get_url() self.logger.info('Connecting to %s' % wsurl) self.__connect(wsurl, symbol) self.logger.info('Connected to WS.') self.__wait_for_symbol(symbol) def __wait_for_symbol(self, symbol): while self.df.empty: sleep(0.1) def __get_url(self): symbolSubs = ["orderBookL2_25"] subscriptions = [sub + ':' + self.symbol for sub in symbolSubs] urlparts = list(urllib.parse.urlparse(self.endpoint)) urlparts[0] = urlparts[0].replace('http', 'ws') urlparts[2] = "/realtime?subscribe={}".format(','.join(subscriptions)) return urllib.parse.urlunparse(urlparts) def __connect(self, wsurl, symbol): self.logger.debug('Starting thread') self.ws = websocket.WebSocketApp(wsurl, on_message=self.__on_message, on_close=self.__on_close, on_open=self.__on_open, on_error=self.__on_error) self.wst = threading.Thread(target=lambda: self.ws.run_forever()) self.wst.daemon = True self.wst.start() self.logger.debug('Started thread') sleep(5) def __on_message(self, message): self.wip = True message = json.loads(message) action = message.get('action') data = message.get('data') if action == 'partial': self.df = pd.io.json.json_normalize(data) elif action == 'update': df_data = pd.io.json.json_normalize(data) indicies, Is = self.search_index(self.df, df_data) for i in range(len(indicies)): index = indicies[i] id = self.df.iloc[index]['id'] price = self.df.loc[index, 'price'] self.df.loc[index] = df_data.iloc[Is[i]] self.df.loc[index, 'price'] = price # price = ((100000000 * symbolIdx) - ID) * instrumentTickSize elif action == "delete": df_data = pd.io.json.json_normalize(data) indicies, Is = self.search_index(self.df, df_data) self.df.drop(index=self.df.index[indicies], inplace=True) self.df = self.df.sort_values('price', ascending=False) self.df.reset_index(inplace=True, drop=True) elif action == "insert": df_data = pd.io.json.json_normalize(data) self.df = pd.concat([self.df, df_data], sort=True) self.df = self.df.sort_values('price', ascending=False) self.df.reset_index(inplace=True, drop=True) self.wip = False # print('best bid: {}, best ask: {}'.format(self.df.iloc[25]['price'], self.df.iloc[24]['price'])) # print('best bid size: {}, best ask size: {}'.format(self.df.iloc[25]['size'], self.df.iloc[24]['size'])) def __on_error(self, error): if not self.exited: self.logger.error("Error : %s" % error) raise websocket.WebSocketException(error) def __on_open(self): self.logger.debug('Websocket Opened') def __on_close(self): self.logger.info('Websocket Closed') def get_orderbook(self): while True: if not self.wip: return self.df.copy() sleep(0.001) def get_best_bid(self): return self.get_orderbook().loc[25, 'price'] def get_best_ask(self): return self.get_orderbook().loc[24, 'price'] def search_index(self, a, b): # DataFrame a から DataFrame Bと一致する行を探す indicies = list() Is = list() for i in range(len(b)): id = b.iloc[i]['id'] side = b.iloc[i]['side'] index = a.query('id == {} and side == "{}"'.format(id, side)).index[0] indicies.append(index) Is.append(i) return indicies, Is if __name__ == "__main__": bmw = BitMEXWebsocket(endpoint='wss://testnet.bitmex.com/realtime', symbol='XBTUSD') while True: print(bmw.get_orderbook()) # print('best_bid: {}, best_ask: {}'.format(bmw.get_best_bid(), bmw.get_best_ask())) sleep(1)
となります。 雑な解説でしたが、参考になったら幸いです。 前回版では、websocketに接続した際に返ってくるメッセージを処理する際にエラーが出るバグがありましたが、それは修正いたしました。
いーや、もっと大きなバグがあるね
bot開発のために、いろいろと弄っていたところ、あきらかにおかしな数値を出してくるバグに遭遇しました。
これはtestnetの板のように、スプレッドがあいている故に起こっているバグだと思われます。 近いうちに修正します
2020/1/6 追記
idからpriceを求める際の計算式の違いによるバグのものでした。 現在は修正されています。
もしバグ報告などがあれば、気軽にコメントお願いします。