跳到主要内容

Web平台策略开发指南

本指南将帮助您在Web平台上开发策略,包括策略的创建、编写、配置、部署和调试。

目录

概述

OpenQuant 的 Web 平台提供了一个可视化的策略编辑器,可以帮助您轻松创建、编辑和调试策略。

前置条件

在开始编写策略之前,请确保您已经:

  • ✅ 完成快速入门中的步骤
  • ✅ 创建了 API Key 和加密配置
  • ✅ 部署了托管者并确保其正常运行

策略创建

创建新策略

  1. 点击进入“策略”库页面
  2. 点击右上角“新增”按钮,进入新增策略页面
  3. 填写策略基本信息:
    • 策略语言:选择 Python 或 Rust(目前支持这两种语言)
    • 策略名称:为您的策略起一个易于识别的名称
    • 分组:选择策略所属的分组(便于管理)
    • 策略描述:简要描述策略的功能和特点

新增策略

策略语言选择
  • Python:适合快速开发和原型验证,语法简洁,生态丰富
  • Rust:适合高性能场景,延迟更低,但开发复杂度较高

下面以实现 Python 深度不平衡策略为例,展示完整的策略创建和编写流程。

策略编辑

编辑策略代码

创建策略后,系统会自动生成一个默认的 strategy.py 文件。您可以在此基础上进行编辑:

  1. 在“策略代码”区域找到 strategy.py 文件
  2. 点击文件名进入编辑模式
  3. 修改代码内容

新建文件

如果需要创建额外的文件(如配置文件、工具类等),可以:

  1. 点击“新建文件”按钮
  2. 输入文件名(如utils.py 等)
  3. 按回车确认
  4. 在“文件列表”中点击文件名进行编辑

新建文件

提示

该图仅为展示前端文件交互用法,实际使用时请根据您的策略需求创建相应的文件。

完整策略示例

下面是一个完整的深度不平衡策略示例。该策略通过监控订单簿买卖盘深度不平衡来捕捉交易机会。

代码说明

这是一个完整的示例策略,包含约 800 行代码。建议您:

  1. 先理解整体结构
  2. 根据您的需求修改关键部分
  3. 参考完整示例文档了解更多策略模式

策略核心逻辑

  • 监控订单簿前 5 档深度
  • 计算买卖盘不平衡比例
  • 当不平衡超过阈值时触发交易信号
  • 使用限价单执行交易
点击展开查看完整策略代码(约 800 行)
"""
订单簿深度不平衡策略
实现原理:基于买卖盘深度不平衡检测市场情绪,捕捉短期价格波动
"""

import time
import traderv2 # type: ignore
import base_strategy

class Strategy(base_strategy.BaseStrategy):
"""深度不平衡策略实现类,通过监控买卖盘深度差异捕捉短期价格波动机会"""

def __init__(self, cex_configs, dex_configs, config, trader: traderv2.TraderV2):
# 访问配置值
self.cex_configs = cex_configs # 中心化交易所配置
self.has_account = len(cex_configs) > 0 # 是否有可用账户
self.base_index = len(self.cex_configs)
self.dex_configs = dex_configs # 去中心化交易所配置
self.trader = trader # 交易执行器
self.stop_flag = False # 停止标志

# 持仓历史缓存,用于记录开仓价格和跟踪订单状态
self.position_cache = {} # 持仓历史缓存
self.positions = {} # 当前持仓信息
self.balances = {} # 账户余额信息
self.orders = {} # 订单信息
self.last_signal_time = {} # 上次信号触发时间

# 初始化配置
self.update_config(config)

# 记录策略启动时间和初始资产
self.start_time = time.time()
self.initial_equity = 10000.0 # 默认初始资产,会在连接账户后更新

# 尝试加载缓存的交易数据
self.load_cache_data()

# 交易对信息字典
self.instruments = {} # 交易对基本信息
self.bbos = {} # 最优买卖价信息
self.fee_rates = {} # 手续费率

