九、数据源订阅
九、数据源订阅
9.1 订阅机制概述
Trader API v2提供了完整的数据源订阅机制,通过策略的subscribes()方法定义需要订阅的数据类型。系统支持三种不同的数据源订阅方式,每种方式适用于不同的数据获取场景。
📊 三种数据源类型
| 数据源类型 | 数据来源 | 更新方式 | 适用场景 | 延迟特性 |
|---|---|---|---|---|
| SubscribeWs | 交易所WebSocket | 实时推送 | 高频交易、实时行情 | 极低延迟 |
| SubscribeRest | 交易所REST API | 定时轮询 | 账户状态、合约信息、指定币种余额 | 秒级延迟 |
| SubscribeTimer | 系统定时器 | 定时触发 | 策略逻辑、状态检查 | 可配置间隔 |
🔄 数据流向图
┌─────────────────┐ ┌──────────────┐ ┌─────────────────┐
│ 交易所API │ │ 系统定时器 │ │ 策略回调函数 │
│ │ │ │ │ │
│ WebSocket ────┐ │ │ Timer ───┐ │ │ ┌─ on_bbo() │
│ REST API ──┐ │ │ │ │ │ │ ├─ on_kline() │
│ │ │ │ │ │ │ │ ├─ on_order() │
└───────────┼──┼─┘ └────────────┼─┘ │ ├─ on_timer() │
│ │ │ │ └─ on_balance()│
│ └─────────────────┐ │ │ │
└──────────────────┐ │ │ └─────────────────┘
│ │ │ ▲
▼ ▼ ▼ │
┌─────────────────┐ │
│ Trader Engine │──────┘
│ 数据分发中心 │
└─────────────────┘
9.2 WebSocket实时订阅 (SubscribeWs)
WebSocket订阅是最重要的数据源,提供交易所的实时推送数据,具有极低延迟的特性,是高频交易和实时行情分析的核心。
📡 WebSocket支持的频道类型
| 频道分类 | 频道类型 | 说明 | 回调函数 | 延迟级别 |
|---|---|---|---|---|
| 市场行情 | Bbo | 最佳买卖价 | on_bbo | 毫秒级 |
Depth | 市场深度 | on_depth | 毫秒级 | |
Trade | 成交记录 | on_trade | 毫秒级 | |
AggTrade | 聚合成交 | on_agg_trade | 毫秒级 | |
Kline | K线数据 | on_kline | 秒级 | |
MarkPrice | 标记价格 | on_mark_price | 秒级 | |
Funding | 资金费率 | on_funding | 分钟级 | |
| 账户数据 | Order | 订单更新 | on_order | 毫秒级 |
Position | 持仓更新 | on_position | 毫秒级 | |
Balance | 余额更新 | on_balance | 秒级 | |
FundingFee | 资金费结算 | on_funding_fee | 小时级 | |
OrderAndFill | 订单和成交 | on_order_and_fill | 毫秒级 |
🚀 WebSocket订阅配置
基础配置格式:
{
"account_id": 0, # 账户ID(市场数据可选,账户数据必须)
"event_handle_mode": "Sync", # 事件处理模式:Latest/Sync/Async(可选,默认Sync)
"sub": {
"SubscribeWs": [
# 市场数据订阅(无需账户ID)
{
"频道类型": 配置参数
},
# 账户数据订阅(需要账户ID)
{
"频道类型": 配置参数
}
]
}
}
⚠️ 重要说明:
event_handle_mode参数仅对SubscribeWs生效,SubscribeRest和SubscribeTimer会忽略此参数- 不指定
event_handle_mode时默认使用"Sync"模式 - 不同数据类型推荐使用不同模式:
- 高频数据(Bbo、Trade)→
"Latest"模式 - 重要数据(Depth、Order、Position)→
"Sync"模式 - 高频重要数据 →
"Async"模式
- 高频数据(Bbo、Trade)→
📈 Kline数据订阅详解
📈 Kline订阅配置
基础格式:
{
"Kline": {
"symbols": ["BTC_USDT", "ETH_USDT"], # 订阅的交易对列表
"interval": "1m" # K线时间间隔
}
}
支持的时间间隔:
"1m","3m","5m","15m","30m": 分钟级别"1h","2h","4h","6h","8h","12h": 小时级别"1d","3d": 天级别"1w": 周级别"1M": 月级别
🔄 Kline回调数据结构
当订阅Kline数据后,每当有新的K线数据时,会触发on_kline(exchange, kline)回调:
回调参数:
exchange: 字符串,交易所名称kline: K线数据对象,包含以下字段:symbol: 交易对符号interval: K线时间粒度candles: 蜡烛图数据列表(通常只包含最新的一根K线)
Candle数据结构:
{
"timestamp": 1640995200000, # 开盘时间,Unix时间戳(毫秒)
"open": 50000.0, # 开盘价
"high": 50500.0, # 最高价
"low": 49800.0, # 最低价
"close": 50200.0, # 收盘价
"volume": 123.45, # 成交量(基础货币)
"quote_volume": 6180000.0, # 成交额(报价货币)
"trades": 1250, # 成交笔数(可选)
"taker_buy_volume": 65.2, # 主动买入成交量(可选)
"taker_buy_quote_volume": 3270000.0, # 主动买入成交额(可选)
"confirm": False # K线状态:false=未完结,true=已完结
}
📊 Kline订阅示例
完整订阅示例:
def subscribes(self):
"""返回订阅配置"""
return [
{
"account_id": 0,
"event_handle_mode": "Latest", # K线和BBO数据使用Latest模式,追求最低延迟
"sub": {
"SubscribeWs": [
# 订阅多个交易对的1分钟K线
{
"Kline": {
"symbols": ["BTC_USDT", "ETH_USDT", "SOL_USDT"],
"interval": "1m"
}
},
# 同时订阅BBO数据
{
"Bbo": ["BTC_USDT", "ETH_USDT"]
}
]
}
},
# 可以订阅不同时间周期的K线
{
"account_id": 0,
"event_handle_mode": "Sync", # 重要数据使用Sync模式
"sub": {
"SubscribeWs": [
{
"Kline": {
"symbols": ["BTC_USDT"],
"interval": "5m"
}
}
]
}
}
]
def on_kline(self, exchange, kline):
"""K线数据回调处理"""
symbol = kline['symbol']
interval = kline['interval']
candles = kline['candles']
for candle in candles:
timestamp = candle['timestamp']
open_price = candle['open']
high_price = candle['high']
low_price = candle['low']
close_price = candle['close']
volume = candle['volume']
is_confirmed = candle['confirm']
# 计算涨跌幅
price_change_pct = (close_price - open_price) / open_price * 100
status = "已完结" if is_confirmed else "未完结"
self.trader.log(
f"{exchange} {symbol} {interval} K线更新: "
f"价格 {close_price} (涨跌 {price_change_pct:+.2f}%), "
f"成交量 {volume:.2f}, 状态: {status}",
level="DEBUG"
)
# 只对已完结的K线进行技术分析
if is_confirmed:
self.analyze_kline_signal(symbol, candle)
def analyze_kline_signal(self, symbol, candle):
"""分析K线信号"""
# 检测大阳线/大阴线
body_ratio = abs(candle['close'] - candle['open']) / (candle['high'] - candle['low'])
if body_ratio > 0.7: # 实体超过70%
if candle['close'] > candle['open']:
signal_type = "大阳线"
color = "green"
else:
signal_type = "大阴线"
color = "red"
self.trader.tlog(
f"K线信号_{symbol}",
f"{symbol}: 检测到{signal_type},实体比例 {body_ratio:.1%}",
color=color,
interval=60
)
🔥 其他WebSocket订阅示例
深度数据订阅:
{
"Depth": {
"symbols": ["BTC_USDT", "ETH_USDT"],
"levels": 5 # 订阅5档深度
}
}
账户数据订阅:
{
"account_id": 0,
"event_handle_mode": "Sync", # 账户数据使用Sync模式,保证数据完整性
"sub": {
"SubscribeWs": [
{
"Order": ["BTC_USDT", "ETH_USDT"] # 订单更新
},
{
"Position": ["BTC_USDT", "ETH_USDT"] # 持仓更新
},
{
"OrderAndFill": ["BTC_USDT"] # 订单和成交数据
}
]
}
}
💡 WebSocket完整示例
订阅配置:
def subscribes(self):
"""WebSocket订阅配置示例"""
return [
# 高频市场数据使用Latest模式
{
"account_id": 0,
"event_handle_mode": "Latest", # 高频数据使用Latest模式
"sub": {
"SubscribeWs": [
# 高频市场行情数据
{
"Bbo": ["BTC_USDT", "ETH_USDT"] # 最佳买卖价
},
{
"Kline": {
"symbols": ["BTC_USDT", "ETH_USDT"],
"interval": "1m" # 1分钟K线
}
}
]
}
},
# 重要数据使用Sync模式
{
"account_id": 0,
"event_handle_mode": "Sync", # 重要数据使用Sync模式
"sub": {
"SubscribeWs": [
{
"Depth": {
"symbols": ["BTC_USDT"],
"levels": 10 # 10档深度
}
},
{
"Funding": ["BTC_USDT", "ETH_USDT"] # 资金费率
},
# 账户数据
{
"Order": ["BTC_USDT", "ETH_USDT"] # 订单状态
},
{
"Position": ["BTC_USDT", "ETH_USDT"] # 持仓变化
},
"Balance" # 账户余额
]
}
}
]
回调函数处理:
def on_bbo(self, exchange, symbol, bbo):
"""最佳买卖价回调"""
bid_price = bbo.get('bid_price')
ask_price = bbo.get('ask_price')
spread = ask_price - bid_price
self.trader.log(f"{symbol} BBO: 买{bid_price} 卖{ask_price} 价差{spread}")
def on_kline(self, exchange, kline):
"""K线数据回调"""
symbol = kline['symbol']
interval = kline['interval']
for candle in kline['candles']:
if candle['confirm']: # 只处理已完结的K线
close_price = candle['close']
volume = candle['volume']
self.trader.log(f"{symbol} {interval} K线完结: 价格{close_price} 成交量{volume}")
def on_depth(self, exchange, symbol, depth):
"""深度数据回调"""
bids = depth.get('bids', [])
asks = depth.get('asks', [])
if bids and asks:
best_bid = bids[0][0] if bids[0] else 0
best_ask = asks[0][0] if asks[0] else 0
self.trader.log(f"{symbol} 深度: 最优买{best_bid} 最优卖{best_ask}")
def on_order(self, exchange, order):
"""订单状态回调"""
symbol = order.get('symbol')
status = order.get('status')
side = order.get('side')
self.trader.log(f"订单更新: {symbol} {side} {status}")
def on_position(self, exchange, position):
"""持仓变化回调
参数:
exchange: 交易所名称
position: 持仓数据,数据结构参见[7.2 Position(持仓)数据结构](#72-position持仓数据结构)
"""
symbol = position.get('symbol')
amount = position.get('amount', 0)
side = position.get('side')
unrealized_pnl = position.get('unrealized_pnl', 0)
self.trader.log(f"持仓更新: {symbol} {side} 数量{amount} 未实现盈亏{unrealized_pnl}")
9.3 REST轮询订阅 (SubscribeRest)
REST轮询订阅适用于不需要极低延迟但需要定期更新的数据,如账户余额、持仓状态、合约信息等。系统会按照指定的时间间隔主动调用REST API获取数据。
🔄 REST轮询支持的数据类型
| 数据类型 | 说明 | 回调函数 | 更新频率建议 | 适用场景 |
|---|---|---|---|---|
Balance | 账户余额 | on_balance | 1-5秒 | 余额监控、风控 |
BalanceByCoin | 指定币种余额 | on_balance | 1-5秒 | 特定币种余额监控 |
Position | 持仓信息 | on_position | 2-10秒 | 持仓管理、盈亏计算 |
Funding | 资金费率 | on_funding | 30-300秒 | 资金费率监控 |
Instrument | 合约信息 | on_instrument | 60-300秒 | 合约规则更新 |
⚙️ REST轮询配置
基础配置格式:
{
"account_id": 0, # 必须指定账户ID
# 注意:event_handle_mode对SubscribeRest不生效,REST轮询始终按顺序处理
"sub": {
"SubscribeRest": {
"update_interval": {
"secs": 5, # 轮询间隔(秒)
"nanos": 0 # 纳秒精度(通常为0)
},
"rest_type": "数据类型" # 要轮询的数据类型
}
}
}
⚠️ 重要说明:
event_handle_mode参数对SubscribeRest不生效,REST轮询始终按顺序串行处理- REST轮询适用于不需要极低延迟的数据,如余额、持仓等
- 如需高频数据,请使用
SubscribeWs配合合适的event_handle_mode
特殊配置格式(BalanceByCoin):
{
"account_id": 0, # 必须指定账户ID
"sub": {
"SubscribeRest": {
"update_interval": {
"secs": 5, # 轮询间隔(秒)
"nanos": 0 # 纳秒精度(通常为0)
},
"rest_type": {
"BalanceByCoin": "币种名称" # 指定要监控的币种,如"USDT"、"BTC"
}
}
}
}
实际示例:
# 轮询账户余额(每5秒一次)
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 5, "nanos": 0},
"rest_type": "Balance"
}
}
}
# 轮询指定币种余额(每2秒一次)
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 2, "nanos": 0},
"rest_type": {
"BalanceByCoin": "USDT"
}
}
}
}
# 轮询持仓信息(每10秒一次)
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 10, "nanos": 0},
"rest_type": "Position"
}
}
}
# 轮询合约信息(每5分钟一次)
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 300, "nanos": 0},
"rest_type": "Instrument"
}
}
}
💡 BalanceByCoin 特殊说明
BalanceByCoin 是 Balance 的优化版本,专门用于监控特定币种的余额变化。相比 Balance 订阅所有币种余额,BalanceByCoin 具有以下优势:
使用场景:
- 高频监控: 对特定币种(如USDT、BTC)进行高频余额监控
- 资源优化: 只获取指定币种数据,减少网络传输和数据处理开销
- 精确风控: 针对特定币种设置精确的风控阈值
- 性能提升: 减少不必要的数据处理,提高策略执行效率
配置示例:
# 监控USDT余额(每2秒更新)
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 2, "nanos": 0},
"rest_type": {
"BalanceByCoin": "USDT"
}
}
}
}
# 监控BTC余额(每5秒更新)
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 5, "nanos": 0},
"rest_type": {
"BalanceByCoin": "BTC"
}
}
}
}
回调处理:
def on_balance(self, account_id, balances):
"""BalanceByCoin回调处理"""
for balance in balances:
asset = balance.get('asset')
available = balance.get('available_balance', 0)
total = balance.get('balance', 0)
# 由于使用BalanceByCoin,这里只会收到指定币种的数据
self.trader.log(f"{asset}余额: 可用{available} 总计{total}")
# 针对特定币种的风控逻辑
if asset == "USDT" and available < 100:
self.trader.log(f"🚨 USDT余额不足: {available}", level="ERROR")
elif asset == "BTC" and available < 0.001:
self.trader.log(f"⚠️ BTC余额不足: {available}", level="WARN")
💡 REST轮询完整示例
订阅配置:
def subscribes(self):
"""REST轮询订阅配置示例"""
return [
# 轮询账户余额
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 5, "nanos": 0},
"rest_type": "Balance"
}
}
},
# 轮询指定币种余额(USDT)
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 2, "nanos": 0},
"rest_type": {
"BalanceByCoin": "USDT"
}
}
}
},
# 轮询持仓信息
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 10, "nanos": 0},
"rest_type": "Position"
}
}
},
# 轮询资金费率(备用方案,当WebSocket不稳定时)
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 60, "nanos": 0},
"rest_type": "Funding"
}
}
},
# 轮询合约信息更新
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 300, "nanos": 0},
"rest_type": "Instrument"
}
}
}
]
回调函数处理:
def on_balance(self, account_id, balances):
"""余额轮询回调
参数:
account_id: 账户ID
balances: 余额数据列表,每个元素包含币种余额信息
"""
for balance in balances:
asset = balance.get('asset')
available = balance.get('available_balance', 0)
total = balance.get('balance', 0)
if total > 0: # 只关注有余额的币种
self.trader.log(f"余额更新: {asset} 可用{available} 总计{total}")
# 风控检查
if available < total * 0.1: # 可用余额不足10%
self.trader.log(f"⚠️ {asset} 可用余额不足: {available}", level="WARN")
# 特定币种监控(当使用BalanceByCoin订阅时)
if asset == "USDT":
if available < 100: # USDT余额不足100
self.trader.log(f"🚨 USDT余额不足: {available}", level="ERROR")
def on_position(self, account_id, positions):
"""持仓轮询回调
参数:
account_id: 账户ID
positions: 持仓数据列表,每个元素的数据结构参见[7.2 Position(持仓)数据结构](#72-position持仓数据结构)
"""
for position in positions:
symbol = position.get('symbol')
amount = position.get('amount', 0)
side = position.get('side')
unrealized_pnl = position.get('unrealized_pnl', 0)
if abs(amount) > 0: # 只关注有持仓的交易对
self.trader.log(f"持仓检查: {symbol} {side} 数量{amount} 浮盈{unrealized_pnl}")
# 持仓风控
if unrealized_pnl < -1000: # 浮亏超过1000
self.trader.log(f"🚨 {symbol} 浮亏过大: {unrealized_pnl}", level="ERROR")
def on_funding(self, account_id, funding_rates):
"""资金费率轮询回调
参数:
account_id: 账户ID
funding_rates: 资金费率数据列表
"""
for funding in funding_rates:
symbol = funding.get('symbol')
rate = funding.get('funding_rate', 0)
next_time = funding.get('next_funding_at', 0)
# 资金费率监控
if abs(rate) > 0.001: # 费率超过0.1%
rate_pct = rate * 100
self.trader.log(f"高资金费率: {symbol} {rate_pct:.3f}% 下次结算{next_time}")
def on_instrument(self, account_id, instruments):
"""合约信息轮询回调
参数:
account_id: 账户ID
instruments: 合约信息数据列表
"""
for instrument in instruments:
symbol = instrument.get('symbol')
status = instrument.get('status')
# 监控交易对状态变化
if status != 'trading':
self.trader.log(f"⚠️ 交易对状态异常: {symbol} 状态为 {status}", level="WARN")
9.4 定时器订阅 (SubscribeTimer)
定时器订阅用于执行策略相关的定时任务,如定期检查订单状态、执行风控逻辑、统计数据更新等。不依赖外部数据源,完全由系统定时器驱动。
⏰ 定时器特点
- 精确定时: 支持秒级和纳秒级精度
- 多定时器: 可以设置多个不同名称的定时器
- 独立执行: 不依赖交易所连接状态
- 灵活配置: 可以动态调整执行间隔
- 错开启动: 支持初始延迟,避免多个定时器同时启动造成资源竞争
🎯 定时器配置
基础配置格式:
{
# 注意:event_handle_mode对SubscribeTimer不生效,定时器始终按顺序执行
"sub": {
"SubscribeTimer": {
"update_interval": {
"secs": 间隔秒数, # 定时器间隔(秒)
"nanos": 纳秒精度 # 纳秒精度(通常为0)
},
"name": "定时器名称", # 定时器标识名称
"initial_delay": { # 可选:初始延迟时间
"secs": 延迟秒数, # 启动前延迟(秒)
"nanos": 纳秒精度 # 纳秒精度(通常为0)
}
}
}
}
⚠️ 重要说明:
event_handle_mode参数对SubscribeTimer不生效,定时器始终按顺序串行执行- 定时器适用于策略逻辑、风控检查、统计更新等定时任务
- 如需高频数据处理,请使用
SubscribeWs配合合适的event_handle_mode
字段说明:
update_interval: 定时器执行间隔,必须字段name: 定时器名称,用于标识和日志记录,必须字段initial_delay: 初始延迟时间,可选字段,用于错开多个定时器的启动时间,避免资源竞争
initial_delay使用场景:
| 场景类型 | 建议延迟时间 | 原因说明 | 示例 |
|---|---|---|---|
| 风控检查 | 30-60秒 | 等待数据稳定,避免启动时误报 | 余额检查、持仓风控 |
| 统计计算 | 错开15-60秒 | 避免多个计算任务同时执行 | 收益统计、交易分析 |
| 网络请求 | 错开10-30秒 | 分散API调用,避免频率限制 | 外部数据获取 |
| 文件操作 | 错开30-120秒 | 避免同时写文件造成冲突 | 日志轮转、数据备份 |
| 连接监控 | 0秒(立即) | 需要尽快检测连接状态 | WebSocket状态检查 |
实际应用示例:
# 每30秒检查一次价格预警(立即启动)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 30, "nanos": 0},
"name": "price_alert_check"
}
}
}
# 每分钟更新一次策略统计(延迟10秒启动)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 60, "nanos": 0},
"name": "stats_update",
"initial_delay": {"secs": 10, "nanos": 0}
}
}
}
# 每10秒检查一次订单状态(延迟5秒启动)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 10, "nanos": 0},
"name": "order_status_check",
"initial_delay": {"secs": 5, "nanos": 0}
}
}
}
# 风控检查(延迟30秒启动,避免启动时误报)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 300, "nanos": 0},
"name": "risk_check",
"initial_delay": {"secs": 30, "nanos": 0}
}
}
}
# 对应的回调函数处理
def on_timer_subscribe(self, timer_name):
"""定时器回调处理"""
if timer_name == "price_alert_check":
self.check_price_alerts()
elif timer_name == "stats_update":
self.update_strategy_stats()
elif timer_name == "order_status_check":
self.check_pending_orders()
else:
self.trader.log(f"未知定时器: {timer_name}", level="WARN")
💡 定时器完整示例
订阅配置:
def subscribes(self):
"""定时器订阅配置示例"""
return [
# 策略监控定时器(每30秒,立即启动)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 30, "nanos": 0},
"name": "strategy_monitor"
}
}
},
# 风控检查定时器(每5分钟,延迟60秒启动避免启动时误报)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 300, "nanos": 0},
"name": "risk_check",
"initial_delay": {"secs": 60, "nanos": 0}
}
}
},
# 统计数据更新(每分钟,延迟15秒启动)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 60, "nanos": 0},
"name": "stats_update",
"initial_delay": {"secs": 15, "nanos": 0}
}
}
},
# 价格预警检查(每10秒,延迟5秒启动)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 10, "nanos": 0},
"name": "price_alert",
"initial_delay": {"secs": 5, "nanos": 0}
}
}
},
# 订单状态检查(每15秒,延迟10秒启动)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 15, "nanos": 0},
"name": "order_check",
"initial_delay": {"secs": 10, "nanos": 0}
}
}
}
]
回调函数处理:
def on_timer_subscribe(self, timer_name):
"""定时器回调统一处理"""
try:
if timer_name == "strategy_monitor":
self.strategy_monitor()
elif timer_name == "risk_check":
self.risk_management_check()
elif timer_name == "stats_update":
self.update_strategy_statistics()
elif timer_name == "price_alert":
self.check_price_alerts()
elif timer_name == "order_check":
self.check_pending_orders()
else:
self.trader.log(f"未知定时器: {timer_name}", level="WARN")
except Exception as e:
self.trader.log(f"定时器{timer_name}执行异常: {e}", level="ERROR")
def strategy_monitor(self):
"""策略监控逻辑"""
# 检查策略运行状态
uptime = time.time() - self.start_time
self.trader.tlog(
"策略监控",
f"策略运行时间: {uptime:.0f}秒, 状态: 正常",
interval=300, # 5分钟记录一次
color="green"
)
# 检查WebSocket连接状态
if hasattr(self, 'last_data_time'):
time_since_data = time.time() - self.last_data_time
if time_since_data > 60: # 超过1分钟没有数据
self.trader.log("⚠️ WebSocket数据中断超过1分钟", level="WARN")
def risk_management_check(self):
"""风控检查"""
# 检查总持仓风险
total_exposure = self.calculate_total_exposure()
max_exposure = 100000 # 最大敞口100k
if total_exposure > max_exposure:
self.trader.log(
f"🚨 总敞口超限: {total_exposure} > {max_exposure}",
level="ERROR",
color="red"
)
# 执行风控措施
self.emergency_close_positions()
# 检查浮动盈亏
total_pnl = self.calculate_total_pnl()
if total_pnl < -5000: # 浮亏超过5k
self.trader.log(
f"🚨 浮亏过大: {total_pnl}",
level="ERROR",
color="red"
)
def update_strategy_statistics(self):
"""更新策略统计"""
# 计算成功率
if self.total_trades > 0:
win_rate = self.successful_trades / self.total_trades * 100
self.trader.tlog(
"策略统计",
f"交易次数: {self.total_trades}, 成功率: {win_rate:.1f}%",
interval=300,
color="blue"
)
# 更新到Web平台
if hasattr(self.trader, 'update_trade_stats'):
self.trader.update_trade_stats(
maker_volume=self.total_maker_volume,
taker_volume=self.total_taker_volume,
profit=self.total_profit
)
def check_price_alerts(self):
"""价格预警检查"""
for symbol in self.monitored_symbols:
current_price = self.get_current_price(symbol)
if current_price:
# 检查价格突破
if symbol in self.price_alerts:
alert = self.price_alerts[symbol]
if alert['type'] == 'above' and current_price > alert['price']:
self.trader.log(f"💡 {symbol} 突破预警价格 {alert['price']}", color="yellow")
elif alert['type'] == 'below' and current_price < alert['price']:
self.trader.log(f"💡 {symbol} 跌破预警价格 {alert['price']}", color="yellow")
def check_pending_orders(self):
"""检查挂单状态"""
# 检查长时间未成交的订单
for order_id, order_info in self.pending_orders.items():
order_age = time.time() - order_info['created_time']
if order_age > 600: # 超过10分钟未成交
self.trader.log(
f"⏰ 订单{order_id}长时间未成交: {order_age:.0f}秒",
level="WARN"
)
# 可以选择撤销长时间未成交的订单
if order_age > 1800: # 超过30分钟强制撤销
self.cancel_order(order_id)
self.trader.log(f"🗑️ 强制撤销超时订单: {order_id}")
9.5 综合订阅配置示例
以下是一个完整的策略订阅配置示例,展示如何同时使用三种数据源:
def subscribes(self):
"""综合订阅配置示例"""
return [
# 1. 高频市场数据订阅
{
"account_id": 0,
"event_handle_mode": "Latest", # 高频市场数据使用Latest模式
"sub": {
"SubscribeWs": [
# 高频市场行情数据
{
"Kline": {
"symbols": ["BTC_USDT", "ETH_USDT"],
"interval": "1m"
}
},
{
"Bbo": ["BTC_USDT", "ETH_USDT"]
}
]
}
},
# 2. 重要数据订阅
{
"account_id": 0,
"event_handle_mode": "Sync", # 重要数据使用Sync模式
"sub": {
"SubscribeWs": [
{
"Depth": {
"symbols": ["BTC_USDT"],
"levels": 5
}
},
# 账户数据
{
"Order": ["BTC_USDT", "ETH_USDT"]
},
{
"Position": ["BTC_USDT", "ETH_USDT"]
}
]
}
},
# 2. REST轮询数据订阅
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 5, "nanos": 0},
"rest_type": "Balance" # 每5秒更新余额
}
}
},
# 3. 定时器任务订阅
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 30, "nanos": 0},
"name": "strategy_monitor" # 每30秒执行策略监控
}
}
},
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 300, "nanos": 0},
"name": "risk_check" # 每5分钟执行风控检查
}
}
}
]
9.6 数据源选择与最佳实践
🎯 数据源选择指南
根据不同的应用场景,选择合适的数据源类型:
| 应用场景 | 推荐数据源 | 订阅频道 | 更新频率 | 理由 |
|---|---|---|---|---|
| 高频交易 | WebSocket | Bbo, Depth, Trade | 毫秒级 | 需要极低延迟的实时数据 |
| 技术分析 | WebSocket | Kline, MarkPrice | 秒级 | K线完结状态重要,实时性要求高 |
| 套利策略 | WebSocket | Bbo, Funding | 秒-分钟级 | 价差和资金费率变化敏感 |
| 风控监控 | REST轮询 | Balance, Position | 1-10秒 | 定期检查即可,不需要实时 |
| 策略逻辑 | 定时器 | Timer | 10秒-5分钟 | 独立于市场数据的逻辑执行 |
| 账户管理 | WebSocket + REST | Order + Balance | 混合 | 实时订单状态 + 定期余额检查 |
✅ 最佳实践建议
1. 分层订阅策略
def subscribes(self):
"""分层订阅策略示例"""
return [
# 核心层:高频实时数据
{
"account_id": 0,
"event_handle_mode": "Latest", # 高频数据使用Latest模式
"sub": {"SubscribeWs": [
{"Bbo": self.trading_symbols}, # 交易信号生成
]}
},
# 核心层:重要数据
{
"account_id": 0,
"event_handle_mode": "Sync", # 重要数据使用Sync模式
"sub": {"SubscribeWs": [
{"Order": self.trading_symbols} # 订单状态监控
]}
},
# 支撑层:中频数据
{
"account_id": 0,
"sub": {"SubscribeRest": {
"update_interval": {"secs": 10, "nanos": 0},
"rest_type": "Balance" # 余额监控
}}
},
# 管理层:低频定时任务
{
"sub": {"SubscribeTimer": {
"update_interval": {"secs": 60, "nanos": 0},
"name": "risk_management", # 风控检查
"initial_delay": {"secs": 30, "nanos": 0} # 延迟启动避免误报
}}
}
]
2. 数据处理优化
def on_bbo(self, exchange, symbol, bbo):
"""优化的BBO处理"""
try:
# 快速数据验证
if not self.validate_bbo_data(bbo):
return
# 核心交易逻辑(最小化处理时间)
signal = self.generate_trading_signal(symbol, bbo)
if signal:
self.execute_trade(signal)
except Exception as e:
# 错误处理不影响主流程
self.trader.log(f"BBO处理异常: {e}", level="ERROR")
def on_kline(self, exchange, kline):
"""K线数据处理最佳实践"""
for candle in kline['candles']:
# 关键:只处理已完结的K线
if candle['confirm']:
self.analyze_completed_candle(candle)
else:
# 未完结K线可用于实时监控,但不做技术分析
self.monitor_current_candle(candle)
3. 资源管理与性能优化
class PerformanceOptimizedStrategy:
def __init__(self):
# 数据缓存,避免重复计算
self.price_cache = {}
self.last_update_time = {}
def subscribes(self):
"""性能优化的订阅配置"""
# 只订阅必要的交易对
core_symbols = ["BTC_USDT", "ETH_USDT"] # 核心交易对
monitor_symbols = ["SOL_USDT", "BNB_USDT"] # 监控交易对
return [
# 核心交易对 - 高频订阅
{
"account_id": 0,
"event_handle_mode": "Latest", # 核心交易对使用Latest模式
"sub": {"SubscribeWs": [
{"Bbo": core_symbols},
{"Kline": {"symbols": core_symbols, "interval": "1m"}}
]}
},
# 监控交易对 - 低频订阅
{
"account_id": 0,
"event_handle_mode": "Sync", # 监控数据使用Sync模式
"sub": {"SubscribeWs": [
{"Bbo": monitor_symbols}
]}
}
]
def on_bbo(self, exchange, symbol, bbo):
"""带缓存的BBO处理"""
current_time = time.time()
last_time = self.last_update_time.get(symbol, 0)
# 限制处理频率,避免过度计算
if current_time - last_time < 0.1: # 100ms内不重复处理
return
self.last_update_time[symbol] = current_time
self.process_bbo_data(symbol, bbo)
✅ 推荐配置模式
高频交易模式:
# 追求极低延迟,最小化订阅
["Bbo", "Order"] # 仅订阅必要数据
技术分析模式:
# 需要完整市场数据
["Kline", "Bbo", "Depth", "MarkPrice"]
稳健套利模式:
# 平衡实时性和稳定性
WebSocket: ["Bbo", "Funding", "Order", "Position"]
REST: ["Balance"] (每5秒)
Timer: ["risk_check"] (每分钟,延迟30秒启动)
❌ 常见误区避免
1. 过度订阅
# ❌ 错误:订阅过多不必要的数据
["Bbo", "Depth", "Trade", "Kline", "MarkPrice", "Funding"] # 太多了
# ✅ 正确:只订阅策略需要的数据
["Bbo", "Order"] # 简单策略只需要这些
2. 忽略数据状态
# ❌ 错误:不检查K线完结状态
def on_kline(self, exchange, kline):
candle = kline['candles'][-1]
self.analyze_signal(candle) # 可能分析未完结K线
# ✅ 正确:检查K线状态
def on_kline(self, exchange, kline):
candle = kline['candles'][-1]
if candle['confirm']: # 只分析已完结的K线
self.analyze_signal(candle)
3. 阻塞回调函数
# ❌ 错误:在回调中执行耗时操作
def on_bbo(self, exchange, symbol, bbo):
time.sleep(0.1) # 阻塞其他数据处理
self.complex_calculation() # 复杂计算
# ✅ 正确:异步处理耗时操作
def on_bbo(self, exchange, symbol, bbo):
# 快速缓存数据
self.bbo_cache[symbol] = bbo
# 复杂处理放在定时器中进行
def on_timer_subscribe(self, timer_name):
if timer_name == "complex_analysis":
self.perform_complex_analysis()
4. 忽略初始延迟配置
# ❌ 错误:多个定时器同时启动,造成资源竞争
[
{"SubscribeTimer": {"update_interval": {"secs": 300, "nanos": 0}, "name": "stats"}},
{"SubscribeTimer": {"update_interval": {"secs": 300, "nanos": 0}, "name": "report"}},
{"SubscribeTimer": {"update_interval": {"secs": 300, "nanos": 0}, "name": "backup"}}
]
# ✅ 正确:使用初始延迟错开启动时间
[
{"SubscribeTimer": {"update_interval": {"secs": 300, "nanos": 0}, "name": "stats"}},
{"SubscribeTimer": {"update_interval": {"secs": 300, "nanos": 0}, "name": "report",
"initial_delay": {"secs": 60, "nanos": 0}}},
{"SubscribeTimer": {"update_interval": {"secs": 300, "nanos": 0}, "name": "backup",
"initial_delay": {"secs": 120, "nanos": 0}}}
]
5. 初始延迟最佳实践
def subscribes(self):
"""使用initial_delay的最佳实践"""
return [
# 立即启动的监控类定时器
{
"sub": {"SubscribeTimer": {
"update_interval": {"secs": 30, "nanos": 0},
"name": "connection_monitor" # 无延迟,立即监控连接状态
}}
},
# 延迟启动的风控类定时器
{
"sub": {"SubscribeTimer": {
"update_interval": {"secs": 60, "nanos": 0},
"name": "risk_check",
"initial_delay": {"secs": 60, "nanos": 0} # 延迟1分钟,等数据稳定
}}
},
# 错开启动的统计类定时器(避免同时执行)
{
"sub": {"SubscribeTimer": {
"update_interval": {"secs": 300, "nanos": 0},
"name": "stats_calc",
"initial_delay": {"secs": 30, "nanos": 0} # 错开30秒
}}
},
{
"sub": {"SubscribeTimer": {
"update_interval": {"secs": 300, "nanos": 0},
"name": "report_gen",
"initial_delay": {"secs": 90, "nanos": 0} # 错开90秒
}}
}
]
🔧 调试与监控
数据流监控:
def on_bbo(self, exchange, symbol, bbo):
# 统计数据接收频率
self.data_stats[symbol] = self.data_stats.get(symbol, 0) + 1
def on_timer_subscribe(self, timer_name):
if timer_name == "data_monitor":
for symbol, count in self.data_stats.items():
self.trader.tlog(
"数据统计",
f"{symbol}: {count}次/分钟",
interval=60
)
self.data_stats.clear()
通过合理的数据源选择和配置,可以构建高效、稳定的量化交易策略。