跳到主要内容

九、数据源订阅

九、数据源订阅

9.1 订阅机制概述

Trader API v2提供了完整的数据源订阅机制,通过策略的subscribes()方法定义需要订阅的数据类型。系统支持三种不同的数据源订阅方式,每种方式适用于不同的数据获取场景。

📊 三种数据源类型

数据源类型数据来源更新方式适用场景延迟特性
SubscribeWs交易所WebSocket实时推送高频交易、实时行情极低延迟
SubscribeRest交易所REST API定时轮询账户状态、合约信息、指定币种余额秒级延迟
SubscribeTimer系统定时器定时触发策略逻辑、状态检查可配置间隔

🔄 数据流向图

┌─────────────────┐    ┌──────────────┐    ┌─────────────────┐
│ 交易所API │ │ 系统定时器 │ │ 策略回调函数 │
│ │ │ │ │ │
│ WebSocket ────┐ │ │ Timer ───┐ │ │ ┌─ on_bbo() │
│ REST API ──┐ │ │ │ │ │ │ ├─ on_kline() │
│ │ │ │ │ │ │ │ ├─ on_order() │
└───────────┼──┼─┘ └────────────┼─┘ │ ├─ on_timer() │
│ │ │ │ └─ on_balance()│
│ └─────────────────┐ │ │ │
└──────────────────┐ │ │ └─────────────────┘
│ │ │ ▲
▼ ▼ ▼ │
┌─────────────────┐ │
│ Trader Engine │──────┘
│ 数据分发中心 │
└─────────────────┘

9.2 WebSocket实时订阅 (SubscribeWs)

WebSocket订阅是最重要的数据源,提供交易所的实时推送数据,具有极低延迟的特性,是高频交易和实时行情分析的核心。

📡 WebSocket支持的频道类型

频道分类频道类型说明回调函数延迟级别
市场行情Bbo最佳买卖价on_bbo毫秒级
Depth市场深度on_depth毫秒级
Trade成交记录on_trade毫秒级
AggTrade聚合成交on_agg_trade毫秒级
KlineK线数据on_kline秒级
MarkPrice标记价格on_mark_price秒级
Funding资金费率on_funding分钟级
账户数据Order订单更新on_order毫秒级
Position持仓更新on_position毫秒级
Balance余额更新on_balance秒级
FundingFee资金费结算on_funding_fee小时级
OrderAndFill订单和成交on_order_and_fill毫秒级

🚀 WebSocket订阅配置

基础配置格式:

{
"account_id": 0, # 账户ID(市场数据可选,账户数据必须)
"event_handle_mode": "Sync", # 事件处理模式:Latest/Sync/Async(可选,默认Sync)
"sub": {
"SubscribeWs": [
# 市场数据订阅(无需账户ID)
{
"频道类型": 配置参数
},
# 账户数据订阅(需要账户ID)
{
"频道类型": 配置参数
}
]
}
}

⚠️ 重要说明

  • event_handle_mode参数仅对SubscribeWs生效,SubscribeRestSubscribeTimer会忽略此参数
  • 不指定event_handle_mode时默认使用"Sync"模式
  • 不同数据类型推荐使用不同模式:
    • 高频数据(Bbo、Trade)→ "Latest"模式
    • 重要数据(Depth、Order、Position)→ "Sync"模式
    • 高频重要数据 → "Async"模式

📈 Kline数据订阅详解

📈 Kline订阅配置

基础格式:

{
"Kline": {
"symbols": ["BTC_USDT", "ETH_USDT"], # 订阅的交易对列表
"interval": "1m" # K线时间间隔
}
}

支持的时间间隔:

  • "1m", "3m", "5m", "15m", "30m": 分钟级别
  • "1h", "2h", "4h", "6h", "8h", "12h": 小时级别
  • "1d", "3d": 天级别
  • "1w": 周级别
  • "1M": 月级别

🔄 Kline回调数据结构

当订阅Kline数据后,每当有新的K线数据时,会触发on_kline(exchange, kline)回调:

回调参数:

  • exchange: 字符串,交易所名称
  • kline: K线数据对象,包含以下字段:
    • symbol: 交易对符号
    • interval: K线时间粒度
    • candles: 蜡烛图数据列表(通常只包含最新的一根K线)

Candle数据结构:

