跳到主要内容

数据回调

本节介绍如何处理通过订阅接收到的数据、相关的数据结构以及回调函数。

订阅方式概览

系统支持多种数据订阅方式,以满足不同场景的需求,具体请参考数据源订阅

数据回调函数

订阅数据后,相应的数据会通过 BaseStrategy 中定义的 on_ 系列回调函数推送到策略。您需要根据业务逻辑在策略类中实现这些回调函数。

频道参数类型说明是否私有对应回调
MarkPriceArray(String)标记价格on_mark_price
BboArray(String)最佳买卖价on_bbo
DepthDepthWsParams市场深度on_depth
FundingArray(String)资金费率on_funding
TradeArray(String)逐笔成交on_trade
InstrumentArray(String)币对产品on_instrument
KlineArray(String)K线on_kline
OrderArray(String)订单更新on_order
OrderAndFillArray(String)订单以及成交on_order_and_fill
PositionArray(String)仓位更新on_position
Balance-余额更新on_balance
FundingFeeArray(String)结算资金费率on_funding_fee

回调函数说明

通常情况下,回调函数会包含以下参数:

  • exchange: 交易所名称
  • context: 上下文对象,包含当前策略的上下文信息,主要用于延迟统计,有些回调函数可能不存在。
  • data: 数据对象,包含具体的数据信息,例如bbodepthfundingtradeinstrument等数据,具体数据结构请参考下文

context 数据结构如下:

参数列表

参数名类型描述
latencyObject延迟统计用, python策略端暂时不用管
latency.timerInt计时器
latency.timesObject时间统计
latency.times.strategy_beginInt策略开始时间
latency.times.strategy_endInt策略结束时间
latency.times.ex_command_beginInt执行命令开始时间
latency.times.place_order_endInt下单结束时间
latency.times.amend_endInt修改结束时间
latency.label_timesObject标签时间
latency.label_times.tickerFloat行情数据时间
latency.label_duration_metasArray标签持续时间元数据
request_idInt请求ID, 调用异步指令时可设置为请求ID, 方便回调时识别

示例

{
"latency": {
"timer": 75713614,
"times": {
"strategy_begin": null,
"strategy_end": null,
"ex_command_begin": null,
"place_order_end": null,
"amend_end": null
},
"label_times": {
"ticker": 0.0
},
"label_duration_metas": []
},
"request_id": 12345
}

以下是订阅和对应回调函数和返回值的说明。

订阅MarkPrice(标记价格)

订阅示例:

def subscribes(self):
subs = [
{
"account_id": 0,
"sub": {
"SubscribeWs": [
{
"MarkPrice": ["BTC_USDT"],
},
]
}
}
]
return subs

回调函数:

def on_mark_price(self, exchange, mark_price):
"""
标记价格更新时触发的方法。

Args:
exchange (str): 交易所名称。
mark_price: 标记价格。
"""
self.trader.log(f"on_mark_price: {exchange} {mark_price}")

数据结构:

mark_price 标记价格数据结构如下:

参数名类型是否必填描述
symbolString交易对
priceFloat标记价格

示例:

{
"symbol": "BTC_USDT",
"price": 28000.0
}

订阅Bbo(最佳买卖价)

订阅示例:

def subscribes(self):
subs = [
{
"account_id": 0,
"sub": {
"SubscribeWs": [
{
"Bbo": ["BTC_USDT"],
},
]
}
}
]
return subs

回调函数:

def on_bbo(self, exchange, context, bbo):
"""
最佳买卖价变动时触发的方法。

Args:
exchange (str): 交易所名称。
context: 上下文对象。
bbo: 最佳买一卖一。
"""
self.trader.log(f"on_bbo: {exchange} {bbo}")

数据结构:

bbo 最佳买卖价数据结构如下:

参数名类型是否必填描述
symbolString交易对
bid_priceFloat最佳买单价格
bid_qtyFloat最佳买单数量
ask_priceFloat最佳卖单价格
ask_qtyFloat最佳卖单数量
timestampInt时间戳

示例:

{
"symbol": "BTC_USDT",
"bid_price": 28000.5,
"bid_qty": 0.1,
"ask_price": 28001.0,
"ask_qty": 0.2,
"timestamp": 1678886400000
}

订阅Depth(市场深度)

订阅示例:

def subscribes(self):
subs = [
{
"account_id": 0,
"sub": {
"SubscribeWs": [
{
"Depth": ["BTC_USDT"],
},
]
}
}
]
return subs

回调函数:

def on_depth(self, exchange, context, depth):
"""
市场深度更新时触发的方法。

Args:
exchange (str): 交易所名称。
context: 上下文对象。
depth: 市场深度对象。
"""
self.trader.log(f"on_depth: {exchange} {depth}")