self.flight_orders = {} # 在途订单,尚未收到交易所确认

self.last_trigger_time = {} # 上次触发交易的时间

# 强停状态
self.force_stop = False # 强制停止标志,用于紧急情况

positions = self.trader.get_positions(0).get('Ok', [])
# 过滤出数量大于0的持仓
positions = [pos for pos in positions if pos['amount'] > 0]
if positions:
self.trader.log(f"初始持仓: {len(positions)}", level="INFO", color="blue")
for pos in positions:
symbol = pos['symbol']
self.trader.log(f"\t{symbol}:{pos}", level="INFO", color="blue")
self.positions[symbol] = pos
else:
self.trader.log("无初始持仓", level="INFO", color="red")

def update_config(self, config):
"""
更新策略配置参数

参数:
config: 包含策略参数的字典
"""
self.min_seconds_between_triggers = config['min_seconds_between_triggers'] # 触发信号最小间隔时间(秒)
self.symbols = config['symbols'] # 交易品种列表
self.trigger_imbalance_ratio = config['trigger_imbalance_ratio'] # 触发信号的深度不平衡比率阈值
self.trigger_min_size = config['trigger_min_size'] # 触发信号的最小深度尺寸
self.order_timeout = config.get('order_timeout', 30) # 订单超时时间(秒)
self.leverage = config['leverage'] # 杠杆倍数

def name(self):
"""返回策略名称"""
return "Depth Imbalance Strategy V2"

def start(self):
"""
策略启动函数

完成以下初始化工作:
1. 查询账户余额
2. 初始化持仓信息
3. 设置持仓模式(如适用)
4. 初始化交易对信息
5. 设置杠杆倍数
"""
self.trader.log("启动深度不平衡策略V2", level="INFO", color="blue")

# 查询账户余额
if self.has_account:
# 方式1: 使用get_usdt_balance
balance = self.trader.get_usdt_balance(0)
self.trader.log(f"余额: {balance}", level="DEBUG", color="blue")
self.balances = {
"USDT": {
"available_balance": balance.get('Ok', {}).get('available_balance', 0),
"balance": balance.get('Ok', {}).get('balance', 0)
}
}
self.initial_equity = self.balances.get("USDT", {}).get("balance", 0.0)

# 方式2:生成命令然后publish
cmd = self.trader.get_usdt_balance(0, generate=True)['Ok']
self.trader.log(f"publish 方法命令: {cmd}", level="DEBUG", color="blue")
balance = self.trader.publish(cmd)
self.trader.log(f"publish 方法余额: {balance}", level="DEBUG", color="blue")

# 初始化仓位信息
positions = self.trader.get_positions(0)
self.positions = {}
if 'Ok' in positions and isinstance(positions['Ok'], list):
for pos in positions['Ok']:
if isinstance(pos, dict) and 'symbol' in pos:
self.positions[pos['symbol']] = pos

# 如果交易所是合约交易所,设置单向持仓
if self.cex_configs[0]['exchange'].endswith('Swap'):
try:
# 查询是否双向持仓
is_dual = self.trader.is_dual_side(0)
if 'Ok' in is_dual:
# 如果当前是双向持仓,改为单向持仓
if is_dual['Ok']:
# 设置单向持仓
res = self.trader.set_dual_side(0, False)
self.trader.log(f"设置单向持仓: {res}", level="INFO", color="green")
except Exception as e:
self.trader.log(f"设置持仓模式时发生错误: {str(e)}", level="ERROR", color="red")

# 初始化交易对信息
for symbol in self.symbols:
# 查询合约信息
instrument = self.trader.get_instrument(0, symbol)
time.sleep(0.1) # 避免请求过快
if 'Ok' in instrument:
self.instruments[symbol] = instrument['Ok']
self.trader.log(f"交易对 {symbol} 信息: {instrument['Ok']}", level="INFO", color="green")
else:
raise Exception(f"未找到交易对 {symbol} 信息")