{
"timestamp": 1640995200000, # 开盘时间,Unix时间戳(毫秒)
"open": 50000.0, # 开盘价
"high": 50500.0, # 最高价
"low": 49800.0, # 最低价
"close": 50200.0, # 收盘价
"volume": 123.45, # 成交量(基础货币)
"quote_volume": 6180000.0, # 成交额(报价货币)
"trades": 1250, # 成交笔数(可选)
"taker_buy_volume": 65.2, # 主动买入成交量(可选)
"taker_buy_quote_volume": 3270000.0, # 主动买入成交额(可选)
"confirm": False # K线状态:false=未完结,true=已完结
}

📊 Kline订阅示例

完整订阅示例:

def subscribes(self):
"""返回订阅配置"""
return [
{
"account_id": 0,
"event_handle_mode": "Latest", # K线和BBO数据使用Latest模式,追求最低延迟
"sub": {
"SubscribeWs": [
# 订阅多个交易对的1分钟K线
{
"Kline": {
"symbols": ["BTC_USDT", "ETH_USDT", "SOL_USDT"],
"interval": "1m"
}
},
# 同时订阅BBO数据
{
"Bbo": ["BTC_USDT", "ETH_USDT"]
}
]
}
},
# 可以订阅不同时间周期的K线
{
"account_id": 0,
"event_handle_mode": "Sync", # 重要数据使用Sync模式
"sub": {
"SubscribeWs": [
{
"Kline": {
"symbols": ["BTC_USDT"],
"interval": "5m"
}
}
]
}
}
]

def on_kline(self, exchange, kline):
"""K线数据回调处理"""
symbol = kline['symbol']
interval = kline['interval']
candles = kline['candles']

for candle in candles:
timestamp = candle['timestamp']
open_price = candle['open']
high_price = candle['high']
low_price = candle['low']
close_price = candle['close']
volume = candle['volume']
is_confirmed = candle['confirm']

# 计算涨跌幅
price_change_pct = (close_price - open_price) / open_price * 100

status = "已完结" if is_confirmed else "未完结"

self.trader.log(
f"{exchange} {symbol} {interval} K线更新: "
f"价格 {close_price} (涨跌 {price_change_pct:+.2f}%), "
f"成交量 {volume:.2f}, 状态: {status}",
level="DEBUG"
)

# 只对已完结的K线进行技术分析
if is_confirmed:
self.analyze_kline_signal(symbol, candle)

def analyze_kline_signal(self, symbol, candle):
"""分析K线信号"""
# 检测大阳线/大阴线
body_ratio = abs(candle['close'] - candle['open']) / (candle['high'] - candle['low'])

if body_ratio > 0.7: # 实体超过70%
if candle['close'] > candle['open']:
signal_type = "大阳线"
color = "green"
else:
signal_type = "大阴线"
color = "red"

self.trader.tlog(
f"K线信号_{symbol}",
f"{symbol}: 检测到{signal_type},实体比例 {body_ratio:.1%}",
color=color,
interval=60
)

🔥 其他WebSocket订阅示例

深度数据订阅:

{
"Depth": {
"symbols": ["BTC_USDT", "ETH_USDT"],
"levels": 5 # 订阅5档深度
}
}

账户数据订阅:

{
"account_id": 0,
"event_handle_mode": "Sync", # 账户数据使用Sync模式,保证数据完整性
"sub": {
"SubscribeWs": [
{
"Order": ["BTC_USDT", "ETH_USDT"] # 订单更新
},
{
"Position": ["BTC_USDT", "ETH_USDT"] # 持仓更新
},
{
"OrderAndFill": ["BTC_USDT"] # 订单和成交数据
}
]
}
}

💡 WebSocket完整示例

订阅配置:

def subscribes(self):
"""WebSocket订阅配置示例"""
return [
# 高频市场数据使用Latest模式
{
"account_id": 0,
"event_handle_mode": "Latest", # 高频数据使用Latest模式
"sub": {
"SubscribeWs": [
# 高频市场行情数据
{
"Bbo": ["BTC_USDT", "ETH_USDT"] # 最佳买卖价
},
{
"Kline": {
"symbols": ["BTC_USDT", "ETH_USDT"],
"interval": "1m" # 1分钟K线
}
}
]
}
},
# 重要数据使用Sync模式
{
"account_id": 0,
"event_handle_mode": "Sync", # 重要数据使用Sync模式
"sub": {
"SubscribeWs": [
{
"Depth": {
"symbols": ["BTC_USDT"],
"levels": 10 # 10档深度
}
},
{
"Funding": ["BTC_USDT", "ETH_USDT"] # 资金费率
},
# 账户数据
{
"Order": ["BTC_USDT", "ETH_USDT"] # 订单状态
},
{
"Position": ["BTC_USDT", "ETH_USDT"] # 持仓变化
},
"Balance" # 账户余额
]
}
}
]

