策略基类
本节介绍策略基类 BaseStrategy,该类定义了策略开发所需的基本接口和回调方法。所有自定义策略都应该继承这个基类。
初始化方法
init
def __init__(self, cex_configs, dex_configs, config, trader: trader.Trader)
初始化策略对象。
参数:
cex_configs(list): CEX交易所配置列表dex_configs(list): DEX交易所配置列表config(dict): 策略配置trader(trader.Trader): 消息总线的包装器对象,主要提供以下方法,详情请参考Trader API 指令:-
publish(cmd): 发布交易指令- 参数:
cmd(dict) - 交易指令 - 返回: 指令执行结果
- 说明: 用于发送交易相关指令,如下单、撤单等
- 参数:
-
log(msg, level=None, event=None): 记录日志- 参数:
msg(str): 日志消息level(str, 可选): 日志级别,可选值:- "TRACE": 追踪级别
- "DEBUG": 调试级别
- "INFO": 信息级别(默认)
- "WARN": 警告级别
- "ERROR": 错误级别
event(str, 可选): 事件标识
- 说明: 用于记录策略运行过程中的各类日志信息
- 参数:
-
示例:
def __init__(self, cex_configs, dex_configs, config, trader):
# 记录普通日志
trader.log("策略初始化开始")
# 记录带级别的日志
trader.log("调试信息", level="DEBUG")
# 记录带事件标识的日志
trader.log("订单提交", level="INFO", event="ORDER_SUBMIT")
# 发布交易指令
order_cmd = {
"type": "limit",
"symbol": "BTC_USDT",
"price": 50000,
"quantity": 0.1
}
result = trader.publish(order_cmd)
基础方法
name
def name(self)
返回策略的名称。
返回:
- str: 策略名称
start
def start(self)
启动策略执行。在这里执行策略初始化的操作,比如查询交易币种信息、设置杠杆等。
subscribes
def subscribes(self)
返回策略订阅的事件列表。
订阅分为三种类型:
-
SubscribeWs: 订阅交易所相关websocket频道,接收到消息会推到策略回调
- 支持的订阅频道:
- MarkPrice: 标记价格
- Bbo: 最佳买卖价
- Depth: 市场深度
- Funding: 资金费率
- Trade: 成交
- Order: 订单
- Position: 仓位
- Balance: 余额
- FundingFee: 结算资金费
- 支持的订阅频道:
-
SubscribeRest: 会起异步任务使用HTTP定期轮询某些接口并推送到策略回调
- 支持轮询的接口:
- Funding: 资金费率
- Balance: 余额
- Position: 仓位
- Instrument: 合约信息
- 支持轮询的接口:
-
SubscribeTimer: 会起异步任务定时执行
on_timer_subscribe回调
返回:
- list: 订阅的事件列表
示例:
# 订阅账户0 exchange为BinanceSwap Websocket的BTC_USDT交易对的Bbo(最佳买一卖一)、仓位、订单信息
# 以及账户1 exchange为CoinexSwap 每隔10s获取一次余额信息
return [
{
"exchange": "GateSwap",
"account_id": 0,
"sub": {
"Ws": {
"symbols": [
"BTC_USDT",
"SOL_USDT"
],
"subs": [
"Bbo",
"Position",
"Order"
]
}
}
},
{
"exchange": "CoinexSwap",
"account_id": 1,
"sub": {
"Rest": {
"interval": {
"secs": 10,
"nanos": 0
},
"sub": "Balance"
}
}
}
]
连接事件回调
on_ws_connected
def on_ws_connected(self, exchange, account_id)
WebSocket连接建立时触发。
参数:
exchange(str): 交易所名称account_id(str): 账户ID
on_ws_disconnected
def on_ws_disconnected(self, exchange, account_id)
WebSocket连接断开时触发。
参数:
exchange(str): 交易所名称account_id(str): 账户ID
系统事件回调
on_latency
def on_latency(self, latency, req_id)
延迟统计时触发。
参数:
latency(dict): 延迟信息req_id(str): 请求ID
on_cmd
def on_cmd(self, cmd)
当收到服务端下发的命令时触发。命令通过 AsyncMsg 结构体传递,包含命令ID和具体命令内容。
参数:
cmd(dict): 命令消息,包含以下字段:id(str): 命令IDcmd(dict): 命令内容,可能是标准命令(StandCmd)或自定义命令(CustomCmd)
标准命令(StandCmd)类型:
-
Close: 平仓命令
- 参数:
Vec<String>交易对列表,如["BTCUSDT", "ETHUSDT"] - 作用: 立即平掉指定交易对的所有仓位
- 参数:
-
SlowClose: 缓慢平仓命令
- 参数:
Vec<String>交易对列表,如["BTCUSDT", "ETHUSDT"] - 作用: 通过小单逐步平掉指定交易对的所有仓位
- 参数:
-
Pause: 暂停交易命令
- 参数:
Vec<String>交易对列表,如["BTCUSDT", "ETHUSDT"] - 作用: 暂停指定交易对的交易操作
- 参数:
-
Resume: 恢复交易命令
- 参数:
Vec<String>交易对列表,如["BTCUSDT", "ETHUSDT"] - 作用: 恢复指定交易对的交易操作
- 参数:
自定义命令(CustomCmd)格式:
{
"cmd": "命令名称",
"params": "命令参数(JSON字符串)"
}
命令响应:
策略需要通过 AsyncCmdRsp 响应命令执行结果:
{
"id": "命令ID",
"code": 0, // 0表示成功,其他值表示错误码
"msg": "执行结果描述"
}
示例:
def on_cmd(self, cmd):
cmd_id = cmd["id"]
cmd_content = cmd["cmd"]
try:
if cmd_content["type"] == "StandCmd":
# 处理标准命令
command = cmd_content["command"]
symbols = cmd_content["params"]
if command == "Close":
# 处理平仓逻辑
result = self.handle_close(symbols)
elif command == "Pause":
# 处理暂停交易逻辑
result = self.handle_pause(symbols)
# ... 处理其他标准命令
elif cmd_content["type"] == "CustomCmd":
# 处理自定义命令
custom_cmd = cmd_content["cmd"]
params = json.loads(cmd_content["params"])
result = self.handle_custom_command(custom_cmd, params)
# 返回命令执行结果
return {
"id": cmd_id,
"code": 0 if result else 1,
"msg": "success" if result else "failed"
}
except Exception as e:
# 返回错误信息
return {
"id": cmd_id,
"code": 1,
"msg": str(e)
}
on_timer_subscribe
def on_timer_subscribe(self, timer_name)
定时器触发时触发。
参数:
timer_name(str): 定时器名称
订单相关回调
on_order_submitted
def on_order_submitted(self, account_id, context, order_id_result, order)
订单提交成功时触发。
参数:
account_id(str): 账户IDcontext: 上下文对象order_id_result: 包含订单ID的Result,可能为Errorder(dict): 订单信息
on_batch_order_submitted
def on_batch_order_submitted(self, account_id, context, order_ids_result)
批量订单提交成功时触发。
参数:
account_id(str): 账户IDcontext: 上下文对象order_ids_result: 订单ID列表的Result
on_order_canceled
def on_order_canceled(self, account_id, context, result)
订单取消成功时触发。
参数:
account_id(str): 账户IDcontext: 上下文对象result: 取消结果
on_batch_order_canceled
def on_batch_order_canceled(self, account_id, context, order_ids_result)
批量订单取消成功时触发。
参数:
account_id(str): 账户IDcontext: 上下文对象order_ids_result: 订单ID列表的Result
on_order_amended
def on_order_amended(self, account_id, context, result)
订单修改成功时触发。
参数:
account_id(str): 账户IDcontext: 上下文对象result: 修改结果
on_order
def on_order(self, account_id, context, order)
订单状态更新时触发。
参数:
account_id(str): 账户IDcontext: 上下文对象order: 订单对象
账户相关回调
on_position
def on_position(self, account_id, positions)
持仓更新时触发。
参数:
account_id(str): 账户IDpositions: 持仓对象列表
on_balance
def on_balance(self, account_id, balances)
账户余额更新时触发。
参数:
account_id(str): 账户IDbalances: 余额对象列表
市场数据回调
on_bbo
def on_bbo(self, exchange, context, bbo)
最佳买卖价变动时触发。
参数:
exchange(str): 交易所名称context: 上下文对象bbo: 最佳买一卖一对象
on_depth
def on_depth(self, exchange, context, depth)
市场深度更新时触发。
参数:
exchange(str): 交易所名称context: 上下文对象depth: 市场深度对象
on_ticker
def on_ticker(self, exchange, context, ticker)
最新成交价更新时触发。
参数:
exchange(str): 交易所名称context: 上下文对象ticker: 行情数据对象
on_trade
def on_trade(self, exchange, context, trade)
成交更新时触发。
参数:
exchange(str): 交易所名称context: 上下文对象trade: 成交对象
on_funding
def on_funding(self, exchange, fundings)
资金费率更新时触发。
参数:
exchange(str): 交易所名称fundings: 资金费率对象列表
on_mark_price
def on_mark_price(self, exchange, mark_price)
标记价格更新时触发。
参数:
exchange(str): 交易所名称mark_price: 标记价格对象
on_instrument
def on_instrument(self, exchange, instruments)
合约信息更新时触发。
参数:
exchange(str): 交易所名称instruments: 合约信息对象列表
其他回调
on_dex_data
def on_dex_data(self, account_id, context, data)
DEX数据更新时触发。
参数:
account_id(str): 账户IDcontext: 上下文对象data: 数据对象
on_stop
def on_stop(self)
停止策略时触发。
on_config_update
def on_config_update(self, config)
热更新策略配置时触发。
参数:
config: 配置对象