# 查询Bbo(最佳买卖价)
bbo = self.trader.get_bbo(0, symbol)
time.sleep(0.1)
if 'Ok' in bbo:
self.bbos[symbol] = bbo['Ok']
else:
raise Exception(f"未找到交易对 {symbol} 的BBO")

# 查询手续费率
fee = self.trader.get_fee_rate(0, symbol)
time.sleep(0.1)
if 'Ok' in fee:
self.fee_rates[symbol] = fee['Ok']
else:
raise Exception(f"未找到交易对 {symbol} 的手续费率")

# 如果是合约,设置杠杆
if self.cex_configs[0]['exchange'].endswith('Swap'):
# 设置杠杆倍数
leverage_res = self.trader.set_leverage(0, symbol, self.leverage)
self.trader.log(f"设置杠杆 {self.leverage}x: {leverage_res}", level="INFO", color="green")

# 记录初始资产
self.initial_equity = self.balances.get("USDT", {}).get("balance", 10000.0)
self.trader.log(f"策略初始资产: {self.initial_equity} USDT", level="INFO", color="green")


def subscribes(self):
"""
返回策略需要订阅的行情和账户数据

订阅内容包括:
1. 深度行情数据
2. 最优买卖价(BBO)
3. 订单和持仓更新(如有账户)
4. 余额定时更新
5. 定时检查订单状态

返回:
包含订阅信息的列表
"""
subs = [
{
"account_id": 0,
"sub": {
"SubscribeWs": [
{
"Depth": {
"symbols": self.symbols,
"levels": 5 # 订阅5档深度
},
},
{
"Bbo": self.symbols
}
]
}
},
]

if self.has_account:
# 订阅交易所websocket
subs.append({
"account_id": 0,
"sub": {
"SubscribeWs": [
{
"OrderAndFill": self.symbols # 订阅订单以及用户私有成交更新
},
{
"Position": self.symbols # 订阅持仓更新
},
{
"Order": self.symbols # 订阅订单更新
}
]
}
},)

# 定时更新持仓和余额信息
subs.append({
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {
"secs": 2,
"nanos": 0
},
"rest_type": "Balance" # 每秒更新账户余额
}
}
},)

# 定时检查订单状态
subs.append({
"sub": {
"SubscribeTimer": {
"update_interval": {
"secs": 10,
"nanos": 0
},
"name": "cancel_all_orders" # 每10秒检查一次订单状态
}
}
},)

# 定时检查风险
subs.append({
"sub": {
"SubscribeTimer": {
"update_interval": {
"secs": 10,
"nanos": 0
},
"name": "risk_monitor" # 每10秒检查一次风险
}
}
},)

# 定时上传表格
subs.append({
"sub": {
"SubscribeTimer": {
"update_interval": {
"secs": 5,
"nanos": 0
},
"name": "upload_tables" # 每5秒上传一次表格
}
}
},)

return subs

def on_bbo(self, exchange, bbo):
"""
处理最优买卖价(BBO)数据事件

更新交易对的最新买卖价信息,用于后续交易决策

参数:
exchange: 交易所名称
bbo: 最优买卖价数据
"""
if self.force_stop:
return

symbol = bbo['symbol']
if symbol not in self.symbols:
return

# 更新BBO数据
self.bbos[symbol] = bbo

def on_depth(self, exchange, depth):
"""
处理深度数据事件 - 策略核心逻辑

根据订单簿深度计算买卖盘不平衡程度,在满足条件时生成交易信号

参数:
exchange: 交易所名称
depth: 深度数据,包含买卖盘各档位价格和数量
"""
if self.force_stop:
return

symbol = depth['symbol']
if symbol not in self.symbols:
return

# 计算买卖深度总量(前5档)
bid_total = sum(level[1] for level in depth['bids'][:5])
ask_total = sum(level[1] for level in depth['asks'][:5])

# 如果深度为空,跳过
if bid_total == 0 or ask_total == 0:
return