回调函数处理:

def on_bbo(self, exchange, symbol, bbo):
"""最佳买卖价回调"""
bid_price = bbo.get('bid_price')
ask_price = bbo.get('ask_price')
spread = ask_price - bid_price

self.trader.log(f"{symbol} BBO: 买{bid_price}{ask_price} 价差{spread}")

def on_kline(self, exchange, kline):
"""K线数据回调"""
symbol = kline['symbol']
interval = kline['interval']

for candle in kline['candles']:
if candle['confirm']: # 只处理已完结的K线
close_price = candle['close']
volume = candle['volume']
self.trader.log(f"{symbol} {interval} K线完结: 价格{close_price} 成交量{volume}")

def on_depth(self, exchange, symbol, depth):
"""深度数据回调"""
bids = depth.get('bids', [])
asks = depth.get('asks', [])

if bids and asks:
best_bid = bids[0][0] if bids[0] else 0
best_ask = asks[0][0] if asks[0] else 0
self.trader.log(f"{symbol} 深度: 最优买{best_bid} 最优卖{best_ask}")

def on_order(self, exchange, order):
"""订单状态回调"""
symbol = order.get('symbol')
status = order.get('status')
side = order.get('side')

self.trader.log(f"订单更新: {symbol} {side} {status}")

def on_position(self, exchange, position):
"""持仓变化回调

参数:
exchange: 交易所名称
position: 持仓数据,数据结构参见[7.2 Position(持仓)数据结构](#72-position持仓数据结构)
"""
symbol = position.get('symbol')
amount = position.get('amount', 0)
side = position.get('side')
unrealized_pnl = position.get('unrealized_pnl', 0)

self.trader.log(f"持仓更新: {symbol} {side} 数量{amount} 未实现盈亏{unrealized_pnl}")

9.3 REST轮询订阅 (SubscribeRest)

REST轮询订阅适用于不需要极低延迟但需要定期更新的数据,如账户余额、持仓状态、合约信息等。系统会按照指定的时间间隔主动调用REST API获取数据。

🔄 REST轮询支持的数据类型

数据类型说明回调函数更新频率建议适用场景
Balance账户余额on_balance1-5秒余额监控、风控
BalanceByCoin指定币种余额on_balance1-5秒特定币种余额监控
Position持仓信息on_position2-10秒持仓管理、盈亏计算
Funding资金费率on_funding30-300秒资金费率监控
Instrument合约信息on_instrument60-300秒合约规则更新

⚙️ REST轮询配置

基础配置格式:

{
"account_id": 0, # 必须指定账户ID
# 注意:event_handle_mode对SubscribeRest不生效,REST轮询始终按顺序处理
"sub": {
"SubscribeRest": {
"update_interval": {
"secs": 5, # 轮询间隔(秒)
"nanos": 0 # 纳秒精度(通常为0)
},
"rest_type": "数据类型" # 要轮询的数据类型
}
}
}

⚠️ 重要说明

  • event_handle_mode参数对SubscribeRest不生效,REST轮询始终按顺序串行处理
  • REST轮询适用于不需要极低延迟的数据,如余额、持仓等
  • 如需高频数据,请使用SubscribeWs配合合适的event_handle_mode

特殊配置格式(BalanceByCoin):

{
"account_id": 0, # 必须指定账户ID
"sub": {
"SubscribeRest": {
"update_interval": {
"secs": 5, # 轮询间隔(秒)
"nanos": 0 # 纳秒精度(通常为0)
},
"rest_type": {
"BalanceByCoin": "币种名称" # 指定要监控的币种,如"USDT"、"BTC"
}
}
}
}

实际示例:

# 轮询账户余额(每5秒一次)
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 5, "nanos": 0},
"rest_type": "Balance"
}
}
}

# 轮询指定币种余额(每2秒一次)
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 2, "nanos": 0},
"rest_type": {
"BalanceByCoin": "USDT"
}
}
}
}

# 轮询持仓信息(每10秒一次)
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 10, "nanos": 0},
"rest_type": "Position"
}
}
}

# 轮询合约信息(每5分钟一次)
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 300, "nanos": 0},
"rest_type": "Instrument"
}
}
}

💡 BalanceByCoin 特殊说明

BalanceByCoinBalance 的优化版本,专门用于监控特定币种的余额变化。相比 Balance 订阅所有币种余额,BalanceByCoin 具有以下优势:

使用场景:

  • 高频监控: 对特定币种(如USDT、BTC)进行高频余额监控
  • 资源优化: 只获取指定币种数据,减少网络传输和数据处理开销
  • 精确风控: 针对特定币种设置精确的风控阈值
  • 性能提升: 减少不必要的数据处理,提高策略执行效率

配置示例:

# 监控USDT余额(每2秒更新)
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 2, "nanos": 0},
"rest_type": {
"BalanceByCoin": "USDT"
}
}
}
}

# 监控BTC余额(每5秒更新)
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 5, "nanos": 0},
"rest_type": {
"BalanceByCoin": "BTC"
}
}
}
}

回调处理:

def on_balance(self, account_id, balances):
"""BalanceByCoin回调处理"""
for balance in balances:
asset = balance.get('asset')
available = balance.get('available_balance', 0)
total = balance.get('balance', 0)

# 由于使用BalanceByCoin,这里只会收到指定币种的数据
self.trader.log(f"{asset}余额: 可用{available} 总计{total}")

# 针对特定币种的风控逻辑
if asset == "USDT" and available < 100:
self.trader.log(f"🚨 USDT余额不足: {available}", level="ERROR")
elif asset == "BTC" and available < 0.001:
self.trader.log(f"⚠️ BTC余额不足: {available}", level="WARN")

💡 REST轮询完整示例

订阅配置:

def subscribes(self):
"""REST轮询订阅配置示例"""
return [
# 轮询账户余额
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 5, "nanos": 0},
"rest_type": "Balance"
}
}
},

# 轮询指定币种余额(USDT)
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 2, "nanos": 0},
"rest_type": {
"BalanceByCoin": "USDT"
}
}
}
},

# 轮询持仓信息
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 10, "nanos": 0},
"rest_type": "Position"
}
}
},

# 轮询资金费率(备用方案,当WebSocket不稳定时)
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 60, "nanos": 0},
"rest_type": "Funding"
}
}
},

# 轮询合约信息更新
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 300, "nanos": 0},
"rest_type": "Instrument"
}
}
}
]

回调函数处理:

def on_balance(self, account_id, balances):
"""余额轮询回调

参数:
account_id: 账户ID
balances: 余额数据列表,每个元素包含币种余额信息
"""
for balance in balances:
asset = balance.get('asset')
available = balance.get('available_balance', 0)
total = balance.get('balance', 0)

if total > 0: # 只关注有余额的币种
self.trader.log(f"余额更新: {asset} 可用{available} 总计{total}")

# 风控检查
if available < total * 0.1: # 可用余额不足10%
self.trader.log(f"⚠️ {asset} 可用余额不足: {available}", level="WARN")

# 特定币种监控(当使用BalanceByCoin订阅时)
if asset == "USDT":
if available < 100: # USDT余额不足100
self.trader.log(f"🚨 USDT余额不足: {available}", level="ERROR")

def on_position(self, account_id, positions):
"""持仓轮询回调

参数:
account_id: 账户ID
positions: 持仓数据列表,每个元素的数据结构参见[7.2 Position(持仓)数据结构](#72-position持仓数据结构)
"""
for position in positions:
symbol = position.get('symbol')
amount = position.get('amount', 0)
side = position.get('side')
unrealized_pnl = position.get('unrealized_pnl', 0)

if abs(amount) > 0: # 只关注有持仓的交易对
self.trader.log(f"持仓检查: {symbol} {side} 数量{amount} 浮盈{unrealized_pnl}")

# 持仓风控
if unrealized_pnl < -1000: # 浮亏超过1000
self.trader.log(f"🚨 {symbol} 浮亏过大: {unrealized_pnl}", level="ERROR")

def on_funding(self, account_id, funding_rates):
"""资金费率轮询回调

参数:
account_id: 账户ID
funding_rates: 资金费率数据列表
"""
for funding in funding_rates:
symbol = funding.get('symbol')
rate = funding.get('funding_rate', 0)
next_time = funding.get('next_funding_at', 0)

# 资金费率监控
if abs(rate) > 0.001: # 费率超过0.1%
rate_pct = rate * 100
self.trader.log(f"高资金费率: {symbol} {rate_pct:.3f}% 下次结算{next_time}")

def on_instrument(self, account_id, instruments):
"""合约信息轮询回调

参数:
account_id: 账户ID
instruments: 合约信息数据列表
"""
for instrument in instruments:
symbol = instrument.get('symbol')
status = instrument.get('status')

