跳到主要内容

八、数据源订阅

八、数据源订阅

8.1 订阅机制概述

Trader API 提供了完整的数据源订阅机制,通过策略的 subscribes() 方法定义需要订阅的数据类型。系统支持三种不同的数据源订阅方式:

数据源类型数据来源更新方式适用场景延迟特性
SubscribeWs交易所 WebSocket实时推送高频交易、实时行情极低延迟
SubscribeRest交易所 REST API定时轮询账户状态、合约信息秒级延迟
SubscribeTimer系统定时器定时触发策略逻辑、状态检查可配置间隔

数据流向图

+-------------------+     +---------------+     +----------------------+
| Exchange API | | System Timer | | Strategy Callback |
| | | | | |
| WebSocket ----+ | | Timer ----+ | | +-- on_bbo() |
| REST API ---+ | | | | | | +-- on_kline() |
| | | | | | | | +-- on_index_kline()|
| | | | | | | | +-- on_order() |
+--------------|-|--+ +------------|--+ | +-- on_timer() |
| | | | +-- on_balance() |
| +------------------+ | | |
+------------------+ | | +----------------------+
| | | ^
v v v |
+------------------+ |
| Trader Engine |------------+
| (Data Dispatch) |
+------------------+

中文说明:

  • Exchange API (交易所API): WebSocket实时推送 + REST API轮询
  • System Timer (系统定时器): 策略定时任务触发器
  • Strategy Callback (策略回调): 数据到达后触发的回调函数
  • Trader Engine (数据分发中心): 统一接收并分发数据到策略

8.2 WebSocket 实时订阅 (SubscribeWs)

WebSocket 订阅是最重要的数据源,提供交易所的实时推送数据,具有极低延迟的特性。

支持的频道类型总览

频道分类频道类型说明回调函数延迟级别
市场行情Bbo最佳买卖价on_bbo毫秒级
Depth市场深度on_depth毫秒级
Trade成交记录on_trade毫秒级
AggTrade聚合成交on_agg_trade毫秒级
KlineK线数据on_kline秒级
IndexKline指数K线数据on_index_kline秒级
MarkPrice标记价格on_mark_price秒级
Funding资金费率on_funding分钟级
账户数据Order订单更新on_order毫秒级
Position持仓更新on_position毫秒级
Balance余额更新on_balance秒级
FundingFee资金费结算on_funding_fee小时级
OrderAndFill订单和成交on_order_and_fill毫秒级

🚀 WebSocket订阅配置

基础配置格式:

{
"account_id": 0, # 账户ID(市场数据可选,账户数据必须)
"event_handle_mode": "Sync", # 事件处理模式:Latest/Sync/Async/Batch(可选)
# 默认行为:仅订阅 BBO/Depth/Trade 时默认 Batch,其他情况默认 Sync
"sub": {
"SubscribeWs": [
# 市场数据订阅(无需账户ID)
{
"频道类型": 配置参数
},
# 账户数据订阅(需要账户ID)
{
"频道类型": 配置参数
}
]
}
}

⚠️ 重要说明

  • event_handle_mode参数仅对SubscribeWs生效,SubscribeRestSubscribeTimer会忽略此参数
  • 默认行为:仅订阅 BBO/Depth/Trade 时默认使用 Batch 模式,其他情况默认使用 Sync 模式
  • 不同数据类型推荐使用不同模式:
    • 高频数据(Bbo、Trade)→ "Latest"模式
    • 重要数据(Depth、Order、Position)→ "Sync"模式
    • 高频重要数据 → "Async"模式

8.2.1 Bbo 频道(最佳买卖价)

获取交易对的实时最佳买一价和卖一价,适用于快速获取当前市场价格。

频道说明
  • 频道名称Bbo
  • 推送频率:有变化时推送,毫秒级延迟
  • 回调函数on_bbo(exchange, bbo_list)bbo_list 为数组类型,包含多个 BBO 数据
  • 适用场景:高频交易、价格监控、套利策略
订阅示例
def subscribes(self):
return [
{
"account_id": 0,
"event_handle_mode": "Latest",
"sub": {
"SubscribeWs": [
{
"Bbo": ["BTC_USDT", "ETH_USDT"]
}
]
}
}
]
推送数据参数
参数名类型描述
symbolString交易对,如 BTC_USDT
bid_priceFloat最佳买价
bid_qtyFloat买一档数量
ask_priceFloat最佳卖价
ask_qtyFloat卖一档数量
timestampInteger数据更新时间,Unix 时间戳(毫秒)
推送数据示例
{
"symbol": "BTC_USDT", # 交易对
"bid_price": 50000.5, # 最佳买价
"bid_qty": 1.5, # 买一数量
"ask_price": 50001.0, # 最佳卖价
"ask_qty": 2.0, # 卖一数量
"timestamp": 1640995200000 # 时间戳(毫秒)
}
回调处理示例
def on_bbo(self, exchange, bbo_list):
"""
最佳买卖价回调

注意:bbo_list 是数组类型,包含多个 BBO 数据。
在 Latest 模式下,数组通常只包含一条最新消息。
在 Batch 模式下,数组包含累积期间的所有消息。
"""
for bbo in bbo_list:
symbol = bbo.get('symbol')
bid_price = bbo.get('bid_price')
ask_price = bbo.get('ask_price')
spread = ask_price - bid_price
spread_pct = spread / bid_price * 100

self.trader.log(
f"{symbol} BBO: 买一 {bid_price} 卖一 {ask_price} "
f"价差 {spread:.2f} ({spread_pct:.4f}%)"
)

# 交易信号判断
if spread_pct < 0.01: # 价差小于0.01%
self.generate_trading_signal(symbol, bbo)

8.2.2 Depth 频道(市场深度)

获取交易对的订单簿深度数据,支持配置深度档位。

频道说明
  • 频道名称Depth
  • 推送频率:有变化时推送,毫秒级延迟
  • 回调函数on_depth(exchange, depth_list)depth_list 为数组类型,包含多个深度数据
  • 适用场景:深度分析、大单监控、流动性评估
订阅示例
def subscribes(self):
return [
{
"account_id": 0,
"event_handle_mode": "Sync",
"sub": {
"SubscribeWs": [
{
"Depth": {
"symbols": ["BTC_USDT", "ETH_USDT"],
"levels": 10 # 订阅10档深度
}
}
]
}
}
]
推送数据参数
参数名类型描述
symbolString交易对,如 BTC_USDT
bidsArray买盘数组,按价格从高到低排序
> priceFloat买盘价格
> amountFloat该价格档位的数量
asksArray卖盘数组,按价格从低到高排序
> priceFloat卖盘价格
> amountFloat该价格档位的数量
timestampInteger数据更新时间,Unix 时间戳(毫秒)
推送数据示例
{
"symbol": "BTC_USDT", # 交易对
"bids": [ # 买盘,按价格从高到低排序
{"price": 50000.0, "amount": 1.5},
{"price": 49999.5, "amount": 2.0},
{"price": 49999.0, "amount": 3.2},
# ...更多档位
],
"asks": [ # 卖盘,按价格从低到高排序
{"price": 50001.0, "amount": 1.0},
{"price": 50001.5, "amount": 1.8},
{"price": 50002.0, "amount": 2.5},
# ...更多档位
],
"timestamp": 1640995200000 # 时间戳(毫秒)
}
回调处理示例
def on_depth(self, exchange, depth_list):
"""
市场深度回调

注意:depth_list 是数组类型,包含多个深度数据。
在 Sync/Async 模式下,数组通常只包含一条最新消息。
在 Batch 模式下,数组包含累积期间的所有消息。
"""
for depth in depth_list:
symbol = depth.get('symbol')
bids = depth.get('bids', [])
asks = depth.get('asks', [])

if bids and asks:
best_bid = bids[0]['price']
best_ask = asks[0]['price']

# 计算买卖盘总量
bid_volume = sum(entry['amount'] for entry in bids)
ask_volume = sum(entry['amount'] for entry in asks)