# 计算不平衡比例:(买盘量-卖盘量)/(买盘量+卖盘量)
# 范围在[-1,1]之间,正值表示买盘压力更大,负值表示卖盘压力更大
imbalance_ratio = (bid_total - ask_total) / (bid_total + ask_total)

# 检查是否满足触发信号的时间间隔条件
now = time.time()
last_signal_time = self.last_signal_time.get(symbol, 0)
time_since_last_signal = now - last_signal_time

# 如果距离上次信号时间不足,跳过
if time_since_last_signal < self.min_seconds_between_triggers:
return

# 计算深度总量,用于判断市场流动性
total_depth = bid_total + ask_total

# 检查信号触发条件:
# 1. 不平衡比例绝对值超过阈值
# 2. 总深度超过最小要求(确保足够流动性)
if abs(imbalance_ratio) > self.trigger_imbalance_ratio and total_depth > self.trigger_min_size:
# 更新最后信号时间
self.last_signal_time[symbol] = now

# 根据不平衡方向确定交易方向
# 买盘压力大(正比例)时做多,卖盘压力大(负比例)时做空
side = "Buy" if imbalance_ratio > 0 else "Sell"

# 打印信号信息
color = "green" if side == "Buy" else "red"
self.trader.log(f"深度不平衡信号: {symbol} {side} 不平衡比例: {imbalance_ratio:.4f} 总深度: {total_depth:.4f}",
level="INFO", color=color)

# 执行交易
if self.has_account:
order_data = self.build_trade_order(symbol, side)
if order_data:
self.flight_orders[symbol] = order_data
cmd = {
"account_id": 0,
"method": "PlaceOrder",
"sync": False,
**order_data,
}
return {
"cmds": [cmd],
}

def on_order_submitted(self, account_id, order_id_result, order):
"""
处理订单提交事件回调

当订单提交到交易所后,处理交易所返回的确认信息

参数:
account_id: 账户ID
order_id_result: 订单ID结果
order: 订单信息
"""
symbol = order['symbol']
if symbol in self.flight_orders:
del self.flight_orders[symbol] # 从在途订单中移除

if 'Ok' in order_id_result:
self.trader.log(f"订单提交成功: {order_id_result}", level="INFO", color="green")

order_id = order_id_result['Ok']
# 保存订单记录
order = {
"status": "Open",
'symbol': symbol,
'side': order['side'],
'quantity': order['amount'],
'price': order['price'],
'create_time': time.time()
}

# 更新最后触发时间
self.last_trigger_time[symbol] = time.time()

# 记录订单信息
self.orders[order_id] = order
else:
self.trader.log(f"订单提交失败: {order_id_result}", level="ERROR", color="red")

def on_order_and_fill(self, account_id, order):
"""
订单/用户私有成交更新时触发的方法, 订单频道和用户成交频道哪个快推哪个。

Args:
account_id (str): 账户 ID。
order: 订单对象。
"""
self.trader.log(f"订单/用户私有成交更新: {order}", level="INFO", color="blue")

def on_order(self, account_id, order):
"""
处理订单更新事件

当订单状态有变化时(如已成交、已撤销等),更新本地订单记录

参数:
account_id: 账户ID
order: 更新后的订单信息
"""
self.trader.log(f"订单更新: {order}", level="INFO", color="blue")
if order['id'] in self.orders:
# 更新订单状态
self.orders[order['id']]['status'] = order['status']

# 如果订单已完成(成交或取消),记录成交信息
if order['status'] in ['Filled', 'Canceled']:
self.trader.log(f"订单状态更新: {order['symbol']} {order['side']} 状态: {order['status']} 成交均价: {order.get('avg_price', 0)}",
level="INFO", color="blue")

def on_position(self, account_id, positions):
"""
处理持仓更新事件

当持仓有变化时,更新本地持仓记录

参数:
account_id: 账户ID
positions: 更新后的持仓信息
"""
self.trader.log(f"持仓更新: {positions}", level="INFO", color="blue")
if isinstance(positions, list):
for position in positions:
if isinstance(position, dict) and 'symbol' in position:
symbol = position['symbol']
self.positions[symbol] = position

