跳到主要内容

OpenQuant 执行流程详解

🚀 OpenQuant 执行流程详解

📋 概述

OpenQuant系统的执行流程基于Rust核心引擎和Python策略包装器,从配置文件加载到策略启动运行,包含多个关键步骤。以下是基于实际代码的详细执行流程说明:

🔄 策略执行流程

第一步:配置文件准备

配置文件结构 (config.toml):

# 策略文件路径
strategy_path = "misc/examples/test/strategy.py"
# 策略配置文件路径
strategy_config_path = "misc/examples/test/strategy.json"
# 交易引擎版本
trader_version = "V2"

[web]
is_production = false
secret_id = "your_secret_id"
secret_key = "your_secret_key"

[[exchanges]]
exchange = "GateSwap"
is_testnet = true
key = "your_api_key"
secret = "your_secret"
# 可选:为该交易所单独配置代理
proxy = "http://127.0.0.1:7890"

交易所配置说明

  • proxy: 可选字符串,为该交易所账户单独配置代理
  • 支持协议类型:
    • HTTP代理:http://host:port
    • HTTPS代理:https://host:port
    • SOCKS5代理:socks5://host:port
  • 支持用户名密码验证:
    • 格式:http://username:password@host:port
    • 格式:socks5://username:password@host:port
  • 如不配置则使用系统默认网络连接
  • 每个交易所账户可以配置不同的代理,实现灵活的网络路由

代理配置示例

# HTTP代理(无认证)
proxy = "http://127.0.0.1:7890"

# HTTP代理(带认证)
proxy = "http://user123:pass456@127.0.0.1:7890"

# SOCKS5代理(无认证)
proxy = "socks5://127.0.0.1:1080"

# SOCKS5代理(带认证)
proxy = "socks5://myuser:mypass@127.0.0.1:1080"

🔐 访问密钥获取与启动鉴权流程

为保证与 Web 平台的安全交互,OpenQuant 在启动流程中会使用访问密钥进行鉴权。请按以下步骤配置与检查:

  • 获取访问密钥

    • 若无账号:请联系网页负责人创建账号。
    • 登录站点:
    • 登录后点击右上角“用户名” → “访问密钥” → “新建密钥”,保存生成的 SecretIdSecretKey
  • 在配置文件中设置密钥

    • 将上一步获取的密钥写入 [web]secret_idsecret_key
    • 使用 is_production 选择环境:
      • true:使用正式网域名 https://clm.nb8.net
      • false:使用测试网域名 https://test.nb8.net
    • 注意:请确保密钥来源与所选环境一致(生产密钥配生产环境,测试密钥配测试环境)。
  • 启动时鉴权行为(代码已集成)

    • 程序启动后会先上报本机系统信息,随后调用授权接口校验密钥。
    • 若未授权或密钥无效,程序会报错并中止,提示前往网页端完成授权或重新生成密钥。
  • 常见问题排查

    • 鉴权失败:核对 is_production 与密钥来源一致;检查 secret_id/secret_key 是否正确、未留空。
    • 仍失败:在网页端“访问密钥”页重新“新建密钥”,更新配置后重试。
    • 安全性:避免将真实密钥提交到版本库,可通过本地未纳管配置或环境变量注入。

第二步:加载策略代码

功能: 加载Python策略文件并创建策略实例

策略文件要求:

  • 必须继承自 base_strategy.BaseStrategy
  • 必须实现 start() 方法
  • 必须实现 subscribes() 方法

策略文件结构 (实际示例):

import base_strategy
import traderv2

class Strategy(base_strategy.BaseStrategy):
def __init__(self, cex_configs, dex_configs, config, trader: traderv2.TraderV2):
self.cex_configs = cex_configs
self.dex_configs = dex_configs
self.trader = trader
self.config = config

def name(self):
return "测试策略"

def start(self):
"""策略启动函数 - 在这里执行策略逻辑"""
self.trader.log("启动测试策略")
# 策略具体逻辑...

def subscribes(self):
"""返回数据订阅配置"""
return [
{
"account_id": 0,
"event_handle_mode": "Latest",
"sub": {
"SubscribeWs": [{"Bbo": ["BTC_USDT"]}]
}
},
{
"account_id": 0,
"event_handle_mode": "Async",
"sub": {
"SubscribeWs": [
{"Order": ["BTC_USDT"]},
{"Pisition": ["BTC_USDT"]}
]
}
}
]