# 买卖力量比
volume_ratio = bid_volume / ask_volume if ask_volume > 0 else 0

self.trader.log(
f"{symbol} 深度: 最优买 {best_bid} 最优卖 {best_ask} "
f"买盘量 {bid_volume:.2f} 卖盘量 {ask_volume:.2f} "
f"买卖比 {volume_ratio:.2f}"
)

# 大单检测
for entry in bids[:3]:
if entry['amount'] > 10: # 大单阈值
self.trader.log(f"🐋 {symbol} 大买单: {entry['amount']} @ {entry['price']}")

8.2.3 Trade 频道(成交记录)

获取交易对的实时逐笔成交数据。

频道说明
  • 频道名称Trade
  • 推送频率:每笔成交推送,毫秒级延迟
  • 回调函数on_trade(exchange, trade_list)trade_list 为数组类型,包含多个成交数据
  • 适用场景:成交监控、大单追踪、成交量分析
订阅示例
def subscribes(self):
return [
{
"account_id": 0,
"event_handle_mode": "Latest",
"sub": {
"SubscribeWs": [
{
"Trade": ["BTC_USDT", "ETH_USDT"]
}
]
}
}
]
推送数据参数
参数名类型描述
idString成交记录唯一标识
symbolString交易对,如 BTC_USDT
priceFloat成交价格
amountFloat成交数量(基础货币)
sideString主动成交方向:Buy 买入 / Sell 卖出
timestampInteger成交时间,Unix 时间戳(毫秒)
推送数据示例
{
"id": "123456789", # 成交ID
"symbol": "BTC_USDT", # 交易对
"price": 50000.5, # 成交价格
"amount": 0.5, # 成交数量(基础货币)
"side": "Buy", # 成交方向:Buy/Sell
"timestamp": 1640995200000 # 成交时间(毫秒)
}
回调处理示例
def on_trade(self, exchange, trade_list):
"""
成交记录回调

注意:trade_list 是数组类型,包含多个成交数据。
在 Latest 模式下,数组通常只包含一条最新消息。
在 Batch 模式下,数组包含累积期间的所有消息。
"""
for trade in trade_list:
symbol = trade.get('symbol')
price = trade.get('price')
amount = trade.get('amount')
side = trade.get('side')
value = price * amount

# 大单监控
if value > 100000: # 成交额超过10万
direction = "买入" if side == "Buy" else "卖出"
self.trader.log(
f"🔥 {symbol} 大单成交: {direction} {amount} @ {price} "
f"成交额 ${value:,.0f}",
color="red" if side == "Sell" else "green"
)

8.2.4 AggTrade 频道(聚合成交)

获取交易对的聚合成交数据,将短时间内的成交合并推送。

频道说明
  • 频道名称AggTrade
  • 推送频率:聚合推送,毫秒级延迟
  • 回调函数on_agg_trade(exchange, symbol, agg_trade)
  • 适用场景:减少数据量、成交趋势分析
订阅示例
def subscribes(self):
return [
{
"account_id": 0,
"event_handle_mode": "Latest",
"sub": {
"SubscribeWs": [
{
"AggTrade": ["BTC_USDT"]
}
]
}
}
]
推送数据参数
参数名类型描述
idString聚合成交唯一标识
symbolString交易对,如 BTC_USDT
priceFloat成交价格
amountFloat聚合成交总数量(基础货币)
sideString主动成交方向:Buy 买入 / Sell 卖出
first_trade_idString此聚合包含的首笔成交 ID
last_trade_idString此聚合包含的末笔成交 ID
timestampInteger成交时间,Unix 时间戳(毫秒)
推送数据示例
{
"id": "987654321", # 聚合成交ID
"symbol": "BTC_USDT", # 交易对
"price": 50000.5, # 成交价格
"amount": 5.5, # 聚合成交数量(基础货币)
"side": "Buy", # 成交方向:Buy/Sell
"first_trade_id": "123456780", # 首笔成交ID
"last_trade_id": "123456789", # 末笔成交ID
"timestamp": 1640995200000 # 时间戳(毫秒)
}
回调处理示例
def on_agg_trade(self, exchange, symbol, agg_trade):
"""聚合成交回调"""
price = agg_trade.get('price')
amount = agg_trade.get('amount')
side = agg_trade.get('side')

self.trader.log(f"{symbol} 聚合成交: {side} {amount} @ {price}")

8.2.5 Kline 频道(K线数据)

获取交易对的 K 线(蜡烛图)数据,支持多种时间周期。

频道说明
  • 频道名称Kline
  • 推送频率:K线更新时推送,秒级延迟
  • 回调函数on_kline(exchange, kline)
  • 适用场景:技术分析、趋势判断、量价分析
支持的时间周期
分钟级小时级天级及以上
1m, 3m, 5m, 15m, 30m1h, 2h, 4h, 6h, 8h, 12h1d, 3d, 1w, 1M
订阅示例
def subscribes(self):
return [
{
"account_id": 0,
"event_handle_mode": "Latest",
"sub": {
"SubscribeWs": [
{
"Kline": {
"symbols": ["BTC_USDT", "ETH_USDT"],
"interval": "1m"
}
}
]
}
}
]
推送数据参数
参数名类型描述
symbolString交易对,如 BTC_USDT
intervalStringK线时间周期,如 1m, 5m, 1h, 1d
candlesArrayK线数据数组
> timestampIntegerK线开盘时间,Unix 时间戳(毫秒)
> openFloat开盘价
> highFloat最高价
> lowFloat最低价
> closeFloat收盘价
> volumeFloat成交量(基础货币)
> quote_volumeFloat成交额(报价货币)
> tradesInteger成交笔数(可选)
> taker_buy_volumeFloatTaker 买入成交量(可选)
> taker_buy_quote_volumeFloatTaker 买入成交额(可选)
> confirmBooleanK线状态:false 未完结,true 已完结
推送数据示例
{
"symbol": "BTC_USDT", # 交易对
"interval": "1m", # 时间周期
"candles": [
{
"timestamp": 1640995200000, # 开盘时间(毫秒)
"open": 50000.0, # 开盘价
"high": 50500.0, # 最高价
"low": 49800.0, # 最低价
"close": 50200.0, # 收盘价
"volume": 123.45, # 成交量(基础货币)
"quote_volume": 6180000.0, # 成交额(报价货币)
"trades": 1250, # 成交笔数
"taker_buy_volume": 65.2, # 主动买入成交量
"taker_buy_quote_volume": 3270000.0, # 主动买入成交额
"confirm": False # false=未完结,true=已完结
}
]
}
回调处理示例
def on_kline(self, exchange, kline):
"""K线数据回调"""
symbol = kline['symbol']
interval = kline['interval']

for candle in kline['candles']:
open_price = candle['open']
high_price = candle['high']
low_price = candle['low']
close_price = candle['close']
volume = candle['volume']
is_confirmed = candle['confirm']

# 计算涨跌幅
change_pct = (close_price - open_price) / open_price * 100
status = "✅ 已完结" if is_confirmed else "⏳ 未完结"

self.trader.log(
f"{symbol} {interval} K线: "
f"O:{open_price} H:{high_price} L:{low_price} C:{close_price} "
f"涨跌:{change_pct:+.2f}% 成交量:{volume:.2f} {status}"
)

# 只对已完结的K线进行技术分析
if is_confirmed:
self.analyze_kline_signal(symbol, candle)

def analyze_kline_signal(self, symbol, candle):
"""分析K线形态信号"""
body = abs(candle['close'] - candle['open'])
range_size = candle['high'] - candle['low']

if range_size > 0:
body_ratio = body / range_size

# 大阳线/大阴线检测
if body_ratio > 0.7:
if candle['close'] > candle['open']:
self.trader.log(f"📈 {symbol}: 大阳线信号", color="green")
else:
self.trader.log(f"📉 {symbol}: 大阴线信号", color="red")