def on_balance(self, account_id, balances):
"""
处理余额更新事件

当账户余额变化时,更新本地余额记录

参数:
account_id: 账户ID
balances: 更新后的余额信息
"""
self.trader.log(f"余额更新: {balances}", level="INFO", color="blue")
for balance in balances:
asset = balance['asset']
self.balances[asset] = balance

def on_timer_subscribe(self, timer_name):
"""
处理定时器事件

根据不同的定时器名称执行相应的定时任务

参数:
timer_name: 定时器名称
"""
if timer_name == "cancel_all_orders":
self.cancel_all_orders() # 定期检查并取消长时间未成交的订单

def on_stop(self):
"""
策略停止时的处理逻辑

执行以下清理工作:
1. 取消所有未完成订单
2. 平掉所有持仓
3. 保存缓存数据
4. 计算策略收益
"""
self.trader.log("策略正在停止...", level="INFO", color="blue")
self.stop_flag = True

# 取消所有订单
self.cancel_all_orders()

# 平仓
self.close_all_positions()

# 保存缓存数据
self.save_cache_data()

# 计算收益
self.calculate_profit()

def on_config_update(self, config):
"""
热更新策略配置

在策略运行期间更新参数,无需重启策略

参数:
config: 新的配置参数
"""
self.trader.log(f"热更新策略配置: {config}", level="INFO", color="blue")
self.update_config(config)

def on_latency(self, latency, account_id):
"""
延迟统计时触发的方法。

Args:
latency (dict): 延迟信息。
account_id (str): 账户 ID。
"""
self.trader.log(f"延迟统计: {latency}, account_id: {account_id}", level="INFO", color="blue")

# -------------------------------------------------------实用工具函数-------------------------------------------------------
def build_trade_order(self, symbol, side):
"""
准备交易订单数据

根据交易信号和市场状况,计算下单参数

参数:
symbol: 交易品种
side: 交易方向(Buy/Sell)

返回:
包含订单和参数的字典,或None表示不下单
"""
# 防止重复下单:同一品种有在途订单时跳过
if symbol in self.flight_orders:
return None

# 检查信号触发的时间间隔
last_trigger_time = self.last_trigger_time.get(symbol, 0)
if time.time() - last_trigger_time < self.min_seconds_between_triggers:
return None

# 确保有交易所行情数据
if symbol not in self.instruments or symbol not in self.bbos:
self.trader.log(f"未找到交易对信息: {symbol}", level="ERROR", color="red")
return None

# 获取当前市场价格
bbo = self.bbos[symbol]
if side == "Buy":
price = bbo['ask_price'] # 买入时使用卖一价(ask)
else:
price = bbo['bid_price'] # 卖出时使用买一价(bid)

if price <= 0:
self.trader.log(f"无效价格: {price}", level="ERROR", color="red")
return None

# 计算可用的名义交易价值
available_balance = self.balances.get("USDT", {}).get("available_balance", 0)
self.trader.log(f"当前可用余额: {available_balance}", level="INFO", color="blue")

# 应用杠杆
max_notional_value = available_balance * self.leverage
self.trader.log(f"最大名义价值: {max_notional_value}", level="INFO", color="blue")

# 风险控制:只使用3%可用资金
notional_value = max(max_notional_value * 0.03, self.instruments[symbol]['min_notional'])
self.trader.log(f"开仓名义价值: {notional_value}", level="INFO", color="blue")

# 根据开仓名义价值计算开仓数量
amount = max(notional_value / price, self.instruments[symbol]['min_qty'])
self.trader.log(f"开仓数量(单位币数量): {amount}", level="INFO", color="blue")

real_notional_value = amount * price
self.trader.log(f"实际名义价值: {real_notional_value}", level="INFO", color="blue")

