跳到主要内容

策略基类

本文档介绍策略基类 BaseStrategy,该类定义了策略开发所需的基本接口和回调方法。所有自定义策略都应该继承这个基类。

目录

快速开始

如果您是第一次使用,建议先阅读:

概述

BaseStrategy 是策略开发的基类,提供了:

  • 必须实现的方法name(), start(), subscribes()
  • 可选的回调方法:处理各种事件(订单、持仓、行情等)
  • 生命周期管理:策略启动、停止、配置更新等

初始化方法

init

def __init__(self, cex_configs, dex_configs, config, trader: traderv2.TraderV2)

初始化策略对象。在策略实例化时自动调用。

参数:

参数类型说明
cex_configslist[dict]中心化交易所配置列表,包含 API Key、Secret 等信息
dex_configslist[dict]去中心化交易所配置列表(暂未使用)
configdict策略配置参数,通过策略参数界面或配置文件传入
traderTraderV2交易执行器对象,提供交易、查询、日志等功能

示例:

def __init__(self, cex_configs, dex_configs, config, trader):
self.cex_configs = cex_configs
self.dex_configs = dex_configs
self.trader = trader
self.config = config

# 初始化策略状态
self.positions = {}
self.orders = {}
self.balances = {}
Trader 对象

trader 对象提供了丰富的功能,详情请参考:

基础方法

name

def name(self)

返回策略的名称。

返回:

  • str: 策略名称

start

def start(self)

启动策略执行。在这里执行策略初始化的操作,比如查询交易币种信息、设置杠杆等。

subscribes

def subscribes(self)

返回策略订阅的事件列表。

订阅分为三种类型:

  1. SubscribeWs: 订阅交易所相关 WebSocket 频道,接收到消息会推送到策略回调

    • 支持的订阅频道:
      • MarkPrice: 标记价格
      • Bbo: 最佳买卖价
      • Depth: 市场深度
      • Kline: K线数据
      • Funding: 资金费率
      • Trade: 成交
      • Order: 订单
      • Position: 仓位
      • Balance: 余额
      • FundingFee: 结算资金费
      • OrderAndFill: 订单和用户私有成交(订单频道和用户成交频道哪个快推哪个)
  2. SubscribeRest: 会启动异步任务使用 HTTP 定期轮询某些接口并推送到策略回调

    • 支持轮询的接口:
      • Funding: 资金费率
      • Balance: 余额
      • Position: 仓位
      • Instrument: 合约信息
  3. SubscribeTimer: 会启动异步任务定时执行 on_timer_subscribe 回调

返回:

  • list: 订阅的事件列表

详细订阅配置请参考数据源订阅

连接事件回调

on_ws_connected

def on_ws_connected(self, exchange, account_id)

WebSocket连接建立时触发。

参数:

  • exchange (str): 交易所名称
  • account_id (str): 账户ID

on_ws_disconnected

def on_ws_disconnected(self, exchange, account_id)

WebSocket连接断开时触发。

参数:

  • exchange (str): 交易所名称
  • account_id (str): 账户ID

系统事件回调

on_latency

def on_latency(self, latency, account_id)

延迟统计时触发。

参数:

  • latency (dict): 延迟信息
  • account_id (str): 账户ID

on_cmd

def on_cmd(self, cmd)

当收到服务端下发的命令时触发。命令通过 AsyncMsg 结构体传递,包含命令ID和具体命令内容。

参数:

  • cmd (dict): 命令消息,包含以下字段:
    • id (str): 命令ID
    • cmd (dict): 命令内容,可能是标准命令(StandCmd)或自定义命令(CustomCmd)

标准命令(StandCmd)类型:

  1. Close: 平仓命令

    • 参数: Vec<String> 交易对列表,如 ["BTCUSDT", "ETHUSDT"]
    • 作用: 立即平掉指定交易对的所有仓位
  2. SlowClose: 缓慢平仓命令

    • 参数: Vec<String> 交易对列表,如 ["BTCUSDT", "ETHUSDT"]
    • 作用: 通过小单逐步平掉指定交易对的所有仓位
  3. Pause: 暂停交易命令

    • 参数: Vec<String> 交易对列表,如 ["BTCUSDT", "ETHUSDT"]
    • 作用: 暂停指定交易对的交易操作
  4. Resume: 恢复交易命令

    • 参数: Vec<String> 交易对列表,如 ["BTCUSDT", "ETHUSDT"]
    • 作用: 恢复指定交易对的交易操作

