クリプトHFTとか競プロとか

競技プログラミングや仮想通貨に関することを中心にブログを書いていきます.

Bybitの板情報をPythonで扱いたい

みなさんこんにちは.

みなさんはBitMEXクローンの仮想通貨取引所は数多あることをご存知だと思います.

今回はあのMAXも使っているBybitで板情報をwebsocketからとって扱おうとして苦労したことなどについて書いていきます

Bybit公式のWebsocketコネクタを使ってみる

BybitはBitMEXと同じように,公式でPythonコネクタが用意されています.

試しにOrderBook情報を扱おうとこんなコードを書いてみます.

 bybitws = BybitWebsocket(wsURL="wss://stream-testnet.bybit.com/realtime", api_key=None, api_secret=None, symbol='BTCUSD')
    bybitws.subscribe_orderBookL2()

    while True:
        sleep(5)
        print(bybitws.get_data('orderBookL2_25.BTCUSD'))

すると返ってくるのは...

'delete': [{'price': '6774.00', 'symbol': 'BTCUSD', 'id': 67740000, 'side': 'Buy'}], 'update': [], 'insert': [{'price': '6773.50', 'symbol': 'BTCUSD', 'id': 67735000, 'side': 'Buy', 'size': 1633}], 'transactTimeE6': 0}
{'delete': [], 'update': [{'price': '6772.00', 'symbol': 'BTCUSD', 'id': 67720000, 'side': 'Buy', 'size': 35784}], 'insert': [], 'transactTimeE6': 0}
{'delete': [{'price': '6776.50', 'symbol': 'BTCUSD', 'id': 67765000, 'side': 'Buy'}, {'price': '6776.00', 'symbol': 'BTCUSD', 'id': 67760000, 'side': 'Buy'}, {'price': '6777.00', 'symbol': 'BTCUSD', 'id': 67770000, 'side': 'Buy'}], 'update': [{'price': '6787.00', 'symbol': 'BTCUSD', 'id': 67870000, 'side': 'Sell', 'size': 17609}, {'price': '6786.50', 'symbol': 'BTCUSD', 'id': 67865000, 'side': 'Sell', 'size': 21198}, {'price': '6785.00', 'symbol': 'BTCUSD', 'id': 67850000, 'side': 'Sell', 'size': 23983}, {'price': '6775.00', 'symbol': 'BTCUSD', 'id': 67750000, 'side': 'Buy', 'size': 2266}], 'insert': [{'price': '6762.00', 'symbol': 'BTCUSD', 'id': 67620000, 'side': 'Buy', 'size': 8432}, {'price': '6775.50', 'symbol': 'BTCUSD', 'id': 67755000, 'side': 'Buy', 'size': 23871}, {'price': '6777.50', 'symbol': 'BTCUSD', 'id': 67775000, 'side': 'Buy', 'size': 45646}], 'transactTimeE6': 0}
{'delete': [{'price': '6798.50', 'symbol': 'BTCUSD', 'id': 67985000, 'side': 'Sell'}, {'price': '6765.50', 'symbol': 'BTCUSD', 'id': 67655000, 'side': 'Buy'}, {'price': '6765.00', 'symbol': 'BTCUSD', 'id': 67650000, 'side': 'Buy'}], 'update': [{'price': '6778.00', 'symbol': 'BTCUSD', 'id': 67780000, 'side': 'Buy', 'size': 22444}, {'price': '6778.50', 'symbol': 'BTCUSD', 'id': 67785000, 'side': 'Buy', 'size': 37785}, {'price': '6779.00', 'symbol': 'BTCUSD', 'id': 67790000, 'side': 'Buy', 'size': 45131}, {'price': '6786.50', 'symbol': 'BTCUSD', 'id': 67865000, 'side': 'Sell', 'size': 27567}], 'insert': [{'price': '6780.50', 'symbol': 'BTCUSD', 'id': 67805000, 'side': 'Buy', 'size': 40484}, {'price': '6780.00', 'symbol': 'BTCUSD', 'id': 67800000, 'side': 'Buy', 'size': 43470}, {'price': '6787.50', 'symbol': 'BTCUSD', 'id': 67875000, 'side': 'Sell', 'size': 17584}], 'transactTimeE6': 0}

...

...

...

...

...

....

...

そう. 受け取った情報をそのまま流してくるのである!

これを扱いたいBotterは,削除やアップデートの各クエリに対してのプログラムを書かなくてはいけない. BitMEXとは大違いである.

仕方ないので自分で書く

