跳到主要内容

策略基类

本节介绍策略基类 BaseStrategy,该类定义了策略开发所需的基本接口和回调方法。所有自定义策略都应该继承这个基类。

初始化方法

init

def __init__(self, cex_configs, dex_configs, config, trader: trader.Trader)

初始化策略对象。

参数:

  • cex_configs (list): CEX交易所配置列表
  • dex_configs (list): DEX交易所配置列表
  • config (dict): 策略配置
  • trader (trader.Trader): 消息总线的包装器对象,主要提供以下方法,详情请参考Trader API 指令
    • publish(cmd): 发布交易指令

      • 参数: cmd (dict) - 交易指令
      • 返回: 指令执行结果
      • 说明: 用于发送交易相关指令,如下单、撤单等
    • log(msg, level=None, event=None): 记录日志

      • 参数:
        • msg (str): 日志消息
        • level (str, 可选): 日志级别,可选值:
          • "TRACE": 追踪级别
          • "DEBUG": 调试级别
          • "INFO": 信息级别(默认)
          • "WARN": 警告级别
          • "ERROR": 错误级别
        • event (str, 可选): 事件标识
      • 说明: 用于记录策略运行过程中的各类日志信息

示例:

def __init__(self, cex_configs, dex_configs, config, trader):
# 记录普通日志
trader.log("策略初始化开始")

# 记录带级别的日志
trader.log("调试信息", level="DEBUG")

# 记录带事件标识的日志
trader.log("订单提交", level="INFO", event="ORDER_SUBMIT")

# 发布交易指令
order_cmd = {
"type": "limit",
"symbol": "BTC_USDT",
"price": 50000,
"quantity": 0.1
}
result = trader.publish(order_cmd)

基础方法

name

def name(self)

返回策略的名称。

返回:

  • str: 策略名称

start

def start(self)

启动策略执行。在这里执行策略初始化的操作,比如查询交易币种信息、设置杠杆等。

subscribes

def subscribes(self)

返回策略订阅的事件列表。

订阅分为三种类型:

  1. SubscribeWs: 订阅交易所相关websocket频道,接收到消息会推到策略回调

    • 支持的订阅频道:
      • MarkPrice: 标记价格
      • Bbo: 最佳买卖价
      • Depth: 市场深度
      • Funding: 资金费率
      • Trade: 成交
      • Order: 订单
      • Position: 仓位
      • Balance: 余额
      • FundingFee: 结算资金费
  2. SubscribeRest: 会起异步任务使用HTTP定期轮询某些接口并推送到策略回调

    • 支持轮询的接口:
      • Funding: 资金费率
      • Balance: 余额
      • Position: 仓位
      • Instrument: 合约信息
  3. SubscribeTimer: 会起异步任务定时执行on_timer_subscribe回调

返回:

  • list: 订阅的事件列表

示例:

# 订阅账户0 exchange为BinanceSwap Websocket的BTC_USDT交易对的Bbo(最佳买一卖一)、仓位、订单信息
# 以及账户1 exchange为CoinexSwap 每隔10s获取一次余额信息
return [
{
"exchange": "GateSwap",
"account_id": 0,
"sub": {
"Ws": {
"symbols": [
"BTC_USDT",
"SOL_USDT"
],
"subs": [
"Bbo",
"Position",
"Order"
]
}
}
},
{
"exchange": "CoinexSwap",
"account_id": 1,
"sub": {
"Rest": {
"interval": {
"secs": 10,
"nanos": 0
},
"sub": "Balance"
}
}
}
]

连接事件回调

on_ws_connected

def on_ws_connected(self, exchange, account_id)

WebSocket连接建立时触发。

参数:

  • exchange (str): 交易所名称
  • account_id (str): 账户ID

on_ws_disconnected

def on_ws_disconnected(self, exchange, account_id)

WebSocket连接断开时触发。

参数:

  • exchange (str): 交易所名称
  • account_id (str): 账户ID

系统事件回调

on_latency

def on_latency(self, latency, req_id)

延迟统计时触发。

参数:

  • latency (dict): 延迟信息
  • req_id (str): 请求ID

on_cmd

def on_cmd(self, cmd)

当收到服务端下发的命令时触发。命令通过 AsyncMsg 结构体传递,包含命令ID和具体命令内容。

参数:

  • cmd (dict): 命令消息,包含以下字段:
    • id (str): 命令ID
    • cmd (dict): 命令内容,可能是标准命令(StandCmd)或自定义命令(CustomCmd)

标准命令(StandCmd)类型:

  1. Close: 平仓命令

    • 参数: Vec<String> 交易对列表,如 ["BTCUSDT", "ETHUSDT"]
    • 作用: 立即平掉指定交易对的所有仓位
  2. SlowClose: 缓慢平仓命令

    • 参数: Vec<String> 交易对列表,如 ["BTCUSDT", "ETHUSDT"]
    • 作用: 通过小单逐步平掉指定交易对的所有仓位
  3. Pause: 暂停交易命令

    • 参数: Vec<String> 交易对列表,如 ["BTCUSDT", "ETHUSDT"]
    • 作用: 暂停指定交易对的交易操作
  4. Resume: 恢复交易命令

    • 参数: Vec<String> 交易对列表,如 ["BTCUSDT", "ETHUSDT"]
    • 作用: 恢复指定交易对的交易操作

