クリプトHFTとか競プロとか

競技プログラミングや仮想通貨に関することを中心にブログを書いていきます.

PythonとWebsocketを使ってMEXの板情報を高速に扱いたい コーディング篇

はい。 この記事では、僕がBitMEXの板情報(いわゆるOrderBook)を扱うときに困った点や、それを解決した方法について書きます。

追記:色々とバグを直したバージョンを公開しました:

kabukimining.hateblo.jp

なぜWebsocketを使って板情報を扱うのか

デイトレードレベルの頻度でAPIにアクセスするなら、実装ハードルも低いREST APIを使えばいいのですが、 HFTをしようともなると、いちいちアクセスしていてはAPI Limitが枯渇してしまいます。

www.bitmex.com

そこで、では、僕がBitMEXの板情報(いわゆるOrderBook)を扱うときに困った点や、それを解決した方法について書きます。

なぜWebsocketを使って板情報を扱うのか

デイトレードレベルの頻度でAPIにアクセスするなら、実装ハードルも低いREST APIを使えばいいのですが、 HFTをしようともなると、いちいちアクセスしていてはAPI Limitが枯渇してしまいます。

www.bitmex.com

そこで、Websocketを使うことで、API Limitを消費することなく、リアルタイムにOrderBookを高速に扱うことができるのです。 BitMEX先輩もそう言っています。

BitMEX ではリアルタイムデータを購読可能です。 このアクセスはいったん接続するとレートの制限がなくなるため、プログラムに最新データを取り込む最適な方法です。

https://www.bitmex.com/app/wsAPI

公式コネクタを使ってみる

このページにたどり着く皆さんは、BitMEXの公式コネクタの存在を知っていると思います。 ですが、それと同時に公式コネクタはとても遅いこともご存知だと思います。

それについては、よく検証されたnoteがあるのでそちらを参照してください。

note.com

公式コネクタが遅いことがよく分かると思います。

モッチオさんのnoteに貼ってあるものを試す。

モッチオさんのnoteには、公式コネクタをindexを記憶した辞書を使うことによって高速化されたコネクタのコードがあるのですが、 下記noteやコメント欄を見ると分かる通り、完璧なindex管理がなされておらず、時間がたつとorderbookの情報がズレてしまいます。

note.com

そこで、上記noteを書いたmatsuoさんのPuppetterというフレームワークはMITライセンスで公開されているため、 WebSocketコネクタのソースコードを覗いてみました。

puppeteer/inmemorydb_bitmex_websocket.py at master · o-matsuo/puppeteer · GitHub

想像以上に行数が多く、他のプログラムとも結合しているために、コピペは断念(そりゃそう)。

自分で作ることにしました。

自分で作る

そもそもなぜ公式コネクタが遅いのかは、配列の検索するメソッドにあります。 それをindex配列を作ることでモッチオさんは高速化をしていましたが、私はOrderBookさえとれればいいので、 OrderBookの情報をPandasのDataFrameとして扱うことによって、ライブラリの力で高速化しました。

import websocket
import threading
from time import sleep
import json
import urllib
import sys
import pandas as pd

#おまじない
from logging import Formatter, getLogger, StreamHandler, DEBUG

class BitMEXWebsocket:
    max_table_len = 200

    def __init__(self, endpoint, symbol):
        self.formatter = Formatter('%(asctime)-15s - %(levelname)-8s - %(message)s')
        self.logger = getLogger(__name__)
        self.handler = StreamHandler(sys.stdout)
        self.handler.setLevel(20)
        self.handler.setFormatter(self.formatter)
        self.logger.setLevel(20)
        self.logger.addHandler(self.handler)

        self.endpoint = endpoint
        self.symbol = symbol

        self.exited = False

        self.df = pd.DataFrame()
        wsurl = self.__get_url()
        self.logger.info('Connecting to %s' % wsurl)
        self.__connect(wsurl, symbol)  
        self.logger.info('Connected to WS.')
        self.__wait_for_symbol(symbol)

    def __wait_for_symbol(self, symbol):
        while self.df.empty:
            sleep(0.1)
    
    def __get_url(self):
        symbolSubs = ["orderBookL2_25"]
        genericSubs = ["margin"]

        subscriptions = [sub + ':' + self.symbol for sub in symbolSubs]
        subscriptions += genericSubs

        urlparts = list(urllib.parse.urlparse(self.endpoint))
        urlparts[0] = urlparts[0].replace('http', 'ws')
        urlparts[2] = "/realtime?subscribe={}".format(','.join(subscriptions)) 

        return urllib.parse.urlunparse(urlparts)

    
    def __connect(self, wsurl, symbol):
        self.logger.debug('Starting thread')
        self.ws = websocket.WebSocketApp(wsurl, on_message=self.__on_message, 
        on_close=self.__on_close, on_open=self.__on_open, on_error=self.__on_error)

        self.wst = threading.Thread(target=lambda: self.ws.run_forever())
        self.wst.daemon = True
        self.wst.start()
        self.logger.debug('Started thread')
        sleep(5)

    def __on_message(self, message):
        message = json.loads(message)
        action = message['action']
        data = message['data']
        if action == 'partial':
            self.df = pd.io.json.json_normalize(data)
            self.df.to_csv('orderbook.csv')

        elif action == 'update':
            df_data = pd.io.json.json_normalize(data)
            indicies, Is = self.search_index(self.df, df_data)            
            
            for i in range(len(indicies)):
                index = indicies[i]
                id = self.df.iloc[index]['id']

                self.df.iloc[index] = df_data.iloc[Is[i]]
                self.df['price'].iloc[index] = -0.01 * id + 88000000

        elif action == "delete":
            df_data = pd.io.json.json_normalize(data)
            indicies, Is = self.search_index(self.df, df_data)
            

            self.df.drop(index=self.df.index[indicies], inplace=True)
            self.df = self.df.sort_values('price', ascending=False)
            self.df.reset_index(inplace=True, drop=True)

        elif action == "insert":
            df_data = pd.io.json.json_normalize(data)


            self.df = pd.concat([self.df, df_data], sort=True)
            self.df = self.df.sort_values('price', ascending=False)
            self.df.reset_index(inplace=True, drop=True)

    def __on_error(self, error):
        if not self.exited:
            self.logger.error("Error : %s" % error)
            raise websocket.WebSocketException(error)
    
    def __on_open(self):
        self.logger.debug('Websocket Opened')
    
    def __on_close(self):
        self.logger.info('Websocket Closed')

    def get_orderbook(self):
        return self.df

    def search_index(self, a, b):
        # DataFrame a から DataFrame Bと一致する行を探す
        indicies = list()
        Is = list()
        # self.logger.debug(a)
        for i in range(len(b)):
            id = b.iloc[i]['id']
            side = b.iloc[i]['side']

            index = a.query('id == {} and side == "{}"'.format(id, side)).index[0]
            
            indicies.append(index)
            Is.append(i)
        
        return indicies, Is
    
    
if __name__ == "__main__":
    bmw = BitMEXWebsocket(endpoint='wss://www.bitmex.com/realtime', symbol='XBTUSD')
    while True:
        sleep(1)

このクラスの関数get_orderbook()を呼ぶことでOrderBookを得ることができるのですが、 NaNが入ってしまうというバグに悩まされているため、現在調査中です

To be continued....

次回に続く...

追記:色々とバグを直したバージョンを公開しました:

kabukimining.hateblo.jp