なので,自分の記事 を参考に,板情報をpandasのDataFrame型で返す関数を追加したBybitWebsocketクラスを書いた.

import sys
import websocket
import threading
import traceback
import pandas as pd
from time import sleep
import json
import logging
import urllib
import math
import time
import hmac
from logging import Formatter, getLogger, StreamHandler, DEBUG, INFO, WARNING, basicConfig

# This is a simple adapters for connecting to Bybit's websocket API.
# You could use methods whose names begin with “subscribe”, and get result by "get_data" method.
# All the provided websocket APIs are implemented, includes public and private topic.
# Private methods are only available after authorization, but public methods do not.
# If you have any questions during use, please contact us vim the e-mail "support@bybit.com".


class BybitWebsocket:

    #User can ues MAX_DATA_CAPACITY to control memory usage.
    MAX_DATA_CAPACITY = 200
    PRIVATE_TOPIC = ['position', 'execution', 'order']
    def __init__(self, wsURL, symbol, api_key, api_secret):
        
        self.df = pd.DataFrame()
        self.formatter = Formatter('%(asctime)-15s - %(levelname)-8s - %(message)s')
        self.logger = getLogger(__name__)
        self.handler = StreamHandler(sys.stdout)
        self.handler.setLevel(DEBUG)
        self.handler.setFormatter(self.formatter)
        self.logger.setLevel(DEBUG)
        self.logger.addHandler(self.handler)
        self.logger.debug("Initializing WebSocket.")
        self.symbol = symbol

        if api_key is not None and api_secret is None:
            raise ValueError('api_secret is required if api_key is provided')
        if api_key is None and api_secret is not None:
            raise ValueError('api_key is required if api_secret is provided')

        self.api_key = api_key
        self.api_secret = api_secret

        self.data = {}
        self.exited = False
        self.auth = False
        # We can subscribe right in the connection querystring, so let's build that.
        # Subscribe to all pertinent endpoints
        self.logger.info("Connecting to %s" % wsURL)
        self.__connect(wsURL)
    def exit(self):
        '''Call this to exit - will close websocket.'''
        self.exited = True
        self.ws.close()

    def __connect(self, wsURL):
        '''Connect to the websocket in a thread.'''
        self.logger.debug("Starting thread")

        self.ws = websocket.WebSocketApp(wsURL,
                                         on_message=self.__on_message,
                                         on_close=self.__on_close,
                                         on_open=self.__on_open,
                                         on_error=self.__on_error,
                                         keep_running=True)

        self.wst = threading.Thread(target=lambda: self.ws.run_forever())
        self.wst.daemon = True
        self.wst.start()
        self.logger.debug("Started thread")

        # Wait for connect before continuing
        retry_times = 5
        while not self.ws.sock or not self.ws.sock.connected and retry_times:
            sleep(1)
            retry_times -= 1
        if retry_times == 0 and not self.ws.sock.connected: 
            self.logger.error("Couldn't connect to WebSocket! Exiting.")
            self.exit()
            raise websocket.WebSocketTimeoutException('Error!Couldn not connect to WebSocket!.')

        if self.api_key and self.api_secret:
            self.__do_auth()

    def generate_signature(self, expires):
        """Generate a request signature."""
        _val = 'GET/realtime' + expires
        return str(hmac.new(bytes(self.api_secret, "utf-8"), bytes(_val, "utf-8"), digestmod="sha256").hexdigest())

    def __do_auth(self):

        expires = str(int(round(time.time())+1))+"000"
        signature = self.generate_signature(expires)

        auth = {}
        auth["op"] = "auth"
        auth["args"] = [self.api_key, expires, signature]

        args = json.dumps(auth)

        self.ws.send(args)

    def __on_message(self, message):
        '''Handler for parsing WS messages.'''
        message = json.loads(message)

        if 'success' in message and message["success"]:   
            if 'request' in message and message["request"]["op"] == 'auth':
                self.auth = True
                self.logger.info("Authentication success.")
            if 'ret_msg' in message and message["ret_msg"] == 'pong':
                self.data["pong"].append("PING success")

        if 'topic' in message:
            self.data[message["topic"]].append(message["data"])

            if message['topic'] == 'orderBookL2_25.{}'.format(self.symbol):
                typ = message['type']
                data = message['data']
                if typ == 'snapshot':
                    self.df = pd.io.json.json_normalize(message['data'])
                elif typ == 'delta':
                    # self.logger.debug(data)
                    for key, value in data.items():                
                        if key == 'transactTimeE6':
                            continue
                        elif key == 'update':
                            df_data = pd.io.json.json_normalize(value)
                            indicies, Is = self.search_index(self.df, df_data)            

                            for i in range(len(indicies)):
                                index = indicies[i]
                                id = self.df.iloc[index]['id']
                                price = self.df.loc[index, 'price']

                                self.df.loc[index] = df_data.iloc[Is[i]]
                                self.df.loc[index, 'price'] = price

                        elif key == 'delete':
                            df_data = pd.io.json.json_normalize(value)
                            
                            indicies, Is = self.search_index(self.df, df_data)
                            self.df.drop(index=self.df.index[indicies], inplace=True)
                            self.df = self.df.sort_values('price', ascending=False)
                            self.df.reset_index(inplace=True, drop=True)
        
                        elif key == "insert":
                            df_data = pd.io.json.json_normalize(value)
                            self.df = pd.concat([self.df, df_data], sort=True)
                            self.df = self.df.sort_values('price', ascending=False)
                            self.df.reset_index(inplace=True, drop=True)

                        
                            
            if len(self.data[message["topic"]]) > BybitWebsocket.MAX_DATA_CAPACITY:
                self.data[message["topic"]] = self.data[message["topic"]][BybitWebsocket.MAX_DATA_CAPACITY//2:]

    def __on_error(self, error):
        '''Called on fatal websocket errors. We exit on these.'''
        if not self.exited:
            self.logger.error("Error : %s" % error)
            raise websocket.WebSocketException(error)

    def __on_open(self):
        '''Called when the WS opens.'''
        self.logger.debug("Websocket Opened.")

    def __on_close(self):
        '''Called on websocket close.'''
        self.logger.info('Websocket Closed')

    def ping(self):
        self.ws.send('{"op":"ping"}')
        if 'pong' not in self.data:
            self.data['pong'] = []

    def subscribe_kline(self, interval:str):
        param = {}
        param['op'] = 'subscribe'
        param['args'] = ['kline.' + self.symbol + '.' + interval]
        self.ws.send(json.dumps(param))
        if 'kline.' + self.symbol + '.' + interval not in self.data:
            self.data['kline.' + self.symbol + '.' + interval] = []

    def subscribe_trade(self):
        self.ws.send('{"op":"subscribe","args":["trade"]}')
        if "trade.BTCUSD" not in self.data:
            self.data["trade.BTCUSD"] = []
            self.data["trade.ETHUSD"] = []
            self.data["trade.EOSUSD"] = []
            self.data["trade.XRPUSD"] = []

    def subscribe_insurance(self):
        self.ws.send('{"op":"subscribe","args":["insurance"]}')
        if 'insurance.BTC' not in self.data:
            self.data['insurance.BTC'] = []
            self.data['insurance.XRP'] = []
            self.data['insurance.EOS'] = []
            self.data['insurance.ETH'] = []

    def subscribe_orderBookL2(self):
        param = {}
        param['op'] = 'subscribe'
        param['args'] = ['orderBookL2_25.' + self.symbol]
        self.ws.send(json.dumps(param))
        if 'orderBookL2_25.' + self.symbol not in self.data:
            self.data['orderBookL2_25.' + self.symbol] = []

    def subscribe_instrument_info(self):
        param = {}
        param['op'] = 'subscribe'
        param['args'] = ['instrument_info.100ms.' + self.symbol]
        self.ws.send(json.dumps(param))
        if 'instrument_info.100ms.' + self.symbol not in self.data:
            self.data['instrument_info.100ms.' + self.symbol] = []

    def subscribe_position(self):
        self.ws.send('{"op":"subscribe","args":["position"]}')
        if 'position' not in self.data:
            self.data['position'] = []

    def subscribe_execution(self):
        self.ws.send('{"op":"subscribe","args":["execution"]}')
        if 'execution' not in self.data:
            self.data['execution'] = []

    def subscribe_order(self):
        self.ws.send('{"op":"subscribe","args":["order"]}')
        if 'order' not in self.data:
            self.data['order'] = []

    def get_data(self, topic):
        if topic not in self.data:
            self.logger.info(" The topic %s is not subscribed." % topic)
            return []
        if topic.split('.')[0] in BybitWebsocket.PRIVATE_TOPIC and not self.auth:
            self.logger.info("Authentication failed. Please check your api_key and api_secret. Topic: %s" % topic)
            return []
        else:
            if len(self.data[topic]) == 0:
                # self.logger.info(" The topic %s is empty." % topic)
                return []
            # while len(self.data[topic]) == 0 :
            #     sleep(0.1)
            return self.data[topic].pop()

    def get_df(self):
        return self.df

    def search_index(self, a, b):
        # DataFrame a から DataFrame Bと一致する行を探す
        indicies = list()
        Is = list()

        for i in range(len(b)):
            id = b.iloc[i]['id']
            side = b.iloc[i]['side']

            index = a.query('id == {} and side == "{}"'.format(id, side)).index[0]
            indicies.append(index)
            Is.append(i)

        return indicies, Is

if __name__ == "__main__":
    bybitws = BybitWebsocket(wsURL="wss://stream-testnet.bybit.com/realtime", api_key=None, api_secret=None, symbol='BTCUSD')
    bybitws.subscribe_orderBookL2()

    while True:
        sleep(5)
        print(bybitws.get_data('orderBookL2_25.BTCUSD'))

使い方

もとのコネクタと同じように bybitws = BybitWebsocket(wsURL="wss://stream-testnet.bybit.com/realtime", api_key=None, api_secret=None, symbol='BTCUSD') として,get_df関数を呼ぶことで,OrderBook情報がpandasのDataFrame型として返ってくる. bybitws.get_df()

         id    price  side   size  symbol
0   67980000  6798.00  Sell   4168  BTCUSD
1   67975000  6797.50  Sell  16294  BTCUSD
2   67970000  6797.00  Sell    342  BTCUSD
3   67965000  6796.50  Sell  17166  BTCUSD
4   67960000  6796.00  Sell  10858  BTCUSD
5   67955000  6795.50  Sell   8516  BTCUSD
6   67950000  6795.00  Sell  27528  BTCUSD
7   67945000  6794.50  Sell  26413  BTCUSD
8   67940000  6794.00  Sell  23465  BTCUSD
9   67935000  6793.50  Sell  21457  BTCUSD
10  67930000  6793.00  Sell   6215  BTCUSD
11  67925000  6792.50  Sell  18226  BTCUSD
12  67920000  6792.00  Sell   4577  BTCUSD
13  67915000  6791.50  Sell  12060  BTCUSD
14  67910000  6791.00  Sell  16535  BTCUSD
15  67905000  6790.50  Sell  22345  BTCUSD
16  67900000  6790.00  Sell   7112  BTCUSD
17  67895000  6789.50  Sell  23136  BTCUSD
18  67890000  6789.00  Sell  18644  BTCUSD
19  67885000  6788.50  Sell   8303  BTCUSD
20  67880000  6788.00  Sell    322  BTCUSD
21  67865000  6786.50  Sell   8690  BTCUSD
22  67860000  6786.00  Sell  15285  BTCUSD
23  67855000  6785.50  Sell  14570  BTCUSD
24  67850000  6785.00  Sell  10832  BTCUSD
25  67805000  6780.50   Buy   9987  BTCUSD
26  67800000  6780.00   Buy  14040  BTCUSD
27  67795000  6779.50   Buy   3923  BTCUSD
28  67790000  6779.00   Buy   9540  BTCUSD
29  67780000  6778.00   Buy    513  BTCUSD
30  67775000  6777.50   Buy  30129  BTCUSD
31  67770000  6777.00   Buy  29851  BTCUSD
32  67765000  6776.50   Buy  11973  BTCUSD
33  67760000  6776.00   Buy  11096  BTCUSD
34  67755000  6775.50   Buy   8102  BTCUSD
35  67745000  6774.50   Buy  16637  BTCUSD
36  67740000  6774.00   Buy  31585  BTCUSD
37  67735000  6773.50   Buy  10602  BTCUSD
38  67730000  6773.00   Buy  14331  BTCUSD
39  67725000  6772.50   Buy  31113  BTCUSD
40  67720000  6772.00   Buy  26608  BTCUSD
41  67715000  6771.50   Buy   5231  BTCUSD
42  67710000  6771.00   Buy  44615  BTCUSD
43  67705000  6770.50   Buy  13054  BTCUSD
44  67700000  6770.00   Buy  28312  BTCUSD
45  67695000  6769.50   Buy   3325  BTCUSD
46  67690000  6769.00   Buy  16470  BTCUSD
47  67685000  6768.50   Buy  13675  BTCUSD
48  67680000  6768.00   Buy  22811  BTCUSD
49  67675000  6767.50   Buy  16363  BTCUSD

おわりに

過去のコードがほとんどそのまま使えたので助かった.

もしなにかこのコードにバグや不明な点などがあったら気軽にコメントお願いします.

では.