自定义命令(CustomCmd)格式:

{
"cmd": "命令名称",
"params": "命令参数(JSON字符串)"
}

命令响应: 策略需要通过 AsyncCmdRsp 响应命令执行结果:

{
"id": "命令ID",
"code": 0, // 0表示成功,其他值表示错误码
"msg": "执行结果描述"
}

示例:

def on_cmd(self, cmd):
cmd_id = cmd["id"]
cmd_content = cmd["cmd"]

try:
if cmd_content["type"] == "StandCmd":
# 处理标准命令
command = cmd_content["command"]
symbols = cmd_content["params"]

if command == "Close":
# 处理平仓逻辑
result = self.handle_close(symbols)
elif command == "Pause":
# 处理暂停交易逻辑
result = self.handle_pause(symbols)
# ... 处理其他标准命令

elif cmd_content["type"] == "CustomCmd":
# 处理自定义命令
custom_cmd = cmd_content["cmd"]
params = json.loads(cmd_content["params"])
result = self.handle_custom_command(custom_cmd, params)

# 返回命令执行结果
return {
"id": cmd_id,
"code": 0 if result else 1,
"msg": "success" if result else "failed"
}

except Exception as e:
# 返回错误信息
return {
"id": cmd_id,
"code": 1,
"msg": str(e)
}

on_timer_subscribe

def on_timer_subscribe(self, timer_name)

定时器触发时触发。

参数:

  • timer_name (str): 定时器名称

订单相关回调

on_order_submitted

def on_order_submitted(self, account_id, context, order_id_result, order)

订单提交成功时触发。

参数:

  • account_id (str): 账户ID
  • context: 上下文对象
  • order_id_result: 包含订单ID的Result,可能为Err
  • order (dict): 订单信息

on_batch_order_submitted

def on_batch_order_submitted(self, account_id, context, order_ids_result)

批量订单提交成功时触发。

参数:

  • account_id (str): 账户ID
  • context: 上下文对象
  • order_ids_result: 订单ID列表的Result

on_order_canceled

def on_order_canceled(self, account_id, context, result)

订单取消成功时触发。

参数:

  • account_id (str): 账户ID
  • context: 上下文对象
  • result: 取消结果

on_batch_order_canceled

def on_batch_order_canceled(self, account_id, context, order_ids_result)

批量订单取消成功时触发。

参数:

  • account_id (str): 账户ID
  • context: 上下文对象
  • order_ids_result: 订单ID列表的Result

on_order_amended

def on_order_amended(self, account_id, context, result)

订单修改成功时触发。

参数:

  • account_id (str): 账户ID
  • context: 上下文对象
  • result: 修改结果

on_order

def on_order(self, account_id, context, order)

订单状态更新时触发。

参数:

  • account_id (str): 账户ID
  • context: 上下文对象
  • order: 订单对象

账户相关回调

on_position

def on_position(self, account_id, positions)

持仓更新时触发。

参数:

  • account_id (str): 账户ID
  • positions: 持仓对象列表

on_balance

def on_balance(self, account_id, balances)

账户余额更新时触发。

参数:

  • account_id (str): 账户ID
  • balances: 余额对象列表

市场数据回调

on_bbo

def on_bbo(self, exchange, context, bbo)

最佳买卖价变动时触发。

参数:

  • exchange (str): 交易所名称
  • context: 上下文对象
  • bbo: 最佳买一卖一对象

on_depth

def on_depth(self, exchange, context, depth)

市场深度更新时触发。

参数:

  • exchange (str): 交易所名称
  • context: 上下文对象
  • depth: 市场深度对象

on_ticker

def on_ticker(self, exchange, context, ticker)

最新成交价更新时触发。

参数:

  • exchange (str): 交易所名称
  • context: 上下文对象
  • ticker: 行情数据对象

on_trade

def on_trade(self, exchange, context, trade)

成交更新时触发。

参数:

  • exchange (str): 交易所名称
  • context: 上下文对象
  • trade: 成交对象

on_funding

def on_funding(self, exchange, fundings)

资金费率更新时触发。

参数:

  • exchange (str): 交易所名称
  • fundings: 资金费率对象列表

on_mark_price

def on_mark_price(self, exchange, mark_price)

标记价格更新时触发。

参数:

  • exchange (str): 交易所名称
  • mark_price: 标记价格对象

on_instrument

def on_instrument(self, exchange, instruments)

合约信息更新时触发。

参数:

  • exchange (str): 交易所名称
  • instruments: 合约信息对象列表

其他回调

on_dex_data

def on_dex_data(self, account_id, context, data)

DEX数据更新时触发。

参数:

  • account_id (str): 账户ID
  • context: 上下文对象
  • data: 数据对象

on_stop

def on_stop(self)

停止策略时触发。

on_config_update

def on_config_update(self, config)

热更新策略配置时触发。

参数:

  • config: 配置对象