# 监控交易对状态变化
if status != 'trading':
self.trader.log(f"⚠️ 交易对状态异常: {symbol} 状态为 {status}", level="WARN")

9.4 定时器订阅 (SubscribeTimer)

定时器订阅用于执行策略相关的定时任务,如定期检查订单状态、执行风控逻辑、统计数据更新等。不依赖外部数据源,完全由系统定时器驱动。

⏰ 定时器特点

  • 精确定时: 支持秒级和纳秒级精度
  • 多定时器: 可以设置多个不同名称的定时器
  • 独立执行: 不依赖交易所连接状态
  • 灵活配置: 可以动态调整执行间隔
  • 错开启动: 支持初始延迟,避免多个定时器同时启动造成资源竞争

🎯 定时器配置

基础配置格式:

{
# 注意:event_handle_mode对SubscribeTimer不生效,定时器始终按顺序执行
"sub": {
"SubscribeTimer": {
"update_interval": {
"secs": 间隔秒数, # 定时器间隔(秒)
"nanos": 纳秒精度 # 纳秒精度(通常为0)
},
"name": "定时器名称", # 定时器标识名称
"initial_delay": { # 可选:初始延迟时间
"secs": 延迟秒数, # 启动前延迟(秒)
"nanos": 纳秒精度 # 纳秒精度(通常为0)
}
}
}
}

⚠️ 重要说明

  • event_handle_mode参数对SubscribeTimer不生效,定时器始终按顺序串行执行
  • 定时器适用于策略逻辑、风控检查、统计更新等定时任务
  • 如需高频数据处理,请使用SubscribeWs配合合适的event_handle_mode

字段说明:

  • update_interval: 定时器执行间隔,必须字段
  • name: 定时器名称,用于标识和日志记录,必须字段
  • initial_delay: 初始延迟时间,可选字段,用于错开多个定时器的启动时间,避免资源竞争

initial_delay使用场景:

场景类型建议延迟时间原因说明示例
风控检查30-60秒等待数据稳定,避免启动时误报余额检查、持仓风控
统计计算错开15-60秒避免多个计算任务同时执行收益统计、交易分析
网络请求错开10-30秒分散API调用,避免频率限制外部数据获取
文件操作错开30-120秒避免同时写文件造成冲突日志轮转、数据备份
连接监控0秒(立即)需要尽快检测连接状态WebSocket状态检查

实际应用示例:

# 每30秒检查一次价格预警(立即启动)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 30, "nanos": 0},
"name": "price_alert_check"
}
}
}

# 每分钟更新一次策略统计(延迟10秒启动)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 60, "nanos": 0},
"name": "stats_update",
"initial_delay": {"secs": 10, "nanos": 0}
}
}
}

# 每10秒检查一次订单状态(延迟5秒启动)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 10, "nanos": 0},
"name": "order_status_check",
"initial_delay": {"secs": 5, "nanos": 0}
}
}
}

# 风控检查(延迟30秒启动,避免启动时误报)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 300, "nanos": 0},
"name": "risk_check",
"initial_delay": {"secs": 30, "nanos": 0}
}
}
}

# 对应的回调函数处理
def on_timer_subscribe(self, timer_name):
"""定时器回调处理"""
if timer_name == "price_alert_check":
self.check_price_alerts()
elif timer_name == "stats_update":
self.update_strategy_stats()
elif timer_name == "order_status_check":
self.check_pending_orders()
else:
self.trader.log(f"未知定时器: {timer_name}", level="WARN")

💡 定时器完整示例

订阅配置:

def subscribes(self):
"""定时器订阅配置示例"""
return [
# 策略监控定时器(每30秒,立即启动)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 30, "nanos": 0},
"name": "strategy_monitor"
}
}
},

# 风控检查定时器(每5分钟,延迟60秒启动避免启动时误报)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 300, "nanos": 0},
"name": "risk_check",
"initial_delay": {"secs": 60, "nanos": 0}
}
}
},

# 统计数据更新(每分钟,延迟15秒启动)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 60, "nanos": 0},
"name": "stats_update",
"initial_delay": {"secs": 15, "nanos": 0}
}
}
},

# 价格预警检查(每10秒,延迟5秒启动)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 10, "nanos": 0},
"name": "price_alert",
"initial_delay": {"secs": 5, "nanos": 0}
}
}
},

# 订单状态检查(每15秒,延迟10秒启动)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 15, "nanos": 0},
"name": "order_check",
"initial_delay": {"secs": 10, "nanos": 0}
}
}
}
]