if real_notional_value > max_notional_value:
self.trader.log(f"余额不足,实际名义价值超过最大名义价值: {real_notional_value}", level="ERROR", color="red")
return None

# 准备下单参数
order = {
"cid": traderv2.create_cid(self.cex_configs[0]['exchange']), # 客户端订单ID
"symbol": symbol,
"order_type": "Limit", # 限价单
"side": side,
"pos_side": None, # 单向持仓模式
"time_in_force": "IOC", # 一直有效直到取消
"price": price,
"amount": amount,
}

params = {
"is_dual_side": False, # 单向持仓
"margin_mode": "Cross", # 全仓模式(okx需要)
}

return {
"order": order,
"params": params
}

def load_cache_data(self):
"""
加载缓存数据

从本地文件恢复策略运行状态,包括持仓历史和资金信息
"""

cache = self.trader.cache_load()
if cache:
self.position_cache = cache.get('position_cache', {})
self.initial_equity = cache.get('initial_equity', 10000.0)
self.start_time = cache.get('start_time', time.time())
self.trader.log("加载缓存数据成功", level="INFO", color="green")

def save_cache_data(self):
"""
保存缓存数据

将当前策略状态保存到本地文件,用于后续恢复
"""
cache = {
'position_cache': self.position_cache,
'initial_equity': self.initial_equity,
'start_time': self.start_time
}
self.trader.cache_save(cache)
self.trader.log("保存缓存数据成功", level="INFO", color="green")

def get_open_orders(self, symbol: str) -> list:
"""
获取当前所有挂单

查询指定交易对的未完成订单

参数:
symbol: 交易对名称

返回:
list: 挂单列表
"""
result = self.trader.get_open_orders(0, symbol)
if 'Ok' in result:
orders = result['Ok']
if isinstance(orders, list) and len(orders) > 0:
self.trader.log(f"获取挂单成功: {orders}", level="DEBUG")
return orders
else:
self.trader.log(f"获取挂单失败: {result}", level="ERROR")
return []

def cancel_all_orders(self):
"""
取消所有未完成订单

遍历所有交易品种,取消所有挂起的订单
"""
for symbol in self.symbols:
orders = self.get_open_orders(symbol)
for order in orders:
self.trader.log(f"取消订单: {order}", level="INFO", color="blue")
res = self.trader.cancel_order(0, symbol, order['id'])
self.trader.log(f"取消订单结果: {res}", level="INFO", color="blue")


def get_positions(self) -> list:
"""
获取当前持仓信息

查询所有交易品种的持仓状态

返回:
dict: 持仓信息字典
"""
res = self.trader.get_positions(0)
if 'Ok' in res:
return res['Ok']
else:
self.trader.log(f"获取持仓失败: {res}", level="ERROR", color="red")
return []

def close_all_positions(self):
"""
平掉所有持仓

遍历所有持仓,提交市价单平仓
"""
for position in self.get_positions():
symbol = position['symbol']
amount = position['amount']
if amount > 0:
self.trader.log(f"平仓仓位: {position}", level="INFO", color="blue")
pos_side = position['side'] # 持仓方向
# 确定平仓方向:持多仓卖出,持空仓买入
side = 'Sell' if pos_side == 'Long' else 'Buy'

# 准备市价平仓订单
order = {
"symbol": symbol,
"order_type": "Market", # 市价单,确保能立即成交
"side": side,
"pos_side": pos_side,
"time_in_force": "IOC", # 立即成交否则取消
"price": None, # 市价单无需指定价格
"amount": amount,
}

params = {
"is_dual_side": False,
"margin_mode": "Cross", # 全仓模式
"market_order_mode": "Normal" # 普通市价单
}

self.trader.log(f"平仓订单: {order}", level="INFO", color="blue")

# 提交市价平仓订单
res = self.trader.place_order(0, order, params)
self.trader.log(f"平仓结果: {res}", level="INFO", color="blue")