自定义命令(CustomCmd)格式:

{
"cmd": "命令名称",
"params": "命令参数(JSON字符串)"
}

命令响应: 策略需要通过 AsyncCmdRsp 响应命令执行结果:

{
"id": "命令ID",
"code": 0, // 0表示成功,其他值表示错误码
"msg": "执行结果描述"
}

示例:

def on_cmd(self, cmd):
cmd_id = cmd["id"]
cmd_content = cmd["cmd"]

try:
if cmd_content["type"] == "StandCmd":
# 处理标准命令
command = cmd_content["command"]
symbols = cmd_content["params"]

if command == "Close":
# 处理平仓逻辑
result = self.handle_close(symbols)
elif command == "Pause":
# 处理暂停交易逻辑
result = self.handle_pause(symbols)
# ... 处理其他标准命令

elif cmd_content["type"] == "CustomCmd":
# 处理自定义命令
custom_cmd = cmd_content["cmd"]
params = json.loads(cmd_content["params"])
result = self.handle_custom_command(custom_cmd, params)

# 返回命令执行结果
return {
"id": cmd_id,
"code": 0 if result else 1,
"msg": "success" if result else "failed"
}

except Exception as e:
# 返回错误信息
return {
"id": cmd_id,
"code": 1,
"msg": str(e)
}

on_timer_subscribe

def on_timer_subscribe(self, timer_name)

定时器触发时触发。当在 subscribes() 中配置了 SubscribeTimer 时,会根据配置的间隔时间定期调用此方法。

参数:

  • timer_name (str): 定时器名称,与 subscribes() 中配置的名称对应

示例:

def on_timer_subscribe(self, timer_name):
if timer_name == "check_orders":
# 检查订单状态
self.check_order_status()
elif timer_name == "update_stats":
# 更新统计数据
self.update_statistics()

on_subscribe

def on_subscribe(self, ws_id, channels, result)

订阅成功时触发。

参数:

  • ws_id (str): WebSocket ID
  • channels (list): 订阅的频道列表
  • result: 订阅结果

on_unsubscribe

def on_unsubscribe(self, ws_id, channels, result)

取消订阅成功时触发。

参数:

  • ws_id (str): WebSocket ID
  • channels (list): 取消订阅的频道列表
  • result: 取消订阅结果

订单相关回调

on_order_submitted

def on_order_submitted(self, account_id, order_id_result, order)

订单提交成功时触发。当订单提交到交易所后,交易所返回订单ID时触发。

参数:

  • account_id (str): 账户ID
  • order_id_result: 包含订单ID的Result,可能为 Ok(order_id)Err(error)
  • order (dict): 订单信息

示例:

def on_order_submitted(self, account_id, order_id_result, order):
if 'Ok' in order_id_result:
order_id = order_id_result['Ok']
self.trader.log(f"订单提交成功: {order_id}", level="INFO")
# 保存订单记录
self.orders[order_id] = order
else:
error = order_id_result.get('Err', '未知错误')
self.trader.log(f"订单提交失败: {error}", level="ERROR")

on_batch_order_submitted

def on_batch_order_submitted(self, account_id, order_ids_result)

批量订单提交成功时触发。

参数:

  • account_id (str): 账户ID
  • order_ids_result: 订单ID列表的Result,可能为 Ok([order_ids])Err(error)

on_order_canceled

def on_order_canceled(self, account_id, result, id, symbol)

订单取消成功时触发。

参数:

  • account_id (str): 账户ID
  • result: 取消结果
  • id (str): 撤单时传入的订单ID或订单CID
  • symbol (str): 撤单时传入的交易对

on_batch_order_canceled

def on_batch_order_canceled(self, account_id, order_ids_result)

批量订单取消成功时触发。

参数:

  • account_id (str): 账户ID
  • order_ids_result: 订单ID列表的Result

on_batch_order_canceled_by_ids

def on_batch_order_canceled_by_ids(self, account_id, order_ids_result)

通过订单ID批量取消订单成功时触发。

参数:

  • account_id (str): 账户ID
  • order_ids_result: 订单ID列表的Result

on_order_amended

def on_order_amended(self, account_id, result, order)

订单修改成功时触发。