回调函数处理:

def on_timer_subscribe(self, timer_name):
"""定时器回调统一处理"""
try:
if timer_name == "strategy_monitor":
self.strategy_monitor()
elif timer_name == "risk_check":
self.risk_management_check()
elif timer_name == "stats_update":
self.update_strategy_statistics()
elif timer_name == "price_alert":
self.check_price_alerts()
elif timer_name == "order_check":
self.check_pending_orders()
else:
self.trader.log(f"未知定时器: {timer_name}", level="WARN")
except Exception as e:
self.trader.log(f"定时器{timer_name}执行异常: {e}", level="ERROR")

def strategy_monitor(self):
"""策略监控逻辑"""
# 检查策略运行状态
uptime = time.time() - self.start_time
self.trader.tlog(
"策略监控",
f"策略运行时间: {uptime:.0f}秒, 状态: 正常",
interval=300, # 5分钟记录一次
color="green"
)

# 检查WebSocket连接状态
if hasattr(self, 'last_data_time'):
time_since_data = time.time() - self.last_data_time
if time_since_data > 60: # 超过1分钟没有数据
self.trader.log("⚠️ WebSocket数据中断超过1分钟", level="WARN")

def risk_management_check(self):
"""风控检查"""
# 检查总持仓风险
total_exposure = self.calculate_total_exposure()
max_exposure = 100000 # 最大敞口100k

if total_exposure > max_exposure:
self.trader.log(
f"🚨 总敞口超限: {total_exposure} > {max_exposure}",
level="ERROR",
color="red"
)
# 执行风控措施
self.emergency_close_positions()

# 检查浮动盈亏
total_pnl = self.calculate_total_pnl()
if total_pnl < -5000: # 浮亏超过5k
self.trader.log(
f"🚨 浮亏过大: {total_pnl}",
level="ERROR",
color="red"
)

def update_strategy_statistics(self):
"""更新策略统计"""
# 计算成功率
if self.total_trades > 0:
win_rate = self.successful_trades / self.total_trades * 100
self.trader.tlog(
"策略统计",
f"交易次数: {self.total_trades}, 成功率: {win_rate:.1f}%",
interval=300,
color="blue"
)

# 更新到Web平台
if hasattr(self.trader, 'update_trade_stats'):
self.trader.update_trade_stats(
maker_volume=self.total_maker_volume,
taker_volume=self.total_taker_volume,
profit=self.total_profit
)

def check_price_alerts(self):
"""价格预警检查"""
for symbol in self.monitored_symbols:
current_price = self.get_current_price(symbol)
if current_price:
# 检查价格突破
if symbol in self.price_alerts:
alert = self.price_alerts[symbol]
if alert['type'] == 'above' and current_price > alert['price']:
self.trader.log(f"💡 {symbol} 突破预警价格 {alert['price']}", color="yellow")
elif alert['type'] == 'below' and current_price < alert['price']:
self.trader.log(f"💡 {symbol} 跌破预警价格 {alert['price']}", color="yellow")

def check_pending_orders(self):
"""检查挂单状态"""
# 检查长时间未成交的订单
for order_id, order_info in self.pending_orders.items():
order_age = time.time() - order_info['created_time']

if order_age > 600: # 超过10分钟未成交
self.trader.log(
f"⏰ 订单{order_id}长时间未成交: {order_age:.0f}秒",
level="WARN"
)

# 可以选择撤销长时间未成交的订单
if order_age > 1800: # 超过30分钟强制撤销
self.cancel_order(order_id)
self.trader.log(f"🗑️ 强制撤销超时订单: {order_id}")

9.5 综合订阅配置示例

以下是一个完整的策略订阅配置示例,展示如何同时使用三种数据源:

def subscribes(self):
"""综合订阅配置示例"""
return [
# 1. 高频市场数据订阅
{
"account_id": 0,
"event_handle_mode": "Latest", # 高频市场数据使用Latest模式
"sub": {
"SubscribeWs": [
# 高频市场行情数据
{
"Kline": {
"symbols": ["BTC_USDT", "ETH_USDT"],
"interval": "1m"
}
},
{
"Bbo": ["BTC_USDT", "ETH_USDT"]
}
]
}
},
# 2. 重要数据订阅
{
"account_id": 0,
"event_handle_mode": "Sync", # 重要数据使用Sync模式
"sub": {
"SubscribeWs": [
{
"Depth": {
"symbols": ["BTC_USDT"],
"levels": 5
}
},
# 账户数据
{
"Order": ["BTC_USDT", "ETH_USDT"]
},
{
"Position": ["BTC_USDT", "ETH_USDT"]
}
]
}
},

# 2. REST轮询数据订阅
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 5, "nanos": 0},
"rest_type": "Balance" # 每5秒更新余额
}
}
},

