策略开发完整指南
本指南将从策略开发者的角度,详细介绍如何开发一个完整的交易策略,包括数据订阅、回调处理和下单等核心功能。
1. 策略框架概述
一个完整的策略通常包含以下几个核心部分:
- 策略初始化
- 数据订阅
- 行情回调处理
- 交易信号生成
- 订单管理
- 账户管理
- 获取公共行情
- Trader API 指令
2. 创建策略类
首先,我们需要创建一个继承自基类的策略类:
import trader # type: ignore
import base_strategy
## 类名必须为Strategy
class Strategy(base_strategy.BaseStrategy):
def __init__(self, cex_configs, dex_configs, config, trader: trader.Trader):
self.cex_configs = cex_configs # 中心化交易所配置
self.trader = trader # 交易执行器
self.stop_flag = False # 停止标志
# 在这里初始化策略参数
self.symbols = config['symbols'] # 交易品种列表
self.positions = {} # 当前持仓信息
self.balances = {} # 账户余额信息
self.orders = {} # 订单信息
def name(self):
"""返回策略名称"""
return "My Strategy"
def start(self):
"""策略启动函数"""
# 订阅行情数据
self.subscribe_order_book(self.symbol)
self.subscribe_trades(self.symbol)
其中cex_configs和dex_configs是中心化交易所和去中心化交易所的配置,config是策略的配置,trader是交易执行器,以上皆在配置文件中介绍过。
3. 数据订阅
完成初始化后,策略需要订阅行情或账户数据。订阅配置由列表组成,每个元素是一个包含订阅类型和参数的字典。
系统支持三种订阅方式:WebSocket实时数据、REST API定期查询和定时器触发。具体订阅示例见下文,详细说明请参考数据源订阅。
3.1 WebSocket订阅
def subscribes(self):
"""返回策略需要订阅的行情和账户数据"""
subs = [
{
"account_id": 0,
"sub": {
"SubscribeWs": [
{
"Depth": {
"symbols": self.symbols,
"levels": 5 # 订阅5档深度
},
},
{
"Bbo": self.symbols # 订阅最优买卖价
}
]
}
}
]
return subs
支持的WebSocket订阅类型:
MarkPrice: 标记价格Bbo: 最佳买卖价Depth: 市场深度Funding: 资金费率Trade: 成交Order: 订单Position: 仓位Balance: 余额FundingFee: 结算资金费
3.2 REST API订阅
# 定时更新持仓和余额信息
subs.append({
"account_id": 0,
"sub": {
"SubscribeRest": {
# 每秒更新账户余额
"update_interval": {
"secs": 1,
"nanos": 0
},
"rest_type": "Balance"
}
}
})
支持的REST API订阅类型:
Funding: 资金费率Balance: 余额Position: 仓位Instrument: 合约信息
3.3 定时器订阅
# 定时检查订单状态
subs.append({
"sub": {
"SubscribeTimer": {
"update_interval": {
"secs": 10,
"nanos": 0
},
"name": "check_orders" # 每10秒检查一次订单状态
}
}
})
4. 数据回调函数
在订阅数据后,相应的数据会通过 BaseStrategy 中定义的 on_ 系列回调函数推送到策略。您需要根据业务逻辑在策略类中实现这些回调函数。
主要的回调函数包括:
4.1 行情数据回调
def on_mark_price(self, exchange, mark_price):
"""标记价格回调"""
symbol = mark_price['symbol']
if symbol not in self.symbols:
return
# 更新标记价格数据
self.mark_prices[symbol] = mark_price
def on_bbo(self, exchange, context, bbo):
"""最优买卖价回调"""
symbol = bbo['symbol']
if symbol not in self.symbols:
return
# 更新BBO数据
self.bbos[symbol] = bbo
def on_depth(self, exchange, context, depth):
"""深度数据回调"""
symbol = depth['symbol']
if symbol not in self.symbols:
return
# 处理深度数据
self.process_depth(depth)
def on_trade(self, exchange, context, trade):
"""成交数据回调"""
# 处理成交数据
self.process_trade(trade)
def on_funding(self, exchange, funding):
"""资金费率回调"""
# 处理资金费率数据
self.process_funding(funding)
4.2 订单和持仓回调
def on_order_submitted(self, account_id, context, order_id_result, order):
"""订单提交回调"""
if 'Ok' in order_id_result:
self.trader.log(f"订单提交成功: {order_id_result}")
else:
self.trader.log(f"订单提交失败: {order_id_result}")
def on_order(self, account_id, context, order):
"""订单状态更新回调"""
self.trader.log(f"订单更新: {order}")
# 更新订单状态
self.update_order_status(order)
def on_position(self, account_id, positions):
"""持仓更新回调"""
self.trader.log(f"持仓更新: {positions}")
# 更新持仓信息
self.update_positions(positions)
5. 下单操作
系统支持两种下单方式:
- 直接调用:在函数内部通过
self.trader.publish(cmd)发送交易指令 - 回调返回:在回调函数结尾(非
start和subscribe函数)通过return返回交易指令,更推荐使用
更多下单方式请参考订单管理
5.1 直接调用方式下单
def place_limit_order(self, symbol, side, price, amount):
"""直接调用方式提交限价单"""
# 创建订单对象
order = {
"cid": self.trader.create_cid(self.cex_configs[0]['exchange']), # 客户端订单ID
"symbol": symbol,
"order_type": "Limit", # 限价单
"side": side, # Buy/Sell
"pos_side": None, # 单向持仓模式
"time_in_force": "GTC", # 一直有效直到取消
"price": price,
"amount": amount,
}
# 构建交易指令
cmd = {
"account_id": 0,
"cmd": {
"Async": {
"PlaceOrder": [
order,
{
"is_dual_side": False, # 单向持仓
"margin_mode": "Cross", # 全仓模式
}
]
}
}
}
# 直接调用trader发送指令
result = self.trader.publish(cmd)
self.trader.log(f"下单结果: {result}")
return result
def cancel_order(self, order_id, symbol):
"""直接调用方式撤销订单"""
cmd = {
"account_id": 0,
"cmd": {
"Sync": {
"CancelOrder": [{"Id": order_id}, symbol]
}
}
}
result = self.trader.publish(cmd)
self.trader.log(f"撤单结果: {result}")
return result
5.2 回调返回方式下单
推荐使用回调返回方式下单,因为这种方式可以避免阻塞主线程,提高策略的响应速度。
def on_depth(self, exchange, context, depth):
"""深度数据回调"""
symbol = depth['symbol']
if symbol not in self.symbols:
return
# 处理深度数据
self.process_depth(depth)
# 如果满足交易条件,返回下单指令
if self.should_trade(depth):
order = {
"cid": self.create_cid(),
"symbol": symbol,
"order_type": "Limit",
"side": "Buy" if self.is_buy_signal(depth) else "Sell",
"price": self.calculate_price(depth),
"amount": self.calculate_amount(depth),
"time_in_force": "GTC"
}
cmd = {
"account_id": 0,
"cmd": {
"Async": {
"PlaceOrder": [
order,
{
"is_dual_side": False,
"margin_mode": "Cross",
}
]
}
}
}
# 在回调函数结尾返回交易指令
return {
'cmds': [cmd], # 按照该格式返回
}
6. 账户管理
账户管理是策略开发中的重要组成部分,用于查询账户信息、管理持仓和资金。详细API请参考账户管理。
6.1 余额查询
示例:
def query_balance(self):
"""查询账户余额"""
# 查询USDT余额
cmd = {
"account_id": 0,
"cmd": {
"Sync": "UsdtBalance"
}
}
balance = self.trader.publish(cmd).get('Ok', {}).get("balance", 0.0)
self.trader.log(f"USDT余额: {balance}")
# 查询所有币种余额
cmd = {
"account_id": 0,
"cmd": {
"Sync": "Balance"
}
}
balances = self.trader.publish(cmd).get('Ok', [])
for balance in balances:
self.trader.log(f"{balance['asset']}余额: {balance['balance']}")
6.2 持仓管理
- 获取持仓:查询当前持仓信息
- 获取最大持仓:查询最大持仓限制
- 获取最大杠杆:查询最大杠杆限制
- 设置杠杆:设置交易杠杆倍数
- 获取保证金模式:查询保证金模式
- 设置保证金模式:设置全仓/逐仓模式
- 设置双向持仓:设置单向/双向持仓
- 获取手续费率:查询手续费率
示例:
def init_positions(self):
"""初始化持仓信息"""
# 查询当前持仓
cmd = {
"account_id": 0,
"cmd": {
"Sync": {
"Position": None # 查询所有持仓
}
}
}
positions = self.trader.publish(cmd).get('Ok', [])
for position in positions:
symbol = position['symbol']
self.positions[symbol] = position
self.trader.log(f"持仓: {symbol} {position['amount']} {position['side']}")
# 设置杠杆倍数
for symbol in self.symbols:
cmd = {
"account_id": 0,
"cmd": {
"Sync": {
"SetLeverage": [symbol, self.leverage]
}
}
}
self.trader.publish(cmd)
self.trader.log(f"设置杠杆: {symbol} {self.leverage}x")
6.3 手续费管理
示例:
def check_fee_rate(self, symbol):
"""查询手续费率"""
cmd = {
"account_id": 0,
"cmd": {
"Sync": {
"FeeRate": symbol
}
}
}
fee_rate = self.trader.publish(cmd).get('Ok', {})
self.trader.log(f"手续费率: {symbol} maker={fee_rate.get('maker')} taker={fee_rate.get('taker')}")
return fee_rate
6.4 资金划转
- 划转:在不同账户类型间划转资金
- 现货钱包
- U本位合约钱包
- 币本位合约钱包
- 杠杆全仓钱包
- 杠杆逐仓钱包
示例:
def transfer_funds(self, amount):
"""从现货钱包划转到合约钱包"""
cmd = {
"account_id": 0,
"cmd": {
"Sync": {
"Transfer": {
"asset": "USDT",
"amount": amount,
"from": "Spot",
"to": "UsdtFuture"
}
}
}
}
result = self.trader.publish(cmd)
self.trader.log(f"资金划转结果: {result}")
6.5 借贷管理
示例:
def borrow_asset(self, coin, amount):
"""借入资产"""
cmd = {
"account_id": 0,
"cmd": {
"Sync": {
"Borrow": [coin, amount]
}
}
}
result = self.trader.publish(cmd)
self.trader.log(f"借币结果: {result}")
# 查询当前借币情况
cmd = {
"account_id": 0,
"cmd": {
"Sync": {
"GetBorrowed": coin
}
}
}
borrowed = self.trader.publish(cmd).get('Ok', [])
for item in borrowed:
self.trader.log(f"已借: {item['coin']} {item['amount']}")
6.6 账户模式
示例:
def set_account_mode(self):
"""设置账户模式为跨币种保证金模式"""
cmd = {
"account_id": 0,
"cmd": {
"Sync": {
"SetAccountMode": "MultiCurrency"
}
}
}
result = self.trader.publish(cmd)
self.trader.log(f"设置账户模式结果: {result}")
7. 获取公共行情
除了订阅行情数据外,系统还支持通过同步命令获取公共行情数据。这些接口可以帮助您获取市场的基本信息,如价格、深度、资金费率等。详情请参考公共行情。
7.1 核心接口
系统提供以下公共行情接口:
| 接口 | 说明 | 常用场景 |
|---|---|---|
| Ticker | 24小时交易信息 | 获取价格、成交量、涨跌幅等 |
| MarkPrice | 标记价格 | 计算未实现盈亏、强平价格 |
| Bbo | 最佳买卖价 | 快速获取当前市场价格 |
| Depth | 市场深度 | 分析市场流动性、计算滑点 |
| Instrument | 合约信息 | 获取交易规则、精度限制 |
| FundingRate | 资金费率 | 计算资金费用、预测下一期费率 |
7.2 使用示例
def get_market_data(self):
"""获取市场数据示例"""
# 获取Ticker数据
cmd = {
"account_id": 0,
"cmd": {
"Sync": {
"Ticker": "BTC_USDT"
}
}
}
ticker = self.trader.publish(cmd)
# 获取深度数据
cmd = {
"account_id": 0,
"cmd": {
"Sync": {
"Depth": ["BTC_USDT", 10]
}
}
}
depth = self.trader.publish(cmd)
# 获取资金费率
cmd = {
"account_id": 0,
"cmd": {
"Sync": {
"FundingRate": "BTC_USDT"
}
}
}
funding_rate = self.trader.publish(cmd)
7.3 最佳实践
- 合理使用缓存
def update_market_data(self):
"""更新市场数据"""
current_time = time.time()
# 每5秒更新一次Ticker数据
if current_time - self.last_ticker_update > 5:
self.update_ticker()
self.last_ticker_update = current_time
# 每1秒更新一次深度数据
if current_time - self.last_depth_update > 1:
self.update_depth()
self.last_depth_update = current_time
- 错误处理
def get_market_data_safe(self):
"""安全获取市场数据"""
try:
cmd = {
"account_id": 0,
"cmd": {
"Sync": {
"Ticker": "BTC_USDT"
}
}
}
result = self.trader.publish(cmd)
if 'Ok' in result:
return result['Ok']
else:
self.trader.log(f"获取市场数据失败: {result}")
return None
except Exception as e:
self.trader.log(f"获取市场数据异常: {e}")
return None
- 数据验证
def validate_market_data(self, data):
"""验证市场数据"""
if not data:
return False
# 验证价格合理性
if 'price' in data and (data['price'] <= 0 or data['price'] > 1000000):
return False
# 验证数量合理性
if 'volume' in data and data['volume'] < 0:
return False
return True
8. Trader API 指令
除了知道如何订阅和获取行情数据外,您还需要了解如何使用内置的Trader对象完成更多的功能。详情请参考Trader API 指令。
8.1 核心功能概述
Trader是一个高性能量化交易引擎,基于Rust开发并通过PyO3绑定为Python接口。它提供了以下核心功能:
| 功能模块 | 主要用途 | 关键方法 |
|---|---|---|
| 交易指令执行 | 执行交易所API指令、生成唯一ID | publish(cmd), batch_publish(cmds), create_cid() |
| 日志管理 | 记录系统和交易信息 | log(), tlog(), logt() |
| 缓存管理 | 保存和恢复策略状态 | cache_save(), cache_update(), cache_load() |
| 外部通信 | 获取外部数据 | request() |
| Web平台集成 | 与Web平台交互和数据展示 | init_web_client(), start_web_client(), stop_web_client() |
8.2 交易指令执行
交易指令执行是Trader的核心功能,用于向交易所发送各种操作指令:
# 查询USDT余额
cmd = {
"account_id": 0,
"cmd": {
"Sync": "UsdtBalance" # 查询USDT余额
}
}
result = trader.publish(cmd)
print(f"指令执行结果: {result}")
# 下单指令示例
place_order_cmd = {
"account_id": 0,
"cmd": {
"Sync": {
"PlaceOrder": [
{
"symbol": "BTC_USDT",
"order_type": "Limit",
"side": "Buy",
"price": 50000.0,
"amount": 0.01,
"time_in_force": "GTC"
},
{
"is_dual_side": False,
"market_order_mode": "Normal"
}
]
}
}
}
result = trader.publish(place_order_cmd)
8.3 日志管理
Trader提供了多种日志记录方法,用于记录策略运行状态和交易信息:
# 基本日志记录
trader.log("交易开始")
# 带级别和颜色的日志
trader.log("价格异常", "WARN", "red")
# 限频日志记录
trader.tlog("市场状态", "市场流动性正常", interval=60)
# 带时间戳的日志
trader.logt("重要事件", int(time.time()), color="blue", level="INFO")
8.4 缓存管理
缓存管理功能用于保存和恢复策略状态,确保策略在重启后能够恢复之前的运行状态:
# 保存策略状态
state = {
"positions": [...],
"orders": [...],
"statistics": {...},
"last_update": int(time.time())
}
trader.cache_save(json.dumps(state))
# 加载策略状态
cached_data = trader.cache_load()
if cached_data:
state = json.loads(cached_data)
# 使用加载的状态恢复策略
8.5 Web平台集成
Web平台集成功能使策略能够与NB8 Web平台交互,实现实时监控和数据展示。详情请参考Web可视化。
# 初始化Web客户端
web_config = {
"server_name": "BTC-ETH套利策略",
"primary_balance": 10000.0,
"secondary_balance": 5000.0,
"is_production": True,
"open_threshold": 0.14,
"max_position_ratio": 100,
"max_leverage": 3,
"funding_rate_threshold": 0.1,
"cost": 0.0824
}
trader.init_web_client(web_config)
# 启动Web客户端
trader.start_web_client(upload_interval=10)
# 更新账户余额
trader.update_total_balance(primary_balance, secondary_balance)
# 更新交易统计
trader.update_trade_stats(maker_volume, taker_volume, profit)
# 上传表格数据
trader.upload_tables([spread_table, position_table])
8.6 常见指令格式
Trader支持多种指令格式,包括查询指令、下单指令和取消订单指令等。这些指令通过publish方法发送到交易所:
# 查询指令示例
query_cmd = {
"account_id": 0,
"cmd": {
"Sync": "UsdtBalance" # 查询USDT余额
}
}
# 下单指令示例
order_cmd = {
"account_id": 0,
"cmd": {
"Async": {
"PlaceOrder": [
{
"symbol": "BTC_USDT",
"order_type": "Limit",
"side": "Buy",
"price": 50000.0,
"amount": 0.01,
"time_in_force": "GTC"
},
{
"is_dual_side": False,
"market_order_mode": "Normal"
}
]
}
}
}
# 取消订单指令示例
cancel_cmd = {
"account_id": 0,
"cmd": {
"Sync": {
"CancelOrder": [{"Id": "order_id"}, "BTC_USDT"]
}
}
}
9. 示例代码
以下是一个完整的订单簿深度不平衡策略示例代码,该示例涵盖了本文档中介绍的大部分核心功能。
"""
depth_imbalance_strategy.py
订单簿深度不平衡策略
实现原理:基于买卖盘深度不平衡检测市场情绪,捕捉短期价格波动
"""
import json
import time
import math
import trader # type: ignore
import base_strategy
class Strategy(base_strategy.BaseStrategy):
"""深度不平衡策略实现类,通过监控买卖盘深度差异捕捉短期价格波动机会"""
def __init__(self, cex_configs, dex_configs, config, trader: trader.Trader):
# 访问配置值
self.cex_configs = cex_configs # 中心化交易所配置
self.has_account = len(cex_configs) > 0 # 是否有可用账户
self.base_index = len(self.cex_configs)
self.dex_configs = dex_configs # 去中心化交易所配置
self.trader = trader # 交易执行器
self.stop_flag = False # 停止标志
# 持仓历史缓存,用于记录开仓价格和跟踪订单状态
self.position_cache = {} # 持仓历史缓存
self.positions = {} # 当前持仓信息
self.balances = {} # 账户余额信息
self.orders = {} # 订单信息
self.last_signal_time = {} # 上次信号触发时间
# 初始化配置
self.update_config(config)
# 记录策略启动时间和初始资产
self.start_time = time.time()
self.initial_equity = 10000.0 # 默认初始资产,会在连接账户后更新
# 尝试加载缓存的交易数据
self.load_cache_data()
# 交易对信息字典
self.instruments = {} # 交易对基本信息
self.bbos = {} # 最优买卖价信息
self.fee_rates = {} # 手续费率
self.flight_orders = {} # 在途订单,尚未收到交易所确认
self.last_trigger_time = {} # 上次触发交易的时间
# 强停状态
self.force_stop = False # 强制停止标志,用于紧急情况
def update_config(self, config):
"""
更新策略配置参数
参数:
config: 包含策略参数的字典
"""
self.min_seconds_between_triggers = config['min_seconds_between_triggers'] # 触发信号最小间隔时间(秒)
self.symbols = config['symbols'] # 交易品种列表
self.trigger_imbalance_ratio = config['trigger_imbalance_ratio'] # 触发信号的深度不平衡比率阈值
self.trigger_min_size = config['trigger_min_size'] # 触发信号的最小深度尺寸
self.order_timeout = config.get('order_timeout', 30) # 订单超时时间(秒)
self.leverage = config['leverage'] # 杠杆倍数
def name(self):
"""返回策略名称"""
return "Depth Imbalance Strategy"
def start(self):
"""
策略启动函数
完成以下初始化工作:
1. 查询账户余额
2. 初始化持仓信息
3. 设置持仓模式(如适用)
4. 初始化交易对信息
5. 设置杠杆倍数
"""
self.trader.log("启动深度不平衡策略", level="INFO", color="blue")
# 查询账户余额
if self.has_account:
cmd = {
"account_id": 0,
"cmd": {
"Sync": "UsdtBalance"
}
}
balances = self.trader.publish(cmd).get('Ok', {})
self.trader.log(f"余额: {balances}", level="DEBUG", color="blue")
self.balances = {
"USDT": {
"available_balance": balances.get('available_balance', 0),
"balance": balances.get('balance', 0)
}
}
self.initial_equity = self.balances.get("USDT", {}).get("balance", 0.0)
# 初始化仓位信息
cmd = {
"account_id": 0,
"cmd": {
"Sync": {
"Position": None
}
}
}
res = self.trader.publish(cmd)
self.positions = {}
if 'Ok' in res and isinstance(res['Ok'], list):
for pos in res['Ok']:
if isinstance(pos, dict) and 'symbol' in pos:
self.positions[pos['symbol']] = pos
# 如果交易所是合约交易所,设置单向持仓
if self.cex_configs[0]['exchange'].endswith('Swap'):
try:
# 查询是否双向持仓
cmd = {
"account_id": 0,
"cmd": {
"Sync": {
"IsDualSidePositionExt": {
"extra": {
"symbol": "BTC_USDT"
}
}
}
}
}
res = self.trader.publish(cmd)
if 'Ok' in res:
# 如果当前是双向持仓,改为单向持仓
if res['Ok']:
# 设置单向持仓
cmd = {
"account_id": 0,
"cmd": {
"Sync": {
"SetDualSidePosition": False
}
}
}
res = self.trader.publish(cmd)
self.trader.log(f"设置单向持仓: {res}", level="INFO", color="green")
except Exception as e:
self.trader.log(f"设置持仓模式时发生错误: {str(e)}", level="ERROR", color="red")
# 初始化交易对信息
for symbol in self.symbols:
# 查询合约信息
cmd = {
"account_id": 0,
"cmd": {"Sync": {"Instrument": symbol}}
}
instrument = self.trader.publish(cmd).get('Ok')
time.sleep(0.1) # 避免请求过快
if instrument:
self.instruments[symbol] = instrument
self.trader.log(f"交易对 {symbol} 信息: {instrument}", level="INFO", color="green")
else:
raise Exception(f"未找到交易对 {symbol} 信息")
# 查询Bbo(最佳买卖价)
cmd['cmd'] = {"Sync": {"Bbo": symbol}}
bbo = self.trader.publish(cmd).get('Ok')
time.sleep(0.1)
if bbo:
self.bbos[symbol] = bbo
else:
raise Exception(f"未找到交易对 {symbol} 的BBO")
# 查询手续费率
cmd['cmd'] = {"Sync": {"FeeRate": symbol}}
fee = self.trader.publish(cmd).get('Ok')
time.sleep(0.1)
if fee:
self.fee_rates[symbol] = fee
else:
raise Exception(f"未找到交易对 {symbol} 的手续费率")
# 如果是合约,设置杠杆
if self.cex_configs[0]['exchange'].endswith('Swap'):
# 设置杠杆倍数
cmd = {
"account_id": 0,
"cmd": {
"Sync": {
"SetLeverage": [symbol, self.leverage]
}
}
}
leverage_res = self.trader.publish(cmd)
self.trader.log(f"设置杠杆 {self.leverage}x: {leverage_res}", level="INFO", color="green")
# 记录初始资产
self.initial_equity = self.balances.get("USDT", {}).get("balance", 10000.0)
self.trader.log(f"策略初始资产: {self.initial_equity} USDT", level="INFO", color="green")
def subscribes(self):
"""
返回策略需要订阅的行情和账户数据
订阅内容包括:
1. 深度行情数据
2. 最优买卖价(BBO)
3. 订单和持仓更新(如有账户)
4. 余额定时更新
5. 定时检查订单状态
返回:
包含订阅信息的列表
"""
subs = [
{
"account_id": 0,
"sub": {
"SubscribeWs": [
{
"Depth": {
"symbols": self.symbols,
"levels": 5 # 订阅5档深度
},
},
{
"Bbo": self.symbols
}
]
}
},
]
if self.has_account:
# 订阅交易所websocket
subs.append({
"account_id": 0,
"sub": {
"SubscribeWs": [
{
"Order": self.symbols # 订阅订单更新
},
{
"Position": self.symbols # 订阅持仓更新
},
]
}
},)
# 定时更新持仓和余额信息
subs.append({
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {
"secs": 1,
"nanos": 0
},
"rest_type": "Balance" # 每秒更新账户余额
}
}
},)
# 定时检查订单状态
subs.append({
"sub": {
"SubscribeTimer": {
"update_interval": {
"secs": 10,
"nanos": 0
},
"name": "cancel_all_orders" # 每10秒检查一次订单状态
}
}
},)
return subs
def on_bbo(self, exchange, context, bbo):
"""
处理最优买卖价(BBO)数据事件
更新交易对的最新买卖价信息,用于后续交易决策
参数:
exchange: 交易所名称
context: 上下文信息
bbo: 最优买卖价数据
"""
if self.force_stop:
return
symbol = bbo['symbol']
if symbol not in self.symbols:
return
# 更新BBO数据
self.bbos[symbol] = bbo
def on_depth(self, exchange, context, depth):
"""
处理深度数据事件 - 策略核心逻辑
根据订单簿深度计算买卖盘不平衡程度,在满足条件时生成交易信号
参数:
exchange: 交易所名称
context: 上下文信息
depth: 深度数据,包含买卖盘各档位价格和数量
"""
# 输出context
# self.trader.log(f"context: {context}", level="INFO", color="blue")
if self.force_stop:
return
symbol = depth['symbol']
if symbol not in self.symbols:
return
# 计算买卖深度总量(前5档)
bid_total = sum(level[1] for level in depth['bids'][:5])
ask_total = sum(level[1] for level in depth['asks'][:5])
# 如果深度为空,跳过
if bid_total == 0 or ask_total == 0:
return
# 计算不平衡比例:(买盘量-卖盘量)/(买盘量+卖盘量)
# 范围在[-1,1]之间,正值表示买盘压力更大,负值表示卖盘压力更大
imbalance_ratio = (bid_total - ask_total) / (bid_total + ask_total)
# 检查是否满足触发信号的时间间隔条件
now = time.time()
last_signal_time = self.last_signal_time.get(symbol, 0)
time_since_last_signal = now - last_signal_time
# 如果距离上次信号时间不足,跳过
if time_since_last_signal < self.min_seconds_between_triggers:
return
# 计算深度总量,用于判断市场流动性
total_depth = bid_total + ask_total
# 检查信号触发条件:
# 1. 不平衡比例绝对值超过阈值
# 2. 总深度超过最小要求(确保足够流动性)
if abs(imbalance_ratio) > self.trigger_imbalance_ratio and total_depth > self.trigger_min_size:
# 更新最后信号时间
self.last_signal_time[symbol] = now
# 根据不平衡方向确定交易方向
# 买盘压力大(正比例)时做多,卖盘压力大(负比例)时做空
side = "Buy" if imbalance_ratio > 0 else "Sell"
# 打印信号信息
color = "green" if side == "Buy" else "red"
self.trader.log(f"深度不平衡信号: {symbol} {side} 不平衡比例: {imbalance_ratio:.4f} 总深度: {total_depth:.4f}",
level="INFO", color=color)
# 执行交易
if self.has_account:
cmd = self.build_trade_cmd(symbol, context, side)
if cmd:
self.flight_orders[symbol] = cmd
return {
'cmds': [cmd],
}
def on_order_submitted(self, account_id, context, order_id_result, order):
"""
处理订单提交事件回调
当订单提交到交易所后,处理交易所返回的确认信息
参数:
account_id: 账户ID
context: 上下文信息
order_id_result: 订单ID结果
order: 订单信息
"""
symbol = order['symbol']
if symbol in self.flight_orders:
del self.flight_orders[symbol] # 从在途订单中移除
if 'Ok' in order_id_result:
self.trader.log(f"订单提交成功: {order_id_result}", level="INFO", color="green")
order_id = order_id_result['Ok']
# 保存订单记录
order = {
"status": "Open",
'symbol': symbol,
'side': order['side'],
'quantity': order['amount'],
'price': order['price'],
'create_time': time.time()
}
# 记录为在途订单
self.flight_orders[symbol] = order
# 更新最后触发时间
self.last_trigger_time[symbol] = time.time()
# 记录订单信息
self.orders[order_id] = order
else:
self.trader.log(f"订单提交失败: {order_id_result}", level="ERROR", color="red")
def on_order(self, account_id, context, order):
"""
处理订单更新事件
当订单状态有变化时(如已成交、已撤销等),更新本地订单记录
参数:
account_id: 账户ID
context: 上下文信息
order: 更新后的订单信息
"""
self.trader.log(f"订单更新: {order}", level="INFO", color="blue")
if order['id'] in self.orders:
# 更新订单状态
self.orders[order['id']]['status'] = order['status']
# 如果订单已完成(成交或取消),记录成交信息
if order['status'] in ['Filled', 'Canceled']:
self.trader.log(f"订单状态更新: {order['symbol']} {order['side']} 状态: {order['status']} 成交均价: {order.get('avg_price', 0)}",
level="INFO", color="blue")
def on_position(self, account_id, positions):
"""
处理持仓更新事件
当持仓有变化时,更新本地持仓记录
参数:
account_id: 账户ID
positions: 更新后的持仓信息
"""
self.trader.log(f"持仓更新: {positions}", level="INFO", color="blue")
if isinstance(positions, list):
for position in positions:
if isinstance(position, dict) and 'symbol' in position:
symbol = position['symbol']
self.positions[symbol] = position
def on_balance(self, account_id, balances):
"""
处理余额更新事件
当账户余额变化时,更新本地余额记录
参数:
account_id: 账户ID
balances: 更新后的余额信息
"""
for balance in balances:
asset = balance['asset']
self.balances[asset] = balance
def on_timer_subscribe(self, timer_name):
"""
处理定时器事件
根据不同的定时器名称执行相应的定时任务
参数:
timer_name: 定时器名称
"""
if timer_name == "cancel_all_orders":
self.cancel_all_orders() # 定期检查并取消长时间未成交的订单
def on_stop(self):
"""
策略停止时的处理逻辑
执行以下清理工作:
1. 取消所有未完成订单
2. 平掉所有持仓
3. 保存缓存数据
4. 计算策略收益
"""
self.trader.log("策略正在停止...", level="INFO", color="blue")
self.stop_flag = True
# 取消所有订单
self.cancel_all_orders()
# 平仓
self.close_all_positions()
# 保存缓存数据
self.save_cache_data()
# 计算收益
self.calculate_profit()
def on_config_update(self, config):
"""
热更新策略配置
在策略运行期间更新参数,无需重启策略
参数:
config: 新的配置参数
"""
self.trader.log(f"热更新策略配置: {config}", level="INFO", color="blue")
self.update_config(config)
def on_latency(self, latency, account_id):
"""
延迟统计时触发的方法。
Args:
latency (dict): 延迟信息。
account_id (str): 账户 ID。
"""
self.trader.log(f"延迟统计: {latency}, account_id: {account_id}", level="INFO", color="blue")
# -------------------------------------------------------实用工具函数-------------------------------------------------------
def build_trade_cmd(self, symbol, context, side):
"""
执行交易操作
根据交易信号和市场状况,计算下单参数并提交订单
参数:
symbol: 交易品种
context: 上下文信息
side: 交易方向(Buy/Sell)
"""
# 防止重复下单:同一品种有在途订单时跳过
if symbol in self.flight_orders:
return
# 检查信号触发的时间间隔
last_trigger_time = self.last_trigger_time.get(symbol, 0)
if time.time() - last_trigger_time < self.min_seconds_between_triggers:
return
# 确保有交易所行情数据
if symbol not in self.instruments or symbol not in self.bbos:
self.trader.log(f"未找到交易对信息: {symbol}", level="ERROR", color="red")
return
# 获取当前市场价格
bbo = self.bbos[symbol]
if side == "Buy":
price = bbo['ask_price'] # 买入时使用卖一价(ask)
else:
price = bbo['bid_price'] # 卖出时使用买一价(bid)
if price <= 0:
self.trader.log(f"无效价格: {price}", level="ERROR", color="red")
return
# 计算可用的名义交易价值
notional_value = self.balances.get("USDT", {}).get("available_balance", 0)
self.trader.log(f"当前可用余额: {notional_value}", level="INFO", color="blue")
# 应用杠杆
notional_value = notional_value * self.leverage
# 风险控制:只使用30%可用资金
notional_value = notional_value * 0.3
self.trader.log(f"开仓名义价值: {notional_value}", level="INFO", color="blue")
instrument = self.instruments[symbol]
# 检查是否满足最小下单名义价值要求
if notional_value <= instrument.get('min_notional', 0):
return
# 根据开仓名义价值计算开仓数量
amount = notional_value / price
self.trader.log(f"开仓数量(单位币数量): {amount}", level="INFO", color="blue")
# 处理数量精度
amount_precision = instrument.get('amount_precision', 3) # 数量精度
amount_tick = float(instrument.get('amount_tick', 0.001)) # 数量最小变动单位
amount_multiplier = instrument.get('amount_multiplier', 1) # 数量乘数(合约需要)
# 数量处理:乘以合约乘数换算为交易所接口所需的张数
amount = amount * amount_multiplier
# 数量处理:用数量步长向下取整
if amount_tick > 0:
amount = math.floor(amount / amount_tick) * amount_tick
# 保留精度
amount = round(amount, amount_precision)
self.trader.log(f"开仓数量(合约张数): {amount}", level="INFO", color="blue")
# 检查是否满足最小下单数量要求
if amount < instrument.get('min_qty', 0.000001):
return
# 处理价格精度
price_precision = instrument.get('price_precision', 5) # 价格精度
price_tick = float(instrument.get('price_tick', 0.00001)) # 价格最小变动单位
price_multiplier = instrument.get('price_multiplier', 1) # 价格乘数(合约需要)
# 价格处理:乘以合约乘数换算为交易所接口所需的价格
price = price * price_multiplier
# 价格处理:用价格步长根据买卖方向向上或向下取整
if price_tick > 0:
if side == "Buy":
# 买入时向上取整(确保能成交)
price = math.ceil(price / price_tick) * price_tick
else:
# 卖出时向下取整(确保能成交)
price = math.floor(price / price_tick) * price_tick
# 保留价格精度
price = round(price, price_precision)
# 准备下单参数
order = {
"cid": trader.create_cid(self.cex_configs[0]['exchange']), # 客户端订单ID
"symbol": symbol,
"order_type": "Limit", # 限价单
"side": side,
"pos_side": None, # 单向持仓模式
"time_in_force": "GTC", # 一直有效直到取消
"price": price,
"amount": amount,
}
# 提交限价订单
cmd = {
"account_id": 0,
# "context": context,
"cmd": {
"Async": {
"PlaceOrder": [
order,
{
"is_dual_side": False, # 单向持仓
"margin_mode": "Cross", # 全仓模式(okx需要)
}
]
}
}
}
return cmd
def load_cache_data(self):
"""
加载缓存数据
从本地文件恢复策略运行状态,包括持仓历史和资金信息
"""
try:
with open('depth_imbalance_cache.json', 'r') as f:
cache = json.load(f)
self.position_cache = cache.get('position_cache', {})
self.initial_equity = cache.get('initial_equity', 10000.0)
self.start_time = cache.get('start_time', time.time())
self.trader.log("加载缓存数据成功", level="INFO", color="green")
except:
self.trader.log("加载缓存数据失败,使用默认值", level="WARN", color="yellow")
def save_cache_data(self):
"""
保存缓存数据
将当前策略状态保存到本地文件,用于后续恢复
"""
try:
cache = {
'position_cache': self.position_cache,
'initial_equity': self.initial_equity,
'start_time': self.start_time
}
with open('depth_imbalance_cache.json', 'w') as f:
json.dump(cache, f)
self.trader.log("保存缓存数据成功", level="INFO", color="green")
except:
self.trader.log("保存缓存数据失败", level="ERROR", color="red")
def get_orders(self, symbol: str) -> list:
"""
获取当前所有挂单
查询指定交易对的未完成订单
参数:
symbol: 交易对名称
返回:
list: 挂单列表
"""
cmd = {
"account_id": 0,
"cmd": {
"Sync": {
"GetOpenOrders": symbol
}
}
}
result = self.trader.publish(cmd)
if 'Ok' in result :
orders = result['Ok']
if isinstance(orders, list) and len(orders) > 0:
self.trader.log(f"获取挂单成功: {orders}", level="DEBUG")
return orders
else:
self.trader.log(f"获取挂单失败: {result}", level="ERROR")
return []
def cancel_all_orders(self):
"""
取消所有未完成订单
遍历所有交易品种,取消所有挂起的订单
"""
for symbol in self.symbols:
orders = self.get_orders(symbol)
for order in orders:
self.trader.log(f"取消订单: {order}", level="INFO", color="blue")
res = self.trader.publish({"account_id": 0, "cmd": {"Sync": {"CancelOrder": [{"Id": order['id']}, order['symbol']]}}})
self.trader.log(f"取消订单结果: {res}", level="INFO", color="blue")
def get_position(self) -> dict:
"""
获取当前持仓信息
查询所有交易品种的持仓状态
返回:
dict: 持仓信息字典
"""
cmd = {
"account_id": 0,
"cmd": {
"Sync": {"Position": None}
}
}
res = self.trader.publish(cmd)
if 'Ok' in res:
return res['Ok']
else:
self.trader.log(f"获取持仓失败: {res}", level="ERROR", color="red")
return []
def close_all_positions(self):
"""
平掉所有持仓
遍历所有持仓,提交市价单平仓
"""
for position in self.get_position():
symbol = position['symbol']
amount = position['amount']
if amount > 0:
self.trader.log(f"平仓仓位: {position}", level="INFO", color="blue")
pos_side = position['side'] # 持仓方向
# 确定平仓方向:持多仓卖出,持空仓买入
side = 'Sell' if pos_side == 'Long' else 'Buy'
# 处理数量精度
instrument = self.instruments.get(symbol, {})
amount_precision = instrument.get('amount_precision', 3)
amount_multiplier = instrument.get('amount_multiplier', 1)
self.trader.log(f"平仓数量(单位币数量): {amount}", level="INFO", color="blue")
amount = amount * amount_multiplier # 转换为交易所张数
# 保留精度
amount = round(amount, amount_precision)
self.trader.log(f"平仓数量(合约张数): {amount}", level="INFO", color="blue")
# 准备市价平仓订单
order = {
"symbol": symbol,
"order_type": "Market", # 市价单,确保能立即成交
"side": side,
"pos_side": pos_side,
"time_in_force": "IOC", # 立即成交否则取消
"price": None, # 市价单无需指定价格
"amount": amount,
}
self.trader.log(f"平仓订单: {order}", level="INFO", color="blue")
# 提交市价平仓订单
cmd = {
"account_id": 0,
"cmd": {
"Sync": {
"PlaceOrder": [
order,
{
"is_dual_side": False,
"margin_mode": "Cross", # 全仓模式
"market_order_mode": "Normal" # 普通市价单
}
]
}
}
}
res = self.trader.publish(cmd)
self.trader.log(f"平仓结果: {res}", level="INFO", color="blue")
def calculate_profit(self):
"""
计算策略收益
计算策略运行期间的盈亏情况
"""
cmd = {
"account_id": 0,
"cmd": {
"Sync": "UsdtBalance"
}
}
balance = self.trader.publish(cmd).get('Ok', {}).get("balance", 0.0)
profit = balance - self.initial_equity
self.trader.log(f"收益: {profit}", level="INFO", color="blue")
10. 下一步
- 了解支持哪些交易所,请参考交易所列表。
- 了解完整的REST API列表,请参考REST API。
- 了解错误码列表,请参考错误码。
- 了解日志限频,请参考日志限频。
- 了解一些交易所的特殊说明,请参考特殊说明。
11. 常见问题
- 如何处理网络延迟?
- 实现心跳检测
- 添加重连机制
- 使用本地时间戳
- 如何处理订单状态?
- 维护订单状态机
- 实现订单超时处理
- 添加订单重试机制
- 如何优化策略性能?
- 使用异步处理
- 实现数据缓存
- 优化数据结构