# 十字星检测
elif body_ratio < 0.1:
self.trader.log(f"✴️ {symbol}: 十字星形态", color="yellow")

8.2.6 IndexKline 频道(指数K线数据)

获取交易对的指数K线数据。指数价格通常是多个交易所现货价格的加权平均,用于衍生品交易的公允价格参考。 目前仅OkxSwap、GateSwap、HuobiSwap支持。

频道说明
  • 频道名称IndexKline
  • 推送频率:K线更新时推送,秒级延迟
  • 回调函数on_index_kline(exchange, index_kline)
  • 适用场景:指数价格分析、套利策略、跨交易所价格比较
  • 支持交易所:OKX、Gate、Huobi(合约交易)
与普通 Kline 的区别
特性Kline(普通K线)IndexKline(指数K线)
价格来源单一交易所成交价多交易所加权平均价
成交量通常无
用途技术分析、交易决策公允价格参考、套利分析
波动性较高较平滑
支持的时间周期
分钟级小时级天级及以上
1m, 3m, 5m, 15m, 30m1h, 2h, 4h, 6h, 8h, 12h1d, 3d, 1w, 1M
订阅示例
def subscribes(self):
return [
{
"account_id": 0,
"event_handle_mode": "Latest",
"sub": {
"SubscribeWs": [
{
"IndexKline": {
"symbols": ["BTC_USDT", "ETH_USDT"],
"interval": "1m"
}
}
]
}
}
]
推送数据参数
参数名类型描述
symbolString交易对,如 BTC_USDT
intervalStringK线时间周期,如 1m, 5m, 1h, 1d
candlesArrayK线数据数组
> timestampIntegerK线开盘时间,Unix 时间戳(毫秒)
> openFloat开盘价(指数价格)
> highFloat最高价(指数价格)
> lowFloat最低价(指数价格)
> closeFloat收盘价(指数价格)
> volumeFloat成交量(可能为0,取决于交易所)
> quote_volumeFloat成交额(可能为0,取决于交易所)
> confirmBooleanK线状态:false 未完结,true 已完结
推送数据示例
{
"symbol": "BTC_USDT", # 交易对
"interval": "1m", # 时间周期
"candles": [
{
"timestamp": 1640995200000, # 开盘时间(毫秒)
"open": 50000.0, # 指数开盘价
"high": 50500.0, # 指数最高价
"low": 49800.0, # 指数最低价
"close": 50200.0, # 指数收盘价
"volume": 0.0, # 成交量(指数通常为0)
"quote_volume": 0.0, # 成交额(指数通常为0)
"confirm": False # false=未完结,true=已完结
}
]
}
回调处理示例
def on_index_kline(self, exchange, index_kline):
"""指数K线数据回调"""
symbol = index_kline['symbol']
interval = index_kline['interval']

for candle in index_kline['candles']:
open_price = candle['open']
high_price = candle['high']
low_price = candle['low']
close_price = candle['close']
is_confirmed = candle['confirm']

# 计算指数价格涨跌幅
change_pct = (close_price - open_price) / open_price * 100
status = "已完结" if is_confirmed else "未完结"

self.trader.log(
f"{symbol} 指数{interval} K线: "
f"O:{open_price} H:{high_price} L:{low_price} C:{close_price} "
f"涨跌:{change_pct:+.2f}% {status}"
)

# 只对已完结的K线进行分析
if is_confirmed:
self.analyze_index_price(symbol, candle)

def analyze_index_price(self, symbol, candle):
"""分析指数价格与现货价格的偏离"""
index_price = candle['close']

# 获取当前现货/合约价格进行比较
# 可用于套利策略或风控
self.trader.log(f"{symbol}: 指数价格 {index_price}")
同时订阅 Kline 和 IndexKline 示例
def subscribes(self):
"""同时订阅普通K线和指数K线,便于对比分析"""
return [
{
"account_id": 0,
"sub": {
"SubscribeWs": [
{
"Kline": {
"symbols": ["BTC_USDT"],
"interval": "1m"
}
},
{
"IndexKline": {
"symbols": ["BTC_USDT"],
"interval": "1m"
}
}
]
}
}
]

def on_kline(self, exchange, kline):
"""普通K线回调"""
self.last_spot_price = kline['candles'][-1]['close']
self.check_basis()

def on_index_kline(self, exchange, index_kline):
"""指数K线回调"""
self.last_index_price = index_kline['candles'][-1]['close']
self.check_basis()

def check_basis(self):
"""检查基差(现货与指数的价格差异)"""
if hasattr(self, 'last_spot_price') and hasattr(self, 'last_index_price'):
basis = self.last_spot_price - self.last_index_price
basis_pct = basis / self.last_index_price * 100
self.trader.log(f"基差: {basis:.2f} ({basis_pct:+.3f}%)")

8.2.7 MarkPrice 频道(标记价格)

获取交易对的标记价格,用于计算未实现盈亏和强平价格。

频道说明
  • 频道名称MarkPrice
  • 推送频率:秒级推送
  • 回调函数on_mark_price(exchange, symbol, mark_price)
  • 适用场景:风控计算、盈亏监控
订阅示例
def subscribes(self):
return [
{
"account_id": 0,
"event_handle_mode": "Sync",
"sub": {
"SubscribeWs": [
{
"MarkPrice": ["BTC_USDT", "ETH_USDT"]
}
]
}
}
]
推送数据参数
参数名类型描述
symbolString交易对,如 BTC_USDT
priceFloat标记价格,用于计算未实现盈亏和强平价格
推送数据示例
{
"symbol": "BTC_USDT", # 交易对
"price": 50100.5 # 标记价格
}
回调处理示例
def on_mark_price(self, exchange, symbol, mark_price):
"""标记价格回调"""
price = mark_price.get('price')

self.trader.log(f"{symbol} 标记价格: {price}")

# 可结合持仓数据计算未实现盈亏
if symbol in self.positions:
position = self.positions[symbol]
entry_price = position.get('entry_price', 0)
amount = position.get('amount', 0)
if entry_price > 0 and amount != 0:
pnl = (price - entry_price) * amount
self.trader.log(f"{symbol} 预估浮盈: {pnl:.2f}")

8.2.8 Funding 频道(资金费率)

获取永续合约的资金费率信息。

频道说明
  • 频道名称Funding
  • 推送频率:分钟级推送
  • 回调函数on_funding(exchange, symbol, funding)
  • 适用场景:资金费率套利、持仓成本计算
订阅示例
def subscribes(self):
return [
{
"account_id": 0,
"event_handle_mode": "Sync",
"sub": {
"SubscribeWs": [
{
"Funding": ["BTC_USDT", "ETH_USDT"]
}
]
}
}
]
推送数据参数
参数名类型描述
symbolString交易对,如 BTC_USDT
funding_rateFloat当前资金费率,如 0.0001 表示 0.01%
next_funding_atInteger下次资金费结算时间,Unix 时间戳(毫秒)
min_funding_rateFloat资金费率最小值(下限)
max_funding_rateFloat资金费率最大值(上限)
funding_intervalInteger资金费结算周期(小时),可选,通常为 8
推送数据示例
{
"symbol": "BTC_USDT", # 交易对
"funding_rate": 0.0001, # 当前资金费率
"next_funding_at": 1640998800000,# 下次结算时间(毫秒)
"min_funding_rate": -0.00375, # 资金费率最小值
"max_funding_rate": 0.00375, # 资金费率最大值
"funding_interval": 8 # 资金费结算周期(小时),可选
}
回调处理示例
def on_funding(self, exchange, symbol, funding):
"""资金费率回调"""
rate = funding.get('funding_rate', 0)
next_time = funding.get('next_funding_at', 0)
min_rate = funding.get('min_funding_rate', 0)
max_rate = funding.get('max_funding_rate', 0)

rate_pct = rate * 100