# 3. 定时器任务订阅
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 30, "nanos": 0},
"name": "strategy_monitor" # 每30秒执行策略监控
}
}
},
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 300, "nanos": 0},
"name": "risk_check" # 每5分钟执行风控检查
}
}
}
]

9.6 数据源选择与最佳实践

🎯 数据源选择指南

根据不同的应用场景,选择合适的数据源类型:

应用场景推荐数据源订阅频道更新频率理由
高频交易WebSocketBbo, Depth, Trade毫秒级需要极低延迟的实时数据
技术分析WebSocketKline, MarkPrice秒级K线完结状态重要,实时性要求高
套利策略WebSocketBbo, Funding秒-分钟级价差和资金费率变化敏感
风控监控REST轮询Balance, Position1-10秒定期检查即可,不需要实时
策略逻辑定时器Timer10秒-5分钟独立于市场数据的逻辑执行
账户管理WebSocket + RESTOrder + Balance混合实时订单状态 + 定期余额检查

✅ 最佳实践建议

1. 分层订阅策略

def subscribes(self):
"""分层订阅策略示例"""
return [
# 核心层:高频实时数据
{
"account_id": 0,
"event_handle_mode": "Latest", # 高频数据使用Latest模式
"sub": {"SubscribeWs": [
{"Bbo": self.trading_symbols}, # 交易信号生成
]}
},
# 核心层:重要数据
{
"account_id": 0,
"event_handle_mode": "Sync", # 重要数据使用Sync模式
"sub": {"SubscribeWs": [
{"Order": self.trading_symbols} # 订单状态监控
]}
},

# 支撑层:中频数据
{
"account_id": 0,
"sub": {"SubscribeRest": {
"update_interval": {"secs": 10, "nanos": 0},
"rest_type": "Balance" # 余额监控
}}
},

# 管理层:低频定时任务
{
"sub": {"SubscribeTimer": {
"update_interval": {"secs": 60, "nanos": 0},
"name": "risk_management", # 风控检查
"initial_delay": {"secs": 30, "nanos": 0} # 延迟启动避免误报
}}
}
]

2. 数据处理优化

def on_bbo(self, exchange, symbol, bbo):
"""优化的BBO处理"""
try:
# 快速数据验证
if not self.validate_bbo_data(bbo):
return

# 核心交易逻辑(最小化处理时间)
signal = self.generate_trading_signal(symbol, bbo)
if signal:
self.execute_trade(signal)

except Exception as e:
# 错误处理不影响主流程
self.trader.log(f"BBO处理异常: {e}", level="ERROR")

def on_kline(self, exchange, kline):
"""K线数据处理最佳实践"""
for candle in kline['candles']:
# 关键:只处理已完结的K线
if candle['confirm']:
self.analyze_completed_candle(candle)
else:
# 未完结K线可用于实时监控,但不做技术分析
self.monitor_current_candle(candle)

3. 资源管理与性能优化

class PerformanceOptimizedStrategy:
def __init__(self):
# 数据缓存,避免重复计算
self.price_cache = {}
self.last_update_time = {}

def subscribes(self):
"""性能优化的订阅配置"""
# 只订阅必要的交易对
core_symbols = ["BTC_USDT", "ETH_USDT"] # 核心交易对
monitor_symbols = ["SOL_USDT", "BNB_USDT"] # 监控交易对

return [
# 核心交易对 - 高频订阅
{
"account_id": 0,
"event_handle_mode": "Latest", # 核心交易对使用Latest模式
"sub": {"SubscribeWs": [
{"Bbo": core_symbols},
{"Kline": {"symbols": core_symbols, "interval": "1m"}}
]}
},

# 监控交易对 - 低频订阅
{
"account_id": 0,
"event_handle_mode": "Sync", # 监控数据使用Sync模式
"sub": {"SubscribeWs": [
{"Bbo": monitor_symbols}
]}
}
]

def on_bbo(self, exchange, symbol, bbo):
"""带缓存的BBO处理"""
current_time = time.time()
last_time = self.last_update_time.get(symbol, 0)

# 限制处理频率,避免过度计算
if current_time - last_time < 0.1: # 100ms内不重复处理
return