数据结构:

depth 市场深度数据结构如下:

参数名类型是否必填描述
symbolString交易对
timestampInt时间戳
bidsArray(DepthEntry)买单深度列表
asksArray(DepthEntry)卖单深度列表

DepthEntry 结构:

参数名类型是否必填描述
priceFloat价格
amountFloat数量

示例:

{
"symbol": "BTC_USDT",
"timestamp": 1678886400000,
"bids": [[28000.5, 0.1], [28000.0, 0.5], [27999.5, 1.2]],
"asks": [[28001.0, 0.2], [28001.5, 0.8], [28002.0, 1.5]]
}

订阅Funding(资金费率)

订阅示例:

def subscribes(self):
subs = [
{
"account_id": 0,
"sub": {
"SubscribeWs": [
{
"Funding": ["BTC_USDT"],
},
]
}
}
]
return subs

回调函数:

def on_funding(self, exchange, fundings):
"""
资金费率更新时触发的方法。

Args:
exchange (str): 交易所名称。
fundings: 资金费率对象列表。
"""
self.trader.log(f"on_funding: {exchange} {fundings}")

数据结构:

funding 资金费率数据结构如下:

参数名类型是否必填描述
symbolString交易对
funding_rateFloat资金费率
next_funding_atInt下次资金费时间戳
funding_intervalInt资金费结算周期(小时)

示例:

{
"symbol": "BTC_USDT",
"funding_rate": 0.0001,
"next_funding_at": 1678915200000,
"funding_interval": 8
}

订阅Trade(逐笔成交)

订阅示例:

def subscribes(self):
subs = [
{
"account_id": 0,
"sub": {
"SubscribeWs": [
{
"Trade": ["BTC_USDT"],
},
]
}
}
]
return subs

回调函数:

def on_trade(self, exchange, context, trade):
"""
成交更新时触发的方法。

Args:
exchange (str): 交易所名称。
context: 上下文对象。
trade: 成交对象。
"""
self.trader.log(f"on_trade: {exchange} {trade}")

数据结构:

trade 逐笔成交数据结构如下:

参数名类型是否必填描述
idString交易所返回的订单ID
symbolString交易对
timestampInt时间戳
priceFloat成交价格
amountFloat成交数量
sideString成交方向,Buy/Sell

示例:

{
"id": "12345678",
"symbol": "BTC_USDT",
"timestamp": 1678886400000,
"price": 28000.0,
"amount": 0.05,
"side": "Buy"
}

订阅Instrument(合约信息)

订阅示例:

def subscribes(self):
subs = [
{
"account_id": 0,
"sub": {
"SubscribeWs": [
{
"Instrument": ["BTC_USDT"],
},
]
}
}
]
return subs

回调函数:

def on_instrument(self, exchange, instruments):
"""
合约信息更新时触发的方法。

Args:
exchange (str): 交易所名称。
instruments: 合约信息对象列表。
"""
self.trader.log(f"on_instrument: {exchange} {instruments}")

数据结构:

instrument 合约信息数据结构如下:

参数名类型是否必填描述
symbolString交易对
stateString合约状态
price_tickFloat价格步长
amount_tickFloat数量步长
price_precisionInt交易所的价格小数位
amount_precisionInt交易所的数量小数位
min_qtyFloat数量下限, 最小下单数量
min_notionalFloat最小名义价值
price_multiplierFloat价格乘数
amount_multiplierFloat数量乘数

示例:

{
"symbol": "BTC_USDT",
"state": "Normal",
"price_tick": 0.1,
"amount_tick": 0.001,
"price_precision": 1,
"amount_precision": 3,
"min_qty": 0.001,
"min_notional": 5.0,
"price_multiplier": 1.0,
"amount_multiplier": 1.0
}

订阅Kline(K线)

订阅示例:

def subscribes(self):
subs = [
{
"account_id": 0,
"sub": {
"SubscribeWs": [
{
"Kline": {
"symbols": ["BTC_USDT"],
"interval": "1m"
}
}
]
}
}
]
return subs

回调函数:

def on_kline(self, exchange, context, kline):
"""
K线更新时触发的方法。

Args:
exchange (str): 交易所名称。
context: 上下文对象。
kline: K线对象。
"""
self.trader.log(f"on_kline: {exchange} {context} {kline}")

数据结构: kline 数据结构如下:

参数名类型是否必填描述
symbolString交易对
intervalString时间间隔
candlesArray(Candle)蜡烛图数据列表

Candle 数据结构:

字段类型是否必须描述
timestampNumber时间戳,毫秒
openNumber开盘价
highNumber最高价
lowNumber最低价
closeNumber收盘价
volumeNumber成交量(基础货币)
quote_volumeNumber成交额(报价货币)
tradesNumber成交笔数
taker_buy_volumeNumbertaker_buy成交量(基础货币)
taker_buy_quote_volumeNumbertaker_buy成交量(报价货币)
confirmBooleanK线状态:false 代表 K 线未完结,true 代表 K 线已完结

示例:

{
"symbol": "BTC_USDT",
"interval": "1m",
"candles": [
{
"timestamp": 1672531200000,
"open": 10000.0,
"high": 10000.0,
"low": 10000.0,
"close": 10000.0,
"volume": 1000.0,
"quote_volume": 10000000.0,
"trades": 100,
"taker_buy_volume": 1000.0,
"taker_buy_quote_volume": 10000000.0,
"confirm": true
}
]
}

订阅Order(订单更新)

订阅示例:

def subscribes(self):
subs = [
{
"account_id": 0,
"sub": {
"SubscribeWs": [
{
"Order": ["BTC_USDT"],
},
]
}
}
]
return subs

回调函数:

def on_order(self, account_id, context, order):
"""
订单状态更新时触发的方法。

Args:
account_id (str): 账户 ID。
context (dict): 上下文对象。
order (dict): 订单对象。
"""
self.trader.log(f"on_order: {account_id} {context} {order}")

def on_order_submitted(self, account_id, context, order_id_result, order):
"""
处理订单提交事件回调

当订单提交到交易所后,处理交易所返回的确认信息

参数:
account_id: 账户ID
context: 上下文信息
order_id_result: 订单ID结果
order: 订单信息
"""
pass


def on_batch_order_submitted(self, account_id, context, order_ids_result):
"""
批量订单提交成功时触发的方法。

Args:
account_id (str): 账户 ID。
context (dict): 上下文对象。
order_ids (list): 订单 ID 列表。
"""
pass

def on_order_canceled(self, account_id, context, result, id, symbol):
"""
订单取消成功时触发的方法。

Args:
account_id (str): 账户 ID。
context (dict): 上下文对象。
order_id (str): 订单 ID。
id (str): 撤单时传入的订单id/订单cid。
symbol(str): 撤单时传入的symbol。
"""
pass

def on_batch_order_canceled(self, account_id, context, order_ids_result):
"""
批量订单取消成功时触发的方法。

Args:
account_id (str): 账户 ID。
context (dict): 上下文对象。
order_ids (list): 订单 ID 列表。
"""
pass

def on_batch_order_canceled_by_ids(self, account_id, context, order_ids_result):
"""
批量订单取消成功时触发的方法。

Args:
account_id (str): 账户 ID。
context (dict): 上下文对象。
order_ids (list): 订单 ID 列表。
"""
pass

def on_order_amended(self, account_id, context, result, order):
"""
订单修改成功时触发的方法。

Args:
account_id (str): 账户 ID。
context (dict): 上下文对象。
order_id (str): 订单 ID。
order(dict): 修改订单时传入的订单信息
"""
pass

数据结构:

order 订单更新数据结构如下:

参数列表

参数名类型是否必填描述
idString订单ID
cidNull客户ID
timestampInt时间戳
statusString订单状态,参考OrderStatus枚举
symbolString交易对
order_typeString订单类型,参考OrderType枚举
sideString买卖方向 参考OrderSide枚举
pos_sideString仓位方向, 单向持仓下非必填,当仅减仓时才需要,参考PositionSide枚举
time_in_forceString有效期,参考TimeInForce枚举
priceNull价格 , 市价单可为None
amountFloat数量
filledFloat已成交数量
filled_avg_priceFloat平均成交价格
sourceString订单来源, 参考OrderSource枚举

OrderStatus 枚举值

  • Open: 未成交
  • PartiallyFilled: 部分成交
  • Filled: 全部成交
  • Canceled: 已取消

OrderType 枚举值

  • Limit: 限价单
  • Market: 市价单

OrderSide 枚举值

  • Buy: 买入
  • Sell: 卖出

PositionSide 枚举值

  • Long: 多头
  • Short: 空头

TimeInForce 枚举值

  • GTC: 当前有效
  • IOC: 立即成交否则取消
  • FOK: 立即完全成交否则取消
  • PostOnly: 仅Maker

OrderSource 枚举值

  • Order: 订单
  • UserTrade: 用户成交

示例

{
"id": "",
"cid": null,
"timestamp": 0,
"status": "Open",
"symbol": "",
"order_type": "Limit",
"side": "Buy",
"pos_side": "Long",
"time_in_force": "GTC",
"price": null,
"amount": 0.0,
"filled": 0.0,
"filled_avg_price": 0.0,
"source": "Order"
}

订阅OrderAndFill(订单以及成交)

信息