self.trader.log(
f"{symbol} 资金费率: {rate_pct:.4f}% "
f"(范围: {min_rate*100:.4f}% ~ {max_rate*100:.4f}%)"
)

# 高资金费率预警
if abs(rate) > 0.001: # 费率超过0.1%
direction = "多头支付空头" if rate > 0 else "空头支付多头"
self.trader.log(
f"💰 {symbol} 高资金费率: {rate_pct:.4f}% ({direction})",
color="yellow"
)

8.2.9 Order 频道(订单更新)

获取账户订单状态的实时更新。

频道说明
  • 频道名称Order
  • 推送频率:订单状态变化时推送,毫秒级延迟
  • 回调函数on_order(exchange, order)
  • 适用场景:订单管理、成交确认
  • 权限要求:需要账户权限(account_id 必须指定)
订阅示例
def subscribes(self):
return [
{
"account_id": 0, # 必须指定账户ID
"event_handle_mode": "Sync",
"sub": {
"SubscribeWs": [
{
"Order": ["BTC_USDT", "ETH_USDT"]
}
]
}
}
]
推送数据参数
参数名类型描述
idString交易所返回的订单 ID
cidString用户自定义的订单 ID(可选)
symbolString交易对,如 BTC_USDT
sideString订单方向:Buy 买入 / Sell 卖出
pos_sideString仓位方向:Long 多 / Short 空(可选,双向持仓时使用)
order_typeString订单类型:Limit 限价 / Market 市价
time_in_forceString有效期类型:GTC / IOC / FOK / PostOnly / GTD
priceFloat委托价格(市价单可能为空)
amountFloat委托数量(基础货币)
quote_amountFloat委托数量(报价货币,可选)
filledFloat已成交数量
filled_avg_priceFloat成交均价
statusString订单状态:Open / PartiallyFilled / Filled / Canceled
timestampInteger订单创建时间,Unix 时间戳(毫秒)
fee_detailArray手续费详情列表(可选)
> feeFloat手续费金额
> fee_rateFloat手续费费率(与 fee 互斥)
> fee_coinString手续费币种,如 USDT
> is_makerBoolean是否为 maker 成交
sourceString订单来源:Order 订单事件 / UserTrade 成交事件
推送数据示例
{
"id": "123456789", # 订单ID
"cid": "my_order_1", # 客户端订单ID(可选)
"symbol": "BTC_USDT", # 交易对
"side": "Buy", # 方向:Buy/Sell
"pos_side": "Long", # 仓位方向:Long/Short(可选,双向持仓时使用)
"order_type": "Limit", # 类型:Limit/Market
"time_in_force": "GTC", # 有效期:GTC/IOC/FOK/PostOnly
"price": 50000.0, # 委托价格(市价单可能为空)
"amount": 1.0, # 委托数量(基础货币)
"quote_amount": None, # 委托数量(报价货币,可选)
"filled": 0.5, # 已成交数量
"filled_avg_price": 49999.5, # 成交均价
"status": "PartiallyFilled", # 状态
"timestamp": 1640995200000, # 创建时间(毫秒)
"fee_detail": [ # 手续费详情(可选)
{
"fee": 0.015, # 手续费金额
"fee_coin": "USDT", # 手续费币种
"is_maker": True # 是否为maker
}
],
"source": "Order" # 来源:Order/UserTrade
}
订单状态说明
状态说明
Open新订单,等待成交
PartiallyFilled部分成交
Filled完全成交
Canceled已撤销
有效期类型说明
类型说明
GTCGood-Til-Canceled,一直有效直到取消
IOCImmediate-Or-Cancel,立即成交或取消
FOKFill-Or-Kill,全部成交或取消
PostOnly只做maker,否则取消
GTDGood-Til-Date,指定日期前有效
回调处理示例
def on_order(self, exchange, order):
"""订单状态回调"""
order_id = order.get('id')
symbol = order.get('symbol')
side = order.get('side')
status = order.get('status')
filled = order.get('filled', 0)
amount = order.get('amount', 0)

# 状态日志
self.trader.log(
f"📋 订单更新: {symbol} {side} {order_id} "
f"状态:{status} 成交:{filled}/{amount}"
)

# 成交处理
if status == 'Filled':
avg_price = order.get('filled_avg_price')
self.trader.log(
f"✅ 订单完全成交: {symbol} {side} {filled} @ {avg_price}",
color="green"
)
self.on_order_filled(order)

# 部分成交处理
elif status == 'PartiallyFilled':
self.trader.log(
f"⏳ 订单部分成交: {symbol} {side} {filled}/{amount}",
color="blue"
)

# 撤销处理
elif status == 'Canceled':
self.trader.log(
f"❌ 订单已撤销: {symbol} {side} {order_id}",
color="yellow"
)

8.2.10 Position 频道(持仓更新)

获取账户持仓的实时更新。

频道说明
  • 频道名称Position
  • 推送频率:持仓变化时推送,毫秒级延迟
  • 回调函数on_position(exchange, position)
  • 适用场景:持仓管理、风控监控
  • 权限要求:需要账户权限(account_id 必须指定)
订阅示例
def subscribes(self):
return [
{
"account_id": 0, # 必须指定账户ID
"event_handle_mode": "Sync",
"sub": {
"SubscribeWs": [
{
"Position": ["BTC_USDT", "ETH_USDT"]
}
]
}
}
]
推送数据参数
参数名类型描述
idString仓位唯一标识(可选)
symbolString交易对,如 BTC_USDT
sideString仓位方向:Long 多头 / Short 空头
amountFloat持仓数量
entry_priceFloat开仓均价
unrealized_pnlFloat未实现盈亏
leverageInteger杠杆倍数
margin_modeString保证金模式:Cross 全仓 / Isolated 逐仓
marginFloat维持保证金金额(可选)
mmrFloat维持保证金率(可选),如 0.05 表示 5%
mark_priceFloat当前标记价格(可选)
liquidation_priceFloat强制平仓价格(可选)
ctimeInteger仓位创建时间,Unix 时间戳(毫秒,可选)
utimeInteger仓位更新时间,Unix 时间戳(毫秒,可选)
adlObjectADL(自动减仓)风险评分(可选)
> scoreFloat标准化评分,范围 0-1
> rawFloat原始等级值
> sourceString数据来源字段名
推送数据示例
{
"id": "pos_123456", # 仓位ID(可选)
"symbol": "BTC_USDT", # 交易对
"side": "Long", # 方向:Long/Short
"amount": 1.5, # 持仓数量
"entry_price": 49500.0, # 开仓均价
"unrealized_pnl": 900.0, # 未实现盈亏
"leverage": 10, # 杠杆倍数
"margin_mode": "Cross", # 保证金模式:Cross(全仓)/Isolated(逐仓)
"margin": 5000.0, # 维持保证金(可选)
"mmr": 0.05, # 维持保证金率(可选)
"mark_price": 50100.0, # 标记价格(可选)
"liquidation_price": 45000.0, # 强平价格(可选)
"ctime": 1640995200000, # 创建时间(可选)
"utime": 1640995210000, # 更新时间(可选)
"adl": { # ADL风险评分(可选)
"score": 0.8, # 标准化评分(0-1)
"raw": 4, # 原始等级
"source": "adl_quantile" # 来源字段名
}
}
回调处理示例
def on_position(self, exchange, position):
"""持仓更新回调"""
symbol = position.get('symbol')
side = position.get('side')
amount = position.get('amount', 0)
entry_price = position.get('entry_price', 0)
unrealized_pnl = position.get('unrealized_pnl', 0)
liquidation_price = position.get('liquidation_price')

# 持仓日志
if abs(amount) > 0:
pnl_color = "green" if unrealized_pnl >= 0 else "red"
self.trader.log(
f"📊 持仓更新: {symbol} {side} "
f"数量:{amount} 开仓价:{entry_price} "
f"浮盈:{unrealized_pnl:+.2f}",
color=pnl_color
)