def calculate_profit(self):
"""
计算策略收益

计算策略运行期间的盈亏情况
"""
balance = self.trader.get_usdt_balance(0).get('Ok', {}).get("balance", 0.0)
profit = balance - self.initial_equity
self.trader.log(f"收益: {profit}", level="INFO", color="blue")

def upload_tables(self):
"""上传表格示例方法"""

spread_table = {
"title": "价差",
"cols": ["交易对", "价差(‰)"],
"rows": [
["BTC_USDT", "1.25"],
["ETH_USDT", "2.10"],
["SOL_USDT", "3.45"]
]
}

position_table = {
"title": "持仓",
"cols": ["交易对", "持仓量", "持仓价值", "浮动盈亏", "持仓时间"],
"rows": [
["BTC_USDT", "1.25", "10000", "100", "2小时15分"],
["ETH_USDT", "2.10", "20000", "200", "1小时30分"],
["SOL_USDT", "3.45", "30000", "300", "3小时45分"]
]
}

# 上传表格
self.trader.upload_tables([spread_table, position_table])
完整代码说明

以上是深度不平衡策略的完整代码示例(约 800 行)。代码包含:

  • ✅ 策略初始化和配置管理
  • ✅ 数据订阅配置(深度、BBO、订单、持仓等)
  • ✅ 核心交易逻辑(深度不平衡检测)
  • ✅ 订单管理和风险控制
  • ✅ Web 客户端集成和数据上传

如需查看更多策略示例或了解特定功能的实现,请参考:

策略代码结构

基本结构

每个策略必须包含以下基本结构:

import traderv2  # 交易执行器
import base_strategy # 基础策略类

class Strategy(base_strategy.BaseStrategy):
"""策略类必须命名为 Strategy,并继承 BaseStrategy"""

def __init__(self, cex_configs, dex_configs, config, trader: traderv2.TraderV2):
# 初始化代码
pass

def start(self):
# 必须实现:策略启动逻辑
pass

def subscribes(self):
# 必须实现:数据订阅配置
return []
重要
  • 策略类名必须为 Strategy
  • 必须继承 base_strategy.BaseStrategy
  • 必须实现 start()subscribes() 方法
方法实现说明

重要提示:您不需要实现所有方法!

  • 必须实现__init__(), start(), subscribes() 这三个方法
  • 按需实现:其他回调方法(如 on_bbo(), on_order(), on_position() 等)根据您的策略需求选择性实现即可

上面的完整示例代码包含了所有可能用到的回调方法,但这只是示例。在实际开发中,您只需要:

  1. 继承 BaseStrategy
  2. 实现必须的三个方法
  3. 根据策略逻辑需要,选择性复写相关的回调方法

例如,如果您的策略只需要处理深度数据,那么只需要实现 on_depth() 方法即可,不需要实现其他回调方法。

详细的回调方法说明请参考:策略基类文档

初始化参数说明

__init__ 方法接收以下参数:

参数类型说明
cex_configsList[Dict]中心化交易所配置列表,在新建实盘时选择
dex_configsList[Dict]去中心化交易所配置(暂未使用)
configDict策略参数配置,通过策略参数界面配置
traderTraderV2交易执行器,用于执行交易和查询数据

参数说明

  • cex_configs:包含交易所的 API Key、Secret 等信息,详细参数请参考交易所配置
  • config:策略运行时使用的参数,通过“策略参数”界面配置(见下方说明)
  • trader:提供交易、查询、日志等功能,详细 API 请参考核心功能

必须实现的方法

start() 方法

策略启动时自动调用,用于初始化工作:

  • 查询账户余额和持仓
  • 初始化交易对信息
  • 设置持仓模式和杠杆
  • 初始化 Web 客户端
def start(self):
"""策略启动时的初始化工作"""
# 查询余额
balance = self.trader.get_usdt_balance(0)

# 初始化交易对信息
for symbol in self.symbols:
instrument = self.trader.get_instrument(0, symbol)
# ...

# 初始化 Web 客户端
self.trader.init_web_client(web_config)
self.trader.start_web_client(upload_interval=5)