参数:

  • account_id (str): 账户ID
  • result: 修改结果
  • order (dict): 修改订单时传入的订单信息

on_batch_amend_order

def on_batch_amend_order(self, account_id, order_ids_result)

批量订单修改成功时触发。

参数:

  • account_id (str): 账户ID
  • order_ids_result: 订单ID列表的Result

on_order

def on_order(self, account_id, order)

订单状态更新时触发。当订单状态发生变化(如已成交、已取消等)时触发。

参数:

  • account_id (str): 账户ID
  • order (dict): 订单对象,包含订单状态、成交信息等

示例:

def on_order(self, account_id, order):
order_id = order['id']
status = order['status']

if status == 'Filled':
self.trader.log(f"订单已成交: {order_id}", level="INFO")
elif status == 'Canceled':
self.trader.log(f"订单已取消: {order_id}", level="INFO")

on_order_and_fill

def on_order_and_fill(self, account_id, order)

订单/用户私有成交更新时触发。订单频道和用户成交频道哪个快推哪个。

参数:

  • account_id (str): 账户ID
  • order (dict): 订单对象
使用建议

on_order_and_fillon_order 的区别:

  • on_order: 仅接收订单状态更新
  • on_order_and_fill: 接收订单和成交更新,哪个频道快就推哪个,适合需要最快获取成交信息的场景

账户相关回调

on_position

def on_position(self, account_id, positions)

持仓更新时触发。

参数:

  • account_id (str): 账户ID
  • positions: 持仓对象列表

on_balance

def on_balance(self, account_id, balances)

账户余额更新时触发。当账户余额发生变化时触发。

参数:

  • account_id (str): 账户ID
  • balances (list): 余额对象列表,每个对象包含资产类型、可用余额、总余额等信息

示例:

def on_balance(self, account_id, balances):
for balance in balances:
asset = balance['asset']
available = balance['available_balance']
total = balance['balance']
self.balances[asset] = balance
self.trader.log(f"{asset} 余额: 可用={available}, 总额={total}", level="INFO")

on_funding_fee

def on_funding_fee(self, account_id, funding_fee)

资金费结算时触发。

参数:

  • account_id (str): 账户ID
  • funding_fee (dict): 资金费对象,包含结算金额、时间等信息

市场数据回调

on_bbo

def on_bbo(self, exchange, bbo)

最佳买卖价变动时触发。当最优买一价或卖一价发生变化时触发。

参数:

  • exchange (str): 交易所名称
  • bbo (dict): 最佳买一卖一对象,包含 symbolbid_priceask_pricebid_sizeask_size 等字段

示例:

def on_bbo(self, exchange, bbo):
symbol = bbo['symbol']
bid_price = bbo['bid_price']
ask_price = bbo['ask_price']
spread = ask_price - bid_price
# 更新BBO数据用于交易决策
self.bbos[symbol] = bbo

on_depth

def on_depth(self, exchange, depth)

市场深度更新时触发。当订单簿深度数据更新时触发。

参数:

  • exchange (str): 交易所名称
  • depth (dict): 市场深度对象,包含 symbolbids(买盘)、asks(卖盘)等字段

示例:

def on_depth(self, exchange, depth):
symbol = depth['symbol']
bids = depth['bids'] # 买盘列表,格式: [[price, size], ...]
asks = depth['asks'] # 卖盘列表,格式: [[price, size], ...]

# 计算买卖盘总量
bid_total = sum(level[1] for level in bids[:5])
ask_total = sum(level[1] for level in asks[:5])

# 策略逻辑处理
if bid_total > ask_total * 1.1:
# 买盘压力大,考虑做多
pass

on_ticker

def on_ticker(self, exchange, ticker)

最新成交价更新时触发。

参数:

  • exchange (str): 交易所名称
  • ticker (dict): 行情数据对象,包含最新成交价、24小时涨跌幅等信息

on_trade

def on_trade(self, exchange, trade)

成交更新时触发。当市场有新的成交记录时触发。

参数:

  • exchange (str): 交易所名称
  • trade (dict): 成交对象,包含 symbolpricesizesidetime 等字段

on_funding

def on_funding(self, exchange, fundings)

资金费率更新时触发。

参数:

  • exchange (str): 交易所名称
  • fundings: 资金费率对象列表

on_mark_price