# 强平风险预警
mark_price = position.get('mark_price')
if mark_price and liquidation_price:
distance = abs(mark_price - liquidation_price) / mark_price * 100
if distance < 5: # 距离强平价格不足5%
self.trader.log(
f"🚨 {symbol} 强平风险: 当前价 {mark_price} "
f"强平价 {liquidation_price} 距离 {distance:.1f}%",
level="ERROR",
color="red"
)

# ADL风险监控
adl = position.get('adl')
if adl and adl.get('score', 0) > 0.8:
self.trader.log(
f"⚠️ {symbol} ADL风险较高: {adl['score']:.2f}",
level="WARN"
)
else:
self.trader.log(f"📭 {symbol} 持仓已清空")

8.2.11 Balance 频道(余额更新)

获取账户余额的实时更新。

频道说明
  • 频道名称Balance
  • 推送频率:余额变化时推送,秒级延迟
  • 回调函数on_balance(account_id, balances)
  • 适用场景:资金监控、风控检查
  • 权限要求:需要账户权限(account_id 必须指定)
订阅示例
def subscribes(self):
return [
{
"account_id": 0, # 必须指定账户ID
"event_handle_mode": "Sync",
"sub": {
"SubscribeWs": [
"Balance" # 无需指定交易对
]
}
}
]
推送数据参数
参数名类型描述
assetString币种符号(大写),如 USDT, BTC
balanceFloat钱包总余额(含未实现盈亏)
available_balanceFloat可用于交易的余额
unrealized_pnlFloat未实现盈亏(以该币种计价)
推送数据示例
[
{
"asset": "USDT", # 币种
"balance": 10000.0, # 总余额(含浮动盈亏)
"available_balance": 8000.0, # 可用余额
"unrealized_pnl": 500.0 # 未实现盈亏
},
{
"asset": "BTC",
"balance": 1.5,
"available_balance": 1.0,
"unrealized_pnl": 0.0
}
]
回调处理示例
def on_balance(self, account_id, balances):
"""余额更新回调"""
for balance in balances:
asset = balance.get('asset')
total = balance.get('balance', 0)
available = balance.get('available_balance', 0)
unrealized_pnl = balance.get('unrealized_pnl', 0)

if total > 0:
# 计算已用余额(总余额 - 可用余额 - 未实现盈亏)
used = total - available - unrealized_pnl

self.trader.log(
f"💰 {asset} 余额: 总计 {total} 可用 {available} "
f"浮盈 {unrealized_pnl}"
)

# 可用余额不足预警
available_rate = available / total if total > 0 else 0
if available_rate < 0.2: # 可用余额不足20%
self.trader.log(
f"⚠️ {asset} 可用余额不足: {available_rate:.1%}",
level="WARN"
)

# USDT 余额监控
if asset == "USDT" and available < 100:
self.trader.log(
f"🚨 USDT 可用余额不足: {available}",
level="ERROR"
)

8.2.12 FundingFee 频道(资金费结算)

获取资金费用结算通知。

频道说明
  • 频道名称FundingFee
  • 推送频率:每次资金费结算时推送(通常每8小时)
  • 回调函数on_funding_fee(exchange, funding_fee)
  • 适用场景:资金费用统计、成本分析
  • 权限要求:需要账户权限(account_id 必须指定)
订阅示例
def subscribes(self):
return [
{
"account_id": 0,
"event_handle_mode": "Sync",
"sub": {
"SubscribeWs": [
{
"FundingFee": ["BTC_USDT", "ETH_USDT"]
}
]
}
}
]
推送数据参数
参数名类型描述
symbolString交易对,如 BTC_USDT
funding_feeFloat本次结算的资金费金额,正数为收入,负数为支出
timestampInteger结算时间,Unix 时间戳(毫秒)
推送数据示例
{
"symbol": "BTC_USDT", # 交易对
"funding_fee": -15.5, # 资金费用(负数为支出,正数为收入)
"timestamp": 1640998800000 # 结算时间(毫秒)
}
回调处理示例
def on_funding_fee(self, exchange, funding_fee):
"""资金费结算回调"""
symbol = funding_fee.get('symbol')
fee = funding_fee.get('funding_fee', 0)
timestamp = funding_fee.get('timestamp', 0)

fee_type = "收入" if fee > 0 else "支出"
fee_color = "green" if fee > 0 else "red"

self.trader.log(
f"💸 {symbol} 资金费结算: {fee_type} {abs(fee):.2f} USDT",
color=fee_color
)

# 累计统计
self.total_funding_fee += fee

8.2.13 OrderAndFill 频道(订单和成交)

同时获取订单更新和成交明细,提供更完整的交易信息。

频道说明
  • 频道名称OrderAndFill
  • 推送频率:订单或成交变化时推送,毫秒级延迟
  • 回调函数on_order_and_fill(exchange, order_and_fill)
  • 适用场景:需要同时跟踪订单和成交明细的策略
  • 权限要求:需要账户权限(account_id 必须指定)
订阅示例
def subscribes(self):
return [
{
"account_id": 0,
"event_handle_mode": "Sync",
"sub": {
"SubscribeWs": [
{
"OrderAndFill": ["BTC_USDT"]
}
]
}
}
]
推送数据参数
参数名类型描述
orderObject订单信息,结构同 Order 频道
> idString订单 ID
> symbolString交易对
> sideString订单方向:Buy / Sell
> order_typeString订单类型:Limit / Market
> priceFloat委托价格
> amountFloat委托数量
> filledFloat已成交数量
> statusString订单状态
fillsArray成交明细列表
> trade_idString成交记录 ID
> priceFloat成交价格
> amountFloat成交数量
> feeFloat手续费金额
> fee_coinString手续费币种
> is_makerBoolean是否为 maker 成交
> timestampInteger成交时间,Unix 时间戳(毫秒)
推送数据示例
{
"order": {
"order_id": "123456789",
"symbol": "BTC_USDT",
"side": "buy",
"order_type": "limit",
"price": 50000.0,
"qty": 1.0,
"filled_qty": 0.5,
"status": "partially_filled"
},
"fills": [
{
"trade_id": "987654321",
"price": 49999.5,
"qty": 0.3,
"fee": 0.015,
"fee_asset": "USDT",
"is_maker": True,
"timestamp": 1640995200000
},
{
"trade_id": "987654322",
"price": 50000.0,
"qty": 0.2,
"fee": 0.01,
"fee_asset": "USDT",
"is_maker": False,
"timestamp": 1640995210000
}
]
}
回调处理示例
def on_order_and_fill(self, exchange, order_and_fill):
"""订单和成交回调"""
order = order_and_fill.get('order', {})
fills = order_and_fill.get('fills', [])

order_id = order.get('order_id')
symbol = order.get('symbol')
status = order.get('status')

self.trader.log(f"📋 订单 {order_id} 状态: {status}")

# 处理成交明细
for fill in fills:
price = fill.get('price')
qty = fill.get('qty')
fee = fill.get('fee')
is_maker = fill.get('is_maker')

order_type = "Maker" if is_maker else "Taker"
self.trader.log(
f" └─ 成交: {qty} @ {price} ({order_type}) 手续费: {fee}"
)

# 统计maker/taker成交量
if is_maker:
self.total_maker_volume += qty * price
else:
self.total_taker_volume += qty * price

8.3 REST轮询订阅 (SubscribeRest)

REST轮询订阅适用于不需要极低延迟但需要定期更新的数据,如账户余额、持仓状态、合约信息等。系统会按照指定的时间间隔主动调用REST API获取数据。

🔄 REST轮询支持的数据类型

数据类型说明回调函数更新频率建议适用场景
Balance账户余额on_balance1-5秒余额监控、风控
BalanceByCoin指定币种余额on_balance1-5秒特定币种余额监控
Position持仓信息on_position2-10秒持仓管理、盈亏计算
Funding资金费率on_funding30-300秒资金费率监控
Instrument合约信息on_instrument60-300秒合约规则更新