第三步:数据订阅配置

功能: 根据策略的subscribes()方法配置数据订阅

订阅类型:

  1. WebSocket订阅 (SubscribeWs):

    • Order: 订单更新
    • OrderAndFill: 订单和成交更新
    • Bbo: 最优买卖价
    • Depth: 订单簿深度
    • Kline: K线数据
    • Funding: 资金费率
  2. REST API订阅 (SubscribeRest):

    • BalanceByCoin: 按币种查询余额
    • 定时查询接口
  3. 定时器订阅 (SubscribeTimer):

    • 定时执行策略逻辑

第四步:启动策略

功能: 启动策略运行,调用策略的start()方法

启动流程:

  1. 调用策略的 subscribes() 方法获取数据订阅配置
  2. 建立数据连接(WebSocket/REST)
  3. 执行策略的 start() 方法
  4. 进入事件循环,开始处理数据推送和回调

📊 数据流和事件回调

策略启动后,系统进入事件驱动模式:

事件回调方法:

市场数据回调:

  • on_bbo(exchange, bbo): 最优买卖价回调
  • on_depth(exchange, depth): 深度数据回调
  • on_ticker(exchange, ticker): 行情数据回调
  • on_trade(exchange, trade): 成交数据回调
  • on_kline(exchange, kline): K线数据回调
  • on_funding(exchange, funding): 资金费率回调
  • on_mark_price(exchange, mark_price): 标记价格回调
  • on_instrument(exchange, instruments): 产品信息回调
  • on_instrument_updated(exchange, instruments): 产品更新回调
  • on_instrument_added(exchange, instruments): 产品新增回调
  • on_instrument_removed(exchange, instruments): 产品移除回调

账户数据回调:

  • on_order(account_id, order): 订单更新回调
  • on_order_and_fill(account_id, order): 订单和成交回调
  • on_position(account_id, positions): 持仓更新回调
  • on_balance(account_id, balances): 余额更新回调
  • on_funding_fee(account_id, funding_fee): 资金费结算回调

异步命令结果回调:

  • on_order_submitted(account_id, result, order): 订单提交回调
  • on_order_amended(account_id, result, order): 订单修改回调
  • on_order_canceled(account_id, result, order_id, symbol): 订单取消回调
  • on_batch_order_submitted(account_id, batch_order_rsp): 批量下单回调
  • on_batch_order_canceled(account_id, batch_order_rsp): 批量取消订单回调
  • on_batch_order_canceled_by_ids(account_id, batch_order_rsp): 按ID批量取消订单回调

网络与连接回调:

  • on_ws_connected(exchange, account_id): WebSocket连接成功回调
  • on_ws_disconnected(exchange, account_id): WebSocket断开连接回调
  • on_subscribe(ws_id, channels, result): 运行时订阅结果回调
  • on_unsubscribe(ws_id, channels, result): 运行时取消订阅结果回调

系统事件回调:

  • on_timer_subscribe(timer_name): 定时器回调
  • on_stop(): 策略停止回调
  • on_config_update(config): 配置热更新回调
  • on_latency(latency_type, latency_value): 延迟数据回调
  • on_cmd(cmd): Web命令回调
  • on_signal(signal): 信号回调

⚡ 事件回调并行与串行机制

OpenQuant系统采用智能的并行/串行处理机制来优化性能和保证数据一致性:

🔄 订阅组间并行处理

不同订阅组之间是并行处理的,这意味着:

def subscribes(self):
return [
# 订阅组1 - 与订阅组2并行处理
{
"account_id": 0,
"event_handle_mode": "Sync", # 订单数据使用Sync模式,保证数据完整性
"sub": {
"SubscribeWs": [
{"Order": ["BTC_USDT", "ETH_USDT"]}
]
}
},
# 订阅组2 - 与订阅组1并行处理
{
"account_id": 0,
"event_handle_mode": "Latest", # BBO数据使用Latest模式,追求最低延迟
"sub": {
"SubscribeWs": [
{"Bbo": ["BTC_USDT", "ETH_USDT"]}
]
}
}
]
  • 订单更新(Order)和最优买卖价(Bbo)回调可以同时执行
  • 不同订阅组的数据处理不会相互阻塞

🔗 同一订阅组内串行处理

同一订阅组内的数据是串行处理的,确保数据处理的顺序性:

{
"account_id": 0,
"sub": {
"SubscribeWs": [
{"Order": ["BTC_USDT", "ETH_USDT"]}, # 这些数据类型
{"OrderAndFill": ["BTC_USDT", "ETH_USDT"]}, # 在同一组内
{"Bbo": ["BTC_USDT", "ETH_USDT"]} # 串行处理
]
}
}

🎛️ Event Handle Mode 配置影响

通过订阅时指定event_handle_mode,可以控制 Websocket 事件的处理方式: 所有订阅的默认行为都是 Sync

⚠️ 重要变更:从0.9.3版本开始,新增了event_handle_mode配置参数,这是一个重要的不兼容改动。现有策略需要根据数据处理需求选择合适的模式。

🔄 配置迁移:旧版本的data_source.market_mode配置已完全被event_handle_mode取代,请将原有配置迁移到新的参数。

配置文件示例:

def subscribes(self):
"""返回数据订阅配置"""
return [
{
"account_id": 0,
"event_handle_mode": "Latest", # 新增:指定事件处理模式
# "event_handle_mode": "Sync", # 默认模式,可省略
# "event_handle_mode": "Async", # 异步队列模式
"sub": {
"SubscribeWs": [{"Bbo": ["BTC_USDT"]}]
}
},
{
"account_id": 0,
"event_handle_mode": "Sync", # 重要数据使用Sync模式
"sub": {
"SubscribeWs": [
{"Depth": ["BTC_USDT"]},
{"Order": ["BTC_USDT"]}
]
}
}
]

模式选择指南

  • Latest模式:适用于高频数据(BBO、Trade),追求最低延迟
  • Sync模式:适用于重要数据(Depth、Funding、Order),保证数据完整性
  • Async模式:适用于高频重要数据,平衡延迟和完整性

📈 Latest模式 - Symbol间并行

event_handle_mode配置为"Latest"时,不同symbol间并行处理:

# 配置为Latest模式时
{"Bbo": ["BTC_USDT", "ETH_USDT", "LTC_USDT"]}

处理方式:

  • BTC_USDTETH_USDTLTC_USDT的BBO数据可以并行处理
  • 每个symbol的on_bbo()回调可以同时执行
  • 适用于对实时性要求高的场景

📊 Sync模式 - 消息阻塞串行处理

event_handle_mode配置为"Sync"时,按照消息到达的顺序串行处理:

# 配置为Sync模式时
{"Depth": ["BTC_USDT", "ETH_USDT", "LTC_USDT"]}

处理方式:

  • 按照消息实际到达的时间顺序串行处理
  • 可能的处理顺序:ETH_USDTBTC_USDTLTC_USDTBTC_USDT...
  • 保证消息处理的严格时序性,不会并发处理
  • 适用于需要保证数据处理顺序一致性的场景

📊 Async模式 - 消息异步串行处理

event_handle_mode配置为"Async"时,按照消息到达的顺序串行处理:

# 配置为Async模式时
{"Depth": ["BTC_USDT", "ETH_USDT", "LTC_USDT"]}

处理方式:

  • 按照消息实际到达的时间顺序,将事件写入队列,并在后台按照原始事件顺序串行处理
  • 可能的处理顺序:ETH_USDTBTC_USDTLTC_USDTBTC_USDT...
  • 保证消息处理的严格时序性,不会并发处理
  • 适用于需要保证数据处理顺序一致性,并且消息数量较大有可能阻塞 Websocket 连接并导致断连的场景

💡 处理机制总结

同一个订阅组共享相同的 event_handle_mode 配置,如果需要不同处理方式,需要拆分到不同订阅组。 event_handle_mode 只对 SubscribeWs 生效,SubscribeRestSubscribeTimer 会忽略此内容。 三种模式行为概括如下: Latest:所有 event 在 symbol 内串行,symbol 间并行,接收速度大于消费速度时,会丢弃未被消费的旧事件,保留新收到的事件。 Sync:所有 event 串行,延迟相对较低,但如果事件处理太慢可能阻塞 Websocket 导致断连。 Async :所有 event 串行,但是引入额外的队列来缓存事件,相对于 Sync 模式有额外开销,但是不会导致 WebSocket 阻塞断连。

订阅组1 (并行) ┌─ Order: 消息1 → 消息2 → 消息3 (同组内串行)
├─ Bbo: BTC_USDT ∥ ETH_USDT (Latest模式并行)
└─ Depth: 消息A → 消息B → 消息C (Sync模式按到达顺序串行)

