八、数据源订阅
八、数据源订阅
8.1 订阅机制概述
Trader API 提供了完整的数据源订阅机制,通过策略的 subscribes() 方法定义需要订阅的数据类型。系统支持三种不同的数据源订阅方式:
| 数据源类型 | 数据来源 | 更新方式 | 适用场景 | 延迟特性 |
|---|---|---|---|---|
| SubscribeWs | 交易所 WebSocket | 实时推送 | 高频交易、实时行情 | 极低延迟 |
| SubscribeRest | 交易所 REST API | 定时轮询 | 账户状态、合约信息 | 秒级延迟 |
| SubscribeTimer | 系统定时器 | 定时触发 | 策略逻辑、状态检查 | 可配置间隔 |
数据流向图
+-------------------+ +---------------+ +----------------------+
| Exchange API | | System Timer | | Strategy Callback |
| | | | | |
| WebSocket ----+ | | Timer ----+ | | +-- on_bbo() |
| REST API ---+ | | | | | | +-- on_kline() |
| | | | | | | | +-- on_index_kline()|
| | | | | | | | +-- on_order() |
+--------------|-|--+ +------------|--+ | +-- on_timer() |
| | | | +-- on_balance() |
| +------------------+ | | |
+------------------+ | | +----------------------+
| | | ^
v v v |
+------------------+ |
| Trader Engine |------------+
| (Data Dispatch) |
+------------------+
中文说明:
- Exchange API (交易所API): WebSocket实时推送 + REST API轮询
- System Timer (系统定时器): 策略定时任务触发器
- Strategy Callback (策略回调): 数据到达后触发的回调函数
- Trader Engine (数据分发中心): 统一接收并分发数据到策略
8.2 WebSocket 实时订阅 (SubscribeWs)
WebSocket 订阅是最重要的数据源,提供交易所的实时推送数据,具有极低延迟的特性。
支持的频道类型总览
| 频道分类 | 频道类型 | 说明 | 回调函数 | 延迟级别 |
|---|---|---|---|---|
| 市场行情 | Bbo | 最佳买卖价 | on_bbo | 毫秒级 |
Depth | 市场深度 | on_depth | 毫秒级 | |
Trade | 成交记录 | on_trade | 毫秒级 | |
AggTrade | 聚合成交 | on_agg_trade | 毫秒级 | |
Kline | K线数据 | on_kline | 秒级 | |
IndexKline | 指数K线数据 | on_index_kline | 秒级 | |
MarkPrice | 标记价格 | on_mark_price | 秒级 | |
Funding | 资金费率 | on_funding | 分钟级 | |
| 账户数据 | Order | 订单更新 | on_order | 毫秒级 |
Position | 持仓更新 | on_position | 毫秒级 | |
Balance | 余额更新 | on_balance | 秒级 | |
FundingFee | 资金费结算 | on_funding_fee | 小时级 | |
OrderAndFill | 订单和成交 | on_order_and_fill | 毫秒级 |
🚀 WebSocket订阅配置
基础配置格式:
{
"account_id": 0, # 账户ID(市场数据可选,账户数据必须)
"event_handle_mode": "Sync", # 事件处理模式:Latest/Sync/Async/Batch(可选)
# 默认行为:仅订阅 BBO/Depth/Trade 时默认 Batch,其他情况默认 Sync
"sub": {
"SubscribeWs": [
# 市场数据订阅(无需账户ID)
{
"频道类型": 配置参数
},
# 账户数据订阅(需要账户ID)
{
"频道类型": 配置参数
}
]
}
}
⚠️ 重要说明:
event_handle_mode参数仅对SubscribeWs生效,SubscribeRest和SubscribeTimer会忽略此参数- 默认行为:仅订阅 BBO/Depth/Trade 时默认使用
Batch模式,其他情况默认使用Sync模式 - 不同数据类型推荐使用不同模式:
- 高频数据(Bbo、Trade)→
"Latest"模式 - 重要数据(Depth、Order、Position)→
"Sync"模式 - 高频重要数据 →
"Async"模式
- 高频数据(Bbo、Trade)→
8.2.1 Bbo 频道(最佳买卖价)
获取交易对的实时最佳买一价和卖一价,适用于快速获取当前市场价格。
频道说明
- 频道名称:
Bbo - 推送频率:有变化时推送,毫秒级延迟
- 回调函数:
on_bbo(exchange, bbo_list),bbo_list为数组类型,包含多个 BBO 数据 - 适用场景:高频交易、价格监控、套利策略
订阅示例
def subscribes(self):
return [
{
"account_id": 0,
"event_handle_mode": "Latest",
"sub": {
"SubscribeWs": [
{
"Bbo": ["BTC_USDT", "ETH_USDT"]
}
]
}
}
]
推送数据参数
| 参数名 | 类型 | 描述 |
|---|---|---|
| symbol | String | 交易对,如 BTC_USDT |
| bid_price | Float | 最佳买价 |
| bid_qty | Float | 买一档数量 |
| ask_price | Float | 最佳卖价 |
| ask_qty | Float | 卖一档数量 |
| timestamp | Integer | 数据更新时间,Unix 时间戳(毫秒) |
推送数据示例
{
"symbol": "BTC_USDT", # 交易对
"bid_price": 50000.5, # 最佳买价
"bid_qty": 1.5, # 买一数量
"ask_price": 50001.0, # 最佳卖价
"ask_qty": 2.0, # 卖一数量
"timestamp": 1640995200000 # 时间戳(毫秒)
}
回调处理示例
def on_bbo(self, exchange, bbo_list):
"""
最佳买卖价回调
注意:bbo_list 是数组类型,包含多个 BBO 数据。
在 Latest 模式下,数组通常只包含一条最新消息。
在 Batch 模式下,数组包含累积期间的所有消息。
"""
for bbo in bbo_list:
symbol = bbo.get('symbol')
bid_price = bbo.get('bid_price')
ask_price = bbo.get('ask_price')
spread = ask_price - bid_price
spread_pct = spread / bid_price * 100
self.trader.log(
f"{symbol} BBO: 买一 {bid_price} 卖一 {ask_price} "
f"价差 {spread:.2f} ({spread_pct:.4f}%)"
)
# 交易信号判断
if spread_pct < 0.01: # 价差小于0.01%
self.generate_trading_signal(symbol, bbo)
8.2.2 Depth 频道(市场深度)
获取交易对的订单簿深度数据,支持配置深度档位。
频道说明
- 频道名称:
Depth - 推送频率:有变化时推送,毫秒级延迟
- 回调函数:
on_depth(exchange, depth_list),depth_list为数组类型,包含多个深度数据 - 适用场景:深度分析、大单监控、流动性评估
订阅示例
def subscribes(self):
return [
{
"account_id": 0,
"event_handle_mode": "Sync",
"sub": {
"SubscribeWs": [
{
"Depth": {
"symbols": ["BTC_USDT", "ETH_USDT"],
"levels": 10 # 订阅10档深度
}
}
]
}
}
]
推送数据参数
| 参数名 | 类型 | 描述 |
|---|---|---|
| symbol | String | 交易对,如 BTC_USDT |
| bids | Array | 买盘数组,按价格从高到低排序 |
| > price | Float | 买盘价格 |
| > amount | Float | 该价格档位的数量 |
| asks | Array | 卖盘数组,按价格从低到高排序 |
| > price | Float | 卖盘价格 |
| > amount | Float | 该价格档位的数量 |
| timestamp | Integer | 数据更新时间,Unix 时间戳(毫秒) |
推送数据示例
{
"symbol": "BTC_USDT", # 交易对
"bids": [ # 买盘,按价格从高到低排序
{"price": 50000.0, "amount": 1.5},
{"price": 49999.5, "amount": 2.0},
{"price": 49999.0, "amount": 3.2},
# ...更多档位
],
"asks": [ # 卖盘,按价格从低到高排序
{"price": 50001.0, "amount": 1.0},
{"price": 50001.5, "amount": 1.8},
{"price": 50002.0, "amount": 2.5},
# ...更多档位
],
"timestamp": 1640995200000 # 时间戳(毫秒)
}
回调处理示例
def on_depth(self, exchange, depth_list):
"""
市场深度回调
注意:depth_list 是数组类型,包含多个深度数据。
在 Sync/Async 模式下,数组通常只包含一条最新消息。
在 Batch 模式下,数组包含累积期间的所有消息。
"""
for depth in depth_list:
symbol = depth.get('symbol')
bids = depth.get('bids', [])
asks = depth.get('asks', [])
if bids and asks:
best_bid = bids[0]['price']
best_ask = asks[0]['price']
# 计算买卖盘总量
bid_volume = sum(entry['amount'] for entry in bids)
ask_volume = sum(entry['amount'] for entry in asks)
# 买卖力量比
volume_ratio = bid_volume / ask_volume if ask_volume > 0 else 0
self.trader.log(
f"{symbol} 深度: 最优买 {best_bid} 最优卖 {best_ask} "
f"买盘量 {bid_volume:.2f} 卖盘量 {ask_volume:.2f} "
f"买卖比 {volume_ratio:.2f}"
)
# 大单检测
for entry in bids[:3]:
if entry['amount'] > 10: # 大单阈值
self.trader.log(f"🐋 {symbol} 大买单: {entry['amount']} @ {entry['price']}")
8.2.3 Trade 频道(成交记录)
获取交易对的实时逐笔成交数据。
频道说明
- 频道名称:
Trade - 推送频率:每笔成交推送,毫秒级延迟
- 回调函数:
on_trade(exchange, trade_list),trade_list为数组类型,包含多个成交数据 - 适用场景:成交监控、大单追踪、成交量分析
订阅示例
def subscribes(self):
return [
{
"account_id": 0,
"event_handle_mode": "Latest",
"sub": {
"SubscribeWs": [
{
"Trade": ["BTC_USDT", "ETH_USDT"]
}
]
}
}
]
推送数据参数
| 参数名 | 类型 | 描述 |
|---|---|---|
| id | String | 成交记录唯一标识 |
| symbol | String | 交易对,如 BTC_USDT |
| price | Float | 成交价格 |
| amount | Float | 成交数量(基础货币) |
| side | String | 主动成交方向:Buy 买入 / Sell 卖出 |
| timestamp | Integer | 成交时间,Unix 时间戳(毫秒) |
推送数据示例
{
"id": "123456789", # 成交ID
"symbol": "BTC_USDT", # 交易对
"price": 50000.5, # 成交价格
"amount": 0.5, # 成交数量(基础货币)
"side": "Buy", # 成交方向:Buy/Sell
"timestamp": 1640995200000 # 成交时间(毫秒)
}
回调处理示例
def on_trade(self, exchange, trade_list):
"""
成交记录回调
注意:trade_list 是数组类型,包含多个成交数据。
在 Latest 模式下,数组通常只包含一条最新消息。
在 Batch 模式下,数组包含累积期间的所有消息。
"""
for trade in trade_list:
symbol = trade.get('symbol')
price = trade.get('price')
amount = trade.get('amount')
side = trade.get('side')
value = price * amount
# 大单监控
if value > 100000: # 成交额超过10万
direction = "买入" if side == "Buy" else "卖出"
self.trader.log(
f"🔥 {symbol} 大单成交: {direction} {amount} @ {price} "
f"成交额 ${value:,.0f}",
color="red" if side == "Sell" else "green"
)
8.2.4 AggTrade 频道(聚合成交)
获取交易对的聚合成交数据,将短时间内的成交合并推送。
频道说明
- 频道名称:
AggTrade - 推送频率:聚合推送,毫秒级延迟
- 回调函数:
on_agg_trade(exchange, symbol, agg_trade) - 适用场景:减少数据量、成交趋势分析
订阅示例
def subscribes(self):
return [
{
"account_id": 0,
"event_handle_mode": "Latest",
"sub": {
"SubscribeWs": [
{
"AggTrade": ["BTC_USDT"]
}
]
}
}
]
推送数据参数
| 参数名 | 类型 | 描述 |
|---|---|---|
| id | String | 聚合成交唯一标识 |
| symbol | String | 交易对,如 BTC_USDT |
| price | Float | 成交价格 |
| amount | Float | 聚合成交总数量(基础货币) |
| side | String | 主动成交方向:Buy 买入 / Sell 卖出 |
| first_trade_id | String | 此聚合包含的首笔成交 ID |
| last_trade_id | String | 此聚合包含的末笔成交 ID |
| timestamp | Integer | 成交时间,Unix 时间戳(毫秒) |
推送数据示例
{
"id": "987654321", # 聚合成交ID
"symbol": "BTC_USDT", # 交易对
"price": 50000.5, # 成交价格
"amount": 5.5, # 聚合成交数量(基础货币)
"side": "Buy", # 成交方向:Buy/Sell
"first_trade_id": "123456780", # 首笔成交ID
"last_trade_id": "123456789", # 末笔成交ID
"timestamp": 1640995200000 # 时间戳(毫秒)
}
回调处理示例
def on_agg_trade(self, exchange, symbol, agg_trade):
"""聚合成交回调"""
price = agg_trade.get('price')
amount = agg_trade.get('amount')
side = agg_trade.get('side')
self.trader.log(f"{symbol} 聚合成交: {side} {amount} @ {price}")
8.2.5 Kline 频道(K线数据)
获取交易对的 K 线(蜡烛图)数据,支持多种时间周期。
频道说明
- 频道名称:
Kline - 推送频率:K线更新时推送,秒级延迟
- 回调函数:
on_kline(exchange, kline) - 适用场景:技术分析、趋势判断、量价分析
支持的时间周期
| 分钟级 | 小时级 | 天级及以上 |
|---|---|---|
1m, 3m, 5m, 15m, 30m | 1h, 2h, 4h, 6h, 8h, 12h | 1d, 3d, 1w, 1M |
订阅示例
def subscribes(self):
return [
{
"account_id": 0,
"event_handle_mode": "Latest",
"sub": {
"SubscribeWs": [
{
"Kline": {
"symbols": ["BTC_USDT", "ETH_USDT"],
"interval": "1m"
}
}
]
}
}
]
推送数据参数
| 参数名 | 类型 | 描述 |
|---|---|---|
| symbol | String | 交易对,如 BTC_USDT |
| interval | String | K线时间周期,如 1m, 5m, 1h, 1d |
| candles | Array | K线数据数组 |
| > timestamp | Integer | K线开盘时间,Unix 时间戳(毫秒) |
| > open | Float | 开盘价 |
| > high | Float | 最高价 |
| > low | Float | 最低价 |
| > close | Float | 收盘价 |
| > volume | Float | 成交量(基础货币) |
| > quote_volume | Float | 成交额(报价货币) |
| > trades | Integer | 成交笔数(可选) |
| > taker_buy_volume | Float | Taker 买入成交量(可选) |
| > taker_buy_quote_volume | Float | Taker 买入成交额(可选) |
| > confirm | Boolean | K线状态:false 未完结,true 已完结 |
推送数据示例
{
"symbol": "BTC_USDT", # 交易对
"interval": "1m", # 时间周期
"candles": [
{
"timestamp": 1640995200000, # 开盘时间(毫秒)
"open": 50000.0, # 开盘价
"high": 50500.0, # 最高价
"low": 49800.0, # 最低价
"close": 50200.0, # 收盘价
"volume": 123.45, # 成交量(基础货币)
"quote_volume": 6180000.0, # 成交额(报价货币)
"trades": 1250, # 成交笔数
"taker_buy_volume": 65.2, # 主动买入成交量
"taker_buy_quote_volume": 3270000.0, # 主动买入成交额
"confirm": False # false=未完结,true=已完结
}
]
}
回调处理示例
def on_kline(self, exchange, kline):
"""K线数据回调"""
symbol = kline['symbol']
interval = kline['interval']
for candle in kline['candles']:
open_price = candle['open']
high_price = candle['high']
low_price = candle['low']
close_price = candle['close']
volume = candle['volume']
is_confirmed = candle['confirm']
# 计算涨跌幅
change_pct = (close_price - open_price) / open_price * 100
status = "✅ 已完结" if is_confirmed else "⏳ 未完结"
self.trader.log(
f"{symbol} {interval} K线: "
f"O:{open_price} H:{high_price} L:{low_price} C:{close_price} "
f"涨跌:{change_pct:+.2f}% 成交量:{volume:.2f} {status}"
)
# 只对已完结的K线进行技术分析
if is_confirmed:
self.analyze_kline_signal(symbol, candle)
def analyze_kline_signal(self, symbol, candle):
"""分析K线形态信号"""
body = abs(candle['close'] - candle['open'])
range_size = candle['high'] - candle['low']
if range_size > 0:
body_ratio = body / range_size
# 大阳线/大阴线检测
if body_ratio > 0.7:
if candle['close'] > candle['open']:
self.trader.log(f"📈 {symbol}: 大阳线信号", color="green")
else:
self.trader.log(f"📉 {symbol}: 大阴线信号", color="red")
# 十字星检测
elif body_ratio < 0.1:
self.trader.log(f"✴️ {symbol}: 十字星形态", color="yellow")
8.2.6 IndexKline 频道(指数K线数据)
获取交易对的指数K线数据。指数价格通常是多个交易所现货价格的加权平均,用于衍生品交易的公允价格参考。 目前仅OkxSwap、GateSwap、HuobiSwap支持。
频道说明
- 频道名称:
IndexKline - 推送频率:K线更新时推送,秒级延迟
- 回调函数:
on_index_kline(exchange, index_kline) - 适用场景:指数价格分析、套利策略、跨交易所价格比较
- 支持交易所:OKX、Gate、Huobi(合约交易)
与普通 Kline 的区别
| 特性 | Kline(普通K线) | IndexKline(指数K线) |
|---|---|---|
| 价格来源 | 单一交易所成交价 | 多交易所加权平均价 |
| 成交量 | 有 | 通常无 |
| 用途 | 技术分析、交易决策 | 公允价格参考、套利分析 |
| 波动性 | 较高 | 较平滑 |
支持的时间周期
| 分钟级 | 小时级 | 天级及以上 |
|---|---|---|
1m, 3m, 5m, 15m, 30m | 1h, 2h, 4h, 6h, 8h, 12h | 1d, 3d, 1w, 1M |
订阅示例
def subscribes(self):
return [
{
"account_id": 0,
"event_handle_mode": "Latest",
"sub": {
"SubscribeWs": [
{
"IndexKline": {
"symbols": ["BTC_USDT", "ETH_USDT"],
"interval": "1m"
}
}
]
}
}
]
推送数据参数
| 参数名 | 类型 | 描述 |
|---|---|---|
| symbol | String | 交易对,如 BTC_USDT |
| interval | String | K线时间周期,如 1m, 5m, 1h, 1d |
| candles | Array | K线数据数组 |
| > timestamp | Integer | K线开盘时间,Unix 时间戳(毫秒) |
| > open | Float | 开盘价(指数价格) |
| > high | Float | 最高价(指数价格) |
| > low | Float | 最低价(指数价格) |
| > close | Float | 收盘价(指数价格) |
| > volume | Float | 成交量(可能为0,取决于交易所) |
| > quote_volume | Float | 成交额(可能为0,取决于交易所) |
| > confirm | Boolean | K线状态:false 未完结,true 已完结 |
推送数据示例
{
"symbol": "BTC_USDT", # 交易对
"interval": "1m", # 时间周期
"candles": [
{
"timestamp": 1640995200000, # 开盘时间(毫秒)
"open": 50000.0, # 指数开盘价
"high": 50500.0, # 指数最高价
"low": 49800.0, # 指数最低价
"close": 50200.0, # 指数收盘价
"volume": 0.0, # 成交量(指数通常为0)
"quote_volume": 0.0, # 成交额(指数通常为0)
"confirm": False # false=未完结,true=已完结
}
]
}
回调处理示例
def on_index_kline(self, exchange, index_kline):
"""指数K线数据回调"""
symbol = index_kline['symbol']
interval = index_kline['interval']
for candle in index_kline['candles']:
open_price = candle['open']
high_price = candle['high']
low_price = candle['low']
close_price = candle['close']
is_confirmed = candle['confirm']
# 计算指数价格涨跌幅
change_pct = (close_price - open_price) / open_price * 100
status = "已完结" if is_confirmed else "未完结"
self.trader.log(
f"{symbol} 指数{interval} K线: "
f"O:{open_price} H:{high_price} L:{low_price} C:{close_price} "
f"涨跌:{change_pct:+.2f}% {status}"
)
# 只对已完结的K线进行分析
if is_confirmed:
self.analyze_index_price(symbol, candle)
def analyze_index_price(self, symbol, candle):
"""分析指数价格与现货价格的偏离"""
index_price = candle['close']
# 获取当前现货/合约价格进行比较
# 可用于套利策略或风控
self.trader.log(f"{symbol}: 指数价格 {index_price}")
同时订阅 Kline 和 IndexKline 示例
def subscribes(self):
"""同时订阅普通K线和指数K线,便于对比分析"""
return [
{
"account_id": 0,
"sub": {
"SubscribeWs": [
{
"Kline": {
"symbols": ["BTC_USDT"],
"interval": "1m"
}
},
{
"IndexKline": {
"symbols": ["BTC_USDT"],
"interval": "1m"
}
}
]
}
}
]
def on_kline(self, exchange, kline):
"""普通K线回调"""
self.last_spot_price = kline['candles'][-1]['close']
self.check_basis()
def on_index_kline(self, exchange, index_kline):
"""指数K线回调"""
self.last_index_price = index_kline['candles'][-1]['close']
self.check_basis()
def check_basis(self):
"""检查基差(现货与指数的价格差异)"""
if hasattr(self, 'last_spot_price') and hasattr(self, 'last_index_price'):
basis = self.last_spot_price - self.last_index_price
basis_pct = basis / self.last_index_price * 100
self.trader.log(f"基差: {basis:.2f} ({basis_pct:+.3f}%)")
8.2.7 MarkPrice 频道(标记价格)
获取交易对的标记价格,用于计算未实现盈亏和强平价格。
频道说明
- 频道名称:
MarkPrice - 推送频率:秒级推送
- 回调函数:
on_mark_price(exchange, symbol, mark_price) - 适用场景:风控计算、盈亏监控
订阅示例
def subscribes(self):
return [
{
"account_id": 0,
"event_handle_mode": "Sync",
"sub": {
"SubscribeWs": [
{
"MarkPrice": ["BTC_USDT", "ETH_USDT"]
}
]
}
}
]
推送数据参数
| 参数名 | 类型 | 描述 |
|---|---|---|
| symbol | String | 交易对,如 BTC_USDT |
| price | Float | 标记价格,用于计算未实现盈亏和强平价格 |
推送数据示例
{
"symbol": "BTC_USDT", # 交易对
"price": 50100.5 # 标记价格
}
回调处理示例
def on_mark_price(self, exchange, symbol, mark_price):
"""标记价格回调"""
price = mark_price.get('price')
self.trader.log(f"{symbol} 标记价格: {price}")
# 可结合持仓数据计算未实现盈亏
if symbol in self.positions:
position = self.positions[symbol]
entry_price = position.get('entry_price', 0)
amount = position.get('amount', 0)
if entry_price > 0 and amount != 0:
pnl = (price - entry_price) * amount
self.trader.log(f"{symbol} 预估浮盈: {pnl:.2f}")
8.2.8 Funding 频道(资金费率)
获取永续合约的资金费率信息。
频道说明
- 频道名称:
Funding - 推送频率:分钟级推送
- 回调函数:
on_funding(exchange, symbol, funding) - 适用场景:资金费率套利、持仓成本计算
订阅示例
def subscribes(self):
return [
{
"account_id": 0,
"event_handle_mode": "Sync",
"sub": {
"SubscribeWs": [
{
"Funding": ["BTC_USDT", "ETH_USDT"]
}
]
}
}
]
推送数据参数
| 参数名 | 类型 | 描述 |
|---|---|---|
| symbol | String | 交易对,如 BTC_USDT |
| funding_rate | Float | 当前资金费率,如 0.0001 表示 0.01% |
| next_funding_at | Integer | 下次资金费结算时间,Unix 时间戳(毫秒) |
| min_funding_rate | Float | 资金费率最小值(下限) |
| max_funding_rate | Float | 资金费率最大值(上限) |
| funding_interval | Integer | 资金费结算周期(小时),可选,通常为 8 |
推送数据示例
{
"symbol": "BTC_USDT", # 交易对
"funding_rate": 0.0001, # 当前资金费率
"next_funding_at": 1640998800000,# 下次结算时间(毫秒)
"min_funding_rate": -0.00375, # 资金费率最小值
"max_funding_rate": 0.00375, # 资金费率最大值
"funding_interval": 8 # 资金费结算周期(小时),可选
}
回调处理示例
def on_funding(self, exchange, symbol, funding):
"""资金费率回调"""
rate = funding.get('funding_rate', 0)
next_time = funding.get('next_funding_at', 0)
min_rate = funding.get('min_funding_rate', 0)
max_rate = funding.get('max_funding_rate', 0)
rate_pct = rate * 100
self.trader.log(
f"{symbol} 资金费率: {rate_pct:.4f}% "
f"(范围: {min_rate*100:.4f}% ~ {max_rate*100:.4f}%)"
)
# 高资金费率预警
if abs(rate) > 0.001: # 费率超过0.1%
direction = "多头支付空头" if rate > 0 else "空头支付多头"
self.trader.log(
f"💰 {symbol} 高资金费率: {rate_pct:.4f}% ({direction})",
color="yellow"
)
8.2.9 Order 频道(订单更新)
获取账户订单状态的实时更新。
频道说明
- 频道名称:
Order - 推送频率:订单状态变化时推送,毫秒级延迟
- 回调函数:
on_order(exchange, order) - 适用场景:订单管理、成交确认
- 权限要求:需要账户权限(account_id 必须指定)
订阅示例
def subscribes(self):
return [
{
"account_id": 0, # 必须指定账户ID
"event_handle_mode": "Sync",
"sub": {
"SubscribeWs": [
{
"Order": ["BTC_USDT", "ETH_USDT"]
}
]
}
}
]
推送数据参数
| 参数名 | 类型 | 描述 |
|---|---|---|
| id | String | 交易所返回的订单 ID |
| cid | String | 用户自定义的订单 ID(可选) |
| symbol | String | 交易对,如 BTC_USDT |
| side | String | 订单方向:Buy 买入 / Sell 卖出 |
| pos_side | String | 仓位方向:Long 多 / Short 空(可选,双向持仓时使用) |
| order_type | String | 订单类型:Limit 限价 / Market 市价 |
| time_in_force | String | 有效期类型:GTC / IOC / FOK / PostOnly / GTD |
| price | Float | 委托价格(市价单可能为空) |
| amount | Float | 委托数量(基础货币) |
| quote_amount | Float | 委托数量(报价货币,可选) |
| filled | Float | 已成交数量 |
| filled_avg_price | Float | 成交均价 |
| status | String | 订单状态:Open / PartiallyFilled / Filled / Canceled |
| timestamp | Integer | 订单创建时间,Unix 时间戳(毫秒) |
| fee_detail | Array | 手续费详情列表(可选) |
| > fee | Float | 手续费金额 |
| > fee_rate | Float | 手续费费率(与 fee 互斥) |
| > fee_coin | String | 手续费币种,如 USDT |
| > is_maker | Boolean | 是否为 maker 成交 |
| source | String | 订单来源:Order 订单事件 / UserTrade 成交事件 |
推送数据示例
{
"id": "123456789", # 订单ID
"cid": "my_order_1", # 客户端订单ID(可选)
"symbol": "BTC_USDT", # 交易对
"side": "Buy", # 方向:Buy/Sell
"pos_side": "Long", # 仓位方向:Long/Short(可选,双向持仓时使用)
"order_type": "Limit", # 类型:Limit/Market
"time_in_force": "GTC", # 有效期:GTC/IOC/FOK/PostOnly
"price": 50000.0, # 委托价格(市价单可能为空)
"amount": 1.0, # 委托数量(基础货币)
"quote_amount": None, # 委托数量(报价货币,可选)
"filled": 0.5, # 已成交数量
"filled_avg_price": 49999.5, # 成交均价
"status": "PartiallyFilled", # 状态
"timestamp": 1640995200000, # 创建时间(毫秒)
"fee_detail": [ # 手续费详情(可选)
{
"fee": 0.015, # 手续费金额
"fee_coin": "USDT", # 手续费币种
"is_maker": True # 是否为maker
}
],
"source": "Order" # 来源:Order/UserTrade
}
订单状态说明
| 状态 | 说明 |
|---|---|
Open | 新订单,等待成交 |
PartiallyFilled | 部分成交 |
Filled | 完全成交 |
Canceled | 已撤销 |
有效期类型说明
| 类型 | 说明 |
|---|---|
GTC | Good-Til-Canceled,一直有效直到取消 |
IOC | Immediate-Or-Cancel,立即成交或取消 |
FOK | Fill-Or-Kill,全部成交或取消 |
PostOnly | 只做maker,否则取消 |
GTD | Good-Til-Date,指定日期前有效 |
回调处理示例
def on_order(self, exchange, order):
"""订单状态回调"""
order_id = order.get('id')
symbol = order.get('symbol')
side = order.get('side')
status = order.get('status')
filled = order.get('filled', 0)
amount = order.get('amount', 0)
# 状态日志
self.trader.log(
f"📋 订单更新: {symbol} {side} {order_id} "
f"状态:{status} 成交:{filled}/{amount}"
)
# 成交处理
if status == 'Filled':
avg_price = order.get('filled_avg_price')
self.trader.log(
f"✅ 订单完全成交: {symbol} {side} {filled} @ {avg_price}",
color="green"
)
self.on_order_filled(order)
# 部分成交处理
elif status == 'PartiallyFilled':
self.trader.log(
f"⏳ 订单部分成交: {symbol} {side} {filled}/{amount}",
color="blue"
)
# 撤销处理
elif status == 'Canceled':
self.trader.log(
f"❌ 订单已撤销: {symbol} {side} {order_id}",
color="yellow"
)
8.2.10 Position 频道(持仓更新)
获取账户持仓的实时更新。
频道说明
- 频道名称:
Position - 推送频率:持仓变化时推送,毫秒级延迟
- 回调函数:
on_position(exchange, position) - 适用场景:持仓管理、风控监控
- 权限要求:需要账户权限(account_id 必须指定)
订阅示例
def subscribes(self):
return [
{
"account_id": 0, # 必须指定账户ID
"event_handle_mode": "Sync",
"sub": {
"SubscribeWs": [
{
"Position": ["BTC_USDT", "ETH_USDT"]
}
]
}
}
]
推送数据参数
| 参数名 | 类型 | 描述 |
|---|---|---|
| id | String | 仓位唯一标识(可选) |
| symbol | String | 交易对,如 BTC_USDT |
| side | String | 仓位方向:Long 多头 / Short 空头 |
| amount | Float | 持仓数量 |
| entry_price | Float | 开仓均价 |
| unrealized_pnl | Float | 未实现盈亏 |
| leverage | Integer | 杠杆倍数 |
| margin_mode | String | 保证金模式:Cross 全仓 / Isolated 逐仓 |
| margin | Float | 维持保证金金额(可选) |
| mmr | Float | 维持保证金率(可选),如 0.05 表示 5% |
| mark_price | Float | 当前标记价格(可选) |
| liquidation_price | Float | 强制平仓价格(可选) |
| ctime | Integer | 仓位创建时间,Unix 时间戳(毫秒,可选) |
| utime | Integer | 仓位更新时间,Unix 时间戳(毫秒,可选) |
| adl | Object | ADL(自动减仓)风险评分(可选) |
| > score | Float | 标准化评分,范围 0-1 |
| > raw | Float | 原始等级值 |
| > source | String | 数据来源字段名 |
推送数据示例
{
"id": "pos_123456", # 仓位ID(可选)
"symbol": "BTC_USDT", # 交易对
"side": "Long", # 方向:Long/Short
"amount": 1.5, # 持仓数量
"entry_price": 49500.0, # 开仓均价
"unrealized_pnl": 900.0, # 未实现盈亏
"leverage": 10, # 杠杆倍数
"margin_mode": "Cross", # 保证金模式:Cross(全仓)/Isolated(逐仓)
"margin": 5000.0, # 维持保证金(可选)
"mmr": 0.05, # 维持保证金率(可选)
"mark_price": 50100.0, # 标记价格(可选)
"liquidation_price": 45000.0, # 强平价格(可选)
"ctime": 1640995200000, # 创建时间(可选)
"utime": 1640995210000, # 更新时间(可选)
"adl": { # ADL风险评分(可选)
"score": 0.8, # 标准化评分(0-1)
"raw": 4, # 原始等级
"source": "adl_quantile" # 来源字段名
}
}
回调处理示例
def on_position(self, exchange, position):
"""持仓更新回调"""
symbol = position.get('symbol')
side = position.get('side')
amount = position.get('amount', 0)
entry_price = position.get('entry_price', 0)
unrealized_pnl = position.get('unrealized_pnl', 0)
liquidation_price = position.get('liquidation_price')
# 持仓日志
if abs(amount) > 0:
pnl_color = "green" if unrealized_pnl >= 0 else "red"
self.trader.log(
f"📊 持仓更新: {symbol} {side} "
f"数量:{amount} 开仓价:{entry_price} "
f"浮盈:{unrealized_pnl:+.2f}",
color=pnl_color
)
# 强平风险预警
mark_price = position.get('mark_price')
if mark_price and liquidation_price:
distance = abs(mark_price - liquidation_price) / mark_price * 100
if distance < 5: # 距离强平价格不足5%
self.trader.log(
f"🚨 {symbol} 强平风险: 当前价 {mark_price} "
f"强平价 {liquidation_price} 距离 {distance:.1f}%",
level="ERROR",
color="red"
)
# ADL风险监控
adl = position.get('adl')
if adl and adl.get('score', 0) > 0.8:
self.trader.log(
f"⚠️ {symbol} ADL风险较高: {adl['score']:.2f}",
level="WARN"
)
else:
self.trader.log(f"📭 {symbol} 持仓已清空")
8.2.11 Balance 频道(余额更新)
获取账户余额的实时更新。
频道说明
- 频道名称:
Balance - 推送频率:余额变化时推送,秒级延迟
- 回调函数:
on_balance(account_id, balances) - 适用场景:资金监控、风控检查
- 权限要求:需要账户权限(account_id 必须指定)
订阅示例
def subscribes(self):
return [
{
"account_id": 0, # 必须指定账户ID
"event_handle_mode": "Sync",
"sub": {
"SubscribeWs": [
"Balance" # 无需指定交易对
]
}
}
]
推送数据参数
| 参数名 | 类型 | 描述 |
|---|---|---|
| asset | String | 币种符号(大写),如 USDT, BTC |
| balance | Float | 钱包总余额(含未实现盈亏) |
| available_balance | Float | 可用于交易的余额 |
| unrealized_pnl | Float | 未实现盈亏(以该币种计价) |
推送数据示例
[
{
"asset": "USDT", # 币种
"balance": 10000.0, # 总余额(含浮动盈亏)
"available_balance": 8000.0, # 可用余额
"unrealized_pnl": 500.0 # 未实现盈亏
},
{
"asset": "BTC",
"balance": 1.5,
"available_balance": 1.0,
"unrealized_pnl": 0.0
}
]
回调处理示例
def on_balance(self, account_id, balances):
"""余额更新回调"""
for balance in balances:
asset = balance.get('asset')
total = balance.get('balance', 0)
available = balance.get('available_balance', 0)
unrealized_pnl = balance.get('unrealized_pnl', 0)
if total > 0:
# 计算已用余额(总余额 - 可用余额 - 未实现盈亏)
used = total - available - unrealized_pnl
self.trader.log(
f"💰 {asset} 余额: 总计 {total} 可用 {available} "
f"浮盈 {unrealized_pnl}"
)
# 可用余额不足预警
available_rate = available / total if total > 0 else 0
if available_rate < 0.2: # 可用余额不足20%
self.trader.log(
f"⚠️ {asset} 可用余额不足: {available_rate:.1%}",
level="WARN"
)
# USDT 余额监控
if asset == "USDT" and available < 100:
self.trader.log(
f"🚨 USDT 可用余额不足: {available}",
level="ERROR"
)
8.2.12 FundingFee 频道(资金费结算)
获取资金费用结算通知。
频道说明
- 频道名称:
FundingFee - 推送频率:每次资金费结算时推送(通常每8小时)
- 回调函数:
on_funding_fee(exchange, funding_fee) - 适用场景:资金费用统计、成本分析
- 权限要求:需要账户权限(account_id 必须指定)
订阅示例
def subscribes(self):
return [
{
"account_id": 0,
"event_handle_mode": "Sync",
"sub": {
"SubscribeWs": [
{
"FundingFee": ["BTC_USDT", "ETH_USDT"]
}
]
}
}
]
推送数据参数
| 参数名 | 类型 | 描述 |
|---|---|---|
| symbol | String | 交易对,如 BTC_USDT |
| funding_fee | Float | 本次结算的资金费金额,正数为收入,负数为支出 |
| timestamp | Integer | 结算时间,Unix 时间戳(毫秒) |
推送数据示例
{
"symbol": "BTC_USDT", # 交易对
"funding_fee": -15.5, # 资金费用(负数为支出,正数为收入)
"timestamp": 1640998800000 # 结算时间(毫秒)
}
回调处理示例
def on_funding_fee(self, exchange, funding_fee):
"""资金费结算回调"""
symbol = funding_fee.get('symbol')
fee = funding_fee.get('funding_fee', 0)
timestamp = funding_fee.get('timestamp', 0)
fee_type = "收入" if fee > 0 else "支出"
fee_color = "green" if fee > 0 else "red"
self.trader.log(
f"💸 {symbol} 资金费结算: {fee_type} {abs(fee):.2f} USDT",
color=fee_color
)
# 累计统计
self.total_funding_fee += fee
8.2.13 OrderAndFill 频道(订单和成交)
同时获取订单更新和成交明细,提供更完整的交易信息。
频道说明
- 频道名称:
OrderAndFill - 推送频率:订单或成交变化时推送,毫秒级延迟
- 回调函数:
on_order_and_fill(exchange, order_and_fill) - 适用场景:需要同时跟踪订单和成交明细的策略
- 权限要求:需要账户权限(account_id 必须指定)
订阅示例
def subscribes(self):
return [
{
"account_id": 0,
"event_handle_mode": "Sync",
"sub": {
"SubscribeWs": [
{
"OrderAndFill": ["BTC_USDT"]
}
]
}
}
]
推送数据参数
| 参数名 | 类型 | 描述 |
|---|---|---|
| order | Object | 订单信息,结构同 Order 频道 |
| > id | String | 订单 ID |
| > symbol | String | 交易对 |
| > side | String | 订单方向:Buy / Sell |
| > order_type | String | 订单类型:Limit / Market |
| > price | Float | 委托价格 |
| > amount | Float | 委托数量 |
| > filled | Float | 已成交数量 |
| > status | String | 订单状态 |
| fills | Array | 成交明细列表 |
| > trade_id | String | 成交记录 ID |
| > price | Float | 成交价格 |
| > amount | Float | 成交数量 |
| > fee | Float | 手续费金额 |
| > fee_coin | String | 手续费币种 |
| > is_maker | Boolean | 是否为 maker 成交 |
| > timestamp | Integer | 成交时间,Unix 时间戳(毫秒) |
推送数据示例
{
"order": {
"order_id": "123456789",
"symbol": "BTC_USDT",
"side": "buy",
"order_type": "limit",
"price": 50000.0,
"qty": 1.0,
"filled_qty": 0.5,
"status": "partially_filled"
},
"fills": [
{
"trade_id": "987654321",
"price": 49999.5,
"qty": 0.3,
"fee": 0.015,
"fee_asset": "USDT",
"is_maker": True,
"timestamp": 1640995200000
},
{
"trade_id": "987654322",
"price": 50000.0,
"qty": 0.2,
"fee": 0.01,
"fee_asset": "USDT",
"is_maker": False,
"timestamp": 1640995210000
}
]
}
回调处理示例
def on_order_and_fill(self, exchange, order_and_fill):
"""订单和成交回调"""
order = order_and_fill.get('order', {})
fills = order_and_fill.get('fills', [])
order_id = order.get('order_id')
symbol = order.get('symbol')
status = order.get('status')
self.trader.log(f"📋 订单 {order_id} 状态: {status}")
# 处理成交明细
for fill in fills:
price = fill.get('price')
qty = fill.get('qty')
fee = fill.get('fee')
is_maker = fill.get('is_maker')
order_type = "Maker" if is_maker else "Taker"
self.trader.log(
f" └─ 成交: {qty} @ {price} ({order_type}) 手续费: {fee}"
)
# 统计maker/taker成交量
if is_maker:
self.total_maker_volume += qty * price
else:
self.total_taker_volume += qty * price
8.3 REST轮询订阅 (SubscribeRest)
REST轮询订阅适用于不需要极低延迟但需要定期更新的数据,如账户余额、持仓状态、合约信息等。系统会按照指定的时间间隔主动调用REST API获取数据。
🔄 REST轮询支持的数据类型
| 数据类型 | 说明 | 回调函数 | 更新频率建议 | 适用场景 |
|---|---|---|---|---|
Balance | 账户余额 | on_balance | 1-5秒 | 余额监控、风控 |
BalanceByCoin | 指定币种余额 | on_balance | 1-5秒 | 特定币种余额监控 |
Position | 持仓信息 | on_position | 2-10秒 | 持仓管理、盈亏计算 |
Funding | 资金费率 | on_funding | 30-300秒 | 资金费率监控 |
Instrument | 合约信息 | on_instrument | 60-300秒 | 合约规则更新 |
⚙️ REST轮询配置
基础配置格式:
{
"account_id": 0, # 必须指定账户ID
# 注意:event_handle_mode对SubscribeRest不生效,REST轮询始终按顺序处理
"sub": {
"SubscribeRest": {
"update_interval": {
"secs": 5, # 轮询间隔(秒)
"nanos": 0 # 纳秒精度(通常为0)
},
"rest_type": "数据类型" # 要轮询的数据类型
}
}
}
⚠️ 重要说明:
event_handle_mode参数对SubscribeRest不生效,REST轮询始终按顺序串行处理- REST轮询适用于不需要极低延迟的数据,如余额、持仓等
- 如需高频数据,请使用
SubscribeWs配合合适的event_handle_mode
特殊配置格式(BalanceByCoin):
{
"account_id": 0, # 必须指定账户ID
"sub": {
"SubscribeRest": {
"update_interval": {
"secs": 5, # 轮询间隔(秒)
"nanos": 0 # 纳秒精度(通常为0)
},
"rest_type": {
"BalanceByCoin": "币种名称" # 指定要监控的币种,如"USDT"、"BTC"
}
}
}
}
实际示例:
# 轮询账户余额(每5秒一次)
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 5, "nanos": 0},
"rest_type": "Balance"
}
}
}
# 轮询指定币种余额(每2秒一次)
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 2, "nanos": 0},
"rest_type": {
"BalanceByCoin": "USDT"
}
}
}
}
# 轮询持仓信息(每10秒一次)
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 10, "nanos": 0},
"rest_type": "Position"
}
}
}
# 轮询合约信息(每5分钟一次)
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 300, "nanos": 0},
"rest_type": "Instrument"
}
}
}
💡 BalanceByCoin 特殊说明
BalanceByCoin 是 Balance 的优化版本,专门用于监控特定币种的余额变化。相比 Balance 订阅所有币种余额,BalanceByCoin 具有以下优势:
使用场景:
- 高频监控: 对特定币种(如USDT、BTC)进行高频余额监控
- 资源优化: 只获取指定币种数据,减少网络传输和数据处理开销
- 精确风控: 针对特定币种设置精确的风控阈值
- 性能提升: 减少不必要的数据处理,提高策略执行效率
配置示例:
# 监控USDT余额(每2秒更新)
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 2, "nanos": 0},
"rest_type": {
"BalanceByCoin": "USDT"
}
}
}
}
# 监控BTC余额(每5秒更新)
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 5, "nanos": 0},
"rest_type": {
"BalanceByCoin": "BTC"
}
}
}
}
回调处理:
def on_balance(self, account_id, balances):
"""BalanceByCoin回调处理"""
for balance in balances:
asset = balance.get('asset')
available = balance.get('available_balance', 0)
total = balance.get('balance', 0)
# 由于使用BalanceByCoin,这里只会收到指定币种的数据
self.trader.log(f"{asset}余额: 可用{available} 总计{total}")
# 针对特定币种的风控逻辑
if asset == "USDT" and available < 100:
self.trader.log(f"🚨 USDT余额不足: {available}", level="ERROR")
elif asset == "BTC" and available < 0.001:
self.trader.log(f"⚠️ BTC余额不足: {available}", level="WARN")
8.3.1 Balance 轮询(账户余额)
定时获取账户所有币种的余额信息。
订阅示例
def subscribes(self):
return [
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 5, "nanos": 0},
"rest_type": "Balance"
}
}
}
]
回调处理示例
def on_balance(self, account_id, balances):
"""余额轮询回调"""
for balance in balances:
asset = balance.get('asset')
available = balance.get('available_balance', 0)
total = balance.get('balance', 0)
if total > 0:
self.trader.log(f"余额: {asset} 可用 {available} 总计 {total}")
8.3.2 BalanceByCoin 轮询(指定币种余额)
定时获取指定币种的余额信息,减少数据传输量。
订阅示例
def subscribes(self):
return [
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 2, "nanos": 0},
"rest_type": {
"BalanceByCoin": "USDT"
}
}
}
}
]
回调处理示例
def on_balance(self, account_id, balances):
"""指定币种余额回调"""
for balance in balances:
asset = balance.get('asset')
available = balance.get('available_balance', 0)
# 只会收到指定币种的数据
if asset == "USDT" and available < 100:
self.trader.log(f"🚨 USDT余额不足: {available}", level="ERROR")
8.3.3 Position 轮询(持仓信息)
定时获取账户持仓信息。
订阅示例
def subscribes(self):
return [
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 10, "nanos": 0},
"rest_type": "Position"
}
}
}
]
回调处理示例
def on_position(self, account_id, positions):
"""持仓轮询回调"""
for position in positions:
symbol = position.get('symbol')
amount = position.get('amount', 0)
entry_price = position.get('entry_price', 0)
unrealized_pnl = position.get('unrealized_pnl', 0)
if abs(amount) > 0:
self.trader.log(
f"持仓: {symbol} 数量 {amount} "
f"开仓价 {entry_price} 浮盈 {unrealized_pnl}"
)
8.3.4 Funding 轮询(资金费率)
定时获取资金费率信息。
订阅示例
def subscribes(self):
return [
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 60, "nanos": 0},
"rest_type": "Funding"
}
}
}
]
回调处理示例
def on_funding(self, account_id, funding_rates):
"""资金费率轮询回调"""
for funding in funding_rates:
symbol = funding.get('symbol')
rate = funding.get('funding_rate', 0)
if abs(rate) > 0.001:
self.trader.log(f"高资金费率: {symbol} {rate * 100:.3f}%")
8.3.5 Instrument 轮询(合约信息)
定时获取合约规则信息。
订阅示例
def subscribes(self):
return [
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 300, "nanos": 0},
"rest_type": "Instrument"
}
}
}
]
回调处理示例
def on_instrument(self, account_id, instruments):
"""合约信息轮询回调"""
for instrument in instruments:
symbol = instrument.get('symbol')
status = instrument.get('status')
if status != 'trading':
self.trader.log(f"⚠️ {symbol} 状态异常: {status}", level="WARN")
8.4 定时器订阅 (SubscribeTimer)
定时器订阅用于执行策略相关的定时任务,如定期检查订单状态、执行风控逻辑、统计数据更新等。不依赖外部数据源,完全由系统定时器驱动。
⏰ 定时器特点
- 精确定时: 支持秒级和纳秒级精度
- 多定时器: 可以设置多个不同名称的定时器
- 独立执行: 不依赖交易所连接状态
- 灵活配置: 可以动态调整执行间隔
- 错开启动: 支持初始延迟,避免多个定时器同时启动造成资源竞争
🎯 定时器配置
基础配置格式:
{
# 注意:event_handle_mode对SubscribeTimer不生效,定时器始终按顺序执行
"sub": {
"SubscribeTimer": {
"update_interval": {
"secs": 间隔秒数, # 定时器间隔(秒)
"nanos": 纳秒精度 # 纳秒精度(通常为0)
},
"name": "定时器名称", # 定时器标识名称
"initial_delay": { # 可选:初始延迟时间
"secs": 延迟秒数, # 启动前延迟(秒)
"nanos": 纳秒精度 # 纳秒精度(通常为0)
}
}
}
}
⚠️ 重要说明:
event_handle_mode参数对SubscribeTimer不生效,定时器始终按顺序串行执行- 定时器适用于策略逻辑、风控检查、统计更新等定时任务
- 如需高频数据处理,请使用
SubscribeWs配合合适的event_handle_mode
字段说明:
update_interval: 定时器执行间隔,必须字段name: 定时器名称,用于标识和日志记录,必须字段initial_delay: 初始延迟时间,可选字段,用于错开多个定时器的启动时间,避免资源竞争
initial_delay使用场景:
| 场景类型 | 建议延迟时间 | 原因说明 | 示例 |
|---|---|---|---|
| 风控检查 | 30-60秒 | 等待数据稳定,避免启动时误报 | 余额检查、持仓风控 |
| 统计计算 | 错开15-60秒 | 避免多个计算任务同时执行 | 收益统计、交易分析 |
| 网络请求 | 错开10-30秒 | 分散API调用,避免频率限制 | 外部数据获取 |
| 文件操作 | 错开30-120秒 | 避免同时写文件造成冲突 | 日志轮转、数据备份 |
| 连接监控 | 0秒(立即) | 需要尽快检测连接状态 | WebSocket状态检查 |
实际应用示例:
# 每30秒检查一次价格预警(立即启动)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 30, "nanos": 0},
"name": "price_alert_check"
}
}
}
# 每分钟更新一次策略统计(延迟10秒启动)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 60, "nanos": 0},
"name": "stats_update",
"initial_delay": {"secs": 10, "nanos": 0}
}
}
}
# 每10秒检查一次订单状态(延迟5秒启动)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 10, "nanos": 0},
"name": "order_status_check",
"initial_delay": {"secs": 5, "nanos": 0}
}
}
}
# 风控检查(延迟30秒启动,避免启动时误报)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 300, "nanos": 0},
"name": "risk_check",
"initial_delay": {"secs": 30, "nanos": 0}
}
}
}
# 对应的回调函数处理
def on_timer_subscribe(self, timer_name):
"""定时器回调处理"""
if timer_name == "price_alert_check":
self.check_price_alerts()
elif timer_name == "stats_update":
self.update_strategy_stats()
elif timer_name == "order_status_check":
self.check_pending_orders()
else:
self.trader.log(f"未知定时器: {timer_name}", level="WARN")
💡 定时器完整示例
订阅配置:
def subscribes(self):
"""定时器订阅配置示例"""
return [
# 策略监控定时器(每30秒,立即启动)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 30, "nanos": 0},
"name": "strategy_monitor"
}
}
},
# 风控检查定时器(每5分钟,延迟60秒启动避免启动时误报)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 300, "nanos": 0},
"name": "risk_check",
"initial_delay": {"secs": 60, "nanos": 0}
}
}
},
# 统计数据更新(每分钟,延迟15秒启动)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 60, "nanos": 0},
"name": "stats_update",
"initial_delay": {"secs": 15, "nanos": 0}
}
}
},
# 价格预警检查(每10秒,延迟5秒启动)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 10, "nanos": 0},
"name": "price_alert",
"initial_delay": {"secs": 5, "nanos": 0}
}
}
},
# 订单状态检查(每15秒,延迟10秒启动)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 15, "nanos": 0},
"name": "order_check",
"initial_delay": {"secs": 10, "nanos": 0}
}
}
}
]
回调函数处理:
def on_timer_subscribe(self, timer_name):
"""定时器回调统一处理"""
try:
if timer_name == "strategy_monitor":
self.strategy_monitor()
elif timer_name == "risk_check":
self.risk_management_check()
elif timer_name == "stats_update":
self.update_strategy_statistics()
elif timer_name == "price_alert":
self.check_price_alerts()
elif timer_name == "order_check":
self.check_pending_orders()
else:
self.trader.log(f"未知定时器: {timer_name}", level="WARN")
except Exception as e:
self.trader.log(f"定时器{timer_name}执行异常: {e}", level="ERROR")
def strategy_monitor(self):
"""策略监控逻辑"""
# 检查策略运行状态
uptime = time.time() - self.start_time
self.trader.tlog(
"策略监控",
f"策略运行时间: {uptime:.0f}秒, 状态: 正常",
interval=300, # 5分钟记录一次
color="green"
)
# 检查WebSocket连接状态
if hasattr(self, 'last_data_time'):
time_since_data = time.time() - self.last_data_time
if time_since_data > 60: # 超过1分钟没有数据
self.trader.log("⚠️ WebSocket数据中断超过1分钟", level="WARN")
def risk_management_check(self):
"""风控检查"""
# 检查总持仓风险
total_exposure = self.calculate_total_exposure()
max_exposure = 100000 # 最大敞口100k
if total_exposure > max_exposure:
self.trader.log(
f"🚨 总敞口超限: {total_exposure} > {max_exposure}",
level="ERROR",
color="red"
)
# 执行风控措施
self.emergency_close_positions()
# 检查浮动盈亏
total_pnl = self.calculate_total_pnl()
if total_pnl < -5000: # 浮亏超过5k
self.trader.log(
f"🚨 浮亏过大: {total_pnl}",
level="ERROR",
color="red"
)
def update_strategy_statistics(self):
"""更新策略统计"""
# 计算成功率
if self.total_trades > 0:
win_rate = self.successful_trades / self.total_trades * 100
self.trader.tlog(
"策略统计",
f"交易次数: {self.total_trades}, 成功率: {win_rate:.1f}%",
interval=300,
color="blue"
)
def check_price_alerts(self):
"""价格预警检查"""
for symbol in self.monitored_symbols:
current_price = self.get_current_price(symbol)
if current_price:
# 检查价格突破
if symbol in self.price_alerts:
alert = self.price_alerts[symbol]
if alert['type'] == 'above' and current_price > alert['price']:
self.trader.log(f"💡 {symbol} 突破预警价格 {alert['price']}", color="yellow")
elif alert['type'] == 'below' and current_price < alert['price']:
self.trader.log(f"💡 {symbol} 跌破预警价格 {alert['price']}", color="yellow")
def check_pending_orders(self):
"""检查挂单状态"""
# 检查长时间未成交的订单
for order_id, order_info in self.pending_orders.items():
order_age = time.time() - order_info['created_time']
if order_age > 600: # 超过10分钟未成交
self.trader.log(
f"⏰ 订单{order_id}长时间未成交: {order_age:.0f}秒",
level="WARN"
)
# 可以选择撤销长时间未成交的订单
if order_age > 1800: # 超过30分钟强制撤销
self.cancel_order(order_id)
self.trader.log(f"🗑️ 强制撤销超时订单: {order_id}")
8.5 综合订阅配置示例
以下是一个完整的策略订阅配置示例,展示如何同时使用三种数据源:
def subscribes(self):
"""综合订阅配置示例"""
return [
# 1. 高频市场数据订阅
{
"account_id": 0,
"event_handle_mode": "Latest", # 高频市场数据使用Latest模式
"sub": {
"SubscribeWs": [
# 高频市场行情数据
{
"Kline": {
"symbols": ["BTC_USDT", "ETH_USDT"],
"interval": "1m"
}
},
{
"Bbo": ["BTC_USDT", "ETH_USDT"]
}
]
}
},
# 2. 重要数据订阅
{
"account_id": 0,
"event_handle_mode": "Sync", # 重要数据使用Sync模式
"sub": {
"SubscribeWs": [
{
"Depth": {
"symbols": ["BTC_USDT"],
"levels": 5
}
},
# 账户数据
{
"Order": ["BTC_USDT", "ETH_USDT"]
},
{
"Position": ["BTC_USDT", "ETH_USDT"]
}
]
}
},
# 2. REST轮询数据订阅
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 5, "nanos": 0},
"rest_type": "Balance" # 每5秒更新余额
}
}
},
# 3. 定时器任务订阅
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 30, "nanos": 0},
"name": "strategy_monitor" # 每30秒执行策略监控
}
}
},
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 300, "nanos": 0},
"name": "risk_check" # 每5分钟执行风控检查
}
}
}
]
8.6 数据源选择与最佳实践
🎯 数据源选择指南
根据不同的应用场景,选择合适的数据源类型:
| 应用场景 | 推荐数据源 | 订阅频道 | 更新频率 | 理由 |
|---|---|---|---|---|
| 高频交易 | WebSocket | Bbo, Depth, Trade | 毫秒级 | 需要极低延迟的实时数据 |
| 技术分析 | WebSocket | Kline, MarkPrice | 秒级 | K线完结状态重要,实时性要求高 |
| 套利策略 | WebSocket | Bbo, Funding | 秒-分钟级 | 价差和资金费率变化敏感 |
| 风控监控 | REST轮询 | Balance, Position | 1-10秒 | 定期检查即可,不需要实时 |
| 策略逻辑 | 定时器 | Timer | 10秒-5分钟 | 独立于市场数据的逻辑执行 |
| 账户管理 | WebSocket + REST | Order + Balance | 混合 | 实时订单状态 + 定期余额检查 |
✅ 最佳实践建议
1. 分层订阅策略
def subscribes(self):
"""分层订阅策略示例"""
return [
# 核心层:高频实时数据
{
"account_id": 0,
"event_handle_mode": "Latest", # 高频数据使用Latest模式
"sub": {"SubscribeWs": [
{"Bbo": self.trading_symbols}, # 交易信号生成
]}
},
# 核心层:重要数据
{
"account_id": 0,
"event_handle_mode": "Sync", # 重要数据使用Sync模式
"sub": {"SubscribeWs": [
{"Order": self.trading_symbols} # 订单状态监控
]}
},
# 支撑层:中频数据
{
"account_id": 0,
"sub": {"SubscribeRest": {
"update_interval": {"secs": 10, "nanos": 0},
"rest_type": "Balance" # 余额监控
}}
},
# 管理层:低频定时任务
{
"sub": {"SubscribeTimer": {
"update_interval": {"secs": 60, "nanos": 0},
"name": "risk_management", # 风控检查
"initial_delay": {"secs": 30, "nanos": 0} # 延迟启动避免误报
}}
}
]
2. 数据处理优化
def on_bbo(self, exchange, bbo_list):
"""优化的BBO处理"""
try:
for bbo in bbo_list:
symbol = bbo.get('symbol')
# 快速数据验证
if not self.validate_bbo_data(bbo):
continue
# 核心交易逻辑(最小化处理时间)
signal = self.generate_trading_signal(symbol, bbo)
if signal:
self.execute_trade(signal)
except Exception as e:
# 错误处理不影响主流程
self.trader.log(f"BBO处理异常: {e}", level="ERROR")
def on_kline(self, exchange, kline):
"""K线数据处理最佳实践"""
for candle in kline['candles']:
# 关键:只处理已完结的K线
if candle['confirm']:
self.analyze_completed_candle(candle)
else:
# 未完结K线可用于实时监控,但不做技术分析
self.monitor_current_candle(candle)
3. 资源管理与性能优化
class PerformanceOptimizedStrategy:
def __init__(self):
# 数据缓存,避免重复计算
self.price_cache = {}
self.last_update_time = {}
def subscribes(self):
"""性能优化的订阅配置"""
# 只订阅必要的交易对
core_symbols = ["BTC_USDT", "ETH_USDT"] # 核心交易对
monitor_symbols = ["SOL_USDT", "BNB_USDT"] # 监控交易对
return [
# 核心交易对 - 高频订阅
{
"account_id": 0,
"event_handle_mode": "Latest", # 核心交易对使用Latest模式
"sub": {"SubscribeWs": [
{"Bbo": core_symbols},
{"Kline": {"symbols": core_symbols, "interval": "1m"}}
]}
},
# 监控交易对 - 低频订阅
{
"account_id": 0,
"event_handle_mode": "Sync", # 监控数据使用Sync模式
"sub": {"SubscribeWs": [
{"Bbo": monitor_symbols}
]}
}
]
def on_bbo(self, exchange, bbo_list):
"""带缓存的BBO处理"""
current_time = time.time()
for bbo in bbo_list:
symbol = bbo.get('symbol')
last_time = self.last_update_time.get(symbol, 0)
# 限制处理频率,避免过度计算
if current_time - last_time < 0.1: # 100ms内不重复处理
continue
self.last_update_time[symbol] = current_time
self.process_bbo_data(symbol, bbo)
✅ 推荐配置模式
高频交易模式:
# 追求极低延迟,最小化订阅
["Bbo", "Order"] # 仅订阅必要数据
技术分析模式:
# 需要完整市场数据
["Kline", "Bbo", "Depth", "MarkPrice"]
稳健套利模式:
# 平衡实时性和稳定性
WebSocket: ["Bbo", "Funding", "Order", "Position"]
REST: ["Balance"] (每5秒)
Timer: ["risk_check"] (每分钟,延迟30秒启动)
❌ 常见误区避免
1. 过度订阅
# ❌ 错误:订阅过多不必要的数据
["Bbo", "Depth", "Trade", "Kline", "MarkPrice", "Funding"] # 太多了
# ✅ 正确:只订阅策略需要的数据
["Bbo", "Order"] # 简单策略只需要这些
2. 忽略数据状态
# ❌ 错误:不检查K线完结状态
def on_kline(self, exchange, kline):
candle = kline['candles'][-1]
self.analyze_signal(candle) # 可能分析未完结K线
# ✅ 正确:检查K线状态
def on_kline(self, exchange, kline):
candle = kline['candles'][-1]
if candle['confirm']: # 只分析已完结的K线
self.analyze_signal(candle)
3. 阻塞回调函数
# ❌ 错误:在回调中执行耗时操作
def on_bbo(self, exchange, bbo_list):
for bbo in bbo_list:
time.sleep(0.1) # 阻塞其他数据处理
self.complex_calculation() # 复杂计算
# ✅ 正确:异步处理耗时操作
def on_bbo(self, exchange, bbo_list):
# 快速缓存数据
for bbo in bbo_list:
symbol = bbo.get('symbol')
self.bbo_cache[symbol] = bbo
# 复杂处理放在定时器中进行
def on_timer_subscribe(self, timer_name):
if timer_name == "complex_analysis":
self.perform_complex_analysis()
4. 忽略初始延迟配置
# ❌ 错误:多个定时器同时启动,造成资源竞争
[
{"SubscribeTimer": {"update_interval": {"secs": 300, "nanos": 0}, "name": "stats"}},
{"SubscribeTimer": {"update_interval": {"secs": 300, "nanos": 0}, "name": "report"}},
{"SubscribeTimer": {"update_interval": {"secs": 300, "nanos": 0}, "name": "backup"}}
]
# ✅ 正确:使用初始延迟错开启动时间
[
{"SubscribeTimer": {"update_interval": {"secs": 300, "nanos": 0}, "name": "stats"}},
{"SubscribeTimer": {"update_interval": {"secs": 300, "nanos": 0}, "name": "report",
"initial_delay": {"secs": 60, "nanos": 0}}},
{"SubscribeTimer": {"update_interval": {"secs": 300, "nanos": 0}, "name": "backup",
"initial_delay": {"secs": 120, "nanos": 0}}}
]
5. 初始延迟最佳实践
def subscribes(self):
"""使用initial_delay的最佳实践"""
return [
# 立即启动的监控类定时器
{
"sub": {"SubscribeTimer": {
"update_interval": {"secs": 30, "nanos": 0},
"name": "connection_monitor" # 无延迟,立即监控连接状态
}}
},
# 延迟启动的风控类定时器
{
"sub": {"SubscribeTimer": {
"update_interval": {"secs": 60, "nanos": 0},
"name": "risk_check",
"initial_delay": {"secs": 60, "nanos": 0} # 延迟1分钟,等数据稳定
}}
},
# 错开启动的统计类定时器(避免同时执行)
{
"sub": {"SubscribeTimer": {
"update_interval": {"secs": 300, "nanos": 0},
"name": "stats_calc",
"initial_delay": {"secs": 30, "nanos": 0} # 错开30秒
}}
},
{
"sub": {"SubscribeTimer": {
"update_interval": {"secs": 300, "nanos": 0},
"name": "report_gen",
"initial_delay": {"secs": 90, "nanos": 0} # 错开90秒
}}
}
]
🔧 调试与监控
数据流监控:
def on_bbo(self, exchange, bbo_list):
# 统计数据接收频率
for bbo in bbo_list:
symbol = bbo.get('symbol')
self.data_stats[symbol] = self.data_stats.get(symbol, 0) + 1
def on_timer_subscribe(self, timer_name):
if timer_name == "data_monitor":
for symbol, count in self.data_stats.items():
self.trader.tlog(
"数据统计",
f"{symbol}: {count}次/分钟",
interval=60
)
self.data_stats.clear()
通过合理的数据源选择和配置,可以构建高效、稳定的量化交易策略。