self.last_update_time[symbol] = current_time
self.process_bbo_data(symbol, bbo)

✅ 推荐配置模式

高频交易模式:

# 追求极低延迟,最小化订阅
["Bbo", "Order"] # 仅订阅必要数据

技术分析模式:

# 需要完整市场数据
["Kline", "Bbo", "Depth", "MarkPrice"]

稳健套利模式:

# 平衡实时性和稳定性
WebSocket: ["Bbo", "Funding", "Order", "Position"]
REST: ["Balance"] (5)
Timer: ["risk_check"] (每分钟,延迟30秒启动)

❌ 常见误区避免

1. 过度订阅

# ❌ 错误:订阅过多不必要的数据
["Bbo", "Depth", "Trade", "Kline", "MarkPrice", "Funding"] # 太多了

# ✅ 正确:只订阅策略需要的数据
["Bbo", "Order"] # 简单策略只需要这些

2. 忽略数据状态

# ❌ 错误:不检查K线完结状态
def on_kline(self, exchange, kline):
candle = kline['candles'][-1]
self.analyze_signal(candle) # 可能分析未完结K线

# ✅ 正确:检查K线状态
def on_kline(self, exchange, kline):
candle = kline['candles'][-1]
if candle['confirm']: # 只分析已完结的K线
self.analyze_signal(candle)

3. 阻塞回调函数

# ❌ 错误:在回调中执行耗时操作
def on_bbo(self, exchange, symbol, bbo):
time.sleep(0.1) # 阻塞其他数据处理
self.complex_calculation() # 复杂计算

# ✅ 正确:异步处理耗时操作
def on_bbo(self, exchange, symbol, bbo):
# 快速缓存数据
self.bbo_cache[symbol] = bbo
# 复杂处理放在定时器中进行

def on_timer_subscribe(self, timer_name):
if timer_name == "complex_analysis":
self.perform_complex_analysis()

4. 忽略初始延迟配置

# ❌ 错误:多个定时器同时启动,造成资源竞争
[
{"SubscribeTimer": {"update_interval": {"secs": 300, "nanos": 0}, "name": "stats"}},
{"SubscribeTimer": {"update_interval": {"secs": 300, "nanos": 0}, "name": "report"}},
{"SubscribeTimer": {"update_interval": {"secs": 300, "nanos": 0}, "name": "backup"}}
]

# ✅ 正确:使用初始延迟错开启动时间
[
{"SubscribeTimer": {"update_interval": {"secs": 300, "nanos": 0}, "name": "stats"}},
{"SubscribeTimer": {"update_interval": {"secs": 300, "nanos": 0}, "name": "report",
"initial_delay": {"secs": 60, "nanos": 0}}},
{"SubscribeTimer": {"update_interval": {"secs": 300, "nanos": 0}, "name": "backup",
"initial_delay": {"secs": 120, "nanos": 0}}}
]

5. 初始延迟最佳实践

def subscribes(self):
"""使用initial_delay的最佳实践"""
return [
# 立即启动的监控类定时器
{
"sub": {"SubscribeTimer": {
"update_interval": {"secs": 30, "nanos": 0},
"name": "connection_monitor" # 无延迟,立即监控连接状态
}}
},

# 延迟启动的风控类定时器
{
"sub": {"SubscribeTimer": {
"update_interval": {"secs": 60, "nanos": 0},
"name": "risk_check",
"initial_delay": {"secs": 60, "nanos": 0} # 延迟1分钟,等数据稳定
}}
},

# 错开启动的统计类定时器(避免同时执行)
{
"sub": {"SubscribeTimer": {
"update_interval": {"secs": 300, "nanos": 0},
"name": "stats_calc",
"initial_delay": {"secs": 30, "nanos": 0} # 错开30秒
}}
},
{
"sub": {"SubscribeTimer": {
"update_interval": {"secs": 300, "nanos": 0},
"name": "report_gen",
"initial_delay": {"secs": 90, "nanos": 0} # 错开90秒
}}
}
]

🔧 调试与监控

数据流监控:

def on_bbo(self, exchange, symbol, bbo):
# 统计数据接收频率
self.data_stats[symbol] = self.data_stats.get(symbol, 0) + 1

def on_timer_subscribe(self, timer_name):
if timer_name == "data_monitor":
for symbol, count in self.data_stats.items():
self.trader.tlog(
"数据统计",
f"{symbol}: {count}次/分钟",
interval=60
)
self.data_stats.clear()

通过合理的数据源选择和配置,可以构建高效、稳定的量化交易策略。