订阅组2 (并行) ┌─ Balance查询 (定时)
└─ Timer事件 (定时)

🎯 最佳实践建议

  1. 高频数据使用Latest模式: 如BBO、Trade等实时性要求高的数据
  2. 重要数据使用Sync模式: 如Depth、Funding等需要完整处理的数据
  3. 重要高频数据使用Async模式:如部分特殊情况需要完整 BBO、Trade 等数据
  4. 合理分组订阅: 将相关性强的数据放在同一组,无关数据分开订阅
  5. 避免阻塞: 回调函数中避免长时间计算,必要时使用异步处理

⚠️ 注意事项

  • 回调函数执行时间: 应尽量保持回调函数的执行时间短,避免影响其他数据处理
  • 数据一致性: Sync模式按消息到达顺序串行处理保证时序一致性,Latest模式虽然快但可能跳过某些数据
  • 内存使用: Latest模式内存占用较低,Sync模式可能需要更多内存缓存
  • ⚠️ 线程安全问题: 并行处理时要特别注意数据的线程安全问题,避免竞态条件和数据不一致

📊 性能对比表

模式延迟吞吐量数据完整性内存使用适用场景
Latest最低最高可能丢失最低高频数据(BBO、Trade)
Sync中等中等完整中等重要数据(Depth、Funding)
Async中等中等完整较高高频重要数据

🔄 迁移指南

从旧版本升级到0.9.3

  1. 配置迁移:将原有的data_source.market_mode配置迁移到新的event_handle_mode参数
    # 旧版本配置(已废弃)
    [data_source.market_mode]
    mark_price = "Latest"
    bbo = "Latest"
    depth = "Latest"
    funding = "All"
    trade = "All"

    # 新版本配置
    # 在subscribes()方法中为每个订阅组指定event_handle_mode
  2. 无需修改:如果不指定event_handle_mode,系统默认使用"Sync"模式,保持向后兼容
  3. 性能优化:根据数据类型选择合适的模式:
    • BBO、Trade数据 → 使用"Latest"模式
    • Depth、Funding、Order数据 → 使用"Sync"模式
    • 高频重要数据 → 使用"Async"模式
  4. 测试验证:在生产环境使用前,建议在测试环境验证不同模式的效果

配置示例

def subscribes(self):
return [
# 高频数据使用Latest模式
{
"account_id": 0,
"event_handle_mode": "Latest",
"sub": {
"SubscribeWs": [
{"Bbo": ["BTC_USDT", "ETH_USDT"]},
{"Trade": ["BTC_USDT", "ETH_USDT"]}
]
}
},
# 重要数据使用Sync模式
{
"account_id": 0,
"event_handle_mode": "Sync",
"sub": {
"SubscribeWs": [
{"Depth": ["BTC_USDT"]},
{"Order": ["BTC_USDT"]},
{"Funding": ["BTC_USDT"]}
]
}
}
]

🔒 线程安全注意事项

在并行处理环境中,策略开发者需要特别注意以下线程安全问题:

1. 共享变量访问:

class Strategy(base_strategy.BaseStrategy):
def __init__(self, cex_configs, dex_configs, config, trader):
# ❌ 危险:多个回调可能同时修改这些变量
self.total_volume = 0
self.order_count = 0
self.position_size = 0

def on_bbo(self, exchange, bbo):
# ❌ 危险:并行回调可能导致数据竞态
self.total_volume += bbo['volume']

def on_order(self, account_id, order):
# ❌ 危险:并发修改可能导致计数错误
self.order_count += 1

2. 安全的处理方式:

import threading

class Strategy(base_strategy.BaseStrategy):
def __init__(self, cex_configs, dex_configs, config, trader):
# ✅ 使用锁保护共享数据
self._lock = threading.Lock()
self.total_volume = 0
self.order_count = 0

def on_bbo(self, exchange, bbo):
# ✅ 使用锁保护临界区
with self._lock:
self.total_volume += bbo['volume']

def on_order(self, account_id, order):
# ✅ 原子操作或使用锁
with self._lock:
self.order_count += 1

3. 推荐的安全模式:

from collections import defaultdict
import threading