subscribes() 方法

返回策略需要订阅的数据源配置:

  • 行情数据(深度、BBO、K线等)
  • 账户数据(订单、持仓、余额等)
  • 定时器任务
def subscribes(self):
"""返回订阅配置"""
return [
{
"account_id": 0,
"sub": {
"SubscribeWs": [
{"Depth": {"symbols": self.symbols, "levels": 5}},
{"Bbo": self.symbols}
]
}
}
]

详细订阅配置请参考数据源订阅

可选回调函数

除了必须实现的方法外,还可以实现以下回调函数来处理各种事件:

回调函数说明触发时机
on_depth()处理深度数据收到订单簿深度更新
on_bbo()处理最优买卖价收到 BBO 更新
on_order()处理订单更新订单状态变化
on_position()处理持仓更新持仓数量变化
on_balance()处理余额更新账户余额变化
on_timer_subscribe()处理定时器事件定时器触发
on_stop()策略停止时调用策略停止时

详细说明请参考执行流程和回调函数

策略参数配置

策略参数允许您在创建实盘时动态配置策略的运行参数,无需修改代码。

添加策略参数

  1. 在策略编辑页面,滚动到“策略参数”区域
  2. 点击右上角“新增参数”按钮
  3. 填写参数信息:
    • 参数标题:显示在界面上的名称
    • 参数变量名称:代码中使用的变量名(必须与代码中的 config 键名一致)
    • 数据类型:选择参数类型
    • 默认值:参数的默认值

新增参数

支持的数据类型

  • 字符串:文本类型参数
  • 数字:整数或浮点数
  • 字典:JSON 对象格式
  • 数组:列表格式
  • 布尔:true/false
  • 交易配置数量:用于多交易所配置,设置后会在实盘页面显示多个交易所配置选项

新增参数详情

查看和编辑参数

策略参数列表会显示在策略编辑页面底部:

策略参数列表

在创建实盘时,也可以查看和编辑这些参数:

编辑策略参数

参数使用

在策略代码中,通过 config 字典访问参数值:

self.symbols = config['symbols']
self.leverage = config['leverage']

策略部署与调试

部署策略

  1. 查看策略详情

    • 在策略库页面,点击策略卡片进入策略详情页面
    • 可以查看策略代码、参数配置等信息

    策略详情

  2. 创建实盘

    • 点击菜单栏的“实盘”,进入实盘页面
    • 点击“添加实盘”按钮
    • 填写实盘信息:
      • 选择运行策略
      • 选择托管者(确保托管者在线)
      • 选择交易所配置
      • 配置策略参数
    • 点击“保存实盘”按钮
    重要

    在保存实盘时,必须确保托管者处于正常运行状态,否则会导致实盘无法保存。

    新增实盘完整流程

运行和监控

  1. 启动策略

    • 在实盘页面,找到您创建的实盘
    • 点击“操作”列中的“运行”按钮
    • 策略开始运行
  2. 停止策略

    • 在运行过程中,可以点击“停止”按钮停止策略
    • 策略会执行清理工作(取消订单、平仓等)
  3. 查看运行状态

    • 点击实盘名称进入实盘详情页面
    • 查看策略运行状态、日志、持仓等信息

    实盘详情

  4. 查看日志

    • 在实盘详情页面查看实时日志
    • 或在服务器上查看托管者的日志文件

常见问题

策略无法启动

  • 检查托管者是否在线
  • 检查交易所配置是否正确
  • 查看日志中的错误信息

策略参数不生效

  • 确保参数变量名称与代码中的键名一致
  • 检查参数数据类型是否正确
  • 确认参数已正确保存

订单无法提交

  • 检查账户余额是否充足
  • 确认交易对名称格式正确(如 BTC_USDT
  • 查看交易所 API 权限设置

数据订阅失败

  • 检查 subscribes() 方法返回格式是否正确
  • 确认交易对名称在交易所存在
  • 查看订阅配置文档:数据源订阅

相关文档