def on_mark_price(self, exchange, mark_price)

标记价格更新时触发。标记价格用于计算未实现盈亏,在合约交易中非常重要。

参数:

  • exchange (str): 交易所名称
  • mark_price (dict): 标记价格对象,包含 symbolpricetime 等字段

on_kline

def on_kline(self, exchange, kline)

K线数据更新时触发。

参数:

  • exchange (str): 交易所名称
  • kline (dict): K线对象,包含 symbolopenhighlowclosevolumetime 等字段

on_instrument

def on_instrument(self, exchange, instruments)

合约信息更新时触发。当交易对信息发生变化时触发。

参数:

  • exchange (str): 交易所名称
  • instruments (list): 合约信息对象列表,每个对象包含交易对名称、最小下单量、价格精度等信息

on_instrument_added

def on_instrument_added(self, exchange, instruments)

交易对信息新增时触发。当交易所新增交易对时触发。

参数:

  • exchange (str): 交易所名称
  • instruments (list): 新增的合约信息对象列表

on_instrument_updated

def on_instrument_updated(self, exchange, instruments)

交易对信息更新时触发。当交易对的配置信息(如最小下单量、价格精度等)发生变化时触发。

参数:

  • exchange (str): 交易所名称
  • instruments (list): 更新的合约信息对象列表

on_instrument_removed

def on_instrument_removed(self, exchange, instruments)

交易对信息删除时触发。当交易所下架交易对时触发。

参数:

  • exchange (str): 交易所名称
  • instruments (list): 删除的合约信息对象列表

其他回调

on_dex_data

def on_dex_data(self, account_id, data)

DEX(去中心化交易所)数据更新时触发。

参数:

  • account_id (str): 账户ID
  • data (dict): 数据对象

on_stop

def on_stop(self)

停止策略时触发。在策略停止时自动调用,用于执行清理工作,如取消订单、平仓、保存数据等。

示例:

def on_stop(self):
self.trader.log("策略正在停止...", level="INFO")

# 取消所有订单
for symbol in self.symbols:
orders = self.trader.get_open_orders(0, symbol)
for order in orders:
self.trader.cancel_order(0, symbol, order['id'])

# 平掉所有持仓
positions = self.trader.get_positions(0)
for position in positions:
if position['amount'] > 0:
# 平仓逻辑
pass

# 保存缓存数据
self.save_cache()

on_config_update

def on_config_update(self, config)

热更新策略配置时触发。在策略运行期间更新配置参数时触发,无需重启策略。

参数:

  • config (dict): 新的配置对象

示例:

def on_config_update(self, config):
self.trader.log(f"配置更新: {config}", level="INFO")

# 更新策略参数
if 'leverage' in config:
self.leverage = config['leverage']
if 'symbols' in config:
self.symbols = config['symbols']
热更新

配置热更新允许在不停止策略的情况下调整参数,非常适合实盘交易中的参数调优。

方法实现要求

必须实现的方法

以下方法必须在策略类中实现:

方法说明是否必须
name()返回策略名称✅ 必须
start()策略启动时的初始化工作✅ 必须
subscribes()返回数据订阅配置✅ 必须

可选实现的方法

其他所有方法都是可选的,根据策略需求选择性实现。只实现您需要处理的事件回调即可。

使用示例

最小策略示例

import base_strategy
import traderv2

class MyStrategy(base_strategy.BaseStrategy):
def __init__(self, cex_configs, dex_configs, config, trader):
self.trader = trader
self.config = config

def name(self):
return "我的策略"

def start(self):
self.trader.log("策略启动", level="INFO")

def subscribes(self):
return [
{
"account_id": 0,
"sub": {
"SubscribeWs": [
{"Bbo": ["BTC_USDT"]}
]
}
}
]

def on_bbo(self, exchange, bbo):
self.trader.log(f"BBO更新: {bbo}", level="DEBUG")

最佳实践

  1. 合理使用回调方法

    • 只在需要时实现相应的回调方法
    • 避免在回调方法中执行耗时操作
  2. 错误处理

    • 在回调方法中添加异常处理
    • 使用 trader.log() 记录错误信息
  3. 状态管理

    • __init__ 中初始化所有状态变量
    • on_stop 中清理资源
  4. 性能优化

    • 避免在回调中执行同步阻塞操作
    • 使用缓存减少重复查询

相关文档