数据回调
本节介绍如何处理通过订阅接收到的数据、相关的数据结构以及回调函数。
订阅方式概览
系统支持多种数据订阅方式,以满足不同场景的需求,具体请参考数据源订阅。
数据回调函数
订阅数据后,相应的数据会通过 BaseStrategy 中定义的 on_ 系列回调函数推送到策略。您需要根据业务逻辑在策略类中实现这些回调函数。
| 频道 | 参数类型 | 说明 | 是否私有 | 对应回调 |
|---|---|---|---|---|
MarkPrice | Array(String) | 标记价格 | 否 | on_mark_price |
Bbo | Array(String) | 最佳买卖价 | 否 | on_bbo |
Depth | DepthWsParams | 市场深度 | 否 | on_depth |
Funding | Array(String) | 资金费率 | 否 | on_funding |
Trade | Array(String) | 逐笔成交 | 否 | on_trade |
Instrument | Array(String) | 币对产品 | 否 | on_instrument |
Kline | Array(String) | K线 | 否 | on_kline |
Order | Array(String) | 订单更新 | 是 | on_order |
OrderAndFill | Array(String) | 订单以及成交 | 是 | on_order_and_fill |
Position | Array(String) | 仓位更新 | 是 | on_position |
Balance | - | 余额更新 | 是 | on_balance |
FundingFee | Array(String) | 结算资金费率 | 是 | on_funding_fee |
回调函数说明
通常情况下,回调函数会包含以下参数:
exchange: 交易所名称context: 上下文对象,包含当前策略的上下文信息,主要用于延迟统计,有些回调函数可能不存在。data: 数据对象,包含具体的数据信息,例如bbo、depth、funding、trade、instrument等数据,具体数据结构请参考下文。
context 数据结构如下:
参数列表
| 参数名 | 类型 | 描述 |
|---|---|---|
| latency | Object | 延迟统计用, python策略端暂时不用管 |
| latency.timer | Int | 计时器 |
| latency.times | Object | 时间统计 |
| latency.times.strategy_begin | Int | 策略开始时间 |
| latency.times.strategy_end | Int | 策略结束时间 |
| latency.times.ex_command_begin | Int | 执行命令开始时间 |
| latency.times.place_order_end | Int | 下单结束时间 |
| latency.times.amend_end | Int | 修改结束时间 |
| latency.label_times | Object | 标签时间 |
| latency.label_times.ticker | Float | 行情数据时间 |
| latency.label_duration_metas | Array | 标签持续时间元数据 |
| request_id | Int | 请求ID, 调用异步指令时可设置为请求ID, 方便回调时识别 |
示例
{
"latency": {
"timer": 75713614,
"times": {
"strategy_begin": null,
"strategy_end": null,
"ex_command_begin": null,
"place_order_end": null,
"amend_end": null
},
"label_times": {
"ticker": 0.0
},
"label_duration_metas": []
},
"request_id": 12345
}
以下是订阅和对应回调函数和返回值的说明。
订阅MarkPrice(标记价格)
订阅示例:
def subscribes(self):
subs = [
{
"account_id": 0,
"sub": {
"SubscribeWs": [
{
"MarkPrice": ["BTC_USDT"],
},
]
}
}
]
return subs
回调函数:
def on_mark_price(self, exchange, mark_price):
"""
标记价格更新时触发的方法。
Args:
exchange (str): 交易所名称。
mark_price: 标记价格。
"""
self.trader.log(f"on_mark_price: {exchange} {mark_price}")
数据结构:
mark_price 标记价格数据结构如下:
| 参数名 | 类型 | 是否必填 | 描述 |
|---|---|---|---|
| symbol | String | 是 | 交易对 |
| price | Float | 是 | 标记价格 |
示例:
{
"symbol": "BTC_USDT",
"price": 28000.0
}
订阅Bbo(最佳买卖价)
订阅示例:
def subscribes(self):
subs = [
{
"account_id": 0,
"sub": {
"SubscribeWs": [
{
"Bbo": ["BTC_USDT"],
},
]
}
}
]
return subs
回调函数:
def on_bbo(self, exchange, context, bbo):
"""
最佳买卖价变动时触发的方法。
Args:
exchange (str): 交易所名称。
context: 上下文对象。
bbo: 最佳买一卖一。
"""
self.trader.log(f"on_bbo: {exchange} {bbo}")
数据结构:
bbo 最佳买卖价数据结构如下:
| 参数名 | 类型 | 是否必填 | 描述 |
|---|---|---|---|
| symbol | String | 是 | 交易对 |
| bid_price | Float | 是 | 最佳买单价格 |
| bid_qty | Float | 是 | 最佳买单数量 |
| ask_price | Float | 是 | 最佳卖单价格 |
| ask_qty | Float | 是 | 最佳卖单数量 |
| timestamp | Int | 是 | 时间戳 |
示例:
{
"symbol": "BTC_USDT",
"bid_price": 28000.5,
"bid_qty": 0.1,
"ask_price": 28001.0,
"ask_qty": 0.2,
"timestamp": 1678886400000
}
订阅Depth(市场深度)
订阅示例:
def subscribes(self):
subs = [
{
"account_id": 0,
"sub": {
"SubscribeWs": [
{
"Depth": ["BTC_USDT"],
},
]
}
}
]
return subs
回调函数:
def on_depth(self, exchange, context, depth):
"""
市场深度更新时触发的方法。
Args:
exchange (str): 交易所名称。
context: 上下文对象。
depth: 市场深度对象。
"""
self.trader.log(f"on_depth: {exchange} {depth}")
数据结构:
depth 市场深度数据结构如下:
| 参数名 | 类型 | 是否必填 | 描述 |
|---|---|---|---|
| symbol | String | 是 | 交易对 |
| timestamp | Int | 是 | 时间戳 |
| bids | Array(DepthEntry) | 是 | 买单深度列表 |
| asks | Array(DepthEntry) | 是 | 卖单深度列表 |
DepthEntry 结构:
| 参数名 | 类型 | 是否必填 | 描述 |
|---|---|---|---|
| price | Float | 是 | 价格 |
| amount | Float | 是 | 数量 |
示例:
{
"symbol": "BTC_USDT",
"timestamp": 1678886400000,
"bids": [[28000.5, 0.1], [28000.0, 0.5], [27999.5, 1.2]],
"asks": [[28001.0, 0.2], [28001.5, 0.8], [28002.0, 1.5]]
}
订阅Funding(资金费率)
订阅示例:
def subscribes(self):
subs = [
{
"account_id": 0,
"sub": {
"SubscribeWs": [
{
"Funding": ["BTC_USDT"],
},
]
}
}
]
return subs
回调函数:
def on_funding(self, exchange, fundings):
"""
资金费率更新时触发的方法。
Args:
exchange (str): 交易所名称。
fundings: 资金费率对象列表。
"""
self.trader.log(f"on_funding: {exchange} {fundings}")
数据结构:
funding 资金费率数据结构如下:
| 参数名 | 类型 | 是否必填 | 描述 |
|---|---|---|---|
| symbol | String | 是 | 交易对 |
| funding_rate | Float | 是 | 资金费率 |
| next_funding_at | Int | 是 | 下次资金费时间戳 |
| funding_interval | Int | 否 | 资金费结算周期(小时) |
示例:
{
"symbol": "BTC_USDT",
"funding_rate": 0.0001,
"next_funding_at": 1678915200000,
"funding_interval": 8
}
订阅Trade(逐笔成交)
订阅示例:
def subscribes(self):
subs = [
{
"account_id": 0,
"sub": {
"SubscribeWs": [
{
"Trade": ["BTC_USDT"],
},
]
}
}
]
return subs
回调函数:
def on_trade(self, exchange, context, trade):
"""
成交更新时触发的方法。
Args:
exchange (str): 交易所名称。
context: 上下文对象。
trade: 成交对象。
"""
self.trader.log(f"on_trade: {exchange} {trade}")
数据结构:
trade 逐笔成交数据结构如下:
| 参数名 | 类型 | 是否必填 | 描述 |
|---|---|---|---|
| id | String | 是 | 交易所返回的订单ID |
| symbol | String | 是 | 交易对 |
| timestamp | Int | 是 | 时间戳 |
| price | Float | 是 | 成交价格 |
| amount | Float | 是 | 成交数量 |
| side | String | 是 | 成交方向,Buy/Sell |
示例:
{
"id": "12345678",
"symbol": "BTC_USDT",
"timestamp": 1678886400000,
"price": 28000.0,
"amount": 0.05,
"side": "Buy"
}
订阅Instrument(合约信息)
订阅示例:
def subscribes(self):
subs = [
{
"account_id": 0,
"sub": {
"SubscribeWs": [
{
"Instrument": ["BTC_USDT"],
},
]
}
}
]
return subs
回调函数:
def on_instrument(self, exchange, instruments):
"""
合约信息更新时触发的方法。
Args:
exchange (str): 交易所名称。
instruments: 合约信息对象列表。
"""
self.trader.log(f"on_instrument: {exchange} {instruments}")
数据结构:
instrument 合约信息数据结构如下:
| 参数名 | 类型 | 是否必填 | 描述 |
|---|---|---|---|
| symbol | String | 是 | 交易对 |
| state | String | 是 | 合约状态 |
| price_tick | Float | 是 | 价格步长 |
| amount_tick | Float | 是 | 数量步长 |
| price_precision | Int | 是 | 交易所的价格小数位 |
| amount_precision | Int | 是 | 交易所的数量小数位 |
| min_qty | Float | 是 | 数量下限, 最小下单数量 |
| min_notional | Float | 是 | 最小名义价值 |
| price_multiplier | Float | 是 | 价格乘数 |
| amount_multiplier | Float | 是 | 数量乘数 |
示例:
{
"symbol": "BTC_USDT",
"state": "Normal",
"price_tick": 0.1,
"amount_tick": 0.001,
"price_precision": 1,
"amount_precision": 3,
"min_qty": 0.001,
"min_notional": 5.0,
"price_multiplier": 1.0,
"amount_multiplier": 1.0
}
订阅Kline(K线)
订阅示例:
def subscribes(self):
subs = [
{
"account_id": 0,
"sub": {
"SubscribeWs": [
{
"Kline": {
"symbols": ["BTC_USDT"],
"interval": "1m"
}
}
]
}
}
]
return subs
回调函数:
def on_kline(self, exchange, context, kline):
"""
K线更新时触发的方法。
Args:
exchange (str): 交易所名称。
context: 上下文对象。
kline: K线对象。
"""
self.trader.log(f"on_kline: {exchange} {context} {kline}")
数据结构: kline 数据结构如下:
| 参数名 | 类型 | 是否必填 | 描述 |
|---|---|---|---|
| symbol | String | 是 | 交易对 |
| interval | String | 是 | 时间间隔 |
| candles | Array(Candle) | 是 | 蜡烛图数据列表 |
Candle 数据结构:
| 字段 | 类型 | 是否必须 | 描述 |
|---|---|---|---|
| timestamp | Number | 是 | 时间戳,毫秒 |
| open | Number | 是 | 开盘价 |
| high | Number | 是 | 最高价 |
| low | Number | 是 | 最低价 |
| close | Number | 是 | 收盘价 |
| volume | Number | 是 | 成交量(基础货币) |
| quote_volume | Number | 是 | 成交额(报价货币) |
| trades | Number | 否 | 成交笔数 |
| taker_buy_volume | Number | 否 | taker_buy成交量(基础货币) |
| taker_buy_quote_volume | Number | 否 | taker_buy成交量(报价货币) |
| confirm | Boolean | 是 | K线状态:false 代表 K 线未完结,true 代表 K 线已完结 |
示例:
{
"symbol": "BTC_USDT",
"interval": "1m",
"candles": [
{
"timestamp": 1672531200000,
"open": 10000.0,
"high": 10000.0,
"low": 10000.0,
"close": 10000.0,
"volume": 1000.0,
"quote_volume": 10000000.0,
"trades": 100,
"taker_buy_volume": 1000.0,
"taker_buy_quote_volume": 10000000.0,
"confirm": true
}
]
}
订阅Order(订单更新)
订阅示例:
def subscribes(self):
subs = [
{
"account_id": 0,
"sub": {
"SubscribeWs": [
{
"Order": ["BTC_USDT"],
},
]
}
}
]
return subs
回调函数:
def on_order(self, account_id, context, order):
"""
订单状态更新时触发的方法。
Args:
account_id (str): 账户 ID。
context (dict): 上下文对象。
order (dict): 订单对象。
"""
self.trader.log(f"on_order: {account_id} {context} {order}")
def on_order_submitted(self, account_id, context, order_id_result, order):
"""
处理订单提交事件回调
当订单提交到交易所后,处理交易所返回的确认信息
参数:
account_id: 账户ID
context: 上下文信息
order_id_result: 订单ID结果
order: 订单信息
"""
pass
def on_batch_order_submitted(self, account_id, context, order_ids_result):
"""
批量订单提交成功时触发的方法。
Args:
account_id (str): 账户 ID。
context (dict): 上下文对象。
order_ids (list): 订单 ID 列表。
"""
pass
def on_order_canceled(self, account_id, context, result, id, symbol):
"""
订单取消成功时触发的方法。
Args:
account_id (str): 账户 ID。
context (dict): 上下文对象。
order_id (str): 订单 ID。
id (str): 撤单时传入的订单id/订单cid。
symbol(str): 撤单时传入的symbol。
"""
pass
def on_batch_order_canceled(self, account_id, context, order_ids_result):
"""
批量订单取消成功时触发的方法。
Args:
account_id (str): 账户 ID。
context (dict): 上下文对象。
order_ids (list): 订单 ID 列表。
"""
pass
def on_batch_order_canceled_by_ids(self, account_id, context, order_ids_result):
"""
批量订单取消成功时触发的方法。
Args:
account_id (str): 账户 ID。
context (dict): 上下文对象。
order_ids (list): 订单 ID 列表。
"""
pass
def on_order_amended(self, account_id, context, result, order):
"""
订单修改成功时触发的方法。
Args:
account_id (str): 账户 ID。
context (dict): 上下文对象。
order_id (str): 订单 ID。
order(dict): 修改订单时传入的订单信息
"""
pass
数据结构:
order 订单更新数据结构如下:
参数列表
| 参数名 | 类型 | 是否必填 | 描述 |
|---|---|---|---|
| id | String | 是 | 订单ID |
| cid | Null | 否 | 客户ID |
| timestamp | Int | 是 | 时间戳 |
| status | String | 是 | 订单状态,参考OrderStatus枚举 |
| symbol | String | 是 | 交易对 |
| order_type | String | 是 | 订单类型,参考OrderType枚举 |
| side | String | 是 | 买卖方向 参考OrderSide枚举 |
| pos_side | String | 否 | 仓位方向, 单向持仓下非必填,当仅减仓时才需要,参考PositionSide枚举 |
| time_in_force | String | 是 | 有效期,参考TimeInForce枚举 |
| price | Null | 否 | 价格 , 市价单可为None |
| amount | Float | 是 | 数量 |
| filled | Float | 是 | 已成交数量 |
| filled_avg_price | Float | 是 | 平均成交价格 |
| source | String | 是 | 订单来源, 参考OrderSource枚举 |
OrderStatus 枚举值
Open: 未成交PartiallyFilled: 部分成交Filled: 全部成交Canceled: 已取消
OrderType 枚举值
Limit: 限价单Market: 市价单
OrderSide 枚举值
Buy: 买入Sell: 卖出
PositionSide 枚举值
Long: 多头Short: 空头
TimeInForce 枚举值
GTC: 当前有效IOC: 立即成交否则取消FOK: 立即完全成交否则取消PostOnly: 仅Maker
OrderSource 枚举值
Order: 订单UserTrade: 用户成交
示例
{
"id": "",
"cid": null,
"timestamp": 0,
"status": "Open",
"symbol": "",
"order_type": "Limit",
"side": "Buy",
"pos_side": "Long",
"time_in_force": "GTC",
"price": null,
"amount": 0.0,
"filled": 0.0,
"filled_avg_price": 0.0,
"source": "Order"
}
订阅OrderAndFill(订单以及成交)
同时订阅订单频道和用户私有成交频道时,系统会优先推送速度更快的数据(通常私有成交频道的推送速度优于订单频道)。需要注意的是,用户私有成交频道仅包含当前成交数量和当前成交均价信息,可能缺少订单类型、买卖方向等完整字段。因此,系统通过order结构体中的source字段来区分数据来源是订单频道还是私有成交频道。这两个频道可以同时订阅,互不影响。
订阅示例:
def subscribes(self):
subs = [
{
"account_id": 0,
"sub": {
"SubscribeWs": [
{
"OrderAndFill": ["BTC_USDT"],
},
]
}
}
]
return subs
回调函数:
def on_order_and_fill(self, account_id, context, order):
"""
订单以及成交更新时触发的方法。
Args:
account_id (str): 账户 ID。
context (dict): 上下文对象。
order (dict): 订单对象。
"""
self.trader.log(f"on_order_and_fill: {account_id} {context} {order}")
数据结构:
order 订单以及成交数据结构同订阅Order(订单更新)。
示例:
{
"id": "12312345",
"cid": null,
"timestamp": 0,
"status": "Filled",
"symbol": "BTC_USDT",
"order_type": "Limit",
"side": "Buy",
"pos_side": "Long",
"time_in_force": "GTC",
"price": 100000.0,
"amount": 0.05,
"filled": 0.05,
"filled_avg_price": 100000.0,
"source": "UserTrade"
}
订阅Position(持仓更新)
订阅示例:
def subscribes(self):
subs = [
{
"account_id": 0,
"sub": {
"SubscribeWs": [
{
"Position": ["BTC_USDT"],
},
]
}
}
]
return subs
回调函数:
def on_position(self, account_id, positions):
"""
持仓更新时触发的方法。
Args:
account_id (str): 账户 ID。
positions: 持仓对象列表。
"""
self.trader.log(f"on_position: {account_id} {positions}")
数据结构:
position 持仓更新数据结构如下:
| 字段 | 类型 | 是否必填 | 描述 |
|---|---|---|---|
| symbol | String | 是 | 交易对名称 |
| timestamp | Number | 是 | 时间戳 |
| margin_mode | String | 是 | 保证金模式: Cross/Isolated |
| side | String | 是 | 持仓方向: Long/Short |
| leverage | Number | 是 | 杠杆倍数 |
| amount | Number | 是 | 持仓数量 |
| entry_price | Number | 是 | 开仓均价 |
| unrealized_pnl | Number | 是 | 未实现盈亏 |
示例:
{
"symbol": "BTC_USDT",
"timestamp": 1678886400000,
"margin_mode": "Cross",
"side": "Long",
"leverage": 20,
"amount": 0.5,
"entry_price": 28500.0,
"unrealized_pnl": -75.0
}
订阅Balance(余额更新)
订阅示例:
def subscribes(self):
subs = [
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {
"secs": 1,
"nanos": 0
},
"rest_type": "Balance" # 每秒更新账户余额
}
}
}
]
return subs
回调函数:
def on_balance(self, account_id, balances):
"""
处理余额更新事件
当账户余额变化时,更新本地余额记录
参数:
account_id: 账户ID
balances: 更新后的余额信息
"""
self.trader.log(f"on_balance: {account_id} {balances}")
数据结构:
balance 余额更新数据结构如下:
| 字段 | 类型 | 是否必填 | 描述 |
|---|---|---|---|
| asset | String | 是 | 资产名称 |
| balance | Number | 是 | 余额 |
| available_balance | Number | 是 | 可用余额 |
| unrealized_pnl | Number | 是 | 未实现盈亏 |
示例:
{
"asset": "USDT",
"balance": 10000.0,
"available_balance": 8000.0,
"unrealized_pnl": 150.0
}
订阅FundingFee(结算资金费率)
因各个交易所的推送数据格式不一致,暂时先停止使用该频道。
订阅示例:
def subscribes(self):
subs = [
{
"account_id": 0,
"sub": {
"SubscribeWs": [
{
"FundingFee": ["BTC_USDT"]
}
],
}
}
]
return subs
回调函数:
def on_funding_fee(self, account_id, context, funding_fee):
"""
资金费结算时触发的方法。
Args:
account_id (str): 账户 ID。
context: 上下文对象。
funding_fee: 资金费对象。
"""
self.trader.log(f"on_funding_fee: {account_id} {context} {funding_fee}")
数据结构:
funding_fee 资金费结算数据结构如下:
| 字段 | 类型 | 是否必填 | 描述 |
|---|---|---|---|
| symbol | String | 是 | 交易对 |
| funding_fee | Number | 是 | 结算资金费率 |
| timestamp | Number | 是 | 时间戳 |
示例:
{
"symbol": "BTC_USDT",
"funding_fee": 0.0001,
"timestamp": 1678886400000
}
最佳实践
-
数据订阅
- 根据策略需求选择合适的订阅方式
- 避免重复订阅相同数据
- 及时取消不需要的订阅
-
数据处理
- 进行数据格式验证
- 处理异常值和缺失值
- 实现数据清洗和标准化
-
数据存储
- 选择合适的存储方式
- 实现数据压缩
- 定期清理过期数据
-
性能优化
- 使用批量处理
- 实现数据缓存
- 优化数据结构