同时订阅订单频道和用户私有成交频道时,系统会优先推送速度更快的数据(通常私有成交频道的推送速度优于订单频道)。需要注意的是,用户私有成交频道仅包含当前成交数量和当前成交均价信息,可能缺少订单类型、买卖方向等完整字段。因此,系统通过order结构体中的source字段来区分数据来源是订单频道还是私有成交频道。这两个频道可以同时订阅,互不影响。

订阅示例:

def subscribes(self):
subs = [
{
"account_id": 0,
"sub": {
"SubscribeWs": [
{
"OrderAndFill": ["BTC_USDT"],
},
]
}
}
]
return subs

回调函数:

def on_order_and_fill(self, account_id, context, order):
"""
订单以及成交更新时触发的方法。

Args:
account_id (str): 账户 ID。
context (dict): 上下文对象。
order (dict): 订单对象。
"""
self.trader.log(f"on_order_and_fill: {account_id} {context} {order}")

数据结构:

order 订单以及成交数据结构同订阅Order(订单更新)

示例:

{
"id": "12312345",
"cid": null,
"timestamp": 0,
"status": "Filled",
"symbol": "BTC_USDT",
"order_type": "Limit",
"side": "Buy",
"pos_side": "Long",
"time_in_force": "GTC",
"price": 100000.0,
"amount": 0.05,
"filled": 0.05,
"filled_avg_price": 100000.0,
"source": "UserTrade"
}

订阅Position(持仓更新)

订阅示例:

def subscribes(self):
subs = [
{
"account_id": 0,
"sub": {
"SubscribeWs": [
{
"Position": ["BTC_USDT"],
},
]
}
}
]
return subs

回调函数:

def on_position(self, account_id, positions):
"""
持仓更新时触发的方法。

Args:
account_id (str): 账户 ID。
positions: 持仓对象列表。
"""
self.trader.log(f"on_position: {account_id} {positions}")

数据结构:

position 持仓更新数据结构如下:

字段类型是否必填描述
symbolString交易对名称
timestampNumber时间戳
margin_modeString保证金模式: Cross/Isolated
sideString持仓方向: Long/Short
leverageNumber杠杆倍数
amountNumber持仓数量
entry_priceNumber开仓均价
unrealized_pnlNumber未实现盈亏

示例:

{
"symbol": "BTC_USDT",
"timestamp": 1678886400000,
"margin_mode": "Cross",
"side": "Long",
"leverage": 20,
"amount": 0.5,
"entry_price": 28500.0,
"unrealized_pnl": -75.0
}

订阅Balance(余额更新)

订阅示例:

def subscribes(self):
subs = [
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {
"secs": 1,
"nanos": 0
},
"rest_type": "Balance" # 每秒更新账户余额
}
}
}
]
return subs

回调函数:

def on_balance(self, account_id, balances):
"""
处理余额更新事件

当账户余额变化时,更新本地余额记录

参数:
account_id: 账户ID
balances: 更新后的余额信息
"""
self.trader.log(f"on_balance: {account_id} {balances}")

数据结构:

balance 余额更新数据结构如下:

字段类型是否必填描述
assetString资产名称
balanceNumber余额
available_balanceNumber可用余额
unrealized_pnlNumber未实现盈亏

示例:

{
"asset": "USDT",
"balance": 10000.0,
"available_balance": 8000.0,
"unrealized_pnl": 150.0
}

订阅FundingFee(结算资金费率)

注意

因各个交易所的推送数据格式不一致,暂时先停止使用该频道。

订阅示例:

def subscribes(self):
subs = [
{
"account_id": 0,
"sub": {
"SubscribeWs": [
{
"FundingFee": ["BTC_USDT"]
}
],
}
}
]
return subs

回调函数:

def on_funding_fee(self, account_id, context, funding_fee):
"""
资金费结算时触发的方法。

Args:
account_id (str): 账户 ID。
context: 上下文对象。
funding_fee: 资金费对象。
"""
self.trader.log(f"on_funding_fee: {account_id} {context} {funding_fee}")

数据结构:

funding_fee 资金费结算数据结构如下:

字段类型是否必填描述
symbolString交易对
funding_feeNumber结算资金费率
timestampNumber时间戳

示例:

{
"symbol": "BTC_USDT",
"funding_fee": 0.0001,
"timestamp": 1678886400000
}

最佳实践

  1. 数据订阅

    • 根据策略需求选择合适的订阅方式
    • 避免重复订阅相同数据
    • 及时取消不需要的订阅
  2. 数据处理

    • 进行数据格式验证
    • 处理异常值和缺失值
    • 实现数据清洗和标准化
  3. 数据存储

    • 选择合适的存储方式
    • 实现数据压缩
    • 定期清理过期数据
  4. 性能优化

    • 使用批量处理
    • 实现数据缓存
    • 优化数据结构