class Strategy(base_strategy.BaseStrategy):
def __init__(self, cex_configs, dex_configs, config, trader):
# ✅ 使用线程安全的数据结构
self._data_lock = threading.Lock()
self._symbol_data = defaultdict(dict)

def on_bbo(self, exchange, bbo):
symbol = bbo['symbol']
# ✅ 每个symbol使用独立的数据存储
with self._data_lock:
self._symbol_data[symbol]['last_bbo'] = bbo

def get_symbol_data(self, symbol):
# ✅ 读取时也要加锁
with self._data_lock:
return self._symbol_data[symbol].copy()

4. 常见线程安全问题:

  • 竞态条件: 多个回调同时修改同一变量
  • 数据不一致: 读写操作没有适当同步
  • 死锁风险: 多个锁的使用顺序不当
  • 性能影响: 过度使用锁可能影响并行性能

5. 最佳实践建议:

  • 优先使用不可变数据结构
  • 将状态数据按symbol分离存储
  • 使用原子操作替代锁(如可能)
  • 避免在回调中执行长时间操作
  • 考虑使用队列进行异步处理

🔧 策略执行时序图

程序启动

加载配置文件 (config.toml)

加载策略文件 (strategy.py)

创建策略实例

调用 subscribes() 获取订阅配置

建立数据连接

调用 start() 方法

进入事件循环 (处理数据回调)

策略持续运行...

监听 Ctrl+C 信号

调用 on_stop() 方法 (策略清理)

程序退出

🛑 策略停止机制

当用户按下 Ctrl+C 时,系统会优雅地停止策略:

  1. 信号捕获: 系统捕获 SIGINT 信号(Ctrl+C)
  2. 调用 on_stop(): 执行策略的 on_stop() 方法进行清理
  3. 资源释放: 关闭数据连接、保存状态等
  4. 程序退出: 安全退出程序

策略中的on_stop()实现示例:

def on_stop(self):
"""
策略停止时的回调函数
用于清理资源、保存状态等
"""
self.trader.log("策略正在停止...")

# 取消所有挂单
# self.cancel_all_orders()

# 保存策略状态
# self.save_strategy_state()

# 其他清理工作
self.trader.log("策略已安全停止")

⚠️ 重要:Result返回值处理说明

由于本API基于PyO3构建,所有返回数据的方法在Rust内部都返回Result类型。在Python中调用时,您需要特别注意以下几点:

🔍 返回值结构

大部分API方法返回的结果具有以下结构:

# 成功的返回值结构
{
"Ok": {
# 实际的数据内容
"data": "...",
"code": 200,
"msg": "success"
}
}

# 错误的返回值结构
{
"Err": {
"error": "错误描述",
"code": 400
}
}

💡 正确的使用方式

所有返回数据的方法都应该按以下方式处理:

# ✅ 正确的处理方式
result = trader.http_request("https://api.example.com/data", "GET", None)

if "Ok" in result:
# 成功情况 - 提取实际数据
data = result["Ok"]
print(f"请求成功: {data}")
else:
# 错误情况 - 处理错误信息
error = result.get("Err", "未知错误")
print(f"请求失败: {error}")



📑 API分类索引

分类功能模块主要用途
🚀 OpenQuant 执行流程详解配置文件加载、策略初始化、数据订阅、策略启动了解系统完整启动流程
一、核心交易功能交易所api指令执行、标识符生成执行交易所api、生成唯一ID
二、日志管理日志记录记录系统和交易信息
三、缓存管理缓存操作、缓存系统说明保存和恢复策略状态
四、外部通信HTTP请求获取外部数据
五、NB8 Web平台集成Web客户端管理等六个模块与Web平台交互和数据展示
六、OpenQuant Web通讯line、stats、page、table四种数据上报方式Web平台数据展示
七、交易所API直接访问账户查询、订单管理、止损订单、市场数据、市值数据、交易所特殊说明直接访问交易所API
八、数据结构说明K线数据结构、表格格式等理解数据结构和格式
九、数据源订阅WebSocket、REST、定时器三种数据源订阅接收实时和定时数据
十、Python交易所支持 (pyex)Python交易所适配器开发自定义交易所集成
十一、延迟统计功能系统延迟、行情延迟、下单延迟等数据统计功能性能监控和优化
十二、插件系统折扣币自动购买插件等扩展功能系统扩展和功能增强
十三、CLI命令行工具IP映射、依赖查询、系统状态、版本信息系统管理和信息查询