⚙️ REST轮询配置

基础配置格式:

{
"account_id": 0, # 必须指定账户ID
# 注意:event_handle_mode对SubscribeRest不生效,REST轮询始终按顺序处理
"sub": {
"SubscribeRest": {
"update_interval": {
"secs": 5, # 轮询间隔(秒)
"nanos": 0 # 纳秒精度(通常为0)
},
"rest_type": "数据类型" # 要轮询的数据类型
}
}
}

⚠️ 重要说明

  • event_handle_mode参数对SubscribeRest不生效,REST轮询始终按顺序串行处理
  • REST轮询适用于不需要极低延迟的数据,如余额、持仓等
  • 如需高频数据,请使用SubscribeWs配合合适的event_handle_mode

特殊配置格式(BalanceByCoin):

{
"account_id": 0, # 必须指定账户ID
"sub": {
"SubscribeRest": {
"update_interval": {
"secs": 5, # 轮询间隔(秒)
"nanos": 0 # 纳秒精度(通常为0)
},
"rest_type": {
"BalanceByCoin": "币种名称" # 指定要监控的币种,如"USDT"、"BTC"
}
}
}
}

实际示例:

# 轮询账户余额(每5秒一次)
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 5, "nanos": 0},
"rest_type": "Balance"
}
}
}

# 轮询指定币种余额(每2秒一次)
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 2, "nanos": 0},
"rest_type": {
"BalanceByCoin": "USDT"
}
}
}
}

# 轮询持仓信息(每10秒一次)
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 10, "nanos": 0},
"rest_type": "Position"
}
}
}

# 轮询合约信息(每5分钟一次)
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 300, "nanos": 0},
"rest_type": "Instrument"
}
}
}

💡 BalanceByCoin 特殊说明

BalanceByCoinBalance 的优化版本,专门用于监控特定币种的余额变化。相比 Balance 订阅所有币种余额,BalanceByCoin 具有以下优势:

使用场景:

  • 高频监控: 对特定币种(如USDT、BTC)进行高频余额监控
  • 资源优化: 只获取指定币种数据,减少网络传输和数据处理开销
  • 精确风控: 针对特定币种设置精确的风控阈值
  • 性能提升: 减少不必要的数据处理,提高策略执行效率

配置示例:

# 监控USDT余额(每2秒更新)
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 2, "nanos": 0},
"rest_type": {
"BalanceByCoin": "USDT"
}
}
}
}

# 监控BTC余额(每5秒更新)
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 5, "nanos": 0},
"rest_type": {
"BalanceByCoin": "BTC"
}
}
}
}

回调处理:

def on_balance(self, account_id, balances):
"""BalanceByCoin回调处理"""
for balance in balances:
asset = balance.get('asset')
available = balance.get('available_balance', 0)
total = balance.get('balance', 0)

# 由于使用BalanceByCoin,这里只会收到指定币种的数据
self.trader.log(f"{asset}余额: 可用{available} 总计{total}")

# 针对特定币种的风控逻辑
if asset == "USDT" and available < 100:
self.trader.log(f"🚨 USDT余额不足: {available}", level="ERROR")
elif asset == "BTC" and available < 0.001:
self.trader.log(f"⚠️ BTC余额不足: {available}", level="WARN")

8.3.1 Balance 轮询(账户余额)

定时获取账户所有币种的余额信息。

订阅示例
def subscribes(self):
return [
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 5, "nanos": 0},
"rest_type": "Balance"
}
}
}
]
回调处理示例
def on_balance(self, account_id, balances):
"""余额轮询回调"""
for balance in balances:
asset = balance.get('asset')
available = balance.get('available_balance', 0)
total = balance.get('balance', 0)

if total > 0:
self.trader.log(f"余额: {asset} 可用 {available} 总计 {total}")

8.3.2 BalanceByCoin 轮询(指定币种余额)

定时获取指定币种的余额信息,减少数据传输量。

订阅示例
def subscribes(self):
return [
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 2, "nanos": 0},
"rest_type": {
"BalanceByCoin": "USDT"
}
}
}
}
]
回调处理示例
def on_balance(self, account_id, balances):
"""指定币种余额回调"""
for balance in balances:
asset = balance.get('asset')
available = balance.get('available_balance', 0)

# 只会收到指定币种的数据
if asset == "USDT" and available < 100:
self.trader.log(f"🚨 USDT余额不足: {available}", level="ERROR")

8.3.3 Position 轮询(持仓信息)

定时获取账户持仓信息。

订阅示例
def subscribes(self):
return [
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 10, "nanos": 0},
"rest_type": "Position"
}
}
}
]
回调处理示例
def on_position(self, account_id, positions):
"""持仓轮询回调"""
for position in positions:
symbol = position.get('symbol')
amount = position.get('amount', 0)
entry_price = position.get('entry_price', 0)
unrealized_pnl = position.get('unrealized_pnl', 0)

if abs(amount) > 0:
self.trader.log(
f"持仓: {symbol} 数量 {amount} "
f"开仓价 {entry_price} 浮盈 {unrealized_pnl}"
)

8.3.4 Funding 轮询(资金费率)

定时获取资金费率信息。

订阅示例
def subscribes(self):
return [
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 60, "nanos": 0},
"rest_type": "Funding"
}
}
}
]
回调处理示例
def on_funding(self, account_id, funding_rates):
"""资金费率轮询回调"""
for funding in funding_rates:
symbol = funding.get('symbol')
rate = funding.get('funding_rate', 0)

if abs(rate) > 0.001:
self.trader.log(f"高资金费率: {symbol} {rate * 100:.3f}%")

8.3.5 Instrument 轮询(合约信息)

定时获取合约规则信息。

订阅示例
def subscribes(self):
return [
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 300, "nanos": 0},
"rest_type": "Instrument"
}
}
}
]
回调处理示例
def on_instrument(self, account_id, instruments):
"""合约信息轮询回调"""
for instrument in instruments:
symbol = instrument.get('symbol')
status = instrument.get('status')

if status != 'trading':
self.trader.log(f"⚠️ {symbol} 状态异常: {status}", level="WARN")

8.4 定时器订阅 (SubscribeTimer)

定时器订阅用于执行策略相关的定时任务,如定期检查订单状态、执行风控逻辑、统计数据更新等。不依赖外部数据源,完全由系统定时器驱动。

⏰ 定时器特点

  • 精确定时: 支持秒级和纳秒级精度
  • 多定时器: 可以设置多个不同名称的定时器
  • 独立执行: 不依赖交易所连接状态
  • 灵活配置: 可以动态调整执行间隔
  • 错开启动: 支持初始延迟,避免多个定时器同时启动造成资源竞争

🎯 定时器配置

基础配置格式:

{
# 注意:event_handle_mode对SubscribeTimer不生效,定时器始终按顺序执行
"sub": {
"SubscribeTimer": {
"update_interval": {
"secs": 间隔秒数, # 定时器间隔(秒)
"nanos": 纳秒精度 # 纳秒精度(通常为0)
},
"name": "定时器名称", # 定时器标识名称
"initial_delay": { # 可选:初始延迟时间
"secs": 延迟秒数, # 启动前延迟(秒)
"nanos": 纳秒精度 # 纳秒精度(通常为0)
}
}
}
}

⚠️ 重要说明

  • event_handle_mode参数对SubscribeTimer不生效,定时器始终按顺序串行执行
  • 定时器适用于策略逻辑、风控检查、统计更新等定时任务
  • 如需高频数据处理,请使用SubscribeWs配合合适的event_handle_mode

字段说明:

  • update_interval: 定时器执行间隔,必须字段
  • name: 定时器名称,用于标识和日志记录,必须字段
  • initial_delay: 初始延迟时间,可选字段,用于错开多个定时器的启动时间,避免资源竞争

initial_delay使用场景:

场景类型建议延迟时间原因说明示例
风控检查30-60秒等待数据稳定,避免启动时误报余额检查、持仓风控
统计计算错开15-60秒避免多个计算任务同时执行收益统计、交易分析
网络请求错开10-30秒分散API调用,避免频率限制外部数据获取
文件操作错开30-120秒避免同时写文件造成冲突日志轮转、数据备份
连接监控0秒(立即)需要尽快检测连接状态WebSocket状态检查

实际应用示例:

# 每30秒检查一次价格预警(立即启动)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 30, "nanos": 0},
"name": "price_alert_check"
}
}
}

# 每分钟更新一次策略统计(延迟10秒启动)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 60, "nanos": 0},
"name": "stats_update",
"initial_delay": {"secs": 10, "nanos": 0}
}
}
}

# 每10秒检查一次订单状态(延迟5秒启动)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 10, "nanos": 0},
"name": "order_status_check",
"initial_delay": {"secs": 5, "nanos": 0}
}
}
}

# 风控检查(延迟30秒启动,避免启动时误报)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 300, "nanos": 0},
"name": "risk_check",
"initial_delay": {"secs": 30, "nanos": 0}
}
}
}

# 对应的回调函数处理
def on_timer_subscribe(self, timer_name):
"""定时器回调处理"""
if timer_name == "price_alert_check":
self.check_price_alerts()
elif timer_name == "stats_update":
self.update_strategy_stats()
elif timer_name == "order_status_check":
self.check_pending_orders()
else:
self.trader.log(f"未知定时器: {timer_name}", level="WARN")

💡 定时器完整示例

订阅配置:

def subscribes(self):
"""定时器订阅配置示例"""
return [
# 策略监控定时器(每30秒,立即启动)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 30, "nanos": 0},
"name": "strategy_monitor"
}
}
},

# 风控检查定时器(每5分钟,延迟60秒启动避免启动时误报)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 300, "nanos": 0},
"name": "risk_check",
"initial_delay": {"secs": 60, "nanos": 0}
}
}
},

# 统计数据更新(每分钟,延迟15秒启动)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 60, "nanos": 0},
"name": "stats_update",
"initial_delay": {"secs": 15, "nanos": 0}
}
}
},

# 价格预警检查(每10秒,延迟5秒启动)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 10, "nanos": 0},
"name": "price_alert",
"initial_delay": {"secs": 5, "nanos": 0}
}
}
},

# 订单状态检查(每15秒,延迟10秒启动)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 15, "nanos": 0},
"name": "order_check",
"initial_delay": {"secs": 10, "nanos": 0}
}
}
}
]

回调函数处理:

def on_timer_subscribe(self, timer_name):
"""定时器回调统一处理"""
try:
if timer_name == "strategy_monitor":
self.strategy_monitor()
elif timer_name == "risk_check":
self.risk_management_check()
elif timer_name == "stats_update":
self.update_strategy_statistics()
elif timer_name == "price_alert":
self.check_price_alerts()
elif timer_name == "order_check":
self.check_pending_orders()
else:
self.trader.log(f"未知定时器: {timer_name}", level="WARN")
except Exception as e:
self.trader.log(f"定时器{timer_name}执行异常: {e}", level="ERROR")

def strategy_monitor(self):
"""策略监控逻辑"""
# 检查策略运行状态
uptime = time.time() - self.start_time
self.trader.tlog(
"策略监控",
f"策略运行时间: {uptime:.0f}秒, 状态: 正常",
interval=300, # 5分钟记录一次
color="green"
)

# 检查WebSocket连接状态
if hasattr(self, 'last_data_time'):
time_since_data = time.time() - self.last_data_time
if time_since_data > 60: # 超过1分钟没有数据
self.trader.log("⚠️ WebSocket数据中断超过1分钟", level="WARN")

def risk_management_check(self):
"""风控检查"""
# 检查总持仓风险
total_exposure = self.calculate_total_exposure()
max_exposure = 100000 # 最大敞口100k

if total_exposure > max_exposure:
self.trader.log(
f"🚨 总敞口超限: {total_exposure} > {max_exposure}",
level="ERROR",
color="red"
)
# 执行风控措施
self.emergency_close_positions()

# 检查浮动盈亏
total_pnl = self.calculate_total_pnl()
if total_pnl < -5000: # 浮亏超过5k
self.trader.log(
f"🚨 浮亏过大: {total_pnl}",
level="ERROR",
color="red"
)

def update_strategy_statistics(self):
"""更新策略统计"""
# 计算成功率
if self.total_trades > 0:
win_rate = self.successful_trades / self.total_trades * 100
self.trader.tlog(
"策略统计",
f"交易次数: {self.total_trades}, 成功率: {win_rate:.1f}%",
interval=300,
color="blue"
)


def check_price_alerts(self):
"""价格预警检查"""
for symbol in self.monitored_symbols:
current_price = self.get_current_price(symbol)
if current_price:
# 检查价格突破
if symbol in self.price_alerts:
alert = self.price_alerts[symbol]
if alert['type'] == 'above' and current_price > alert['price']:
self.trader.log(f"💡 {symbol} 突破预警价格 {alert['price']}", color="yellow")
elif alert['type'] == 'below' and current_price < alert['price']:
self.trader.log(f"💡 {symbol} 跌破预警价格 {alert['price']}", color="yellow")

def check_pending_orders(self):
"""检查挂单状态"""
# 检查长时间未成交的订单
for order_id, order_info in self.pending_orders.items():
order_age = time.time() - order_info['created_time']

if order_age > 600: # 超过10分钟未成交
self.trader.log(
f"⏰ 订单{order_id}长时间未成交: {order_age:.0f}秒",
level="WARN"
)

# 可以选择撤销长时间未成交的订单
if order_age > 1800: # 超过30分钟强制撤销
self.cancel_order(order_id)
self.trader.log(f"🗑️ 强制撤销超时订单: {order_id}")

8.5 综合订阅配置示例

以下是一个完整的策略订阅配置示例,展示如何同时使用三种数据源:

def subscribes(self):
"""综合订阅配置示例"""
return [
# 1. 高频市场数据订阅
{
"account_id": 0,
"event_handle_mode": "Latest", # 高频市场数据使用Latest模式
"sub": {
"SubscribeWs": [
# 高频市场行情数据
{
"Kline": {
"symbols": ["BTC_USDT", "ETH_USDT"],
"interval": "1m"
}
},
{
"Bbo": ["BTC_USDT", "ETH_USDT"]
}
]
}
},
# 2. 重要数据订阅
{
"account_id": 0,
"event_handle_mode": "Sync", # 重要数据使用Sync模式
"sub": {
"SubscribeWs": [
{
"Depth": {
"symbols": ["BTC_USDT"],
"levels": 5
}
},
# 账户数据
{
"Order": ["BTC_USDT", "ETH_USDT"]
},
{
"Position": ["BTC_USDT", "ETH_USDT"]
}
]
}
},

# 2. REST轮询数据订阅
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 5, "nanos": 0},
"rest_type": "Balance" # 每5秒更新余额
}
}
},

# 3. 定时器任务订阅
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 30, "nanos": 0},
"name": "strategy_monitor" # 每30秒执行策略监控
}
}
},
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 300, "nanos": 0},
"name": "risk_check" # 每5分钟执行风控检查
}
}
}
]

8.6 数据源选择与最佳实践

🎯 数据源选择指南

根据不同的应用场景,选择合适的数据源类型:

应用场景推荐数据源订阅频道更新频率理由
高频交易WebSocketBbo, Depth, Trade毫秒级需要极低延迟的实时数据
技术分析WebSocketKline, MarkPrice秒级K线完结状态重要,实时性要求高
套利策略WebSocketBbo, Funding秒-分钟级价差和资金费率变化敏感
风控监控REST轮询Balance, Position1-10秒定期检查即可,不需要实时
策略逻辑定时器Timer10秒-5分钟独立于市场数据的逻辑执行
账户管理WebSocket + RESTOrder + Balance混合实时订单状态 + 定期余额检查

