クリプトHFTとか競プロとか

競技プログラミングや仮想通貨に関することを中心にブログを書いていきます.

BitMEXのWebsocket Connectorが遅い! OrderBookのバグを直す篇

この記事では、前回の記事で作ったプログラムで、OrderBookの BestBidとBestAskを取る際に、NaNが入ってしまうバグの原因と対策を考えてきいきます。

f:id:KabukiMining:20200103225308p:plain
BitMEXの板の画像

目次:

とりあえずLogをファイル出力してみる

前回の記事で書いたinmemmorydb_bitmex_websocket.pyの10行あたりを、

from logging import Formatter, getLogger, StreamHandler, DEBUG, INFO, WARNING, basicConfig

と変えます。

そして、

basicConfig(filename='logfile/logger.log', level=DEBUG)

を追加します。 そして、inmemmorydb_bitmex_websocket.pyがあるディレクトリに、logfileディレクトリを作成します。 すると、いい感じにログがたまると思います。

そうして眺めているうちに、なんとなく「これ、データを受け取って反映するまでの僅かな時間に、無が存在していて、それを参照しているのでは?」 と思いました。

init関数に,

self.wip = False

を追加、 on_message関数の一番初めに

self.wip = True

を追加し、on_message関数の最後に

self.wip = False

を追加します。

そして、get_orderbook関数を

def get_orderbook(self):
        while True:
            if not self.wip:
                return self.df
            sleep(0.001)

と変更してみます。 私の環境では、数日ほどOrderBookを記録していますが、ズレはないように思えます。 また、get_orderbook関数のsleepの秒数は、実行環境の性能にもだいぶ左右されると思いますので、ログとにらめっこしながら職人技で調整して下さい。 すると、いい感じにキレイにOrderBookをとれるようになったとおもいます。

一応全体ファイルを置くと、

import websocket
import threading
from time import sleep
import json
import urllib
import sys
import pandas as pd

#おまじない
from logging import Formatter, getLogger, StreamHandler, DEBUG, INFO, WARNING, basicConfig

class BitMEXWebsocket:
    max_table_len = 200

    def __init__(self, endpoint, symbol):
        self.formatter = Formatter('%(asctime)-15s - %(levelname)-8s - %(message)s')
        self.logger = getLogger(__name__)
        self.handler = StreamHandler(sys.stdout)
        self.handler.setLevel(DEBUG)
        self.handler.setFormatter(self.formatter)
        self.logger.setLevel(DEBUG)
        self.logger.addHandler(self.handler)
        basicConfig(filename='logfile/logger.log', level=DEBUG)

        self.endpoint = endpoint
        self.symbol = symbol

        self.exited = False
        self.wip = False
        self.df = pd.DataFrame()
        wsurl = self.__get_url()
        self.logger.info('Connecting to %s' % wsurl)
        self.__connect(wsurl, symbol)  
        self.logger.info('Connected to WS.')
        self.__wait_for_symbol(symbol)

    def __wait_for_symbol(self, symbol):
        while self.df.empty:
            sleep(0.1)
    
    def __get_url(self):
        symbolSubs = ["orderBookL2_25"]

        subscriptions = [sub + ':' + self.symbol for sub in symbolSubs]

        urlparts = list(urllib.parse.urlparse(self.endpoint))
        urlparts[0] = urlparts[0].replace('http', 'ws')
        urlparts[2] = "/realtime?subscribe={}".format(','.join(subscriptions)) 
        return urllib.parse.urlunparse(urlparts)


    
    def __connect(self, wsurl, symbol):
        self.logger.debug('Starting thread')
        self.ws = websocket.WebSocketApp(wsurl, on_message=self.__on_message, 
        on_close=self.__on_close, on_open=self.__on_open, on_error=self.__on_error)

        self.wst = threading.Thread(target=lambda: self.ws.run_forever())
        self.wst.daemon = True
        self.wst.start()
        self.logger.debug('Started thread')
        sleep(5)

    def __on_message(self, message):
        self.wip = True
        message = json.loads(message)
        action = message.get('action')
        
        data = message.get('data')
        if action == 'partial':
            self.df = pd.io.json.json_normalize(data)

        elif action == 'update':
            df_data = pd.io.json.json_normalize(data)
            indicies, Is = self.search_index(self.df, df_data)            
            
            for i in range(len(indicies)):
                index = indicies[i]
                id = self.df.iloc[index]['id']
                price = self.df.loc[index, 'price']

                self.df.loc[index] = df_data.iloc[Is[i]]
                self.df.loc[index, 'price'] = price
                
                # price = ((100000000 * symbolIdx) - ID) * instrumentTickSize

        elif action == "delete":
            df_data = pd.io.json.json_normalize(data)
            indicies, Is = self.search_index(self.df, df_data)
        
            self.df.drop(index=self.df.index[indicies], inplace=True)
            self.df = self.df.sort_values('price', ascending=False)
            self.df.reset_index(inplace=True, drop=True)

        elif action == "insert":
            df_data = pd.io.json.json_normalize(data)

            self.df = pd.concat([self.df, df_data], sort=True)
            self.df = self.df.sort_values('price', ascending=False)
            self.df.reset_index(inplace=True, drop=True)

        self.wip = False
        # print('best bid: {}, best ask: {}'.format(self.df.iloc[25]['price'], self.df.iloc[24]['price']))
        # print('best bid size: {}, best ask size: {}'.format(self.df.iloc[25]['size'], self.df.iloc[24]['size']))
        
    def __on_error(self, error):
        if not self.exited:
            self.logger.error("Error : %s" % error)
            raise websocket.WebSocketException(error)
    
    def __on_open(self):
        self.logger.debug('Websocket Opened')
    
    def __on_close(self):
        self.logger.info('Websocket Closed')

    def get_orderbook(self):
        while True:
            if not self.wip:
                return self.df.copy()
            sleep(0.001)

    def get_best_bid(self):
        return self.get_orderbook().loc[25, 'price']
    def get_best_ask(self):
        return self.get_orderbook().loc[24, 'price']
    
    def search_index(self, a, b):
        # DataFrame a から DataFrame Bと一致する行を探す
        indicies = list()
        Is = list()

        for i in range(len(b)):
            id = b.iloc[i]['id']
            side = b.iloc[i]['side']

            index = a.query('id == {} and side == "{}"'.format(id, side)).index[0]
            indicies.append(index)
            Is.append(i)

        return indicies, Is
        
    
    
if __name__ == "__main__":
    bmw = BitMEXWebsocket(endpoint='wss://testnet.bitmex.com/realtime', symbol='XBTUSD')
    while True:
        print(bmw.get_orderbook())
        # print('best_bid: {}, best_ask: {}'.format(bmw.get_best_bid(), bmw.get_best_ask()))
        sleep(1)

となります。 雑な解説でしたが、参考になったら幸いです。 前回版では、websocketに接続した際に返ってくるメッセージを処理する際にエラーが出るバグがありましたが、それは修正いたしました。

いーや、もっと大きなバグがあるね

bot開発のために、いろいろと弄っていたところ、あきらかにおかしな数値を出してくるバグに遭遇しました。

f:id:KabukiMining:20200105223855p:plain
バグってる板の画像

これはtestnetの板のように、スプレッドがあいている故に起こっているバグだと思われます。 近いうちに修正します

2020/1/6 追記

idからpriceを求める際の計算式の違いによるバグのものでした。 現在は修正されています。

もしバグ報告などがあれば、気軽にコメントお願いします。