策略基类
本文档介绍策略基类 BaseStrategy,该类定义了策略开发所需的基本接口和回调方法。所有自定义策略都应该继承这个基类。
目录
如果您是第一次使用,建议先阅读:
- Web平台策略开发指南 - 了解如何在Web平台开发策略
- 快速入门 - 了解基本操作流程
概述
BaseStrategy 是策略开发的基类,提供了:
- ✅ 必须实现的方法:
name(),start(),subscribes() - ✅ 可选的回调方法:处理各种事件(订单、持仓、行情等)
- ✅ 生命周期管理:策略启动、停止、配置更新等
初始化方法
init
def __init__(self, cex_configs, dex_configs, config, trader: traderv2.TraderV2)
初始化策略对象。在策略实例化时自动调用。
参数:
| 参数 | 类型 | 说明 |
|---|---|---|
cex_configs | list[dict] | 中心化交易所配置列表,包含 API Key、Secret 等信息 |
dex_configs | list[dict] | 去中心化交易所配置列表(暂未使用) |
config | dict | 策略配置参数,通过策略参数界面或配置文件传入 |
trader | TraderV2 | 交易执行器对象,提供交易、查询、日志等功能 |
示例:
def __init__(self, cex_configs, dex_configs, config, trader):
self.cex_configs = cex_configs
self.dex_configs = dex_configs
self.trader = trader
self.config = config
# 初始化策略状态
self.positions = {}
self.orders = {}
self.balances = {}
基础方法
name
def name(self)
返回策略的名称。
返回:
- str: 策略名称
start
def start(self)
启动策略执行。在这里执行策略初始化的操作,比如查询交易币种信息、设置杠杆等。
subscribes
def subscribes(self)
返回策略订阅的事件列表。
订阅分为三种类型:
-
SubscribeWs: 订阅交易所相关 WebSocket 频道,接收到消息会推送到策略回调
- 支持的订阅频道:
MarkPrice: 标记价格Bbo: 最佳买卖价Depth: 市场深度Kline: K线数据Funding: 资金费率Trade: 成交Order: 订单Position: 仓位Balance: 余额FundingFee: 结算资金费OrderAndFill: 订单和用户私有成交(订单频道和用户成交频道哪个快推哪个)
- 支持的订阅频道:
-
SubscribeRest: 会启动异步任务使用 HTTP 定期轮询某些接口并推送到策略回调
- 支持轮询的接口:
Funding: 资金费率Balance: 余额Position: 仓位Instrument: 合约信息
- 支持轮询的接口:
-
SubscribeTimer: 会启动异步任务定时执行
on_timer_subscribe回调
返回:
list: 订阅的事件列表
详细订阅配置请参考数据源订阅。
连接事件回调
on_ws_connected
def on_ws_connected(self, exchange, account_id)
WebSocket连接建立时触发。
参数:
exchange(str): 交易所名称account_id(str): 账户ID
on_ws_disconnected
def on_ws_disconnected(self, exchange, account_id)
WebSocket连接断开时触发。
参数:
exchange(str): 交易所名称account_id(str): 账户ID
系统事件回调
on_latency
def on_latency(self, latency, account_id)
延迟统计时触发。
参数:
latency(dict): 延迟信息account_id(str): 账户ID
on_cmd
def on_cmd(self, cmd)
当收到服务端下发的命令时触发。命令通过 AsyncMsg 结构体传递,包含命令ID和具体命令内容。
参数:
cmd(dict): 命令消息,包含以下字段:id(str): 命令IDcmd(dict): 命令内容,可能是标准命令(StandCmd)或自定义命令(CustomCmd)
标准命令(StandCmd)类型:
-
Close: 平仓命令
- 参数:
Vec<String>交易对列表,如["BTCUSDT", "ETHUSDT"] - 作用: 立即平掉指定交易对的所有仓位
- 参数:
-
SlowClose: 缓慢平仓命令
- 参数:
Vec<String>交易对列表,如["BTCUSDT", "ETHUSDT"] - 作用: 通过小单逐步平掉指定交易对的所有仓位
- 参数:
-
Pause: 暂停交易命令
- 参数:
Vec<String>交易对列表,如["BTCUSDT", "ETHUSDT"] - 作用: 暂停指定交易对的交易操作
- 参数:
-
Resume: 恢复交易命令
- 参数:
Vec<String>交易对列表,如["BTCUSDT", "ETHUSDT"] - 作用: 恢复指定交易对的交易操作
- 参数:
自定义命令(CustomCmd)格式:
{
"cmd": "命令名称",
"params": "命令参数(JSON字符串)"
}
命令响应:
策略需要通过 AsyncCmdRsp 响应命令执行结果:
{
"id": "命令ID",
"code": 0, // 0表示成功,其他值表示错误码
"msg": "执行结果描述"
}
示例:
def on_cmd(self, cmd):
cmd_id = cmd["id"]
cmd_content = cmd["cmd"]
try:
if cmd_content["type"] == "StandCmd":
# 处理标准命令
command = cmd_content["command"]
symbols = cmd_content["params"]
if command == "Close":
# 处理平仓逻辑
result = self.handle_close(symbols)
elif command == "Pause":
# 处理暂停交易逻辑
result = self.handle_pause(symbols)
# ... 处理其他标准命令
elif cmd_content["type"] == "CustomCmd":
# 处理自定义命令
custom_cmd = cmd_content["cmd"]
params = json.loads(cmd_content["params"])
result = self.handle_custom_command(custom_cmd, params)
# 返回命令执行结果
return {
"id": cmd_id,
"code": 0 if result else 1,
"msg": "success" if result else "failed"
}
except Exception as e:
# 返回错误信息
return {
"id": cmd_id,
"code": 1,
"msg": str(e)
}
on_timer_subscribe
def on_timer_subscribe(self, timer_name)
定时器触发时触发。当在 subscribes() 中配置了 SubscribeTimer 时,会根据配置的间隔时间定期调用此方法。
参数:
timer_name(str): 定时器名称,与subscribes()中配置的名称对应
示例:
def on_timer_subscribe(self, timer_name):
if timer_name == "check_orders":
# 检查订单状态
self.check_order_status()
elif timer_name == "update_stats":
# 更新统计数据
self.update_statistics()
on_subscribe
def on_subscribe(self, ws_id, channels, result)
订阅成功时触发。
参数:
ws_id(str): WebSocket IDchannels(list): 订阅的频道列表result: 订阅结果
on_unsubscribe
def on_unsubscribe(self, ws_id, channels, result)
取消订阅成功时触发。
参数:
ws_id(str): WebSocket IDchannels(list): 取消订阅的频道列表result: 取消订阅结果
订单相关回调
on_order_submitted
def on_order_submitted(self, account_id, order_id_result, order)
订单提交成功时触发。当订单提交到交易所后,交易所返回订单ID时触发。
参数:
account_id(str): 账户IDorder_id_result: 包含订单ID的Result,可能为Ok(order_id)或Err(error)order(dict): 订单信息
示例:
def on_order_submitted(self, account_id, order_id_result, order):
if 'Ok' in order_id_result:
order_id = order_id_result['Ok']
self.trader.log(f"订单提交成功: {order_id}", level="INFO")
# 保存订单记录
self.orders[order_id] = order
else:
error = order_id_result.get('Err', '未知错误')
self.trader.log(f"订单提交失败: {error}", level="ERROR")
on_batch_order_submitted
def on_batch_order_submitted(self, account_id, order_ids_result)
批量订单提交成功时触发。
参数:
account_id(str): 账户IDorder_ids_result: 订单ID列表的Result,可能为Ok([order_ids])或Err(error)
on_order_canceled
def on_order_canceled(self, account_id, result, id, symbol)
订单取消成功时触发。
参数:
account_id(str): 账户IDresult: 取消结果id(str): 撤单时传入的订单ID或订单CIDsymbol(str): 撤单时传入的交易对
on_batch_order_canceled
def on_batch_order_canceled(self, account_id, order_ids_result)
批量订单取消成功时触发。
参数:
account_id(str): 账户IDorder_ids_result: 订单ID列表的Result
on_batch_order_canceled_by_ids
def on_batch_order_canceled_by_ids(self, account_id, order_ids_result)
通过订单ID批量取消订单成功时触发。
参数:
account_id(str): 账户IDorder_ids_result: 订单ID列表的Result
on_order_amended
def on_order_amended(self, account_id, result, order)
订单修改成功时触发。
参数:
account_id(str): 账户IDresult: 修改结果order(dict): 修改订单时传入的订单信息
on_batch_amend_order
def on_batch_amend_order(self, account_id, order_ids_result)
批量订单修改成功时触发。
参数:
account_id(str): 账户IDorder_ids_result: 订单ID列表的Result
on_order
def on_order(self, account_id, order)
订单状态更新时触发。当订单状态发生变化(如已成交、已取消等)时触发。
参数:
account_id(str): 账户IDorder(dict): 订单对象,包含订单状态、成交信息等
示例:
def on_order(self, account_id, order):
order_id = order['id']
status = order['status']
if status == 'Filled':
self.trader.log(f"订单已成交: {order_id}", level="INFO")
elif status == 'Canceled':
self.trader.log(f"订单已取消: {order_id}", level="INFO")
on_order_and_fill
def on_order_and_fill(self, account_id, order)
订单/用户私有成交更新时触发。订单频道和用户成交频道哪个快推哪个。
参数:
account_id(str): 账户IDorder(dict): 订单对象
on_order_and_fill 和 on_order 的区别:
on_order: 仅接收订单状态更新on_order_and_fill: 接收订单和成交更新,哪个频道快就推哪个,适合需要最快获取成交信息的场景
账户相关回调
on_position
def on_position(self, account_id, positions)
持仓更新时触发。
参数:
account_id(str): 账户IDpositions: 持仓对象列表
on_balance
def on_balance(self, account_id, balances)
账户余额更新时触发。当账户余额发生变化时触发。
参数:
account_id(str): 账户IDbalances(list): 余额对象列表,每个对象包含资产类型、可用余额、总余额等信息
示例:
def on_balance(self, account_id, balances):
for balance in balances:
asset = balance['asset']
available = balance['available_balance']
total = balance['balance']
self.balances[asset] = balance
self.trader.log(f"{asset} 余额: 可用={available}, 总额={total}", level="INFO")
on_funding_fee
def on_funding_fee(self, account_id, funding_fee)
资金费结算时触发。
参数:
account_id(str): 账户IDfunding_fee(dict): 资金费对象,包含结算金额、时间等信息
市场数据回调
on_bbo
def on_bbo(self, exchange, bbo)
最佳买卖价变动时触发。当最优买一价或卖一价发生变化时触发。
参数:
exchange(str): 交易所名称bbo(dict): 最佳买一卖一对象,包含symbol、bid_price、ask_price、bid_size、ask_size等字段
示例:
def on_bbo(self, exchange, bbo):
symbol = bbo['symbol']
bid_price = bbo['bid_price']
ask_price = bbo['ask_price']
spread = ask_price - bid_price
# 更新BBO数据用于交易决策
self.bbos[symbol] = bbo
on_depth
def on_depth(self, exchange, depth)
市场深度更新时触发。当订单簿深度数据更新时触发。
参数:
exchange(str): 交易所名称depth(dict): 市场深度对象,包含symbol、bids(买盘)、asks(卖盘)等字段
示例:
def on_depth(self, exchange, depth):
symbol = depth['symbol']
bids = depth['bids'] # 买盘列表,格式: [[price, size], ...]
asks = depth['asks'] # 卖盘列表,格式: [[price, size], ...]
# 计算买卖盘总量
bid_total = sum(level[1] for level in bids[:5])
ask_total = sum(level[1] for level in asks[:5])
# 策略逻辑处理
if bid_total > ask_total * 1.1:
# 买盘压力大,考虑做多
pass
on_ticker
def on_ticker(self, exchange, ticker)
最新成交价更新时触发。
参数:
exchange(str): 交易所名称ticker(dict): 行情数据对象,包含最新成交价、24小时涨跌幅等信息
on_trade
def on_trade(self, exchange, trade)
成交更新时触发。当市场有新的成交记录时触发。
参数:
exchange(str): 交易所名称trade(dict): 成交对象,包含symbol、price、size、side、time等字段
on_funding
def on_funding(self, exchange, fundings)
资金费率更新时触发。
参数:
exchange(str): 交易所名称fundings: 资金费率对象列表
on_mark_price
def on_mark_price(self, exchange, mark_price)
标记价格更新时触发。标记价格用于计算未实现盈亏,在合约交易中非常重要。
参数:
exchange(str): 交易所名称mark_price(dict): 标记价格对象,包含symbol、price、time等字段
on_kline
def on_kline(self, exchange, kline)
K线数据更新时触发。
参数:
exchange(str): 交易所名称kline(dict): K线对象,包含symbol、open、high、low、close、volume、time等字段
on_instrument
def on_instrument(self, exchange, instruments)
合约信息更新时触发。当交易对信息发生变化时触发。
参数:
exchange(str): 交易所名称instruments(list): 合约信息对象列表,每个对象包含交易对名称、最小下单量、价格精度等信息
on_instrument_added
def on_instrument_added(self, exchange, instruments)
交易对信息新增时触发。当交易所新增交易对时触发。
参数:
exchange(str): 交易所名称instruments(list): 新增的合约信息对象列表
on_instrument_updated
def on_instrument_updated(self, exchange, instruments)
交易对信息更新时触发。当交易对的配置信息(如最小下单量、价格精度等)发生变化时触发。
参数:
exchange(str): 交易所名称instruments(list): 更新的合约信息对象列表
on_instrument_removed
def on_instrument_removed(self, exchange, instruments)
交易对信息删除时触发。当交易所下架交易对时触发。
参数:
exchange(str): 交易所名称instruments(list): 删除的合约信息对象列表
其他回调
on_dex_data
def on_dex_data(self, account_id, data)
DEX(去中心化交易所)数据更新时触发。
参数:
account_id(str): 账户IDdata(dict): 数据对象
on_stop
def on_stop(self)
停止策略时触发。在策略停止时自动调用,用于执行清理工作,如取消订单、平仓、保存数据等。
示例:
def on_stop(self):
self.trader.log("策略正在停止...", level="INFO")
# 取消所有订单
for symbol in self.symbols:
orders = self.trader.get_open_orders(0, symbol)
for order in orders:
self.trader.cancel_order(0, symbol, order['id'])
# 平掉所有持仓
positions = self.trader.get_positions(0)
for position in positions:
if position['amount'] > 0:
# 平仓逻辑
pass
# 保存缓存数据
self.save_cache()
on_config_update
def on_config_update(self, config)
热更新策略配置时触发。在策略运行期间更新配置参数时触发,无需重启策略。
参数:
config(dict): 新的配置对象
示例:
def on_config_update(self, config):
self.trader.log(f"配置更新: {config}", level="INFO")
# 更新策略参数
if 'leverage' in config:
self.leverage = config['leverage']
if 'symbols' in config:
self.symbols = config['symbols']
配置热更新允许在不停止策略的情况下调整参数,非常适合实盘交易中的参数调优。
方法实现要求
必须实现的方法
以下方法必须在策略类中实现:
| 方法 | 说明 | 是否必须 |
|---|---|---|
name() | 返回策略名称 | ✅ 必须 |
start() | 策略启动时的初始化工作 | ✅ 必须 |
subscribes() | 返回数据订阅配置 | ✅ 必须 |
可选实现的方法
其他所有方法都是可选的,根据策略需求选择性实现。只实现您需要处理的事件回调即可。
使用示例
最小策略示例
import base_strategy
import traderv2
class MyStrategy(base_strategy.BaseStrategy):
def __init__(self, cex_configs, dex_configs, config, trader):
self.trader = trader
self.config = config
def name(self):
return "我的策略"
def start(self):
self.trader.log("策略启动", level="INFO")
def subscribes(self):
return [
{
"account_id": 0,
"sub": {
"SubscribeWs": [
{"Bbo": ["BTC_USDT"]}
]
}
}
]
def on_bbo(self, exchange, bbo):
self.trader.log(f"BBO更新: {bbo}", level="DEBUG")
最佳实践
-
合理使用回调方法
- 只在需要时实现相应的回调方法
- 避免在回调方法中执行耗时操作
-
错误处理
- 在回调方法中添加异常处理
- 使用
trader.log()记录错误信息
-
状态管理
- 在
__init__中初始化所有状态变量 - 在
on_stop中清理资源
- 在
-
性能优化
- 避免在回调中执行同步阻塞操作
- 使用缓存减少重复查询
- Web平台策略开发指南 - 了解如何在Web平台开发策略
- 数据源订阅 - 了解数据订阅配置
- 核心功能 - 了解 Trader API 使用方法
- 执行流程 - 了解策略执行机制