✅ 最佳实践建议

1. 分层订阅策略

def subscribes(self):
"""分层订阅策略示例"""
return [
# 核心层:高频实时数据
{
"account_id": 0,
"event_handle_mode": "Latest", # 高频数据使用Latest模式
"sub": {"SubscribeWs": [
{"Bbo": self.trading_symbols}, # 交易信号生成
]}
},
# 核心层:重要数据
{
"account_id": 0,
"event_handle_mode": "Sync", # 重要数据使用Sync模式
"sub": {"SubscribeWs": [
{"Order": self.trading_symbols} # 订单状态监控
]}
},

# 支撑层:中频数据
{
"account_id": 0,
"sub": {"SubscribeRest": {
"update_interval": {"secs": 10, "nanos": 0},
"rest_type": "Balance" # 余额监控
}}
},

# 管理层:低频定时任务
{
"sub": {"SubscribeTimer": {
"update_interval": {"secs": 60, "nanos": 0},
"name": "risk_management", # 风控检查
"initial_delay": {"secs": 30, "nanos": 0} # 延迟启动避免误报
}}
}
]

2. 数据处理优化

def on_bbo(self, exchange, bbo_list):
"""优化的BBO处理"""
try:
for bbo in bbo_list:
symbol = bbo.get('symbol')
# 快速数据验证
if not self.validate_bbo_data(bbo):
continue

# 核心交易逻辑(最小化处理时间)
signal = self.generate_trading_signal(symbol, bbo)
if signal:
self.execute_trade(signal)

except Exception as e:
# 错误处理不影响主流程
self.trader.log(f"BBO处理异常: {e}", level="ERROR")

def on_kline(self, exchange, kline):
"""K线数据处理最佳实践"""
for candle in kline['candles']:
# 关键:只处理已完结的K线
if candle['confirm']:
self.analyze_completed_candle(candle)
else:
# 未完结K线可用于实时监控,但不做技术分析
self.monitor_current_candle(candle)

3. 资源管理与性能优化

class PerformanceOptimizedStrategy:
def __init__(self):
# 数据缓存,避免重复计算
self.price_cache = {}
self.last_update_time = {}

def subscribes(self):
"""性能优化的订阅配置"""
# 只订阅必要的交易对
core_symbols = ["BTC_USDT", "ETH_USDT"] # 核心交易对
monitor_symbols = ["SOL_USDT", "BNB_USDT"] # 监控交易对

return [
# 核心交易对 - 高频订阅
{
"account_id": 0,
"event_handle_mode": "Latest", # 核心交易对使用Latest模式
"sub": {"SubscribeWs": [
{"Bbo": core_symbols},
{"Kline": {"symbols": core_symbols, "interval": "1m"}}
]}
},

# 监控交易对 - 低频订阅
{
"account_id": 0,
"event_handle_mode": "Sync", # 监控数据使用Sync模式
"sub": {"SubscribeWs": [
{"Bbo": monitor_symbols}
]}
}
]

def on_bbo(self, exchange, bbo_list):
"""带缓存的BBO处理"""
current_time = time.time()
for bbo in bbo_list:
symbol = bbo.get('symbol')
last_time = self.last_update_time.get(symbol, 0)

# 限制处理频率,避免过度计算
if current_time - last_time < 0.1: # 100ms内不重复处理
continue

self.last_update_time[symbol] = current_time
self.process_bbo_data(symbol, bbo)

✅ 推荐配置模式

高频交易模式:

# 追求极低延迟,最小化订阅
["Bbo", "Order"] # 仅订阅必要数据

技术分析模式:

# 需要完整市场数据
["Kline", "Bbo", "Depth", "MarkPrice"]

稳健套利模式:

# 平衡实时性和稳定性
WebSocket: ["Bbo", "Funding", "Order", "Position"]
REST: ["Balance"] (5)
Timer: ["risk_check"] (每分钟,延迟30秒启动)

❌ 常见误区避免

1. 过度订阅

# ❌ 错误:订阅过多不必要的数据
["Bbo", "Depth", "Trade", "Kline", "MarkPrice", "Funding"] # 太多了

# ✅ 正确:只订阅策略需要的数据
["Bbo", "Order"] # 简单策略只需要这些

2. 忽略数据状态

# ❌ 错误:不检查K线完结状态
def on_kline(self, exchange, kline):
candle = kline['candles'][-1]
self.analyze_signal(candle) # 可能分析未完结K线

# ✅ 正确:检查K线状态
def on_kline(self, exchange, kline):
candle = kline['candles'][-1]
if candle['confirm']: # 只分析已完结的K线
self.analyze_signal(candle)

3. 阻塞回调函数

# ❌ 错误:在回调中执行耗时操作
def on_bbo(self, exchange, bbo_list):
for bbo in bbo_list:
time.sleep(0.1) # 阻塞其他数据处理
self.complex_calculation() # 复杂计算

# ✅ 正确:异步处理耗时操作
def on_bbo(self, exchange, bbo_list):
# 快速缓存数据
for bbo in bbo_list:
symbol = bbo.get('symbol')
self.bbo_cache[symbol] = bbo
# 复杂处理放在定时器中进行

def on_timer_subscribe(self, timer_name):
if timer_name == "complex_analysis":
self.perform_complex_analysis()

4. 忽略初始延迟配置

# ❌ 错误:多个定时器同时启动,造成资源竞争
[
{"SubscribeTimer": {"update_interval": {"secs": 300, "nanos": 0}, "name": "stats"}},
{"SubscribeTimer": {"update_interval": {"secs": 300, "nanos": 0}, "name": "report"}},
{"SubscribeTimer": {"update_interval": {"secs": 300, "nanos": 0}, "name": "backup"}}
]

# ✅ 正确:使用初始延迟错开启动时间
[
{"SubscribeTimer": {"update_interval": {"secs": 300, "nanos": 0}, "name": "stats"}},
{"SubscribeTimer": {"update_interval": {"secs": 300, "nanos": 0}, "name": "report",
"initial_delay": {"secs": 60, "nanos": 0}}},
{"SubscribeTimer": {"update_interval": {"secs": 300, "nanos": 0}, "name": "backup",
"initial_delay": {"secs": 120, "nanos": 0}}}
]

5. 初始延迟最佳实践

def subscribes(self):
"""使用initial_delay的最佳实践"""
return [
# 立即启动的监控类定时器
{
"sub": {"SubscribeTimer": {
"update_interval": {"secs": 30, "nanos": 0},
"name": "connection_monitor" # 无延迟,立即监控连接状态
}}
},

# 延迟启动的风控类定时器
{
"sub": {"SubscribeTimer": {
"update_interval": {"secs": 60, "nanos": 0},
"name": "risk_check",
"initial_delay": {"secs": 60, "nanos": 0} # 延迟1分钟,等数据稳定
}}
},

# 错开启动的统计类定时器(避免同时执行)
{
"sub": {"SubscribeTimer": {
"update_interval": {"secs": 300, "nanos": 0},
"name": "stats_calc",
"initial_delay": {"secs": 30, "nanos": 0} # 错开30秒
}}
},
{
"sub": {"SubscribeTimer": {
"update_interval": {"secs": 300, "nanos": 0},
"name": "report_gen",
"initial_delay": {"secs": 90, "nanos": 0} # 错开90秒
}}
}
]

🔧 调试与监控

数据流监控:

def on_bbo(self, exchange, bbo_list):
# 统计数据接收频率
for bbo in bbo_list:
symbol = bbo.get('symbol')
self.data_stats[symbol] = self.data_stats.get(symbol, 0) + 1

def on_timer_subscribe(self, timer_name):
if timer_name == "data_monitor":
for symbol, count in self.data_stats.items():
self.trader.tlog(
"数据统计",
f"{symbol}: {count}次/分钟",
interval=60
)
self.data_stats.clear()

通过合理的数据源选择和配置,可以构建高效、稳定的量化交易策略。