📊 Trader API v2 文档
📌 概述
Trader v2 是一个高性能量化交易引擎的升级版本,基于Rust开发并通过PyO3绑定为Python接口。它在v1版本的基础上增加了更多直接访问交易所API的功能,使策略开发更加灵活强大。
使用版本0.9.0之前的版本请参考Trader API[旧版]。可通过config.toml中设置trader_version = "V2"启用(默认为V2),若需使用旧版请设置trader_version = "V1"。
⚠️ 重要:Result返回值处理说明
由于本API基于PyO3构建,所有返回数据的方法在Rust内部都返回Result类型。在Python中调用时,您需要特别注意以下几点:
🔍 返回值结构
大部分API方法返回的结果具有以下结构:
# 成功的返回值结构
{
"Ok": {
# 实际的数据内容
"data": "...",
"code": 200,
"msg": "success"
}
}
# 错误的返回值结构
{
"Err": {
"error": "错误描述",
"code": 400
}
}
💡 正确的使用方式
所有返回数据的方法都应该按以下方式处理:
# ✅ 正确的处理方式
result = trader.http_request("https://api.example.com/data", "GET", None)
if "Ok" in result:
# 成功情况 - 提取实际数据
data = result["Ok"]
print(f"请求成功: {data}")
else:
# 错误情况 - 处理错误信息
error = result.get("Err", "未知错误")
print(f"请求失败: {error}")
📑 API分类索引
| 分类 | 功能模块 | 主要用途 |
|---|---|---|
| 一、核心交易功能 | 交易所api指令执行、标识符生成 | 执行交易所api、生成唯一ID |
| 二、日志管理 | 日志记录 | 记录系统和交易信息 |
| 三、缓存管理 | 缓存操作、缓存系统说明 | 保存和恢复策略状态 |
| 四、外部通信 | HTTP请求 | 获取外部数据 |
| 五、NB8 Web平台集成 | Web客户端管理等六个模块 | 与Web平台交互和数据展示 |
| 六、交易所API直接访问 | 账户查询、订单管理、止损订单、市场数据 | 直接访问交易所API |
| 七、数据结构说明 | K线数据结构、表格格式等 | 理解数据结构和格式 |
| 八、数据源订阅 | WebSocket、REST、定时器三种数据源订阅 | 接收实时和定时数据 |
一、核心交易功能
1.1 交易指令执行
▶️ publish(cmd)
功能: 向交易引擎发送并执行单个交易指令
参数:
cmd: 交易指令对象,符合ExecutionCommand结构
返回值:
- Result结构: 包含
Ok或Err字段的字典 - 成功时:
{"Ok": execution_result}- 指令执行结果 - 失败时:
{"Err": error_info}- 包含错误信息
示例:
# ✅ 正确的使用方式 - 查询USDT余额
cmd = {
"account_id": 0,
"method": "UsdtBalance",
"sync": True
}
result = trader.publish(cmd)
if "Ok" in result:
balance_data = result["Ok"]
print(f"USDT余额查询成功: {balance_data}")
# 提取具体余额信息
if isinstance(balance_data, dict) and "balance" in balance_data:
print(f"可用余额: {balance_data['balance']}")
else:
error = result.get("Err", "未知错误")
print(f"余额查询失败: {error}")
# 下单指令示例 - 带完整错误处理
place_order_cmd = {
"account_id": 0,
"method": "PlaceOrder",
"order": {
"symbol": "BTC_USDT",
"order_type": "Limit",
"side": "Buy",
"price": 50000.0,
"amount": 0.01,
"time_in_force": "GTC",
"cid": "client_order_123"
},
"params": {
"is_dual_side": False,
"market_order_mode": "Normal"
},
"sync": True
}
result = trader.publish(place_order_cmd)
if "Ok" in result:
order_result = result["Ok"]
print(f"下单成功: {order_result}")
# 提取订单ID
if isinstance(order_result, dict):
order_id = order_result.get("order_id")
client_order_id = order_result.get("client_order_id")
print(f"订单ID: {order_id}, 客户端ID: {client_order_id}")
else:
error = result.get("Err")
print(f"下单失败: {error}")
# 根据错误类型进行处理
if "insufficient balance" in str(error).lower():
print("余额不足,请检查账户余额")
elif "invalid symbol" in str(error).lower():
print("交易对不存在,请检查symbol参数")
# 另一个查询示例
balance_cmd = {"account_id": 0, "method": "UsdtBalance", "sync": True}
result = trader.publish(balance_cmd)
if "Ok" in result:
balance_data = result["Ok"]
print(f"USDT余额查询成功: {balance_data}")
else:
error = result.get("Err", "未知错误")
print(f"USDT余额查询失败: {error}")
▶️ batch_publish(cmds)
功能: 批量执行多个交易指令
参数:
cmds: 交易指令对象列表
返回值:
- Result结构列表: 每个元素都包含
Ok或Err字段的字典 - 成功时: 每个结果为
{"Ok": execution_result} - 失败时: 每个结果为
{"Err": error_info}
示例:
# ✅ 正确的使用方式 - 批量执行多个交易指令
cmds = [
{
"account_id": 0,
"method": "UsdtBalance",
"sync": True
},
{
"account_id": 0,
"method": "BalanceByCoin",
"asset": "BTC",
"sync": True
}
]
results = trader.batch_publish(cmds)
# 处理批量结果
for i, result in enumerate(results):
if "Ok" in result:
data = result["Ok"]
print(f"指令{i+1}执行成功: {data}")
else:
error = result.get("Err", "未知错误")
print(f"指令{i+1}执行失败: {error}")
# 统计执行结果
success_count = 0
failed_count = 0
cmd_names = ["USDT余额查询", "BTC余额查询"]
for i, result in enumerate(results):
cmd_name = cmd_names[i] if i < len(cmd_names) else f"指令{i+1}"
if "Ok" in result:
success_count += 1
data = result["Ok"]
print(f"✅ {cmd_name} 执行成功: {data}")
else:
failed_count += 1
error = result.get("Err", "未知错误")
print(f"❌ {cmd_name} 执行失败: {error}")
print(f"\n总计: 成功 {success_count} 个, 失败 {failed_count} 个")
1.2 唯一标识符生成
🆔 create_cid(exchange)
功能: 创建唯一的客户端标识符
参数:
exchange: 交易所对象,用于生成对应交易所的CID格式
返回值:
- Result结构: 包含
Ok或Err字段的字典 - 成功时:
{"Ok": "unique_cid_string"}- 唯一的标识符字符串 - 失败时:
{"Err": error_info}- 包含错误信息
用途:
- 用于标识和跟踪交易订单
- 在高频交易中避免重复订单
- 关联策略生成的订单与回报信息
- 作为交易请求的唯一引用
示例:
# 创建唯一的客户端标识符
from exchange import Exchange # 假设有交易所模块
# 需要提供exchange对象
exchange = Exchange.BINANCE # 或其他交易所
result = trader.create_cid(exchange)
if "Ok" in result:
cid = result["Ok"]
print(f"生成的订单ID: {cid}")
# 在下单时使用生成的CID
place_order_cmd = {
"account_id": 0,
"method": "PlaceOrder",
"order": {
"symbol": "BTC_USDT",
"order_type": "Limit",
"side": "Buy",
"price": 50000.0,
"amount": 0.01,
"cid": cid # 使用生成的CID
},
"sync": True
}
else:
error = result.get("Err", "未知错误")
print(f"生成订单ID失败: {error}")
1.3 进程管理
⛔ graceful_shutdown()
功能: 优雅退出交易进程,发送 SIGINT 信号(等效于 Ctrl+C)来安全停止程序
参数: 无
返回值: 无
用途:
- 在需要安全退出交易程序时使用
- 通过发送 kill -2 信号来优雅停止当前进程
- 允许程序有序地清理资源和保存状态
- 适用于程序自动退出、异常处理或远程控制停止
示例:
# 在满足特定条件时优雅退出程序
if should_shutdown:
trader.log("准备优雅退出程序", "INFO")
trader.graceful_shutdown()
# 在异常处理中使用
try:
# 执行交易逻辑
pass
except CriticalError as e:
trader.log(f"遇到严重错误,程序将退出: {e}", "ERROR", color="red")
trader.graceful_shutdown()
# 定时退出示例
import time
start_time = time.time()
while True:
# 交易逻辑...
if time.time() - start_time > 24 * 3600: # 24小时后退出
trader.log("程序运行24小时,准备退出", "INFO")
trader.graceful_shutdown()
break
注意事项:
- 此方法会立即发送退出信号,程序会在短时间内停止
- 确保在调用此方法前已完成重要数据的保存
- 建议在调用前使用日志记录退出原因
二、日志管理
2.1 日志记录
📝 log(msg, level=None, color=None, web=True)
功能: 记录日志到控制台和Web平台
参数:
msg: 日志消息内容level: 日志级别,可选值: "TRACE", "DEBUG", "INFO", "WARN", "ERROR",默认为"INFO"color: 日志颜色,可选,默认为Noneweb: 是否发送到Web平台,默认为True
返回值: 无
示例:
# 无色日志,默认发送到web平台
trader.log("交易开始")
# 红色日志,发送到web平台
trader.log("价格异常", "WARN","red")
# 不发送到web平台
trader.log("调试信息", "DEBUG", web=False)
# 发送错误日志到平台
trader.log("错误", "ERROR")
# 多行日志使用\n分割
trader.log("多行日志\n第二行\n第三行")
📝 tlog(tag, msg, color=None, interval=0, level=None, query=False)
功能: 限频日志记录,每N秒最多打印一次指定标签的日志
参数:
tag: 日志标识msg: 日志内容color: 日志颜色,可选,默认为Noneinterval: 日志最小间隔(秒),默认为0level: 日志级别,可选,默认为"INFO"query: 是否只查询不更新时间,默认为False
返回值:
bool- True表示已发送日志,False表示未发送日志
示例:
# 每60秒最多记录一次市场状态
trader.tlog("市场状态", "市场流动性正常", interval=60)
# 使用特定颜色记录警告信息
trader.tlog("价差警告", "价差超过阈值", color="yellow", level="WARN")
# 不更新时间戳
should_log = trader.tlog("检测点", "价格波动", interval=30, query=True)
if should_log:
# 执行额外操作
pass
# 多行日志记录
trader.tlog("深度分析",
f"BTC-USDT 深度\n买: [(50000, 1.2), (49900, 2.5)]\n卖: [(50100, 1.5), (50200, 3.0)]\n不平衡: 0.8750",
interval=5,
level="DEBUG",
color="royalblue")
📝 logt(message, time, color=None, level=None)
功能: 使用指定时间戳记录日志
参数:
message: 日志内容time: 日志时间戳(Unix时间戳)color: 日志颜色,可选,默认为Nonelevel: 日志级别,可选,默认为"INFO"
返回值: 无
示例:
import time
# 记录1小时前的事件
past_time = int(time.time()) - 3600
trader.logt("历史事件记录", past_time)
# 使用特定颜色和级别记录日志
event_time = int(time.time())
trader.logt("重要事件", event_time, color="blue", level="INFO")
2.2 内置颜色表
日志相关函数(如log、tlog和logt)支持通过color参数设置文本颜色。系统支持多种颜色,按功能和使用场景分类如下:
交易状态颜色
| 颜色名称 | 十六进制代码 | 交易应用场景 |
|---|---|---|
| green | #32CD32 | 交易成功、盈利交易、多头信号、买入信号 |
| red | #FF4500 | 交易失败、亏损交易、空头信号、卖出信号 |
| blue | #0000FF | 交易执行、订单提交、中性交易状态 |
| yellow | #FFFF00 | 交易警告、风险提示、接近止损点 |
提示: 系统还支持直接使用十六进制颜色代码,格式为
#RRGGBB,例如#FF7F50(珊瑚红)。
三、缓存管理
3.1 缓存操作
💾 cache_save(data)
功能: 将数据保存到缓存系统
参数:
data: 要保存的数据对象,可以是Python字典、列表或任何可序列化的数据结构
返回值: 无
示例:
# 保存策略状态到缓存
import time
# 准备要保存的策略状态(直接使用Python字典)
state = {
"positions": [
{"symbol": "BTC_USDT", "amount": 0.05, "entry_price": 49800, "unrealized_pnl": 120},
{"symbol": "ETH_USDT", "amount": 0.8, "entry_price": 3200, "unrealized_pnl": 80}
],
"orders": [
{"symbol": "BTC_USDT", "order_id": "123456", "status": "open"},
{"symbol": "ETH_USDT", "order_id": "789012", "status": "filled"}
],
"statistics": {
"total_profit": 1250.5,
"trade_count": 48,
"win_rate": 0.75
},
"last_update": int(time.time())
}
# 直接保存Python对象,无需手动转换为JSON
trader.cache_save(state)
# 也可以保存简单的数据类型
trader.cache_save({"last_price": 50000.0, "timestamp": int(time.time())})
# 保存列表数据
positions_list = [
{"symbol": "BTC_USDT", "side": "long", "amount": 0.1},
{"symbol": "ETH_USDT", "side": "short", "amount": 2.0}
]
trader.cache_save(positions_list)
📂 cache_load()
功能: 从缓存系统加载数据
参数: 无
返回值:
- 缓存数据对象,如果没有缓存则返回None。返回的数据保持原始的Python数据类型
示例:
# 从缓存加载策略状态
cached_data = trader.cache_load()
if cached_data is not None:
# 数据已自动转换为Python对象,无需手动解析JSON
print(f"加载的策略状态: {cached_data}")
# 直接使用加载的数据
if isinstance(cached_data, dict):
positions = cached_data.get("positions", [])
orders = cached_data.get("orders", [])
statistics = cached_data.get("statistics", {})
print(f"已恢复 {len(positions)} 个持仓, {len(orders)} 个订单")
print(f"总盈利: {statistics.get('total_profit', 0)}, 胜率: {statistics.get('win_rate', 0)*100}%")
# 如果保存的是列表
elif isinstance(cached_data, list):
print(f"加载的持仓列表: {len(cached_data)} 个持仓")
for pos in cached_data:
print(f" {pos['symbol']}: {pos['side']} {pos['amount']}")
else:
print("缓存为空,使用默认配置")
# 初始化默认状态
positions = []
orders = []
statistics = {"total_profit": 0, "trade_count": 0, "win_rate": 0}
# 安全的缓存加载模式
def load_strategy_state():
"""安全加载策略状态"""
try:
cached_state = trader.cache_load()
if cached_state and isinstance(cached_state, dict):
# 验证缓存数据的完整性
required_keys = ["positions", "orders", "statistics"]
if all(key in cached_state for key in required_keys):
return cached_state
else:
print("缓存数据不完整,使用默认状态")
# 返回默认状态
return {
"positions": [],
"orders": [],
"statistics": {"total_profit": 0, "trade_count": 0, "win_rate": 0},
"last_update": int(time.time())
}
except Exception as e:
print(f"加载缓存失败: {e}")
return {
"positions": [],
"orders": [],
"statistics": {"total_profit": 0, "trade_count": 0, "win_rate": 0},
"last_update": int(time.time())
}
# 使用安全加载函数
strategy_state = load_strategy_state()
3.2 缓存系统说明
Trader提供的缓存系统分为两部分,它们是互相独立的机制:
-
策略缓存 📊: 通过
cache_save和cache_load方法管理的缓存,主要用于保存策略状态,如交易记录、统计数据等。cache_save: 保存任意Python数据结构(字典、列表等)到缓存cache_load: 从缓存加载数据,返回原始的Python数据类型- 数据格式: 支持自动序列化/反序列化,无需手动转换JSON
-
WebClient缓存 🌐: WebClient有独立的缓存机制,不使用上述的
cache_save和cache_load方法。WebClient的配置数据和状态会自动缓存,不仅包括primary_balance和secondary_balance参数,还包括所有的统计数据,如总成交量、成交次数、失败次数、成功次数、胜率、利润等。这些数据在首次设置后会被自动缓存,后续启动时自动读取。另外,当通过相关API更新数据时(如update_trade_stats、update_total_balance等),缓存文件也会自动更新,确保数据持久性。
WebClient缓存文件位置:
- WebClient缓存文件以
{server_name}.json命名 - 默认存储在当前工作目录中
- 不同策略应使用不同的
server_name以避免缓存冲突 - 如果需要重置初始余额或统计数据,可以删除对应的缓存文件,或使用新的
server_name
缓存机制说明: WebClient的配置数据(包括初始余额、交易统计数据如总成交量、成交次数、失败次数、成功次数、胜率、利润等)会被缓存到本地文件系统。缓存文件以
{server_name}.json的格式命名,存储在当前工作目录中。当调用统计和余额相关的API(如update_trade_stats、update_total_balance等)更新数据时,缓存文件会实时自动更新,无需手动操作。这就是为什么server_name参数非常重要,它不仅用于Web界面标识,还用于定位缓存文件。不同的策略应使用不同的server_name以避免缓存冲突。WebClient缓存是自动管理的,与策略缓存(cache_save/cache_load)完全独立。
四、外部通信
4.1 HTTP请求
🌐 http_request(url, method, body, headers=None)
功能: 发送HTTP请求获取外部数据
参数:
url: 请求URLmethod: 请求方法,如"GET", "POST"等body: 请求体内容,可选。可以是Python字典、列表或任何可序列化的Python对象,会自动转换为JSON字符串headers: 请求头字典,可选
返回值:
- Result结构: 包含
Ok或Err字段的字典 - 成功时:
{"Ok": response_data}- 响应数据已转换为Python对象 - 失败时:
{"Err": error_info}- 包含错误信息
示例:
# ✅ GET请求示例 - 获取交易所合约信息
url = "https://api.gateio.ws/api/v4/futures/usdt/contracts/BTC_USDT"
result = trader.http_request(url, "GET", None, None)
if "Ok" in result:
contract_info = result["Ok"]
print(f"BTC_USDT合约信息: {contract_info}")
else:
error = result.get("Err", "未知错误")
print(f"获取合约信息失败: {error}")
# ✅ POST请求示例 - 使用JSONPlaceholder测试API
post_url = "https://jsonplaceholder.typicode.com/posts"
post_data = {
"title": "测试POST请求",
"body": "这是一个测试POST请求的内容",
"userId": 1
}
headers = {"Content-Type": "application/json"}
# 直接传递Python字典,Rust端会自动序列化为JSON
result = trader.http_request(post_url, "POST", post_data, headers)
if "Ok" in result:
post_response = result["Ok"]
print(f"POST请求结果: {post_response}")
print(f"创建的文章ID: {post_response.get('id')}")
else:
error = result.get("Err", "未知错误")
print(f"POST请求失败: {error}")
重要说明:
- 自动序列化:
body参数支持Python字典、列表等可序列化对象,会在Rust端自动转换为JSON字符串 - 无需手动序列化: 不再需要使用
json.dumps()手动序列化数据 - 向后兼容: 仍然支持传递字符串作为body参数
- Result处理: 返回值需要检查
Ok字段来确定是否成功 - 错误处理: 如果传入的Python对象无法序列化,会返回详细的错误信息
五、NB8 Web平台集成
Web可视化功能提供了与NB8 Web平台的集成,使策略能够实时展示交易数据、统计信息和图表。详细内容请参考Web可视化文档。
六、交易所API直接访问
6.1 订单管理
📊 get_orders(account_id, symbol, start, end, extra=None, generate=False)
功能: 获取指定时间范围内的订单列表
参数:
account_id: 整数,账户IDsymbol: 字符串,交易对名称start: 整数,开始时间戳(毫秒)end: 整数,结束时间戳(毫秒)extra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- 订单列表数据
📊 get_open_orders(account_id, symbol, extra=None, generate=False)
功能: 获取指定交易对的未成交订单列表
参数:
account_id: 整数,账户IDsymbol: 字符串,交易对名称extra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- 未成交订单列表数据
📊 get_all_open_orders(account_id, extra=None, generate=False)
功能: 获取账户下所有交易对的未成交订单
参数:
account_id: 整数,账户IDextra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- 所有未成交订单列表数据
📊 get_order_by_id(account_id, symbol, order_id=None, cid=None, extra=None, generate=False)
功能: 通过订单ID或客户端订单ID查询单个订单详情
参数:
account_id: 整数,账户IDsymbol: 字符串,交易对名称order_id: 字符串,可选,订单IDcid: 字符串,可选,客户端订单IDextra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
注意: order_id 和 cid 不能同时为空,必须提供其中一个
示例:
# 通过订单ID查询
result = trader.get_order_by_id(
account_id=0,
symbol="BTC_USDT",
order_id="123456789"
)
# 通过客户端订单ID查询
result = trader.get_order_by_id(
account_id=0,
symbol="BTC_USDT",
cid="my_order_001"
)
返回值:
- 订单详情数据
📊 place_order(account_id, order, params=None, extra=None, sync=True, generate=False)
功能: 下单
参数:
account_id: 整数,账户IDorder: 字典或对象,订单信息,包含以下字段:symbol: 字符串,交易对名称side: 字符串,交易方向,例如"Buy"或"Sell"order_type: 字符串,订单类型,例如"Limit"或"Market"price: 浮点数,限价单价格amount: 浮点数,交易数量pos_side: 字符串,可选,仓位方向,例如"Long"或"Short"。在双向持仓模式下必须指定,单向持仓模式下可选cid: 字符串,可选,客户端订单IDtime_in_force: 字符串,可选,订单有效期,例如"GTC"(Good Till Cancel)
params: 字典或对象,可选,下单参数,包含以下可选字段:is_dual_side: 布尔值,是否使用双向持仓模式market_order_mode: 字符串,市价单模式,例如"Normal"或"Amount"
extra: 字典,可选,额外参数sync: 布尔值,可选,默认为True,是否同步等待订单结果generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
pos_side参数说明:
"Long": 多仓方向"Short": 空仓方向- 在双向持仓模式(
is_dual_side=True)下,此参数为必填 - 在单向持仓模式下,此参数可选,但建议填写以提高订单明确性
- 当需要只减仓(reduce_only)时,必须传递此参数
返回值:
- Result结构: 包含
Ok或Err字段的字典 - 成功时:
{"Ok": order_result}- 下单成功结果 - 失败时:
{"Err": error_info}- 包含错误信息
示例:
# ✅ 完整的下单示例 - 限价单
order = {
"cid": traderv2.create_cid(exchange),
"symbol": "BTC_USDT",
"order_type": "Limit",
"side": "Buy",
"pos_side": "Long", # 指定仓位方向
"price": 50000.0,
"amount": 0.01,
"time_in_force": "GTC"
}
params = {
"is_dual_side": True, # 双向持仓模式
}
result = trader.place_order(0, order, params, sync=True)
if "Ok" in result:
order_result = result["Ok"]
print(f"下单成功: {order_result}")
else:
error = result.get("Err", "未知错误")
print(f"下单失败: {error}")
# ✅ 市价单示例
market_order = {
"cid": traderv2.create_cid(exchange),
"symbol": "ETH_USDT",
"order_type": "Market",
"side": "Sell",
"pos_side": "Short", # 空仓
"amount": 0.5,
"time_in_force": "IOC"
}
params = {
"is_dual_side": True,
"market_order_mode": "Normal"
}
result = trader.place_order(0, market_order, params, sync=True)
# ✅ 单向持仓模式示例(pos_side可选但建议添加)
single_side_order = {
"cid": traderv2.create_cid(exchange),
"symbol": "SOL_USDT",
"order_type": "Limit",
"side": "Buy",
"pos_side": "Long", # 虽然单向持仓可选,但建议添加
"price": 100.0,
"amount": 1.0,
"time_in_force": "PostOnly"
}
params = {
"is_dual_side": False # 单向持仓模式
}
result = trader.place_order(0, single_side_order, params, sync=True)
# ✅ 只减仓订单示例(pos_side必须)
reduce_only_order = {
"cid": traderv2.create_cid(exchange),
"symbol": "BTC_USDT",
"order_type": "Limit",
"side": "Sell",
"pos_side": "Long", # 减仓时必须指定pos_side
"price": 51000.0,
"amount": 0.01,
"time_in_force": "GTC",
"reduce_only": True # 只减仓标志
}
result = trader.place_order(0, reduce_only_order, sync=True)
# ⚠️ 错误示例 - 双向持仓模式下缺少pos_side
wrong_order = {
"symbol": "BTC_USDT",
"order_type": "Limit",
"side": "Buy",
# "pos_side": "Long", # 缺少此字段在双向持仓下会报错
"price": 50000.0,
"amount": 0.01
}
params = {"is_dual_side": True}
result = trader.place_order(0, wrong_order, params)
# 这会返回错误: {"Err": "pos_side is required in dual side mode"}
# ✅ 批量下单示例(每个订单都需要pos_side)
orders = []
for i in range(3):
order = {
"cid": traderv2.create_cid(exchange),
"symbol": "BTC_USDT",
"order_type": "Limit",
"side": "Buy",
"pos_side": "Long", # 每个订单都要指定
"price": 50000.0 - i * 100, # 不同价格
"amount": 0.01,
"time_in_force": "GTC"
}
orders.append(order)
params = {"is_dual_side": True}
result = trader.batch_place_order(0, orders, params, sync=True)
pos_side使用场景总结:
| 场景 | is_dual_side | pos_side是否必需 | 说明 |
|---|---|---|---|
| 双向持仓开多仓 | True | ✅ 必需 "Long" | 在多空并存模式下必须明确方向 |
| 双向持仓开空仓 | True | ✅ 必需 "Short" | 在多空并存模式下必须明确方向 |
| 单向持仓开仓 | False | 📋 建议添加 | 提高订单明确性,避免歧义 |
| 只减仓订单 | 任意 | ✅ 必需 | 减仓时必须明确要平哪个方向的仓位 |
| 平仓订单 | 任意 | ✅ 必需 | 平仓时必须指定要平的仓位方向 |
📊 batch_place_order(account_id, orders, params=None, extra=None, sync=True, generate=False)
功能: 批量下单
参数:
account_id: 整数,账户IDorders: 列表,多个订单信息,每个订单包含与place_order中order相同的字段params: 字典或对象,可选,下单参数,与place_order中params相同extra: 字典,可选,额外参数sync: 布尔值,可选,默认为True,是否同步等待订单结果generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- 批量下单结果数据列表
📊 amend_order(account_id, order, extra=None, sync=True, generate=False)
功能: 修改订单
参数:
account_id: 整数,账户IDorder: 字典或对象,订单修改信息,包含以下字段:symbol: 字符串,交易对名称order_id: 字符串,订单IDprice: 浮点数,可选,新价格amount: 浮点数,可选,新数量cid: 字符串,可选,客户端订单ID(用于替代order_id)
extra: 字典,可选,额外参数sync: 布尔值,可选,默认为True,是否同步等待修改结果generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- 订单修改结果数据
📊 cancel_order(account_id, symbol, order_id=None, cid=None, extra=None, sync=True, generate=False)
功能: 取消订单
参数:
account_id: 整数,账户IDsymbol: 字符串,交易对名称order_id: 字符串,可选,订单IDcid: 字符串,可选,客户端订单IDextra: 字典,可选,额外参数sync: 布尔值,可选,默认为True,是否同步等待取消结果generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
注意: order_id 和 cid 不能同时为空,必须提供其中一个
示例:
# 通过订单ID取消
result = trader.cancel_order(
account_id=0,
symbol="BTC_USDT",
order_id="123456789"
)
# 通过客户端订单ID取消
result = trader.cancel_order(
account_id=0,
symbol="BTC_USDT",
cid="my_order_001"
)
# 异步取消订单
result = trader.cancel_order(
account_id=0,
symbol="BTC_USDT",
order_id="123456789",
sync=False
)
返回值:
- 取消订单结果数据
📊 batch_cancel_order(account_id, symbol, extra=None, sync=True, generate=False)
功能: 批量取消指定交易对的所有未成交订单
参数:
account_id: 整数,账户IDsymbol: 字符串,交易对名称extra: 字典,可选,额外参数sync: 布尔值,可选,默认为True,是否同步等待取消结果generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- 批量取消订单结果数据
📊 batch_cancel_order_by_id(account_id, symbol=None, order_ids=None, client_order_ids=None, extra=None, sync=True, generate=False)
功能: 批量取消指定ID的订单
参数:
account_id: 整数,账户IDsymbol: 字符串,可选,交易对名称order_ids: 字符串列表,可选,订单ID列表client_order_ids: 字符串列表,可选,客户端订单ID列表extra: 字典,可选,额外参数sync: 布尔值,可选,默认为True,是否同步等待取消结果generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- 批量取消订单结果数据
🔧 CloseTrigger结构体说明
止损订单使用CloseTrigger结构体定义触发条件和执行行为,提供了丰富的配置选项:
CloseTrigger结构体字段:
id: 可选,订单ID(下单时不填,系统自动生成)cid: 可选,客户自定义订单ID,便于订单跟踪trigger_price: 必填,触发价格配置(TriggerPrice枚举)trigger_action: 必填,触发后的执行行为(TriggerAction结构)
TriggerPrice(触发价格类型):
{"MarkPrice": 价格}: 标记价格触发,最稳定,推荐使用{"ContractPrice": 价格}: 最新成交价触发,响应最快{"IndexPrice": 价格}: 指数价格触发,最平滑
TriggerAction(触发行为):
quantity: 可选,触发数量,不填则全平持仓trailing_gap: 可选,回调幅度百分比,追踪止盈必填execute_type: 执行类型"Market": 市价执行(默认){"Limit": 价格}: 限价执行
typ: 订单类型"Normal": 一般止损/止盈(默认)"Trailing": 追踪止盈
特殊功能:
- 追踪止盈: 当
typ为"Trailing"时,价格朝有利方向移动时止盈线会跟踪调整,直到价格回调达到trailing_gap幅度时触发 - 部分平仓: 通过
quantity字段可以实现部分止盈/止损,其余仓位继续持有 - 多种触发价格: 支持标记价格、成交价格、指数价格三种触发方式,适应不同交易策略
📊 StopOrder数据结构
以下是止损订单相关的完整数据结构定义:
StopOrder(完整止损订单信息):
{
"symbol": "BTC_USDT", # 交易对符号
"take_profit": { # 止盈订单配置(可选)
"id": "123456789", # 止盈订单ID(系统生成)
"cid": "client_tp_001", # 客户自定义ID(可选)
"trigger_price": {"MarkPrice": 52000.0}, # 触发价格
"trigger_action": { # 触发行为
"quantity": 0.5, # 触发数量(可选,不填则全平)
"trailing_gap": 2.0, # 回调幅度(%)(可选,追踪止盈时必填)
"execute_type": "Market", # 执行类型:"Market"或{"Limit": 价格}
"typ": "Normal" # 订单类型:"Normal"或"Trailing"
}
},
"stop_loss": { # 止损订单配置(可选)
"id": "987654321", # 止损订单ID(系统生成)
"cid": "client_sl_001", # 客户自定义ID(可选)
"trigger_price": {"ContractPrice": 48000.0}, # 触发价格
"trigger_action": { # 触发行为
"quantity": null, # 全平仓位
"trailing_gap": null, # 不使用追踪
"execute_type": "Market", # 市价执行
"typ": "Normal" # 一般止损
}
},
"status": "Open", # 订单状态
"c_time": 1640995200000, # 创建时间(毫秒时间戳)
"u_time": 1640998800000 # 更新时间(毫秒时间戳)
}
StopOrderStatus(止损订单状态枚举):
"Open": 未触发(默认状态)"Executed": 已触发执行"Canceled": 已取消"Failed": 触发失败
StopOrderIdRsp(止损订单操作返回结果):
{
"take_profit": { # 止盈订单结果(可选)
"Ok": "123456789" # 成功时返回订单ID
# 或
"Err": "错误信息" # 失败时返回错误信息
},
"stop_loss": { # 止损订单结果(可选)
"Ok": "987654321" # 成功时返回订单ID
# 或
"Err": "错误信息" # 失败时返回错误信息
}
}
ExecuteType(执行类型枚举):
"Market": 市价执行(默认){"Limit": 价格}: 限价执行,如{"Limit": 50000.0}
ActionType(订单类型枚举):
"Normal": 一般止损/止盈(默认)"Trailing": 追踪止盈
TriggerPrice(触发价格类型):
{"MarkPrice": 价格}: 标记价格触发,如{"MarkPrice": 50000.0}{"ContractPrice": 价格}: 最新成交价触发,如{"ContractPrice": 50000.0}{"IndexPrice": 价格}: 指数价格触发,如{"IndexPrice": 50000.0}
📊 post_stop_order(account_id, symbol, pos_side, take_profit=None, stop_loss=None, is_dual_side=False, extra=None, generate=False)
功能: 下止损订单
参数:
account_id: 整数,必需,账户IDsymbol: 字符串,必需,交易对名称pos_side: 字符串,必需,仓位方向,例如"Long"或"Short"take_profit: 字典,可选,止盈订单配置(CloseTrigger结构),包含以下字段:cid: 字符串,可选,客户自定义订单IDtrigger_price: 字典,触发价格配置,支持以下类型:{"MarkPrice": 价格}: 标记价格触发{"ContractPrice": 价格}: 最新成交价/市场价触发{"IndexPrice": 价格}: 指数价格触发
trigger_action: 字典,触发行为配置,包含:quantity: 浮点数,可选,触发数量,不填则全平trailing_gap: 浮点数,可选,回调幅度(%),追踪止盈时必填execute_type: 字符串或字典,执行类型:"Market": 市价平仓{"Limit": 价格}: 限价平仓
typ: 字符串,可选,类型:"Normal"(一般)或"Trailing"(追踪),默认"Normal"
stop_loss: 字典,可选,止损订单配置(CloseTrigger结构),字段同take_profitis_dual_side: 布尔值,可选,是否使用双向持仓模式,默认为Falseextra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
重要:
- 此方法仅支持同步执行,无
sync参数 - 必需参数:
account_id、symbol、pos_side为必需参数 - 配置要求:
take_profit和stop_loss至少需要提供其中一个,否则无法创建止损订单 - 仓位方向:
pos_side必须正确设置,与要保护的仓位方向一致
返回值:
- Result结构: 包含
Ok或Err字段的字典 - 示例:
{
"Ok": {
"stop_loss": {
"Ok": "1326362257269604352"
},
"take_profit": {
"Ok": "1326362256586391552"
}
}
}
示例:
# ✅ 基础止损订单 - 标记价格触发
result = trader.post_stop_order(
account_id=0,
symbol="BTC_USDT",
pos_side="Long",
stop_loss={
"trigger_price": {"MarkPrice": 49000.0}, # 标记价格触发
"trigger_action": {
"execute_type": "Market", # 市价平仓
"typ": "Normal" # 一般止损
}
},
is_dual_side=True
)
if "Ok" in result:
order_result = result["Ok"]
print(f"止损订单下达成功: {order_result}")
else:
error = result.get("Err", "未知错误")
print(f"止损订单失败: {error}")
# ✅ 止盈订单 - 合约价格触发,指定数量
result = trader.post_stop_order(
account_id=0,
symbol="ETH_USDT",
pos_side="Long",
take_profit={
"cid": "my_take_profit_001", # 自定义订单ID
"trigger_price": {"ContractPrice": 3500.0}, # 合约价格触发
"trigger_action": {
"quantity": 0.5, # 止盈数量
"execute_type": {"Limit": 3510.0}, # 限价3510平仓
"typ": "Normal"
}
},
is_dual_side=True
)
# ✅ 追踪止盈订单
result = trader.post_stop_order(
account_id=0,
symbol="SOL_USDT",
pos_side="Long",
take_profit={
"trigger_price": {"MarkPrice": 100.0}, # 标记价格100触发
"trigger_action": {
"trailing_gap": 2.0, # 回调2%时执行
"execute_type": "Market", # 市价平仓
"typ": "Trailing" # 追踪止盈
}
},
is_dual_side=True
)
# ✅ 同时设置止损和止盈
result = trader.post_stop_order(
account_id=0,
symbol="BTC_USDT",
pos_side="Short",
take_profit={
"trigger_price": {"MarkPrice": 48000.0}, # 止盈触发价
"trigger_action": {
"quantity": 0.01, # 部分止盈
"execute_type": "Market",
"typ": "Normal"
}
},
stop_loss={
"trigger_price": {"ContractPrice": 52000.0}, # 止损触发价
"trigger_action": {
"execute_type": "Market", # 市价止损
"typ": "Normal"
}
},
is_dual_side=True
)
# ✅ 指数价格触发的止损订单
result = trader.post_stop_order(
account_id=0,
symbol="BTC_USDT",
pos_side="Long",
stop_loss={
"trigger_price": {"IndexPrice": 49500.0}, # 指数价格触发
"trigger_action": {
"quantity": 0.02, # 指定止损数量
"execute_type": {"Limit": 49000.0}, # 限价49000止损
"typ": "Normal"
}
},
is_dual_side=True
)
# 💡 完整的止损订单管理示例(基于实际策略代码)
def complete_stop_order_lifecycle():
"""完整的止损订单生命周期管理示例"""
account_id = 0
symbol = "XRP_USDT"
print("=== 止损订单完整生命周期示例 ===")
# 1. 检查持仓模式
is_dual_side_result = trader.is_dual_side(account_id)
if "Ok" not in is_dual_side_result:
print("❌ 获取持仓模式失败")
return
is_dual_side = is_dual_side_result['Ok']
print(f"📋 当前持仓模式: {'双向持仓' if is_dual_side else '单向持仓'}")
# 2. 创建止盈止损订单
print("\n📥 创建止盈止损订单...")
result = trader.post_stop_order(
account_id=account_id,
symbol=symbol,
pos_side="Long", # 针对多头仓位设置止盈止损
take_profit={
"cid": "tp_xrp_001", # 客户端止盈订单ID
"trigger_price": {"MarkPrice": 3.0}, # 止盈触发价
"trigger_action": {
"quantity": 2, # 部分止盈数量
"execute_type": "Market", # 市价执行
"typ": "Normal" # 一般止盈
}
},
stop_loss={
"cid": "sl_xrp_001", # 客户端止损订单ID
"trigger_price": {"ContractPrice": 2.0}, # 止损触发价
"trigger_action": {
"execute_type": "Market", # 市价止损
"typ": "Normal" # 一般止损
}
},
is_dual_side=is_dual_side
)
if "Ok" not in result:
print(f"❌ 下止盈止损订单失败: {result.get('Err')}")
return
order_result = result["Ok"]
print(f"✅ 止盈止损订单下达成功:")
# 提取订单ID
take_profit_id = None
stop_loss_id = None
if order_result.get('take_profit') and "Ok" in order_result['take_profit']:
take_profit_id = order_result['take_profit']['Ok']
print(f" 🎯 止盈订单ID: {take_profit_id}")
if order_result.get('stop_loss') and "Ok" in order_result['stop_loss']:
stop_loss_id = order_result['stop_loss']['Ok']
print(f" 🛡️ 止损订单ID: {stop_loss_id}")
# 3. 查询并验证订单
print("\n📊 查询止损订单...")
result = trader.get_stop_orders(account_id, symbol)
if "Ok" in result:
orders = result["Ok"]
open_orders = [order for order in orders if order['status'] == 'Open']
print(f" 📋 当前开放的止损订单数量: {len(open_orders)}")
# 找到我们刚才下的订单
our_orders = []
for order in open_orders:
order_found = False
if order.get('take_profit') and order['take_profit']['id'] == take_profit_id:
order_found = True
if order.get('stop_loss') and order['stop_loss']['id'] == stop_loss_id:
order_found = True
if order_found:
our_orders.append(order)
print(f" ✅ 找到我们的订单: {len(our_orders)} 个")
# 显示订单详情
for i, order in enumerate(our_orders):
print(f" 订单 {i+1}: 状态={order['status']}, 创建时间={order['c_time']}")
if order.get('take_profit'):
tp = order['take_profit']
print(f" 止盈: 触发价={tp['trigger_price']}, 数量={tp['trigger_action'].get('quantity')}")
if order.get('stop_loss'):
sl = order['stop_loss']
print(f" 止损: 触发价={sl['trigger_price']}")
else:
print(f"❌ 查询订单失败: {result.get('Err')}")
# 4. 修改止盈订单(演示修改功能)
if take_profit_id:
print(f"\n🔧 修改止盈订单 {take_profit_id}...")
result = trader.amend_stop_order(
account_id=account_id,
symbol=symbol,
id=take_profit_id,
take_profit={
"trigger_price": {"MarkPrice": 3.1}, # 调整触发价格
"trigger_action": {
"quantity": 3, # 调整数量
"execute_type": "Market",
"typ": "Normal"
}
}
)
if "Ok" in result:
print(f" ✅ 止盈订单修改成功: {result['Ok']}")
# 验证修改结果
verify_result = trader.get_stop_orders(account_id, symbol, id=take_profit_id)
if "Ok" in verify_result and verify_result["Ok"]:
updated_order = verify_result["Ok"][0]
if updated_order.get('take_profit'):
tp = updated_order['take_profit']
new_trigger_price = tp['trigger_price']
new_quantity = tp['trigger_action'].get('quantity')
print(f" 📊 修改验证 - 新触发价: {new_trigger_price}, 新数量: {new_quantity}")
else:
print(f" ❌ 止盈订单修改失败: {result.get('Err')}")
# 5. 演示条件取消(这里模拟条件判断)
print("\n🧹 清理订单...")
should_cleanup = True # 在实际策略中,这里会有具体的条件判断
if should_cleanup:
# 取消止盈订单
if take_profit_id:
result = trader.cancel_stop_order(account_id, symbol, id=take_profit_id)
if "Ok" in result:
print(f" ✅ 止盈订单 {take_profit_id} 取消成功")
else:
print(f" ❌ 止盈订单取消失败: {result.get('Err')}")
# 取消止损订单
if stop_loss_id:
result = trader.cancel_stop_order(account_id, symbol, id=stop_loss_id)
if "Ok" in result:
print(f" ✅ 止损订单 {stop_loss_id} 取消成功")
else:
print(f" ❌ 止损订单取消失败: {result.get('Err')}")
# 6. 最终验证
print("\n🔍 最终验证...")
result = trader.get_stop_orders(account_id, symbol)
if "Ok" in result:
orders = result["Ok"]
open_orders = [order for order in orders if order['status'] == 'Open']
print(f" 📋 剩余开放订单数量: {len(open_orders)}")
print("\n✅ 止损订单完整生命周期示例完成!")
# 🚀 实际策略中的止损订单管理示例
def strategy_stop_order_management():
"""基于实际策略代码的止损订单管理"""
account_id = 0
symbol = "XRP_USDT"
print("=== 策略级止损订单管理 ===")
# 获取开放的止损订单(参考strategy.py代码)
result = trader.get_stop_orders(account_id, symbol)
if "Ok" in result:
orders = result["Ok"]
open_stop_orders = [order for order in orders if order['status'] == 'Open']
print(f"📋 当前开放的止损订单: {len(open_stop_orders)} 个")
# 撤销所有开放的止损订单(实际策略场景)
for order in open_stop_orders:
# 获取止损订单ID(优先取止损,如果没有则取止盈)
order_id = None
order_type = None
if order.get('stop_loss'):
order_id = order['stop_loss']['id']
order_type = "止损"
elif order.get('take_profit'):
order_id = order['take_profit']['id']
order_type = "止盈"
if order_id:
res = trader.cancel_stop_order(account_id, symbol, id=order_id)
if 'Ok' in res:
print(f" ✅ {order_type}订单 {order_id} 撤单成功")
else:
print(f" ❌ {order_type}订单 {order_id} 撤单失败: {res.get('Err')}")
else:
print(f"❌ 获取止损订单失败: {result.get('Err')}")
# 设置新的风控订单
print("\n📥 设置新的风控订单...")
result = trader.post_stop_order(
account_id=account_id,
symbol=symbol,
pos_side="Long",
take_profit={
"trigger_price": {"MarkPrice": 2.8},
"trigger_action": {
"execute_type": "Market",
"typ": "Normal"
}
},
stop_loss={
"trigger_price": {"MarkPrice": 2.2},
"trigger_action": {
"execute_type": "Market",
"typ": "Normal"
}
},
is_dual_side=True
)
if "Ok" in result:
print(f"✅ 新的风控订单设置成功: {result['Ok']}")
else:
print(f"❌ 风控订单设置失败: {result.get('Err')}")
# 调用示例函数
complete_stop_order_lifecycle()
print("\n" + "="*50 + "\n")
strategy_stop_order_management()
📊 get_stop_orders(account_id, symbol, limit=None, id=None, cid=None, extra=None, generate=False)
功能: 获取止损订单列表
参数:
account_id: 整数,必需,账户IDsymbol: 字符串,必需,交易对名称limit: 整数,可选,返回订单数量限制id: 字符串,可选,特定订单ID,用于查询指定订单cid: 字符串,可选,客户端订单ID,用于查询指定订单extra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
重要:
- 此方法仅支持同步执行,无
sync参数 - 必需参数:
account_id、symbol两个参数为必需 - 查询模式: 如果不提供
id或cid,则返回所有止损订单;如果提供其中一个,则返回指定订单
返回值:
- Result结构: 包含
Ok或Err字段的字典 - 示例:
{
"Ok": [
{
"c_time": 1751955325429,
"status": "Open",
"stop_loss": {
"cid": "1326362257261215744",
"id": "1326362257269604352",
"trigger_action": {
"execute_type": "Market",
"quantity": null,
"trailing_gap": null,
"typ": "Normal"
},
"trigger_price": {
"ContractPrice": 2.0
}
},
"symbol": "XRP_USDT",
"take_profit": null,
"u_time": 1751955325429
},
{
"c_time": 1751954797851,
"status": "Canceled",
"stop_loss": {
"cid": "1326360044434505728",
"id": "1326360044447088640",
"trigger_action": {
"execute_type": "Market",
"quantity": null,
"trailing_gap": null,
"typ": "Normal"
},
"trigger_price": {
"ContractPrice": 2.0
}
},
"symbol": "XRP_USDT",
"take_profit": null,
"u_time": 1751955183197
},
]
}
示例:
# 获取BTC_USDT的所有止损订单
result = trader.get_stop_orders(
account_id=0,
symbol="BTC_USDT"
)
if "Ok" in result:
stop_orders = result["Ok"]
print(f"止损订单数量: {len(stop_orders)}")
for order in stop_orders:
symbol = order.get('symbol')
status = order.get('status')
create_time = order.get('c_time')
# 检查止盈订单
if order.get('take_profit'):
tp = order['take_profit']
tp_id = tp.get('id')
tp_price = tp['trigger_price']
print(f"止盈订单 - ID: {tp_id}, 触发价: {tp_price}, 状态: {status}")
# 检查止损订单
if order.get('stop_loss'):
sl = order['stop_loss']
sl_id = sl.get('id')
sl_price = sl['trigger_price']
print(f"止损订单 - ID: {sl_id}, 触发价: {sl_price}, 状态: {status}")
else:
error = result.get("Err", "未知错误")
print(f"获取止损订单失败: {error}")
# 通过订单ID获取特定订单
result = trader.get_stop_orders(
account_id=0,
symbol="BTC_USDT",
id="1326362257269604352" # 具体的订单ID
)
if "Ok" in result:
orders = result["Ok"]
if orders:
print(f"找到订单: {orders[0]}")
else:
print("订单不存在")
# 通过客户端订单ID获取特定订单
result = trader.get_stop_orders(
account_id=0,
symbol="BTC_USDT",
cid="client_stop_order_001" # 客户端自定义ID
)
# 限制返回数量(获取最近10个止损订单)
result = trader.get_stop_orders(
account_id=0,
symbol="BTC_USDT",
limit=10
)
if "Ok" in result:
recent_orders = result["Ok"]
print(f"最近{len(recent_orders)}个止损订单")
# 分析订单状态分布
status_count = {}
for order in recent_orders:
status = order.get('status', 'Unknown')
status_count[status] = status_count.get(status, 0) + 1
print(f"订单状态分布: {status_count}")
📊 amend_stop_order(account_id, symbol, id=None, cid=None, take_profit=None, stop_loss=None, extra=None, generate=False)
功能: 修改止损订单
参数:
account_id: 整数,必需,账户IDsymbol: 字符串,必需,交易对名称id: 字符串,可选,要修改的订单IDcid: 字符串,可选,要修改的客户端订单IDtake_profit: 字典,可选,新的止盈订单配置(CloseTrigger结构),包含以下字段:cid: 字符串,可选,客户自定义订单IDtrigger_price: 字典,触发价格配置,支持以下类型:{"MarkPrice": 价格}: 标记价格触发{"ContractPrice": 价格}: 最新成交价/市场价触发{"IndexPrice": 价格}: 指数价格触发
trigger_action: 字典,触发行为配置,包含:quantity: 浮点数,可选,触发数量,不填则全平trailing_gap: 浮点数,可选,回调幅度(%),追踪止盈时必填execute_type: 字符串或字典,执行类型:"Market": 市价平仓{"Limit": 价格}: 限价平仓
typ: 字符串,可选,类型:"Normal"(一般)或"Trailing"(追踪),默认"Normal"
stop_loss: 字典,可选,新的止损订单配置(CloseTrigger结构),字段同take_profitextra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
重要:
- 此方法仅支持同步执行,无
sync参数 - 必需参数:
account_id、symbol为必需参数 - 订单标识:
id和cid不能同时为空,必须提供其中一个用于标识要修改的订单 - 修改内容:
take_profit和stop_loss至少需要提供其中一个要修改的配置,否则修改操作无意义
返回值:
- Result结构: 包含
Ok或Err字段的字典 - 示例:
{
"Ok": {
"stop_loss": {
"Ok": "1326362257269604352"
},
"take_profit": {
"Ok": "1326362256586391552"
}
}
}
示例:
# 首先获取现有的止损订单
result = trader.get_stop_orders(0, "BTC_USDT")
if "Ok" in result and result["Ok"]:
stop_orders = result["Ok"]
# 找到一个开放状态的订单
open_orders = [order for order in stop_orders if order['status'] == 'Open']
if open_orders:
# 获取第一个开放订单的止损部分ID
stop_loss_id = None
if open_orders[0].get('stop_loss'):
stop_loss_id = open_orders[0]['stop_loss']['id']
if stop_loss_id:
# 修改止损订单的触发价格 - 通过订单ID
result = trader.amend_stop_order(
account_id=0,
symbol="BTC_USDT",
id=stop_loss_id, # 使用id参数
stop_loss={
"trigger_price": {"MarkPrice": 48500.0}, # 修改为标记价格触发
"trigger_action": {
"execute_type": "Market", # 市价执行
"typ": "Normal"
}
}
)
if "Ok" in result:
amend_result = result["Ok"]
print(f"止损订单修改成功: {amend_result}")
else:
error = result.get("Err", "未知错误")
print(f"止损订单修改失败: {error}")
# 通过客户端订单ID修改止盈订单为追踪止盈
result = trader.amend_stop_order(
account_id=0,
symbol="ETH_USDT",
cid="client_tp_001", # 使用客户端订单ID
take_profit={
"trigger_price": {"ContractPrice": 3600.0}, # 合约价格触发
"trigger_action": {
"quantity": 0.8, # 数量
"trailing_gap": 1.5, # 回调1.5%
"execute_type": "Market",
"typ": "Trailing" # 改为追踪止盈
}
}
)
if "Ok" in result:
print(f"追踪止盈修改成功: {result['Ok']}")
else:
print(f"追踪止盈修改失败: {result.get('Err', '未知错误')}")
# 同时修改止盈和止损
result = trader.amend_stop_order(
account_id=0,
symbol="SOL_USDT",
id="1326362257269604352", # 止损订单ID
take_profit={
"trigger_price": {"MarkPrice": 120.0},
"trigger_action": {
"quantity": 1.0,
"execute_type": {"Limit": 119.5}, # 限价执行
"typ": "Normal"
}
},
stop_loss={
"trigger_price": {"MarkPrice": 95.0},
"trigger_action": {
"execute_type": "Market",
"typ": "Normal"
}
}
)
if "Ok" in result:
amend_result = result["Ok"]
# 检查修改结果
if amend_result.get('take_profit') and "Ok" in amend_result['take_profit']:
print(f"止盈修改成功,订单ID: {amend_result['take_profit']['Ok']}")
if amend_result.get('stop_loss') and "Ok" in amend_result['stop_loss']:
print(f"止损修改成功,订单ID: {amend_result['stop_loss']['Ok']}")
else:
print(f"修改失败: {result.get('Err', '未知错误')}")
# 完整的修改流程示例
def modify_stop_order_example():
symbol = "BTC_USDT"
account_id = 0
# 1. 查询现有订单
result = trader.get_stop_orders(account_id, symbol)
if "Ok" not in result:
print("获取止损订单失败")
return
orders = result["Ok"]
open_orders = [order for order in orders if order['status'] == 'Open']
if not open_orders:
print("没有找到开放状态的止损订单")
return
# 2. 选择要修改的订单
target_order = open_orders[0]
# 3. 根据订单类型进行修改
if target_order.get('take_profit'):
tp_id = target_order['take_profit']['id']
print(f"修改止盈订单: {tp_id}")
# 修改止盈触发价格
result = trader.amend_stop_order(
account_id=account_id,
symbol=symbol,
id=tp_id,
take_profit={
"trigger_price": {"MarkPrice": 52000.0}, # 新的触发价格
"trigger_action": {
"quantity": 0.05, # 新的数量
"execute_type": "Market",
"typ": "Normal"
}
}
)
if "Ok" in result:
print(f"止盈修改成功: {result['Ok']}")
else:
print(f"止盈修改失败: {result.get('Err')}")
if target_order.get('stop_loss'):
sl_id = target_order['stop_loss']['id']
print(f"修改止损订单: {sl_id}")
# 修改止损触发价格
result = trader.amend_stop_order(
account_id=account_id,
symbol=symbol,
id=sl_id,
stop_loss={
"trigger_price": {"ContractPrice": 47000.0}, # 新的触发价格
"trigger_action": {
"execute_type": "Market",
"typ": "Normal"
}
}
)
if "Ok" in result:
print(f"止损修改成功: {result['Ok']}")
else:
print(f"止损修改失败: {result.get('Err')}")
# 调用示例函数
modify_stop_order_example()
📊 cancel_stop_order(account_id, symbol, id=None, cid=None, extra=None, generate=False)
功能: 取消止损订单
参数:
account_id: 整数,必需,账户IDsymbol: 字符串,必需,交易对名称id: 字符串,可选,订单IDcid: 字符串,可选,客户端订单IDextra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
重要:
- 此方法仅支持同步执行,无
sync参数 - 必需参数:
account_id、symbol两个参数为必需 - 标识参数:
id和cid不能同时为空,必须提供其中一个用于标识要取消的订单
返回值:
- 取消止损订单结果数据
示例:
# 通过订单ID取消止损订单
result = trader.cancel_stop_order(
account_id=0,
symbol="BTC_USDT",
id="1326362257269604352" # 具体的止损订单ID
)
if "Ok" in result:
cancel_result = result["Ok"]
print(f"止损订单取消成功: {cancel_result}")
else:
error = result.get("Err", "未知错误")
print(f"止损订单取消失败: {error}")
# 通过客户端订单ID取消
result = trader.cancel_stop_order(
account_id=0,
symbol="BTC_USDT",
cid="my_stop_order_001" # 客户端自定义ID
)
if "Ok" in result:
print(f"止损订单取消成功: {result['Ok']}")
else:
print(f"止损订单取消失败: {result.get('Err', '未知错误')}")
# 批量取消止损订单示例
def cancel_all_open_stop_orders(account_id, symbol):
"""取消指定交易对的所有开放止损订单"""
# 1. 获取所有开放状态的止损订单
result = trader.get_stop_orders(account_id, symbol)
if "Ok" not in result:
print(f"获取{symbol}止损订单失败: {result.get('Err')}")
return
orders = result["Ok"]
open_orders = [order for order in orders if order['status'] == 'Open']
if not open_orders:
print(f"{symbol}没有开放状态的止损订单")
return
print(f"找到{len(open_orders)}个开放的止损订单,开始批量取消...")
# 2. 逐个取消订单
success_count = 0
failed_count = 0
for order in open_orders:
# 取消止盈订单
if order.get('take_profit'):
tp_id = order['take_profit']['id']
result = trader.cancel_stop_order(account_id, symbol, id=tp_id)
if "Ok" in result:
print(f"✅ 止盈订单 {tp_id} 取消成功")
success_count += 1
else:
print(f"❌ 止盈订单 {tp_id} 取消失败: {result.get('Err')}")
failed_count += 1
# 取消止损订单
if order.get('stop_loss'):
sl_id = order['stop_loss']['id']
result = trader.cancel_stop_order(account_id, symbol, id=sl_id)
if "Ok" in result:
print(f"✅ 止损订单 {sl_id} 取消成功")
success_count += 1
else:
print(f"❌ 止损订单 {sl_id} 取消失败: {result.get('Err')}")
failed_count += 1
print(f"批量取消完成: 成功 {success_count} 个, 失败 {failed_count} 个")
# 调用批量取消函数
cancel_all_open_stop_orders(0, "BTC_USDT")
# 根据条件取消止损订单
def cancel_stop_orders_by_condition():
"""根据特定条件取消止损订单"""
symbol = "ETH_USDT"
account_id = 0
# 获取所有止损订单
result = trader.get_stop_orders(account_id, symbol)
if "Ok" not in result:
return
orders = result["Ok"]
for order in orders:
if order['status'] != 'Open':
continue # 跳过非开放状态的订单
# 示例条件:取消所有标记价格触发价格低于3000的止盈订单
if order.get('take_profit'):
tp = order['take_profit']
trigger_price = tp['trigger_price']
# 检查是否是标记价格触发且价格低于3000
if ('MarkPrice' in trigger_price and
trigger_price['MarkPrice'] < 3000):
tp_id = tp['id']
result = trader.cancel_stop_order(account_id, symbol, id=tp_id)
if "Ok" in result:
print(f"取消低价止盈订单: {tp_id} (触发价: {trigger_price['MarkPrice']})")
else:
print(f"取消订单失败: {tp_id}")
# 使用示例策略中的代码片段
def example_from_strategy():
"""基于策略代码的实际使用示例"""
account_id = 0
symbol = "XRP_USDT"
# 获取开放的止损订单
result = trader.get_stop_orders(account_id, symbol)
if "Ok" in result:
orders = result["Ok"]
open_stop_orders = [order for order in orders if order['status'] == 'Open']
# 撤销所有开放的止损订单
for order in open_stop_orders:
# 获取止损订单ID(优先取止损,如果没有则取止盈)
order_id = None
if order.get('stop_loss'):
order_id = order['stop_loss']['id']
elif order.get('take_profit'):
order_id = order['take_profit']['id']
if order_id:
res = trader.cancel_stop_order(account_id, symbol, id=order_id)
if 'Ok' in res:
print(f"撤单成功: {order_id}")
else:
print(f"撤单失败: {order_id}, 错误: {res.get('Err')}")
# 调用示例函数
example_from_strategy()
止损订单使用最佳实践:
-
风险管理: 建议为每个开仓都设置对应的止损订单,控制单笔交易最大亏损
-
触发价格类型选择:
MarkPrice: 标记价格触发,适用于大多数场景,可避免市场价格异常波动误触发ContractPrice: 最新成交价触发,响应最快,适合高频交易IndexPrice: 指数价格触发,最稳定,适合长期持仓
-
执行类型策略:
Market: 市价执行,确保能够成交,但可能有滑点Limit: 限价执行,控制成交价格,但可能无法成交
-
追踪止盈的使用:
- 设置合理的
trailing_gap回调幅度,通常1-3% - 追踪止盈适用于趋势明确的单边行情
- 震荡行情建议使用一般止盈避免频繁触发
- 设置合理的
-
数量设置:
- 不设置
quantity将全平持仓 - 部分止盈可设置具体数量,实现分批平仓策略
- 不设置
-
仓位方向: 必须正确设置
pos_side参数,与要保护的仓位方向一致 -
双向持仓: 在双向持仓模式下,
is_dual_side应设置为True -
持续监控: 定期检查止损订单状态,根据市场变化及时调整
-
参数要求: 严格按照必需参数要求传递参数
post_stop_order:account_id、symbol、pos_side为必需get_stop_orders:account_id、symbol为必需amend_stop_order:account_id、symbol为必需,id或cid二选一cancel_stop_order:account_id、symbol为必需,id或cid二选一
-
同步执行: 所有止损订单操作都是同步执行,无
sync参数,确保操作的即时性和可靠性
6.2 基础请求
🌐 request(account_id, method, path, auth, query=None, body=None, url=None, headers=None, generate=False)
功能: 向交易所发送原始API请求
参数:
account_id: 整数,账户IDmethod: 字符串,HTTP方法(GET, POST等)path: 字符串,API路径auth: 布尔值,是否需要验证query: 字典,可选,URL查询参数body: 任意类型,可选,请求体数据。可以是Python字典、列表或任何可序列化的Python对象,会自动转换为JSON字符串url: 字符串,可选,完整URL(设置时会覆盖默认URL)headers: 字典,可选,HTTP头信息generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- Result结构: 包含
Ok或Err字段的字典 - 成功时:
{"Ok": response_data}- 交易所API返回的响应数据已转换为Python对象 - 失败时:
{"Err": error_info}- 包含错误信息
示例:
# ✅ GET请求示例 - 获取OKX市场行情数据
query_params = {"instId": "BTC-USD-SWAP"}
result = trader.request(
account_id=0,
method="GET",
path="/api/v5/market/ticker",
auth=False, # 公开接口不需要认证
query=query_params,
body=None,
url=None,
headers=None,
generate=False
)
if "Ok" in result:
ticker_data = result["Ok"]
print(f"BTC-USD-SWAP行情数据: {ticker_data}")
else:
error = result.get("Err", "未知错误")
print(f"获取OKX行情失败: {error}")
重要说明:
- 自动序列化:
body参数支持Python字典、列表等可序列化对象,会在Rust端自动转换为JSON字符串 - URL覆盖: 设置
url参数时会覆盖交易所的默认API基础URL,用于调用其他服务的API - 认证处理:
auth=True时会自动添加交易所要求的签名和认证头 - Result处理: 所有返回值都需要检查
Ok字段来确定是否成功 - 命令生成:
generate=True时只生成命令对象,不立即执行,用于批量操作
6.3 持仓与账户
📊 get_position(account_id, symbol, extra=None, generate=False)
功能: 获取指定交易对的持仓信息
参数:
account_id: 整数,账户IDsymbol: 字符串,交易对名称extra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- Result结构: 包含
Ok或Err字段的字典 - 成功时:
{"Ok": position_data}- 持仓信息数据,如果无持仓返回None - 失败时:
{"Err": error_info}- 包含错误信息 - 数据结构: 参见7.2 Position(持仓)数据结构
📊 get_positions(account_id, extra=None, generate=False)
功能: 获取账户下所有持仓信息
参数:
account_id: 整数,账户IDextra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- Result结构: 包含
Ok或Err字段的字典 - 成功时:
{"Ok": positions_list}- 持仓信息列表,如果无持仓返回空列表 - 失败时:
{"Err": error_info}- 包含错误信息 - 数据结构: 列表中每个元素的结构参见7.2 Position(持仓)数据结构
📊 get_max_position(account_id, symbol, level=None, extra=None, generate=False)
功能: 获取交易对的最大可开仓数量
参数:
account_id: 整数,账户IDsymbol: 字符串,交易对名称level: 浮点数,可选,杠杆倍数extra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
{
"long_notional": 1000.0,
"short_notional": 1000.0,
"long_quantity": 0.001,
"short_quantity": 0.001
}
- 最大可开仓数量数据字典,包含:
long_notional: 浮点数,多仓对应Quote名义价值short_notional: 浮点数,空仓对应Quote名义价值long_quantity: 浮点数,多仓对应Base数量short_quantity: 浮点数,空仓对应Base数量
示例:
# 查询BTC_USDT的最大可开仓数量
result = trader.get_max_position(
account_id=0,
symbol="BTC_USDT"
)
if "Ok" in result:
max_position = result["Ok"]
print(f"最大可开仓数量: {max_position}")
print(f"多仓最大名义价值: {max_position['long_notional']} USDT")
print(f"空仓最大名义价值: {max_position['short_notional']} USDT")
print(f"多仓最大数量: {max_position['long_quantity']} BTC")
print(f"空仓最大数量: {max_position['short_quantity']} BTC")
else:
error = result.get("Err", "未知错误")
print(f"查询最大可开仓数量失败: {error}")
# 指定杠杆倍数查询
result = trader.get_max_position(
account_id=0,
symbol="BTC_USDT",
level=5.0 # 5倍杠杆
)
if "Ok" in result:
max_position_5x = result["Ok"]
print(f"5倍杠杆下最大可开仓: {max_position_5x}")
else:
error = result.get("Err", "未知错误")
print(f"查询5倍杠杆最大可开仓失败: {error}")
# 使用publish方法调用
cmd = {
"account_id": 0,
"method": "GetMaxPosition",
"symbol": "BTC_USDT",
"level": 3.0,
"sync": True
}
result = trader.publish(cmd)
print(f"最大可开仓结果: {result}")
💰 get_usdt_balance(account_id, extra=None, generate=False)
功能: 获取账户USDT余额
参数:
account_id: 整数,账户IDextra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- USDT余额数据字典,包含:
asset: 字符串,资产名称("USDT")balance: 浮点数,总余额+浮动盈亏available_balance: 浮点数,可用余额unrealized_pnl: 浮点数,浮动盈亏
💰 get_balances(account_id, extra=None, generate=False)
功能: 获取账户所有币种余额
参数:
account_id: 整数,账户IDextra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- 所有币种余额数据列表,每个元素为字典,包含:
asset: 字符串,资产名称(如"USDT", "BTC"等)balance: 浮点数,总余额+浮动盈亏available_balance: 浮点数,可用余额unrealized_pnl: 浮点数,浮动盈亏
💰 get_balance_by_coin(account_id, asset, extra=None, generate=False)
功能: 获取指定币种的余额
参数:
account_id: 整数,账户IDasset: 字符串,币种名称extra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- 指定币种余额数据字典,包含:
asset: 字符串,资产名称(与请求的asset相同)balance: 浮点数,总余额+浮动盈亏available_balance: 浮点数,可用余额unrealized_pnl: 浮点数,浮动盈亏
💰 get_fee_rate(account_id, symbol, extra=None, generate=False)
功能: 获取交易对的手续费率
参数:
account_id: 整数,账户IDsymbol: 字符串,交易对名称extra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- 手续费率数据
💰 get_fee_discount_info(account_id, extra=None, generate=False)
功能: 获取费用折扣信息
参数:
account_id: 整数,账户IDextra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- 费用折扣信息数据
💰 is_fee_discount_enabled(account_id, extra=None, generate=False)
功能: 检查费用折扣是否已启用
参数:
account_id: 整数,账户IDextra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- 布尔值,表示费用折扣是否已启用
💰 set_fee_discount_enabled(account_id, enabled, extra=None, generate=False)
功能: 设置费用折扣启用状态
参数:
account_id: 整数,账户IDenabled: 布尔值,是否启用费用折扣extra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- 设置结果
6.4 市场数据
📈 get_ticker(account_id, symbol, extra=None, generate=False)
功能: 获取指定交易对的行情数据
参数:
account_id: 整数,账户IDsymbol: 字符串,交易对名称extra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- 交易对行情数据
📈 get_tickers(account_id, extra=None, generate=False)
功能: 获取所有交易对的行情数据
参数:
account_id: 整数,账户IDextra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- 所有交易对行情数据列表
📉 get_bbo(account_id, symbol, extra=None, generate=False)
功能: 获取交易对的最优买卖报价(Best Bid & Offer)
参数:
account_id: 整数,账户IDsymbol: 字符串,交易对名称extra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- 最优买卖报价数据
📉 get_bbo_tickers(account_id, extra=None, generate=False)
功能: 获取所有交易对的最优买卖报价
参数:
account_id: 整数,账户IDextra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- 所有交易对最优买卖报价数据列表
📊 get_depth(account_id, symbol, limit=None, extra=None, generate=False)
功能: 获取交易对的深度数据(订单簿)
参数:
account_id: 整数,账户IDsymbol: 字符串,交易对名称limit: 整数,可选,深度条数限制extra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- 深度数据
📊 get_instrument(account_id, symbol, extra=None, generate=False)
功能: 获取交易对的详细信息
参数:
account_id: 整数,账户IDsymbol: 字符串,交易对名称extra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- 交易对详细信息
📊 get_instruments(account_id, extra=None, generate=False)
功能: 获取所有可交易的交易对信息
参数:
account_id: 整数,账户IDextra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- 所有交易对信息列表
📊 get_mark_price(account_id, symbol=None, extra=None, generate=False)
功能: 获取标记价格
参数:
account_id: 整数,账户IDsymbol: 字符串,可选,交易对名称,不指定则获取所有交易对extra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- 标记价格数据
📊 get_funding_rates(account_id, extra=None, generate=False)
功能: 获取所有交易对的资金费率
参数:
account_id: 整数,账户IDextra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- 所有交易对资金费率数据列表,每个元素为字典,包含:
symbol: 字符串,交易对符号funding_rate: 浮点数,当前资金费率(小数形式,如0.0001表示0.01%)next_funding_at: 整数,下次资金费结算时间,Unix时间戳(毫秒)min_funding_rate: 浮点数,资金费率最小值(交易所设定的下限)max_funding_rate: 浮点数,资金费率最大值(交易所设定的上限)funding_interval: 整数或None,资金费结算周期,以小时为单位(如8表示8小时结算一次)
📊 get_funding_rate_by_symbol(account_id, symbol, extra=None, generate=False)
功能: 获取指定交易对的资金费率
参数:
account_id: 整数,账户IDsymbol: 字符串,交易对名称extra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- 指定交易对资金费率数据字典,包含:
symbol: 字符串,交易对符号funding_rate: 浮点数,当前资金费率(小数形式,如0.0001表示0.01%)next_funding_at: 整数,下次资金费结算时间,Unix时间戳(毫秒)min_funding_rate: 浮点数,资金费率最小值(交易所设定的下限)max_funding_rate: 浮点数,资金费率最大值(交易所设定的上限)funding_interval: 整数或None,资金费结算周期,以小时为单位(如8表示8小时结算一次)
示例:
# 获取BTC_USDT的资金费率
result = trader.get_funding_rate_by_symbol(
account_id=0,
symbol="BTC_USDT"
)
if "Ok" in result:
funding_rate_data = result["Ok"]
current_rate = funding_rate_data['funding_rate']
min_rate = funding_rate_data['min_funding_rate']
max_rate = funding_rate_data['max_funding_rate']
interval = funding_rate_data.get('funding_interval', 8) # 默认8小时
print(f"BTC_USDT 当前资金费率: {current_rate:.6f} ({current_rate*100:.4f}%)")
print(f"费率范围: {min_rate:.6f} ~ {max_rate:.6f}")
print(f"结算周期: 每{interval}小时")
print(f"下次结算时间: {funding_rate_data['next_funding_at']}")
else:
error = result.get("Err", "未知错误")
print(f"获取资金费率失败: {error}")
# 获取所有交易对的资金费率并分析
result = trader.get_funding_rates(account_id=0)
if "Ok" in result:
all_funding_rates = result["Ok"]
for rate_data in all_funding_rates:
symbol = rate_data['symbol']
current_rate = rate_data['funding_rate']
min_rate = rate_data['min_funding_rate']
max_rate = rate_data['max_funding_rate']
# 计算当前费率在允许范围内的位置(百分比)
if max_rate != min_rate:
position_pct = (current_rate - min_rate) / (max_rate - min_rate) * 100
print(f"{symbol}: 当前 {current_rate:.6f}, 位置 {position_pct:.1f}% (范围: {min_rate:.6f} ~ {max_rate:.6f})")
else:
print(f"{symbol}: 当前 {current_rate:.6f} (固定费率)")
else:
error = result.get("Err", "未知错误")
print(f"获取所有资金费率失败: {error}")
📊 get_funding_rate_history(account_id, symbol=None, since_secs=None, limit=100, extra=None, generate=False)
功能: 获取资金费率历史记录
参数:
account_id: 整数,账户IDsymbol: 字符串,可选,交易对名称,不传则获取全部交易对since_secs: 整数,可选,开始时间,Unix时间戳(秒),不传则返回最近limit条数据limit: 整数,可选,返回数据条数限制,默认100extra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- 资金费率历史记录列表,每条记录为字典,包含:
symbol: 字符串,交易对符号funding_rate: 浮点数,资金费率(小数形式,如0.0001表示0.01%)funding_time: 整数,结算时间,Unix时间戳(毫秒)
示例:
# 获取BTC_USDT的最近100条资金费率历史
result = trader.get_funding_rate_history(
account_id=0,
symbol="BTC_USDT",
limit=100
)
if "Ok" in result:
funding_history = result["Ok"]
print(f"获取到{len(funding_history)}条资金费率历史记录")
for record in funding_history:
print(f"时间: {record['funding_time']}, "
f"费率: {record['funding_rate']:.6f} ({record['funding_rate']*100:.4f}%), "
f"交易对: {record['symbol']}")
else:
error = result.get("Err", "未知错误")
print(f"获取资金费率历史失败: {error}")
# 获取指定时间范围的资金费率历史
import time
since_time = int(time.time()) - 7 * 24 * 60 * 60 # 7天前的时间戳(秒)
result = trader.get_funding_rate_history(
account_id=0,
symbol="BTC_USDT",
since_secs=since_time,
limit=50
)
if "Ok" in result:
funding_history = result["Ok"]
print(f"获取7天内的资金费率历史: {len(funding_history)}条")
# 计算平均资金费率
if funding_history:
avg_rate = sum(record['funding_rate'] for record in funding_history) / len(funding_history)
print(f"平均资金费率: {avg_rate:.6f} ({avg_rate*100:.4f}%)")
else:
error = result.get("Err", "未知错误")
print(f"获取指定时间范围资金费率历史失败: {error}")
# 获取所有交易对的最近资金费率历史
result = trader.get_funding_rate_history(
account_id=0,
limit=20 # 每个交易对最近20条
)
if "Ok" in result:
all_funding_history = result["Ok"]
print(f"获取所有交易对资金费率历史: {len(all_funding_history)}条")
# 按交易对分组统计
symbol_stats = {}
for record in all_funding_history:
symbol = record['symbol']
if symbol not in symbol_stats:
symbol_stats[symbol] = []
symbol_stats[symbol].append(record['funding_rate'])
# 显示各交易对的资金费率统计
for symbol, rates in symbol_stats.items():
avg_rate = sum(rates) / len(rates)
max_rate = max(rates)
min_rate = min(rates)
print(f"{symbol}: 平均 {avg_rate:.6f}, 最高 {max_rate:.6f}, 最低 {min_rate:.6f}")
else:
error = result.get("Err", "未知错误")
print(f"获取所有交易对资金费率历史失败: {error}")
# 使用publish方法调用
cmd = {
"account_id": 0,
"method": "FundingRateHistory",
"symbol": "BTC_USDT",
"since_secs": 1672531200, # 2023-01-01的时间戳
"limit": 100,
"sync": True
}
result = trader.publish(cmd)
print(f"资金费率历史: {result}")
📊 get_funding_fee(account_id, symbol, start_time=None, end_time=None, extra=None, generate=False)
功能: 查询指定交易对的历史资金费用记录
参数:
account_id: 整数,账户IDsymbol: 字符串,交易对名称start_time: 整数,可选,开始时间,Unix时间戳(毫秒)end_time: 整数,可选,结束时间,Unix时间戳(毫秒)extra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- 资金费用历史记录列表,每条记录为字典,包含:
symbol: 字符串,交易对符号funding_fee: 浮点数,资金费用金额(正数表示收入,负数表示支出)timestamp: 整数,结算时间,Unix时间戳(毫秒)
示例:
# 查询BTC_USDT的资金费用记录
result = trader.get_funding_fee(
account_id=0,
symbol="BTC_USDT"
)
if "Ok" in result:
funding_fee_data = result["Ok"]
print(f"资金费用记录: {funding_fee_data}")
else:
error = result.get("Err", "未知错误")
print(f"获取资金费用记录失败: {error}")
funding_fee_data = None
# 查询指定时间范围的资金费用
import time
end_time = int(time.time() * 1000) # 当前时间戳(毫秒)
start_time = end_time - 7 * 24 * 60 * 60 * 1000 # 7天前
result = trader.get_funding_fee(
account_id=0,
symbol="BTC_USDT",
start_time=start_time,
end_time=end_time
)
if "Ok" in result:
funding_fee_data = result["Ok"]
# 处理资金费用数据
total_fee = 0
for record in funding_fee_data:
fee_type = "收入" if record['funding_fee'] > 0 else "支出"
total_fee += record['funding_fee']
print(f"时间: {record['timestamp']}, "
f"费用: {record['funding_fee']:.6f} ({fee_type}), "
f"交易对: {record['symbol']}")
print(f"总资金费用: {total_fee:.6f}")
else:
error = result.get("Err", "未知错误")
print(f"获取指定时间范围资金费用失败: {error}")
# 使用publish方法调用
cmd = {
"account_id": 1,
"method": "FundingFee",
"symbol": "BTC_USDT",
"start_time": 1672531200000,
"end_time": 1672617600000,
"sync": True
}
result = trader.publish(cmd)
📈 get_kline(account_id, symbol, interval, start_time=None, end_time=None, limit=None, extra=None, generate=False)
功能: 获取指定交易对的K线数据
参数:
account_id: 整数,账户IDsymbol: 字符串,交易对符号interval: 字符串,K线时间粒度,支持的值包括:"1m": 1分钟"3m": 3分钟"5m": 5分钟"15m": 15分钟"30m": 30分钟"1h": 1小时"2h": 2小时"4h": 4小时"6h": 6小时"8h": 8小时"12h": 12小时"1d": 1天"3d": 3天"1w": 1周"1M": 1月
start_time: 整数,可选,开始时间,Unix时间戳(毫秒)end_time: 整数,可选,结束时间,Unix时间戳(毫秒)limit: 整数,可选,返回数据条数限制,默认100,最大1000extra: 字典,可选,扩展参数,用于特定交易所的额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- K线数据对象,包含以下字段:
symbol: 交易对符号interval: K线时间粒度candles: 蜡烛图数据列表,每个元素包含:timestamp: 开盘时间,Unix时间戳(毫秒)open: 开盘价high: 最高价low: 最低价close: 收盘价volume: 成交量(基础货币)quote_volume: 成交额(报价货币)trades: 成交笔数(可选)taker_buy_volume: taker买入成交量(基础货币,可选)taker_buy_quote_volume: taker买入成交额(报价货币,可选)confirm: K线状态,false代表K线未完结,true代表K线已完结
示例:
# 获取BTC_USDT的1分钟K线数据,使用默认限制100条
result = trader.get_kline(
account_id=0,
symbol="BTC_USDT",
interval="1m",
limit=100
)
if "Ok" in result:
kline_data = result["Ok"]
print(f"交易对: {kline_data['symbol']}")
print(f"时间粒度: {kline_data['interval']}")
print(f"获取到{len(kline_data['candles'])}条K线数据")
else:
error = result.get("Err", "未知错误")
print(f"获取K线数据失败: {error}")
kline_data = None
# 获取指定时间范围的K线数据
import time
end_time = int(time.time() * 1000) # 当前时间戳(毫秒)
start_time = end_time - 24 * 60 * 60 * 1000 # 24小时前
result = trader.get_kline(
account_id=0,
symbol="BTC_USDT",
interval="1h",
start_time=start_time,
end_time=end_time,
limit=24 # 24小时数据
)
if "Ok" in result:
kline_data = result["Ok"]
print(f"获取24小时K线数据成功: {len(kline_data['candles'])}条")
else:
error = result.get("Err", "未知错误")
print(f"获取24小时K线数据失败: {error}")
kline_data = None
# 使用扩展参数(特定交易所的额外参数)
result = trader.get_kline(
account_id=0,
symbol="BTC_USDT",
interval="5m",
limit=200,
extra={"custom_param": "value"} # 扩展参数
)
if "Ok" in result:
kline_data = result["Ok"]
print(f"使用扩展参数获取K线数据成功")
else:
error = result.get("Err", "未知错误")
print(f"使用扩展参数获取K线数据失败: {error}")
kline_data = None
# 处理K线数据
if kline_data:
candles = kline_data['candles']
for candle in candles:
status = "已完结" if candle['confirm'] else "未完结"
# 处理可选字段
trades_info = f", 成交笔数: {candle['trades']}" if candle.get('trades') else ""
taker_buy_info = ""
if candle.get('taker_buy_volume'):
buy_ratio = candle['taker_buy_volume'] / candle['volume'] * 100 if candle['volume'] > 0 else 0
taker_buy_info = f", 主动买入占比: {buy_ratio:.1f}%"
print(f"时间: {candle['timestamp']}, 开盘: {candle['open']}, "
f"最高: {candle['high']}, 最低: {candle['low']}, "
f"收盘: {candle['close']}, 成交量: {candle['volume']}, "
f"成交额: {candle['quote_volume']}{trades_info}{taker_buy_info}, 状态: {status}")
# 检查最新K线是否已完结
if candles:
latest_candle = candles[-1]
if latest_candle['confirm']:
print("最新K线已完结,可以用于技术分析")
else:
print("最新K线未完结,数据可能还在变化")
else:
print("无法处理K线数据,数据获取失败")
# 使用publish方法调用K线API
cmd = {
"account_id": 1,
"method": "Kline",
"symbol": "BTC_USDT",
"interval": "1m",
"start_time": 1672531200000,
"end_time": 1672617600000,
"limit": 100,
"extra": {}, # 扩展参数字典
"sync": True
}
result = trader.publish(cmd)
print(f"K线数据: {result}")
print(f"返回的蜡烛图数量: {len(result['candles'])}")
6.5 账户设置与杠杆管理
📊 get_max_leverage(account_id, symbol, extra=None, generate=False)
功能: 获取交易对的最大可用杠杆倍数
参数:
account_id: 整数,账户IDsymbol: 字符串,交易对名称extra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- 最大可用杠杆倍数数据
📊 set_leverage(account_id, symbol, leverage, extra=None, generate=False)
功能: 设置交易对的杠杆倍数
参数:
account_id: 整数,账户IDsymbol: 字符串,交易对名称leverage: 整数,杠杆倍数extra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- 设置杠杆倍数结果
📊 get_margin_mode(account_id, symbol, margin_coin, extra=None, generate=False)
功能: 获取交易对的保证金模式
参数:
account_id: 整数,账户IDsymbol: 字符串,交易对名称margin_coin: 字符串,保证金币种extra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- 保证金模式数据
📊 set_margin_mode(account_id, symbol, margin_coin, margin_mode, extra=None, generate=False)
功能: 设置交易对的保证金模式
参数:
account_id: 整数,账户IDsymbol: 字符串,交易对名称margin_coin: 字符串,保证金币种margin_mode: 字符串或对象,保证金模式,例如"isolated"(逐仓)或"cross"(全仓)extra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- 设置保证金模式结果
📊 is_dual_side(account_id, extra=None, generate=False)
功能: 查询账户是否使用双向持仓模式
参数:
account_id: 整数,账户IDextra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- 是否使用双向持仓模式
📊 set_dual_side(account_id, dual_side, extra=None, generate=False)
功能: 设置账户的持仓模式
参数:
account_id: 整数,账户IDdual_side: 布尔值,是否使用双向持仓模式extra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- 设置持仓模式结果
6.6 资金划转与借贷
📊 transfer(account_id, transfer, extra=None, generate=False)
功能: 在账户内部或不同账户之间划转资金
参数:
account_id: 整数,账户IDtransfer: 字典或对象,划转信息,包含以下字段:asset: 字符串,币种名称amount: 浮点数,划转金额from: 字符串,转出账户类型,支持的值:"Spot": 现货钱包"UsdtFuture": U本位合约钱包(默认)"CoinFuture": 币本位合约钱包"Margin": 杠杆全仓钱包"IsolatedMargin": 杠杆逐仓钱包
to: 字符串,转入账户类型,支持的值同上
extra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- 资金划转结果
示例:
# 从现货钱包转账到U本位合约钱包
transfer_data = {
"asset": "USDT",
"amount": 1000.0,
"from": "Spot",
"to": "UsdtFuture"
}
result = trader.transfer(0, transfer_data)
if "Ok" in result:
success_data = result["Ok"]
print(f"划转成功: {success_data}")
else:
error = result.get("Err", "未知错误")
print(f"划转失败: {error}")
# 从U本位合约钱包转账到杠杆全仓钱包
transfer_data = {
"asset": "BTC",
"amount": 0.1,
"from": "UsdtFuture",
"to": "Margin"
}
result = trader.transfer(0, transfer_data)
if "Ok" in result:
success_data = result["Ok"]
print(f"BTC划转成功: {success_data}")
else:
error = result.get("Err", "未知错误")
print(f"BTC划转失败: {error}")
📊 sub_transfer(account_id, sub_transfer, extra=None, generate=False)
功能: 主账户和子账户之间划转资金
参数:
account_id: 整数,账户IDsub_transfer: 字典或对象,划转信息,包含以下字段:cid: 字符串,可选,客户端自定义IDasset: 字符串,币种名称amount: 浮点数,划转金额from: 字符串,转出账户类型,支持的值:"Spot": 现货钱包"UsdtFuture": U本位合约钱包(默认)"CoinFuture": 币本位合约钱包"Margin": 杠杆全仓钱包"IsolatedMargin": 杠杆逐仓钱包
to: 字符串,转入账户类型,支持的值同上from_account: 字符串,可选,转出账户ID,为空时默认为母账户。注意: 当交易所为Bitget时,母账户不能为空to_account: 字符串,可选,转入账户ID,为空时默认为母账户。注意: 当交易所为Bitget时,母账户不能为空direction: 字符串,划转方向,支持的值:"MasterToSub": 母转子(默认)"SubToMaster": 子转母"SubToSub": 子转子
extra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- 子账户资金划转结果
示例:
# 母账户向子账户划转USDT
sub_transfer_data = {
"cid": "transfer_001",
"asset": "USDT",
"amount": 500.0,
"from": "UsdtFuture",
"to": "UsdtFuture",
"from_account": None, # 母账户
"to_account": "sub_account_123", # 子账户ID
"direction": "MasterToSub"
}
result = trader.sub_transfer(0, sub_transfer_data)
if "Ok" in result:
success_data = result["Ok"]
print(f"子账户划转成功: {success_data}")
else:
error = result.get("Err", "未知错误")
print(f"子账户划转失败: {error}")
# 子账户向母账户划转BTC
sub_transfer_data = {
"asset": "BTC",
"amount": 0.05,
"from": "Spot",
"to": "Spot",
"from_account": "sub_account_456",
"to_account": None, # 母账户
"direction": "SubToMaster"
}
result = trader.sub_transfer(0, sub_transfer_data)
if "Ok" in result:
success_data = result["Ok"]
print(f"BTC子转母成功: {success_data}")
else:
error = result.get("Err", "未知错误")
print(f"BTC子转母失败: {error}")
# 子账户之间划转(需要指定两个子账户)
sub_transfer_data = {
"asset": "ETH",
"amount": 1.0,
"from": "UsdtFuture",
"to": "UsdtFuture",
"from_account": "sub_account_123",
"to_account": "sub_account_456",
"direction": "SubToSub"
}
result = trader.sub_transfer(0, sub_transfer_data)
if "Ok" in result:
success_data = result["Ok"]
print(f"ETH子转子成功: {success_data}")
else:
error = result.get("Err", "未知错误")
print(f"ETH子转子失败: {error}")
# 使用publish方法调用
cmd = {
"account_id": 0,
"method": "Transfer",
"transfer": {
"asset": "USDT",
"amount": 1000.0,
"from": "Spot",
"to": "UsdtFuture"
},
"sync": True
}
result = trader.publish(cmd)
# 子账户划转的publish方法调用
cmd = {
"account_id": 0,
"method": "SubTransfer",
"sub_transfer": {
"cid": "transfer_002",
"asset": "USDT",
"amount": 500.0,
"from": "UsdtFuture",
"to": "UsdtFuture",
"from_account": None,
"to_account": "sub_account_123",
"direction": "MasterToSub"
},
"sync": True
}
result = trader.publish(cmd)
钱包类型说明:
| 钱包类型 | 说明 | 适用场景 |
|---|---|---|
Spot | 现货钱包 | 现货交易、充值提现 |
UsdtFuture | U本位合约钱包 | USDT保证金的永续合约交易 |
CoinFuture | 币本位合约钱包 | 币本位保证金的合约交易 |
Margin | 杠杆全仓钱包 | 杠杆交易(全仓模式) |
IsolatedMargin | 杠杆逐仓钱包 | 杠杆交易(逐仓模式) |
划转方向说明:
| 方向 | 说明 | 使用场景 |
|---|---|---|
MasterToSub | 母转子 | 母账户向子账户划转资金 |
SubToMaster | 子转母 | 子账户向母账户划转资金 |
SubToSub | 子转子 | 子账户之间直接划转资金 |
注意事项:
- Bitget交易所特殊要求: 当使用Bitget交易所时,
from_account和to_account字段中的母账户不能为空,需要明确指定母账户ID - 账户ID格式: 不同交易所的账户ID格式可能不同,请参考具体交易所的API文档
- 权限要求: 子账户划转功能需要相应的API权限,确保API密钥具有子账户管理权限
- 资金安全: 划转操作不可逆,请仔细核对划转参数,特别是账户ID和金额
📊 get_deposit_address(account_id, ccy, chain=None, amount=None, extra=None, generate=False)
功能: 获取指定币种的充值地址
参数:
account_id: 整数,账户IDccy: 字符串,币种名称,如"USDT"、"BTC"等chain: 字符串,可选,链类型,支持的值:"Erc20": 以太坊链(默认)"Trc20": 波场链"Bep20": 币安智能链"Sol": Solana链"Polygon": Polygon链"ArbitrumOne": Arbitrum链"Optimism": Optimism链"Ton": TON链"AVAXC": Avalanche链
amount: 浮点数,可选,预估充值金额(部分交易所需要)extra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- 充值地址信息字典,包含:
asset: 字符串,币种名称address: 字符串,充值地址chain: 字符串或None,链类型tag: 字符串或None,地址标签(某些币种如EOS、XRP需要)url: 字符串或None,充值二维码URL(可选)
示例:
# 获取USDT的ERC20充值地址
result = trader.get_deposit_address(
account_id=0,
ccy="USDT",
chain="Erc20"
)
if "Ok" in result:
deposit_info = result["Ok"]
print(f"USDT ERC20充值地址: {deposit_info['address']}")
print(f"币种: {deposit_info['asset']}")
print(f"链类型: {deposit_info['chain']}")
else:
error = result.get("Err", "未知错误")
print(f"获取USDT ERC20充值地址失败: {error}")
# 获取USDT的TRC20充值地址
result = trader.get_deposit_address(
account_id=0,
ccy="USDT",
chain="Trc20"
)
if "Ok" in result:
deposit_info_trc = result["Ok"]
print(f"USDT TRC20充值地址: {deposit_info_trc['address']}")
else:
error = result.get("Err", "未知错误")
print(f"获取USDT TRC20充值地址失败: {error}")
# 获取BTC充值地址(不指定链,使用默认)
result = trader.get_deposit_address(
account_id=0,
ccy="BTC"
)
if "Ok" in result:
btc_deposit = result["Ok"]
print(f"BTC充值地址: {btc_deposit['address']}")
else:
error = result.get("Err", "未知错误")
print(f"获取BTC充值地址失败: {error}")
# 获取需要标签的币种充值地址(如EOS)
result = trader.get_deposit_address(
account_id=0,
ccy="EOS"
)
if "Ok" in result:
eos_deposit = result["Ok"]
print(f"EOS充值地址: {eos_deposit['address']}")
if eos_deposit.get('tag'):
print(f"EOS充值标签: {eos_deposit['tag']}")
else:
error = result.get("Err", "未知错误")
print(f"获取EOS充值地址失败: {error}")
# 使用publish方法调用
cmd = {
"account_id": 0,
"method": "GetDepositAddress",
"ccy": "USDT",
"chain": "Trc20",
"sync": True
}
result = trader.publish(cmd)
print(f"充值地址信息: {result}")
📊 withdrawal(account_id, withdrawal, extra=None, generate=False)
功能: 提币到外部地址或内部转账
参数:
-
account_id: 整数,账户ID -
withdrawal: 字典,提币信息,包含以下字段:-
cid: 字符串,可选,客户端自定义ID -
asset: 字符串,币种名称 -
amt: 浮点数,提币数量 -
addr: 字典,提币地址信息,支持两种类型:链上提币 (
OnChain):OnChain: 字典,包含:chain: 字符串,区块链网络,支持的值:"Erc20": 以太坊链"Trc20": 波场链"Bep20": 币安智能链"Sol": Solana链"Polygon": Polygon链"ArbitrumOne": Arbitrum链"Optimism": Optimism链"Ton": TON链"AVAXC": Avalanche链
address: 字符串,目标钱包地址tag: 字符串或None,地址标签(EOS、XRP等币种需要)
内部转账 (
InternalTransfer):InternalTransfer: 字符串,支持三种格式:{"Email": "user@example.com"}: 邮箱转账{"Phone": ["+86", "13800138000"]}: 手机号转账(区号+手机号){"Uid": "12345678"}: 用户ID转账
-
-
extra: 字典,可选,额外参数 -
generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- 提币操作结果
示例:
# 链上提币 - USDT通过TRC20网络提币
withdrawal_data = {
"cid": "withdraw_001",
"asset": "USDT",
"amt": 100.0,
"addr": {
"OnChain": {
"chain": "Trc20",
"address": "TKzxdSv2FZKQrEqkKVgp5DcwEXBEKMg2Ax",
"tag": None
}
}
}
result = trader.withdrawal(0, withdrawal_data)
if "Ok" in result:
success_data = result["Ok"]
print(f"USDT提币成功: {success_data}")
else:
error = result.get("Err", "未知错误")
print(f"USDT提币失败: {error}")
# 链上提币 - BTC提币
btc_withdrawal = {
"cid": "withdraw_btc_001",
"asset": "BTC",
"amt": 0.01,
"addr": {
"OnChain": {
"chain": "Erc20", # BTC通常不需要指定链
"address": "1A1zP1eP5QGefi2DMPTfTL5SLmv7DivfNa",
"tag": None
}
}
}
result = trader.withdrawal(0, btc_withdrawal)
if "Ok" in result:
success_data = result["Ok"]
print(f"BTC提币成功: {success_data}")
else:
error = result.get("Err", "未知错误")
print(f"BTC提币失败: {error}")
# 链上提币 - EOS提币(需要tag)
eos_withdrawal = {
"asset": "EOS",
"amt": 10.0,
"addr": {
"OnChain": {
"chain": "Erc20",
"address": "eosaccount123",
"tag": "1234567890" # EOS需要memo标签
}
}
}
result = trader.withdrawal(0, eos_withdrawal)
if "Ok" in result:
success_data = result["Ok"]
print(f"EOS提币成功: {success_data}")
else:
error = result.get("Err", "未知错误")
print(f"EOS提币失败: {error}")
# 内部转账 - 通过邮箱转账
internal_transfer_email = {
"cid": "internal_001",
"asset": "USDT",
"amt": 50.0,
"addr": {
"InternalTransfer": {
"Email": "recipient@example.com"
}
}
}
result = trader.withdrawal(0, internal_transfer_email)
if "Ok" in result:
success_data = result["Ok"]
print(f"邮箱内部转账成功: {success_data}")
else:
error = result.get("Err", "未知错误")
print(f"邮箱内部转账失败: {error}")
# 内部转账 - 通过手机号转账
internal_transfer_phone = {
"asset": "BTC",
"amt": 0.005,
"addr": {
"InternalTransfer": {
"Phone": ["+86", "13800138000"] # [区号, 手机号]
}
}
}
result = trader.withdrawal(0, internal_transfer_phone)
if "Ok" in result:
success_data = result["Ok"]
print(f"手机号内部转账成功: {success_data}")
else:
error = result.get("Err", "未知错误")
print(f"手机号内部转账失败: {error}")
# 内部转账 - 通过用户ID转账
internal_transfer_uid = {
"asset": "ETH",
"amt": 0.1,
"addr": {
"InternalTransfer": {
"Uid": "12345678"
}
}
}
result = trader.withdrawal(0, internal_transfer_uid)
if "Ok" in result:
success_data = result["Ok"]
print(f"用户ID内部转账成功: {success_data}")
else:
error = result.get("Err", "未知错误")
print(f"用户ID内部转账失败: {error}")
# 使用publish方法调用
cmd = {
"account_id": 0,
"method": "Withdrawal",
"withdrawal": {
"cid": "withdraw_publish_001",
"asset": "USDT",
"amt": 100.0,
"addr": {
"OnChain": {
"chain": "Trc20",
"address": "TKzxdSv2FZKQrEqkKVgp5DcwEXBEKMg2Ax",
"tag": None
}
}
},
"sync": True
}
result = trader.publish(cmd)
print(f"提币结果: {result}")
重要提醒:
- 手续费: 链上提币会产生网络手续费,请确保账户余额充足
- 最小提币量: 每个币种都有最小提币数量限制,请查询交易所规则
- 地址验证: 提币前请仔细核对目标地址,错误地址可能导致资金永久丢失
- 网络确认: 不同链的确认时间不同,TRC20通常较快,ERC20在网络拥堵时可能较慢
- 标签要求: EOS、XRP、XMR等币种需要正确的标签(tag/memo),否则可能丢失
- API权限: 提币功能需要账户开启提币权限,并且API密钥需要具备提币权限
- 风控限制: 大额提币可能触发风控审核,需要额外验证
链类型选择建议:
| 币种 | 推荐链类型 | 特点 |
|---|---|---|
| USDT | Bep20 | 手续费低,确认快 |
| USDT | Erc20 | 兼容性好,但手续费较高 |
| USDT | Trc20 | 手续费适中,速度快 |
| ETH | Erc20 | 原生链 |
| BNB | Bep20 | 币安生态 |
📊 borrow(account_id, coin, amount, extra=None, generate=False)
功能: 借入资金
参数:
account_id: 整数,账户IDcoin: 字符串,币种amount: 浮点数,借入金额extra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- 借入资金结果
📊 repay(account_id, coin, amount, extra=None, generate=False)
功能: 偿还借入的资金
参数:
account_id: 整数,账户IDcoin: 字符串,币种amount: 浮点数,偿还金额extra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- 偿还资金结果
📊 get_borrowed(account_id, coin=None, extra=None, generate=False)
功能: 查询已借入的资金
参数:
account_id: 整数,账户IDcoin: 字符串,可选,币种,不指定则查询所有币种extra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- 已借入资金数据列表,每个元素为字典,包含:
coin: 字符串,币种名称amount: 浮点数,已借入金额
📊 get_borrow_rate(account_id, coin=None, extra=None, generate=False)
功能: 查询借贷利率
参数:
account_id: 整数,账户IDcoin: 字符串,可选,币种,不指定则查询所有币种extra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- 借贷利率数据列表,每个元素为字典,包含:
ccy: 字符串,币种名称rate: 浮点数,借贷利率
📊 get_borrow_limit(account_id, coin, is_vip=None, extra=None, generate=False)
功能: 查询借贷限额
参数:
account_id: 整数,账户IDcoin: 字符串,币种is_vip: 布尔值,可选,是否为VIP用户extra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- 借贷限额数据字典,包含:
debt: 浮点数,当前负债interest: 浮点数,当前计息coin: 字符串,借贷币种rate: 浮点数,日利率borrow_limit: 浮点数,可借贷限额vip_detail: 字典或None,尊享用户详情(可选),包含:pos_loan: 浮点数,当前账户负债占用available_loan: 浮点数,当前账户剩余可用used_loan: 浮点数,当前账户已借额度
示例:
# 查询USDT的借贷限额
result = trader.get_borrow_limit(
account_id=0,
coin="USDT"
)
if "Ok" in result:
borrow_limit = result["Ok"]
print(f"USDT借贷限额: {borrow_limit}")
print(f"当前负债: {borrow_limit['debt']}")
print(f"可借贷限额: {borrow_limit['borrow_limit']}")
print(f"日利率: {borrow_limit['rate']:.6f}")
else:
error = result.get("Err", "未知错误")
print(f"查询USDT借贷限额失败: {error}")
# VIP用户查询借贷限额
result = trader.get_borrow_limit(
account_id=0,
coin="BTC",
is_vip=True
)
if "Ok" in result:
borrow_limit_vip = result["Ok"]
print(f"VIP用户BTC借贷限额: {borrow_limit_vip}")
# 检查VIP详情
if borrow_limit_vip.get('vip_detail'):
vip_info = borrow_limit_vip['vip_detail']
print(f"VIP可用额度: {vip_info['available_loan']}")
print(f"VIP已用额度: {vip_info['used_loan']}")
else:
error = result.get("Err", "未知错误")
print(f"查询VIP用户BTC借贷限额失败: {error}")
# 使用publish方法调用
cmd = {
"account_id": 1,
"method": "GetBorrowLimit",
"coin": "USDT",
"is_vip": False,
"sync": True
}
result = trader.publish(cmd)
6.7 账户信息
📊 get_account_info(account_id, extra=None, generate=False)
功能: 获取账户详细信息
参数:
account_id: 整数,账户IDextra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- 账户详细信息数据
📊 get_account_mode(account_id, extra=None, generate=False)
功能: 获取账户模式
参数:
account_id: 整数,账户IDextra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- 账户模式数据
📊 set_account_mode(account_id, account_mode, extra=None, generate=False)
功能: 设置账户模式
参数:
account_id: 整数,账户IDaccount_mode: 字符串或对象,账户模式extra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- 设置账户模式结果
📊 get_user_id(account_id, extra=None, generate=False)
功能: 获取用户ID
参数:
account_id: 整数,账户IDextra: 字典,可选,额外参数generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行
返回值:
- 用户ID数据
七、数据结构说明
7.1 K线数据结构
📈 Candle(蜡烛图数据)
单根蜡烛图数据结构,包含完整的OHLCV信息:
pub struct Candle {
/// 开盘时间,Unix时间戳(毫秒)
pub timestamp: i64,
/// 开盘价
pub open: f64,
/// 最高价
pub high: f64,
/// 最低价
pub low: f64,
/// 收盘价
pub close: f64,
/// 成交量(基础货币)
pub volume: f64,
/// 成交额(报价货币)
pub quote_volume: f64,
/// 成交笔数(可选)
pub trades: Option<u32>,
/// taker买入成交量(基础货币,可选)
pub taker_buy_volume: Option<f64>,
/// taker买入成交额(报价货币,可选)
pub taker_buy_quote_volume: Option<f64>,
/// K线状态:false 代表 K 线未完结,true 代表 K 线已完结
pub confirm: bool,
}
字段说明:
timestamp: 该K线的开盘时间,使用Unix时间戳(毫秒)open/high/low/close: 标准OHLC价格数据volume: 该时间段内的总成交量(以基础货币计算)quote_volume: 该时间段内的总成交额(以报价货币计算)trades: 该时间段内的成交笔数,某些交易所可能不提供此数据taker_buy_volume: 主动买入的成交量,用于分析买卖压力taker_buy_quote_volume: 主动买入的成交额confirm: 关键字段,表示该K线是否已完结。未完结的K线数据可能还在变化
📊 Kline(K线数据集合)
包含多根蜡烛图数据的集合:
pub struct Kline {
/// 交易对符号
pub symbol: Symbol,
/// K线时间粒度
pub interval: KlineInterval,
/// 蜡烛图数据列表
pub candles: Vec<Candle>,
}
⏰ KlineInterval(K线时间粒度)
支持的K线时间粒度枚举:
pub enum KlineInterval {
/// 1分钟
Min1, // "1m"
/// 3分钟
Min3, // "3m"
/// 5分钟
Min5, // "5m"
/// 15分钟
Min15, // "15m"
/// 30分钟
Min30, // "30m"
/// 1小时
Hour1, // "1h"
/// 2小时
Hour2, // "2h"
/// 4小时
Hour4, // "4h"
/// 6小时
Hour6, // "6h"
/// 8小时
Hour8, // "8h"
/// 12小时
Hour12, // "12h"
/// 1天
Day1, // "1d"
/// 3天
Day3, // "3d"
/// 1周
Week1, // "1w"
/// 1月
Month1, // "1M"
}
KlineInterval枚举方法:
as_str(): 转换为字符串表示(如"1m", "1h", "1d"等)from_str(): 从字符串解析为KlineInterval枚举值default(): 返回默认值(Min1,即1分钟)
字符串映射表:
- 支持大小写不敏感的解析(如"1h"和"1H"都可以)
- 月份使用大写"M"("1M")以区别于分钟"m"
使用示例:
# 处理K线数据的完整示例
def analyze_kline_data(kline_data):
"""分析K线数据"""
symbol = kline_data['symbol']
interval = kline_data['interval']
candles = kline_data['candles']
print(f"分析 {symbol} 的 {interval} K线数据,共 {len(candles)} 根")
for i, candle in enumerate(candles):
# 检查K线是否完结
status = "已完结" if candle['confirm'] else "未完结"
# 计算涨跌幅
change_pct = (candle['close'] - candle['open']) / candle['open'] * 100
# 计算买卖压力比(如果有taker数据)
buy_pressure = 0
if candle.get('taker_buy_volume') and candle['volume'] > 0:
buy_pressure = candle['taker_buy_volume'] / candle['volume']
# 处理可选的成交笔数信息
trades_count = candle.get('trades', 0)
print(f"K线 {i+1}: 时间={candle['timestamp']}, "
f"OHLC=({candle['open']}, {candle['high']}, {candle['low']}, {candle['close']}), "
f"涨跌幅={change_pct:.2f}%, 成交量={candle['volume']}, "
f"买压={buy_pressure:.2f}, 成交笔数={trades_count}, 状态={status}")
# 只对已完结的K线进行技术分析
if candle['confirm']:
# 进行技术分析逻辑
pass
# 获取并分析K线数据
result = trader.get_kline(0, "BTC_USDT", "1h", limit=24)
if "Ok" in result:
kline_data = result["Ok"]
analyze_kline_data(kline_data)
else:
error = result.get("Err", "未知错误")
print(f"获取K线数据失败: {error}")
K线数据使用最佳实践:
- 检查K线完结状态: 始终检查
confirm字段,只对已完结的K线进行技术分析 - 处理可选字段:
trades、taker_buy_volume等字段可能为空,需要安全处理 - 时间戳处理:
timestamp为毫秒级Unix时间戳,转换为日期时需要除以1000 - 买卖压力分析: 利用
taker_buy_volume分析市场买卖压力 - 成交活跃度: 通过
trades字段分析市场活跃程度
# K线数据安全处理示例
def safe_process_candle(candle):
"""安全处理K线数据"""
# 基础OHLCV数据(必有)
timestamp = candle['timestamp']
ohlc = (candle['open'], candle['high'], candle['low'], candle['close'])
volume = candle['volume']
quote_volume = candle['quote_volume']
is_confirmed = candle['confirm']
# 可选数据的安全处理
trades = candle.get('trades')
taker_buy_vol = candle.get('taker_buy_volume')
taker_buy_quote_vol = candle.get('taker_buy_quote_volume')
# 计算衍生指标
price_change = (ohlc[3] - ohlc[0]) / ohlc[0] * 100 if ohlc[0] > 0 else 0
# 买卖压力分析(如果有数据)
buy_pressure = None
if taker_buy_vol is not None and volume > 0:
buy_pressure = taker_buy_vol / volume
# 平均每笔交易金额(如果有数据)
avg_trade_size = None
if trades is not None and trades > 0:
avg_trade_size = quote_volume / trades
return {
'timestamp': timestamp,
'ohlc': ohlc,
'volume': volume,
'quote_volume': quote_volume,
'price_change_pct': price_change,
'buy_pressure': buy_pressure,
'avg_trade_size': avg_trade_size,
'trades_count': trades,
'is_confirmed': is_confirmed
}
7.2 Position(持仓)数据结构
📊 Position(持仓数据)
持仓数据结构包含完整的仓位信息:
{
# 仓位id(可选)
"id": "position_123456",
# 交易对符号
"symbol": "BTC_USDT",
# 创建仓位的Unix时间戳,以毫秒为单位(可选)
"ctime": 1640995200000,
# 仓位更新的Unix时间戳,以毫秒为单位(可选)
"utime": 1640998800000,
# 保证金模式
"margin_mode": "Cross", # "Cross"(全仓)或 "Isolated"(逐仓)
# 维持保证金(可选)
"margin": 1000.0,
# 维持保证金率(可选)
"mmr": 0.005,
# 仓位方向
"side": "Long", # "Long"(多头)或 "Short"(空头)
# 仓位的杠杆率
"leverage": 10,
# 仓位数量
"amount": 0.5,
# 仓位的平均开仓价格
"entry_price": 50000.0,
# 未实现盈亏,市场价格和开仓价格之间的差异乘以合约数量,可以为负
"unrealized_pnl": 250.5,
# 爆仓价格(可选)
"liquidation_price": 45000.0,
# 标记价格(可选)
"mark_price": 50500.0
}
字段说明:
| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
id | 字符串 | 否 | 仓位唯一标识符 |
symbol | 字符串 | 是 | 交易对符号 |
ctime | 整数 | 否 | 创建时间(毫秒时间戳) |
utime | 整数 | 否 | 更新时间(毫秒时间戳) |
margin_mode | 字符串 | 是 | 保证金模式:"Cross"(全仓)或"Isolated"(逐仓) |
margin | 浮点数 | 否 | 维持保证金金额 |
mmr | 浮点数 | 否 | 维持保证金率 |
side | 字符串 | 是 | 仓位方向:"Long"(多头)或"Short"(空头) |
leverage | 整数 | 是 | 杠杆倍数(1-125) |
amount | 浮点数 | 是 | 仓位数量(正数) |
entry_price | 浮点数 | 是 | 平均开仓价格 |
unrealized_pnl | 浮点数 | 是 | 未实现盈亏(可为负) |
liquidation_price | 浮点数 | 否 | 预估爆仓价格 |
mark_price | 浮点数 | 否 | 当前标记价格 |
📊 MaxPosition(最大可开仓数据)
最大可开仓数据结构:
{
# 多仓对应Quote名义价值
"long_notional": 100000.0,
# 空仓对应Quote名义价值
"short_notional": 100000.0,
# 多仓对应Base数量
"long_quantity": 2.0,
# 空仓对应Base数量
"short_quantity": 2.0
}
字段说明:
| 字段 | 类型 | 说明 |
|---|---|---|
long_notional | 浮点数 | 多仓最大名义价值(以报价货币计) |
short_notional | 浮点数 | 空仓最大名义价值(以报价货币计) |
long_quantity | 浮点数 | 多仓最大数量(以基础货币计) |
short_quantity | 浮点数 | 空仓最大数量(以基础货币计) |
使用示例:
# 获取单个持仓信息
result = trader.get_position(0, "BTC_USDT")
if "Ok" in result:
position = result["Ok"]
if position: # 有持仓
print(f"交易对: {position['symbol']}")
print(f"方向: {position['side']}")
print(f"数量: {position['amount']}")
print(f"开仓价: {position['entry_price']}")
print(f"未实现盈亏: {position['unrealized_pnl']}")
print(f"杠杆: {position['leverage']}x")
print(f"保证金模式: {position['margin_mode']}")
# 计算盈亏比例
if position['entry_price'] > 0:
pnl_pct = position['unrealized_pnl'] / (position['amount'] * position['entry_price']) * 100
print(f"盈亏比例: {pnl_pct:.2f}%")
# 风险评估
if position.get('liquidation_price'):
mark = position.get('mark_price', position['entry_price'])
if position['side'] == 'Long':
risk_pct = (mark - position['liquidation_price']) / mark * 100
else:
risk_pct = (position['liquidation_price'] - mark) / mark * 100
print(f"距离爆仓: {risk_pct:.2f}%")
else:
print("无持仓")
else:
error = result.get("Err", "未知错误")
print(f"获取持仓失败: {error}")
# 获取所有持仓信息
result = trader.get_positions(0)
if "Ok" in result:
positions = result["Ok"]
print(f"持仓数量: {len(positions)}")
# 统计持仓信息
total_pnl = 0
long_value = 0
short_value = 0
for pos in positions:
symbol = pos['symbol']
side = pos['side']
amount = pos['amount']
pnl = pos['unrealized_pnl']
mark = pos.get('mark_price', pos['entry_price'])
# 计算持仓价值
position_value = amount * mark
if side == 'Long':
long_value += position_value
else:
short_value += position_value
total_pnl += pnl
# 打印每个持仓
print(f"{symbol} {side}: 数量={amount}, PNL={pnl:.2f}, 价值={position_value:.2f}")
print(f"\n汇总统计:")
print(f"总未实现盈亏: {total_pnl:.2f}")
print(f"多头总价值: {long_value:.2f}")
print(f"空头总价值: {short_value:.2f}")
print(f"净敞口: {long_value - short_value:.2f}")
else:
error = result.get("Err", "未知错误")
print(f"获取所有持仓失败: {error}")
持仓数据处理最佳实践:
- 空仓检查: 返回值可能为空列表或None,表示无持仓
- 可选字段处理:
liquidation_price、mark_price等字段可能不存在 - 风险计算: 利用爆仓价格和标记价格计算风险水平
- 盈亏分析: 结合数量和开仓价格计算盈亏比例
- 保证金监控: 关注维持保证金率,防止爆仓风险
# 持仓风险管理示例
def analyze_position_risk(position):
"""分析持仓风险"""
symbol = position['symbol']
side = position['side']
amount = position['amount']
entry_price = position['entry_price']
unrealized_pnl = position['unrealized_pnl']
leverage = position['leverage']
# 计算持仓价值
position_value = amount * entry_price
# 计算盈亏比例
pnl_percentage = (unrealized_pnl / position_value) * 100 if position_value > 0 else 0
# 风险评级
risk_level = "低"
if leverage > 10:
risk_level = "高"
elif leverage > 5:
risk_level = "中"
# 爆仓风险评估
liquidation_risk = "未知"
if position.get('liquidation_price') and position.get('mark_price'):
liq_price = position['liquidation_price']
mark_price = position['mark_price']
if side == 'Long':
distance_pct = ((mark_price - liq_price) / mark_price) * 100
else:
distance_pct = ((liq_price - mark_price) / mark_price) * 100
if distance_pct < 5:
liquidation_risk = "极高"
elif distance_pct < 10:
liquidation_risk = "高"
elif distance_pct < 20:
liquidation_risk = "中"
else:
liquidation_risk = "低"
return {
'symbol': symbol,
'side': side,
'value': position_value,
'pnl_percentage': pnl_percentage,
'risk_level': risk_level,
'liquidation_risk': liquidation_risk,
'leverage': leverage
}
# 批量分析持仓风险
def analyze_all_positions_risk(positions):
"""批量分析所有持仓风险"""
high_risk_positions = []
total_risk_value = 0
for pos in positions:
risk_info = analyze_position_risk(pos)
# 识别高风险持仓
if risk_info['risk_level'] == '高' or risk_info['liquidation_risk'] in ['高', '极高']:
high_risk_positions.append(risk_info)
total_risk_value += risk_info['value']
return {
'high_risk_count': len(high_risk_positions),
'high_risk_value': total_risk_value,
'high_risk_positions': high_risk_positions
}
7.3 表格数据格式
表格数据格式示例 📋
{
"title": "表格标题",
"cols": ["列1", "列2", "列3", ...],
"rows": [
["行1值1", "行1值2", "行1值3", ...],
["行2值1", "行2值2", "行2值3", ...],
...
]
}
7.4 WebClient配置格式
WebClient配置示例 ⚙️
{
"server_name": "策略名称", // 策略服务器名称,用于在Web平台上标识
"primary_balance": 10000.0, // 主账户初始余额,仅首次初始化需要
"secondary_balance": 5000.0, // 次账户初始余额,仅首次初始化需要
"is_production": true, // 是否为生产环境,影响风控级别
"open_threshold": 0.14, // 开仓阈值,如0.14代表千1.4价差开仓
"max_position_ratio": 100, // 单个币种最大持仓比例,如100代表100%,1x杠杆
"max_leverage": 10, // 最大可用杠杆倍数
"funding_rate_threshold": 0.1, // 资金费率阈值,如0.1代表千1资金费率开仓
"cost": 0.0824, // 成本 主所开平 副所开平,4次交易手续费,如0.0824 代表万8.24
"primary_maker_fee_rate": 0.0002, // 主所maker手续费率
"primary_taker_fee_rate": 0.0005, // 主所taker手续费率
"primary_rebate_rate": 0.3, // 主所返佣率
"secondary_maker_fee_rate": 0.0002, // 次所maker手续费率
"secondary_taker_fee_rate": 0.0005, // 次所taker手续费率
"secondary_rebate_rate": 0.7, // 次所返佣率
}
所有字段均为可选项,但推荐在首次初始化时设置server_name、primary_balance和secondary_balance。初始余额设置后会被缓存,后续启动策略时会自动从缓存中读取,无需再次设置。
缓存机制说明: WebClient的配置数据(包括初始余额、交易统计数据如总成交量、成交次数、失败次数、成功次数、胜率、利润等)会被缓存到本地文件系统。缓存文件以
{server_name}.json的格式命名,存储在当前工作目录中。当调用统计和余额相关的API(如update_trade_stats、update_total_balance等)更新数据时,缓存文件会实时自动更新,无需手动操作。这就是为什么server_name参数非常重要,它不仅用于Web界面标识,还用于定位缓存文件。不同的策略应使用不同的server_name以避免缓存冲突。WebClient缓存是自动管理的,与策略缓存(cache_save/cache_load)完全独立。
7.4 命令格式说明
v2版本的命令格式相比v1版本更加扁平化,直接使用方法名作为顶级字段:
v1格式(嵌套结构):
{
"account_id": 0,
"cmd": {
"Sync": "UsdtBalance"
}
}
v2格式(扁平化结构):
{
"account_id": 0,
"method": "UsdtBalance",
"sync": true
}
八、数据源订阅
8.1 订阅机制概述
Trader API v2提供了完整的数据源订阅机制,通过策略的subscribes()方法定义需要订阅的数据类型。系统支持三种不同的数据源订阅方式,每种方式适用于不同的数据获取场景。
📊 三种数据源类型
| 数据源类型 | 数据来源 | 更新方式 | 适用场景 | 延迟特性 |
|---|---|---|---|---|
| SubscribeWs | 交易所WebSocket | 实时推送 | 高频交易、实时行情 | 极低延迟 |
| SubscribeRest | 交易所REST API | 定时轮询 | 账户状态、合约信息 | 秒级延迟 |
| SubscribeTimer | 系统定时器 | 定时触发 | 策略逻辑、状态检查 | 可配置间隔 |
🔄 数据流向图
┌─────────────────┐ ┌──────────────┐ ┌─────────────────┐
│ 交易所API │ │ 系统定时器 │ │ 策略回调函数 │
│ │ │ │ │ │
│ WebSocket ────┐ │ │ Timer ───┐ │ │ ┌─ on_bbo() │
│ REST API ──┐ │ │ │ │ │ │ ├─ on_kline() │
│ │ │ │ │ │ │ │ ├─ on_order() │
└───────────┼──┼─┘ └────────────┼─┘ │ ├─ on_timer() │
│ │ │ │ └─ on_balance()│
│ └─────────────────┐ │ │ │
└──────────────────┐ │ │ └─────────────────┘
│ │ │ ▲
▼ ▼ ▼ │
┌─────────────────┐ │
│ Trader Engine │──────┘
│ 数据分发中心 │
└─────────────────┘
8.2 WebSocket实时订阅 (SubscribeWs)
WebSocket订阅是最重要的数据源,提供交易所的实时推送数据,具有极低延迟的特性,是高频交易和实时行情分析的核心。
📡 WebSocket支持的频道类型
| 频道分类 | 频道类型 | 说明 | 回调函数 | 延迟级别 |
|---|---|---|---|---|
| 市场行情 | Bbo | 最佳买卖价 | on_bbo | 毫秒级 |
Depth | 市场深度 | on_depth | 毫秒级 | |
Trade | 成交记录 | on_trade | 毫秒级 | |
Kline | K线数据 | on_kline | 秒级 | |
MarkPrice | 标记价格 | on_mark_price | 秒级 | |
Funding | 资金费率 | on_funding | 分钟级 | |
| 账户数据 | Order | 订单更新 | on_order | 毫秒级 |
Position | 持仓更新 | on_position | 毫秒级 | |
Balance | 余额更新 | on_balance | 秒级 | |
FundingFee | 资金费结算 | on_funding_fee | 小时级 | |
OrderAndFill | 订单和成交 | on_order_and_fill | 毫秒级 |
🚀 WebSocket订阅配置
基础配置格式:
{
"account_id": 0, # 账户ID(市场数据可选,账户数据必须)
"sub": {
"SubscribeWs": [
# 市场数据订阅(无需账户ID)
{
"频道类型": 配置参数
},
# 账户数据订阅(需要账户ID)
{
"频道类型": 配置参数
}
]
}
}
📈 Kline数据订阅详解
📈 Kline订阅配置
基础格式:
{
"Kline": {
"symbols": ["BTC_USDT", "ETH_USDT"], # 订阅的交易对列表
"interval": "1m" # K线时间间隔
}
}
支持的时间间隔:
"1m","3m","5m","15m","30m": 分钟级别"1h","2h","4h","6h","8h","12h": 小时级别"1d","3d": 天级别"1w": 周级别"1M": 月级别
🔄 Kline回调数据结构
当订阅Kline数据后,每当有新的K线数据时,会触发on_kline(exchange, kline)回调:
回调参数:
exchange: 字符串,交易所名称kline: K线数据对象,包含以下字段:symbol: 交易对符号interval: K线时间粒度candles: 蜡烛图数据列表(通常只包含最新的一根K线)
Candle数据结构:
{
"timestamp": 1640995200000, # 开盘时间,Unix时间戳(毫秒)
"open": 50000.0, # 开盘价
"high": 50500.0, # 最高价
"low": 49800.0, # 最低价
"close": 50200.0, # 收盘价
"volume": 123.45, # 成交量(基础货币)
"quote_volume": 6180000.0, # 成交额(报价货币)
"trades": 1250, # 成交笔数(可选)
"taker_buy_volume": 65.2, # 主动买入成交量(可选)
"taker_buy_quote_volume": 3270000.0, # 主动买入成交额(可选)
"confirm": False # K线状态:false=未完结,true=已完结
}
📊 Kline订阅示例
完整订阅示例:
def subscribes(self):
"""返回订阅配置"""
return [
{
"account_id": 0,
"sub": {
"SubscribeWs": [
# 订阅多个交易对的1分钟K线
{
"Kline": {
"symbols": ["BTC_USDT", "ETH_USDT", "SOL_USDT"],
"interval": "1m"
}
},
# 同时订阅BBO数据
{
"Bbo": ["BTC_USDT", "ETH_USDT"]
}
]
}
},
# 可以订阅不同时间周期的K线
{
"account_id": 0,
"sub": {
"SubscribeWs": [
{
"Kline": {
"symbols": ["BTC_USDT"],
"interval": "5m"
}
}
]
}
}
]
def on_kline(self, exchange, kline):
"""K线数据回调处理"""
symbol = kline['symbol']
interval = kline['interval']
candles = kline['candles']
for candle in candles:
timestamp = candle['timestamp']
open_price = candle['open']
high_price = candle['high']
low_price = candle['low']
close_price = candle['close']
volume = candle['volume']
is_confirmed = candle['confirm']
# 计算涨跌幅
price_change_pct = (close_price - open_price) / open_price * 100
status = "已完结" if is_confirmed else "未完结"
self.trader.log(
f"{exchange} {symbol} {interval} K线更新: "
f"价格 {close_price} (涨跌 {price_change_pct:+.2f}%), "
f"成交量 {volume:.2f}, 状态: {status}",
level="DEBUG"
)
# 只对已完结的K线进行技术分析
if is_confirmed:
self.analyze_kline_signal(symbol, candle)
def analyze_kline_signal(self, symbol, candle):
"""分析K线信号"""
# 检测大阳线/大阴线
body_ratio = abs(candle['close'] - candle['open']) / (candle['high'] - candle['low'])
if body_ratio > 0.7: # 实体超过70%
if candle['close'] > candle['open']:
signal_type = "大阳线"
color = "green"
else:
signal_type = "大阴线"
color = "red"
self.trader.tlog(
f"K线信号_{symbol}",
f"{symbol}: 检测到{signal_type},实体比例 {body_ratio:.1%}",
color=color,
interval=60
)
🔥 其他WebSocket订阅示例
深度数据订阅:
{
"Depth": {
"symbols": ["BTC_USDT", "ETH_USDT"],
"levels": 5 # 订阅5档深度
}
}
账户数据订阅:
{
"account_id": 0,
"sub": {
"SubscribeWs": [
{
"Order": ["BTC_USDT", "ETH_USDT"] # 订单更新
},
{
"Position": ["BTC_USDT", "ETH_USDT"] # 持仓更新
},
{
"OrderAndFill": ["BTC_USDT"] # 订单和成交数据
}
]
}
}
💡 WebSocket完整示例
订阅配置:
def subscribes(self):
"""WebSocket订阅配置示例"""
return [
{
"account_id": 0,
"sub": {
"SubscribeWs": [
# 市场行情数据
{
"Bbo": ["BTC_USDT", "ETH_USDT"] # 最佳买卖价
},
{
"Kline": {
"symbols": ["BTC_USDT", "ETH_USDT"],
"interval": "1m" # 1分钟K线
}
},
{
"Depth": {
"symbols": ["BTC_USDT"],
"levels": 10 # 10档深度
}
},
{
"Funding": ["BTC_USDT", "ETH_USDT"] # 资金费率
},
# 账户数据
{
"Order": ["BTC_USDT", "ETH_USDT"] # 订单状态
},
{
"Position": ["BTC_USDT", "ETH_USDT"] # 持仓变化
},
{
"Balance": [] # 账户余额(不指定币种获取所有)
}
]
}
}
]
回调函数处理:
def on_bbo(self, exchange, symbol, bbo):
"""最佳买卖价回调"""
bid_price = bbo.get('bid_price')
ask_price = bbo.get('ask_price')
spread = ask_price - bid_price
self.trader.log(f"{symbol} BBO: 买{bid_price} 卖{ask_price} 价差{spread}")
def on_kline(self, exchange, kline):
"""K线数据回调"""
symbol = kline['symbol']
interval = kline['interval']
for candle in kline['candles']:
if candle['confirm']: # 只处理已完结的K线
close_price = candle['close']
volume = candle['volume']
self.trader.log(f"{symbol} {interval} K线完结: 价格{close_price} 成交量{volume}")
def on_depth(self, exchange, symbol, depth):
"""深度数据回调"""
bids = depth.get('bids', [])
asks = depth.get('asks', [])
if bids and asks:
best_bid = bids[0][0] if bids[0] else 0
best_ask = asks[0][0] if asks[0] else 0
self.trader.log(f"{symbol} 深度: 最优买{best_bid} 最优卖{best_ask}")
def on_order(self, exchange, order):
"""订单状态回调"""
symbol = order.get('symbol')
status = order.get('status')
side = order.get('side')
self.trader.log(f"订单更新: {symbol} {side} {status}")
def on_position(self, exchange, position):
"""持仓变化回调
参数:
exchange: 交易所名称
position: 持仓数据,数据结构参见[7.2 Position(持仓)数据结构](#72-position持仓数据结构)
"""
symbol = position.get('symbol')
amount = position.get('amount', 0)
side = position.get('side')
unrealized_pnl = position.get('unrealized_pnl', 0)
self.trader.log(f"持仓更新: {symbol} {side} 数量{amount} 未实现盈亏{unrealized_pnl}")
8.3 REST轮询订阅 (SubscribeRest)
REST轮询订阅适用于不需要极低延迟但需要定期更新的数据,如账户余额、持仓状态、合约信息等。系统会按照指定的时间间隔主动调用REST API获取数据。
🔄 REST轮询支持的数据类型
| 数据类型 | 说明 | 回调函数 | 更新频率建议 | 适用场景 |
|---|---|---|---|---|
Balance | 账户余额 | on_balance | 1-5秒 | 余额监控、风控 |
Position | 持仓信息 | on_position | 2-10秒 | 持仓管理、盈亏计算 |
Funding | 资金费率 | on_funding | 30-300秒 | 资金费率监控 |
Instrument | 合约信息 | on_instrument | 60-300秒 | 合约规则更新 |
⚙️ REST轮询配置
基础配置格式:
{
"account_id": 0, # 必须指定账户ID
"sub": {
"SubscribeRest": {
"update_interval": {
"secs": 5, # 轮询间隔(秒)
"nanos": 0 # 纳秒精度(通常为0)
},
"rest_type": "数据类型" # 要轮询的数据类型
}
}
}
实际示例:
# 轮询账户余额(每5秒一次)
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 5, "nanos": 0},
"rest_type": "Balance"
}
}
}
# 轮询持仓信息(每10秒一次)
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 10, "nanos": 0},
"rest_type": "Position"
}
}
}
# 轮询合约信息(每5分钟一次)
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 300, "nanos": 0},
"rest_type": "Instrument"
}
}
}
💡 REST轮询完整示例
订阅配置:
def subscribes(self):
"""REST轮询订阅配置示例"""
return [
# 轮询账户余额
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 5, "nanos": 0},
"rest_type": "Balance"
}
}
},
# 轮询持仓信息
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 10, "nanos": 0},
"rest_type": "Position"
}
}
},
# 轮询资金费率(备用方案,当WebSocket不稳定时)
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 60, "nanos": 0},
"rest_type": "Funding"
}
}
},
# 轮询合约信息更新
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 300, "nanos": 0},
"rest_type": "Instrument"
}
}
}
]
回调函数处理:
def on_balance(self, exchange, balances):
"""余额轮询回调"""
for balance in balances:
asset = balance.get('asset')
available = balance.get('available_balance', 0)
total = balance.get('balance', 0)
if total > 0: # 只关注有余额的币种
self.trader.log(f"余额更新: {asset} 可用{available} 总计{total}")
# 风控检查
if available < total * 0.1: # 可用余额不足10%
self.trader.log(f"⚠️ {asset} 可用余额不足: {available}", level="WARN")
def on_position(self, exchange, positions):
"""持仓轮询回调
参数:
exchange: 交易所名称
positions: 持仓数据列表,每个元素的数据结构参见[7.2 Position(持仓)数据结构](#72-position持仓数据结构)
"""
for position in positions:
symbol = position.get('symbol')
amount = position.get('amount', 0)
side = position.get('side')
unrealized_pnl = position.get('unrealized_pnl', 0)
if abs(amount) > 0: # 只关注有持仓的交易对
self.trader.log(f"持仓检查: {symbol} {side} 数量{amount} 浮盈{unrealized_pnl}")
# 持仓风控
if unrealized_pnl < -1000: # 浮亏超过1000
self.trader.log(f"🚨 {symbol} 浮亏过大: {unrealized_pnl}", level="ERROR")
def on_funding(self, exchange, funding_rates):
"""资金费率轮询回调"""
for funding in funding_rates:
symbol = funding.get('symbol')
rate = funding.get('funding_rate', 0)
next_time = funding.get('next_funding_at', 0)
# 资金费率监控
if abs(rate) > 0.001: # 费率超过0.1%
rate_pct = rate * 100
self.trader.log(f"高资金费率: {symbol} {rate_pct:.3f}% 下次结算{next_time}")
def on_instrument(self, exchange, instruments):
"""合约信息轮询回调"""
for instrument in instruments:
symbol = instrument.get('symbol')
status = instrument.get('status')
# 监控交易对状态变化
if status != 'trading':
self.trader.log(f"⚠️ 交易对状态异常: {symbol} 状态为 {status}", level="WARN")
8.4 定时器订阅 (SubscribeTimer)
定时器订阅用于执行策略相关的定时任务,如定期检查订单状态、执行风控逻辑、统计数据更新等。不依赖外部数据源,完全由系统定时器驱动。
⏰ 定时器特点
- 精确定时: 支持秒级和纳秒级精度
- 多定时器: 可以设置多个不同名称的定时器
- 独立执行: 不依赖交易所连接状态
- 灵活配置: 可以动态调整执行间隔
- 错开启动: 支持初始延迟,避免多个定时器同时启动造成资源竞争
🎯 定时器配置
基础配置格式:
{
"sub": {
"SubscribeTimer": {
"update_interval": {
"secs": 间隔秒数, # 定时器间隔(秒)
"nanos": 纳秒精度 # 纳秒精度(通常为0)
},
"name": "定时器名称", # 定时器标识名称
"initial_delay": { # 可选:初始延迟时间
"secs": 延迟秒数, # 启动前延迟(秒)
"nanos": 纳秒精度 # 纳秒精度(通常为0)
}
}
}
}
字段说明:
update_interval: 定时器执行间隔,必须字段name: 定时器名称,用于标识和日志记录,必须字段initial_delay: 初始延迟时间,可选字段,用于错开多个定时器的启动时间,避免资源竞争
initial_delay使用场景:
| 场景类型 | 建议延迟时间 | 原因说明 | 示例 |
|---|---|---|---|
| 风控检查 | 30-60秒 | 等待数据稳定,避免启动时误报 | 余额检查、持仓风控 |
| 统计计算 | 错开15-60秒 | 避免多个计算任务同时执行 | 收益统计、交易分析 |
| 网络请求 | 错开10-30秒 | 分散API调用,避免频率限制 | 外部数据获取 |
| 文件操作 | 错开30-120秒 | 避免同时写文件造成冲突 | 日志轮转、数据备份 |
| 连接监控 | 0秒(立即) | 需要尽快检测连接状态 | WebSocket状态检查 |
实际应用示例:
# 每30秒检查一次价格预警(立即启动)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 30, "nanos": 0},
"name": "price_alert_check"
}
}
}
# 每分钟更新一次策略统计(延迟10秒启动)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 60, "nanos": 0},
"name": "stats_update",
"initial_delay": {"secs": 10, "nanos": 0}
}
}
}
# 每10秒检查一次订单状态(延迟5秒启动)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 10, "nanos": 0},
"name": "order_status_check",
"initial_delay": {"secs": 5, "nanos": 0}
}
}
}
# 风控检查(延迟30秒启动,避免启动时误报)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 300, "nanos": 0},
"name": "risk_check",
"initial_delay": {"secs": 30, "nanos": 0}
}
}
}
# 对应的回调函数处理
def on_timer_subscribe(self, timer_name):
"""定时器回调处理"""
if timer_name == "price_alert_check":
self.check_price_alerts()
elif timer_name == "stats_update":
self.update_strategy_stats()
elif timer_name == "order_status_check":
self.check_pending_orders()
else:
self.trader.log(f"未知定时器: {timer_name}", level="WARN")
💡 定时器完整示例
订阅配置:
def subscribes(self):
"""定时器订阅配置示例"""
return [
# 策略监控定时器(每30秒,立即启动)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 30, "nanos": 0},
"name": "strategy_monitor"
}
}
},
# 风控检查定时器(每5分钟,延迟60秒启动避免启动时误报)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 300, "nanos": 0},
"name": "risk_check",
"initial_delay": {"secs": 60, "nanos": 0}
}
}
},
# 统计数据更新(每分钟,延迟15秒启动)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 60, "nanos": 0},
"name": "stats_update",
"initial_delay": {"secs": 15, "nanos": 0}
}
}
},
# 价格预警检查(每10秒,延迟5秒启动)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 10, "nanos": 0},
"name": "price_alert",
"initial_delay": {"secs": 5, "nanos": 0}
}
}
},
# 订单状态检查(每15秒,延迟10秒启动)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 15, "nanos": 0},
"name": "order_check",
"initial_delay": {"secs": 10, "nanos": 0}
}
}
}
]
回调函数处理:
def on_timer_subscribe(self, timer_name):
"""定时器回调统一处理"""
try:
if timer_name == "strategy_monitor":
self.strategy_monitor()
elif timer_name == "risk_check":
self.risk_management_check()
elif timer_name == "stats_update":
self.update_strategy_statistics()
elif timer_name == "price_alert":
self.check_price_alerts()
elif timer_name == "order_check":
self.check_pending_orders()
else:
self.trader.log(f"未知定时器: {timer_name}", level="WARN")
except Exception as e:
self.trader.log(f"定时器{timer_name}执行异常: {e}", level="ERROR")
def strategy_monitor(self):
"""策略监控逻辑"""
# 检查策略运行状态
uptime = time.time() - self.start_time
self.trader.tlog(
"策略监控",
f"策略运行时间: {uptime:.0f}秒, 状态: 正常",
interval=300, # 5分钟记录一次
color="green"
)
# 检查WebSocket连接状态
if hasattr(self, 'last_data_time'):
time_since_data = time.time() - self.last_data_time
if time_since_data > 60: # 超过1分钟没有数据
self.trader.log("⚠️ WebSocket数据中断超过1分钟", level="WARN")
def risk_management_check(self):
"""风控检查"""
# 检查总持仓风险
total_exposure = self.calculate_total_exposure()
max_exposure = 100000 # 最大敞口100k
if total_exposure > max_exposure:
self.trader.log(
f"🚨 总敞口超限: {total_exposure} > {max_exposure}",
level="ERROR",
color="red"
)
# 执行风控措施
self.emergency_close_positions()
# 检查浮动盈亏
total_pnl = self.calculate_total_pnl()
if total_pnl < -5000: # 浮亏超过5k
self.trader.log(
f"🚨 浮亏过大: {total_pnl}",
level="ERROR",
color="red"
)
def update_strategy_statistics(self):
"""更新策略统计"""
# 计算成功率
if self.total_trades > 0:
win_rate = self.successful_trades / self.total_trades * 100
self.trader.tlog(
"策略统计",
f"交易次数: {self.total_trades}, 成功率: {win_rate:.1f}%",
interval=300,
color="blue"
)
# 更新到Web平台
if hasattr(self.trader, 'update_trade_stats'):
self.trader.update_trade_stats(
maker_volume=self.total_maker_volume,
taker_volume=self.total_taker_volume,
profit=self.total_profit
)
def check_price_alerts(self):
"""价格预警检查"""
for symbol in self.monitored_symbols:
current_price = self.get_current_price(symbol)
if current_price:
# 检查价格突破
if symbol in self.price_alerts:
alert = self.price_alerts[symbol]
if alert['type'] == 'above' and current_price > alert['price']:
self.trader.log(f"💡 {symbol} 突破预警价格 {alert['price']}", color="yellow")
elif alert['type'] == 'below' and current_price < alert['price']:
self.trader.log(f"💡 {symbol} 跌破预警价格 {alert['price']}", color="yellow")
def check_pending_orders(self):
"""检查挂单状态"""
# 检查长时间未成交的订单
for order_id, order_info in self.pending_orders.items():
order_age = time.time() - order_info['created_time']
if order_age > 600: # 超过10分钟未成交
self.trader.log(
f"⏰ 订单{order_id}长时间未成交: {order_age:.0f}秒",
level="WARN"
)
# 可以选择撤销长时间未成交的订单
if order_age > 1800: # 超过30分钟强制撤销
self.cancel_order(order_id)
self.trader.log(f"🗑️ 强制撤销超时订单: {order_id}")
8.5 综合订阅配置示例
以下是一个完整的策略订阅配置示例,展示如何同时使用三种数据源:
def subscribes(self):
"""综合订阅配置示例"""
return [
# 1. WebSocket实时数据订阅
{
"account_id": 0,
"sub": {
"SubscribeWs": [
# 市场行情数据
{
"Kline": {
"symbols": ["BTC_USDT", "ETH_USDT"],
"interval": "1m"
}
},
{
"Bbo": ["BTC_USDT", "ETH_USDT"]
},
{
"Depth": {
"symbols": ["BTC_USDT"],
"levels": 5
}
},
# 账户数据
{
"Order": ["BTC_USDT", "ETH_USDT"]
},
{
"Position": ["BTC_USDT", "ETH_USDT"]
}
]
}
},
# 2. REST轮询数据订阅
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 5, "nanos": 0},
"rest_type": "Balance" # 每5秒更新余额
}
}
},
# 3. 定时器任务订阅
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 30, "nanos": 0},
"name": "strategy_monitor" # 每30秒执行策略监控
}
}
},
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 300, "nanos": 0},
"name": "risk_check" # 每5分钟执行风控检查
}
}
}
]
8.6 数据源选择与最佳实践
🎯 数据源选择指南
根据不同的应用场景,选择合适的数据源类型:
| 应用场景 | 推荐数据源 | 订阅频道 | 更新频率 | 理由 |
|---|---|---|---|---|
| 高频交易 | WebSocket | Bbo, Depth, Trade | 毫秒级 | 需要极低延迟的实时数据 |
| 技术分析 | WebSocket | Kline, MarkPrice | 秒级 | K线完结状态重要,实时性要求高 |
| 套利策略 | WebSocket | Bbo, Funding | 秒-分钟级 | 价差和资金费率变化敏感 |
| 风控监控 | REST轮询 | Balance, Position | 1-10秒 | 定期检查即可,不需要实时 |
| 策略逻辑 | 定时器 | Timer | 10秒-5分钟 | 独立于市场数据的逻辑执行 |
| 账户管理 | WebSocket + REST | Order + Balance | 混合 | 实时订单状态 + 定期余额检查 |
✅ 最佳实践建议
1. 分层订阅策略
def subscribes(self):
"""分层订阅策略示例"""
return [
# 核心层:高频实时数据
{
"account_id": 0,
"sub": {"SubscribeWs": [
{"Bbo": self.trading_symbols}, # 交易信号生成
{"Order": self.trading_symbols} # 订单状态监控
]}
},
# 支撑层:中频数据
{
"account_id": 0,
"sub": {"SubscribeRest": {
"update_interval": {"secs": 10, "nanos": 0},
"rest_type": "Balance" # 余额监控
}}
},
# 管理层:低频定时任务
{
"sub": {"SubscribeTimer": {
"update_interval": {"secs": 60, "nanos": 0},
"name": "risk_management", # 风控检查
"initial_delay": {"secs": 30, "nanos": 0} # 延迟启动避免误报
}}
}
]
2. 数据处理优化
def on_bbo(self, exchange, symbol, bbo):
"""优化的BBO处理"""
try:
# 快速数据验证
if not self.validate_bbo_data(bbo):
return
# 核心交易逻辑(最小化处理时间)
signal = self.generate_trading_signal(symbol, bbo)
if signal:
self.execute_trade(signal)
except Exception as e:
# 错误处理不影响主流程
self.trader.log(f"BBO处理异常: {e}", level="ERROR")
def on_kline(self, exchange, kline):
"""K线数据处理最佳实践"""
for candle in kline['candles']:
# 关键:只处理已完结的K线
if candle['confirm']:
self.analyze_completed_candle(candle)
else:
# 未完结K线可用于实时监控,但不做技术分析
self.monitor_current_candle(candle)
3. 资源管理与性能优化
class PerformanceOptimizedStrategy:
def __init__(self):
# 数据缓存,避免重复计算
self.price_cache = {}
self.last_update_time = {}
def subscribes(self):
"""性能优化的订阅配置"""
# 只订阅必要的交易对
core_symbols = ["BTC_USDT", "ETH_USDT"] # 核心交易对
monitor_symbols = ["SOL_USDT", "BNB_USDT"] # 监控交易对
return [
# 核心交易对 - 高频订阅
{
"account_id": 0,
"sub": {"SubscribeWs": [
{"Bbo": core_symbols},
{"Kline": {"symbols": core_symbols, "interval": "1m"}}
]}
},
# 监控交易对 - 低频订阅
{
"account_id": 0,
"sub": {"SubscribeWs": [
{"Bbo": monitor_symbols}
]}
}
]
def on_bbo(self, exchange, symbol, bbo):
"""带缓存的BBO处理"""
current_time = time.time()
last_time = self.last_update_time.get(symbol, 0)
# 限制处理频率,避免过度计算
if current_time - last_time < 0.1: # 100ms内不重复处理
return
self.last_update_time[symbol] = current_time
self.process_bbo_data(symbol, bbo)
✅ 推荐配置模式
高频交易模式:
# 追求极低延迟,最小化订阅
["Bbo", "Order"] # 仅订阅必要数据
技术分析模式:
# 需要完整市场数据
["Kline", "Bbo", "Depth", "MarkPrice"]
稳健套利模式:
# 平衡实时性和稳定性
WebSocket: ["Bbo", "Funding", "Order", "Position"]
REST: ["Balance"] (每5秒)
Timer: ["risk_check"] (每分钟,延迟30秒启动)
❌ 常见误区避免
1. 过度订阅
# ❌ 错误:订阅过多不必要的数据
["Bbo", "Depth", "Trade", "Kline", "MarkPrice", "Funding"] # 太多了
# ✅ 正确:只订阅策略需要的数据
["Bbo", "Order"] # 简单策略只需要这些
2. 忽略数据状态
# ❌ 错误:不检查K线完结状态
def on_kline(self, exchange, kline):
candle = kline['candles'][-1]
self.analyze_signal(candle) # 可能分析未完结K线
# ✅ 正确:检查K线状态
def on_kline(self, exchange, kline):
candle = kline['candles'][-1]
if candle['confirm']: # 只分析已完结的K线
self.analyze_signal(candle)
3. 阻塞回调函数
# ❌ 错误:在回调中执行耗时操作
def on_bbo(self, exchange, symbol, bbo):
time.sleep(0.1) # 阻塞其他数据处理
self.complex_calculation() # 复杂计算
# ✅ 正确:异步处理耗时操作
def on_bbo(self, exchange, symbol, bbo):
# 快速缓存数据
self.bbo_cache[symbol] = bbo
# 复杂处理放在定时器中进行
def on_timer_subscribe(self, timer_name):
if timer_name == "complex_analysis":
self.perform_complex_analysis()
4. 忽略初始延迟配置
# ❌ 错误:多个定时器同时启动,造成资源竞争
[
{"SubscribeTimer": {"update_interval": {"secs": 300, "nanos": 0}, "name": "stats"}},
{"SubscribeTimer": {"update_interval": {"secs": 300, "nanos": 0}, "name": "report"}},
{"SubscribeTimer": {"update_interval": {"secs": 300, "nanos": 0}, "name": "backup"}}
]
# ✅ 正确:使用初始延迟错开启动时间
[
{"SubscribeTimer": {"update_interval": {"secs": 300, "nanos": 0}, "name": "stats"}},
{"SubscribeTimer": {"update_interval": {"secs": 300, "nanos": 0}, "name": "report",
"initial_delay": {"secs": 60, "nanos": 0}}},
{"SubscribeTimer": {"update_interval": {"secs": 300, "nanos": 0}, "name": "backup",
"initial_delay": {"secs": 120, "nanos": 0}}}
]
5. 初始延迟最佳实践
def subscribes(self):
"""使用initial_delay的最佳实践"""
return [
# 立即启动的监控类定时器
{
"sub": {"SubscribeTimer": {
"update_interval": {"secs": 30, "nanos": 0},
"name": "connection_monitor" # 无延迟,立即监控连接状态
}}
},
# 延迟启动的风控类定时器
{
"sub": {"SubscribeTimer": {
"update_interval": {"secs": 60, "nanos": 0},
"name": "risk_check",
"initial_delay": {"secs": 60, "nanos": 0} # 延迟1分钟,等数据稳定
}}
},
# 错开启动的统计类定时器(避免同时执行)
{
"sub": {"SubscribeTimer": {
"update_interval": {"secs": 300, "nanos": 0},
"name": "stats_calc",
"initial_delay": {"secs": 30, "nanos": 0} # 错开30秒
}}
},
{
"sub": {"SubscribeTimer": {
"update_interval": {"secs": 300, "nanos": 0},
"name": "report_gen",
"initial_delay": {"secs": 90, "nanos": 0} # 错开90秒
}}
}
]
🔧 调试与监控
数据流监控:
def on_bbo(self, exchange, symbol, bbo):
# 统计数据接收频率
self.data_stats[symbol] = self.data_stats.get(symbol, 0) + 1
def on_timer_subscribe(self, timer_name):
if timer_name == "data_monitor":
for symbol, count in self.data_stats.items():
self.trader.tlog(
"数据统计",
f"{symbol}: {count}次/分钟",
interval=60
)
self.data_stats.clear()
通过合理的数据源选择和配置,可以构建高效、稳定的量化交易策略。
九、完整策略示例
以下是一个跨交易所套利策略示例,展示了如何使用Trader API v2的核心功能:
"""
arbitrage_strategy_v2.py - 跨交易所套利策略示例 (v2版本)
"""
import json
import time
import base_strategy
class ArbitrageStrategyV2(base_strategy.BaseStrategy):
def __init__(self, cex_configs, dex_configs, config, trader):
"""初始化策略"""
self.cex_configs = cex_configs
self.has_account = len(cex_configs) > 0
self.dex_configs = dex_configs
self.trader = trader
self.config = config or {
"send_to_web": True,
"min_spread": 1.0, # 最小开仓价差(‰)
"symbols": ["BTC_USDT", "ETH_USDT", "SOL_USDT"] # 监控的交易对
}
# 初始化数据
self.positions = {}
self.running = False
self.primary_balance = 10000.0 # 默认值
self.secondary_balance = 5000.0 # 默认值
self.start_time = time.time() # 策略启动时间
self.last_data_time = time.time() # 最后数据时间
# 加载缓存
self.load_state()
def _init_fee_rates(self):
"""查询费率信息"""
# 默认费率
self.primary_maker_fee = 0.0002
self.primary_taker_fee = 0.0005
self.secondary_maker_fee = 0.0002
self.secondary_taker_fee = 0.0005
self.primary_rebate_rate = 0
self.secondary_rebate_rate = 0
if not self.has_account or not self.config.get("symbols"):
return
symbol = self.config["symbols"][0]
# 查询主账户费率 - 使用v2格式
try:
result = self.trader.get_fee_rate(0, symbol)
if "Ok" in result:
fee_rate = result["Ok"]
self.trader.log(f"主账户费率: {fee_rate}", level="DEBUG", web=self.config.get("send_to_web", True))
self.primary_maker_fee = fee_rate.get('maker_fee_rate', 0.0002)
self.primary_taker_fee = fee_rate.get('taker_fee_rate', 0.0005)
self.primary_rebate_rate = self.cex_configs[0].get('rebate_rate', 0)
except Exception as e:
self.trader.log(f"获取主账户费率失败: {e}", level="WARN", web=self.config.get("send_to_web", True))
# 查询次账户费率
if len(self.cex_configs) > 1:
try:
result = self.trader.get_fee_rate(1, symbol)
if "Ok" in result:
fee_rate = result["Ok"]
self.trader.log(f"次账户费率: {fee_rate}", level="DEBUG", web=self.config.get("send_to_web", True))
self.secondary_maker_fee = fee_rate.get('maker_fee_rate', 0.0002)
self.secondary_taker_fee = fee_rate.get('taker_fee_rate', 0.0005)
self.secondary_rebate_rate = self.cex_configs[1].get('rebate_rate', 0)
except Exception as e:
self.trader.log(f"获取次账户费率失败: {e}", level="WARN", web=self.config.get("send_to_web", True))
# 计算成本,两边都用taker
self.cost = (self.primary_taker_fee + self.secondary_taker_fee) * 2
# 查询账户余额 - 使用v2格式
try:
result = self.trader.get_usdt_balance(0)
if "Ok" in result:
balance = result["Ok"]
self.primary_balance = balance.get('balance', 10000.0)
except Exception:
self.trader.log("获取主账户余额失败", level="WARN", web=self.config.get("send_to_web", True))
# 查询次账户余额
if len(self.cex_configs) > 1:
try:
result = self.trader.get_usdt_balance(1)
if "Ok" in result:
balance = result["Ok"]
self.secondary_balance = balance.get('balance', 5000.0)
except Exception:
self.trader.log("获取次账户余额失败", level="WARN", web=self.config.get("send_to_web", True))
def _init_web_client(self):
"""初始化Web客户端"""
web_config = {
"server_name": "跨所套利策略v2",
"primary_balance": self.primary_balance,
"secondary_balance": self.secondary_balance,
"is_production": True,
"open_threshold": self.config.get("min_spread", 1.0) / 10, # 转换为小数
"max_position_ratio": 80,
"max_leverage": 2,
"funding_rate_threshold": 0.1, # 资金费率阈值
"cost": self.cost, # 交易成本
"primary_maker_fee_rate": self.primary_maker_fee,
"primary_taker_fee_rate": self.primary_taker_fee,
"primary_rebate_rate": self.primary_rebate_rate,
"secondary_maker_fee_rate": self.secondary_maker_fee,
"secondary_taker_fee_rate": self.secondary_taker_fee,
"secondary_rebate_rate": self.secondary_rebate_rate
}
self.trader.init_web_client(web_config)
def name(self):
"""返回策略名称"""
return "跨所套利策略v2"
def load_state(self):
"""从缓存加载策略状态"""
cached_data = self.trader.cache_load()
if cached_data:
self.positions = cached_data.get("positions", {})
self.trader.log("已加载缓存状态", web=self.config.get("send_to_web", True))
def save_state(self):
"""保存策略状态到缓存"""
state = {"positions": self.positions, "last_update": int(time.time())}
self.trader.cache_save(state)
def start(self):
"""启动策略"""
# 初始化费率信息
self._init_fee_rates()
# 初始化Web客户端
if self.config.get("send_to_web", True):
self._init_web_client()
self.trader.start_web_client(upload_interval=5)
self.trader.log("启动跨所套利策略v2", web=self.config.get("send_to_web", True))
# 上传表格示例
self.upload_tables()
# tlog示例 频繁日志使用tlog限频
self.trader.tlog("价差", "BTC_USDT: 1.25‰, ETH_USDT: 2.10‰, SOL_USDT: 3.45‰", color="green")
# 初始化检查并启动交易
if self.has_account:
self.scan_opportunities() # 扫描套利机会
else:
self.trader.log("没有配置账户,无法交易", level="WARN")
def upload_tables(self):
"""上传表格示例方法"""
self.spread_table = {
"title": "价差",
"cols": ["交易对", "价差(‰)"],
"rows": [
["BTC_USDT", "1.25"],
["ETH_USDT", "2.10"],
["SOL_USDT", "3.45"]
]
}
self.position_table = {
"title": "持仓",
"cols": ["交易对", "持仓量", "持仓价值", "浮动盈亏", "持仓时间"],
"rows": [
["BTC_USDT", "1.25", "10000", "100", "2小时15分"],
["ETH_USDT", "2.10", "20000", "200", "1小时30分"],
["SOL_USDT", "3.45", "30000", "300", "3小时45分"]
]
}
# 上传表格
self.trader.upload_tables([self.spread_table, self.position_table])
def place_arbitrage_orders(self, symbol, spread):
"""下套利订单 - 使用v2 API"""
try:
# 获取当前价格
primary_result = self.trader.get_ticker(0, symbol)
secondary_result = self.trader.get_ticker(1, symbol)
if "Ok" not in primary_result or "Ok" not in secondary_result:
return False
primary_ticker = primary_result["Ok"]
secondary_ticker = secondary_result["Ok"]
primary_price = float(primary_ticker.get('price', 0))
secondary_price = float(secondary_ticker.get('price', 0))
if primary_price == 0 or secondary_price == 0:
return False
# 计算交易数量
amount = 0.01 # 示例数量
# 生成订单ID
primary_cid_result = self.trader.create_cid()
secondary_cid_result = self.trader.create_cid()
if "Ok" not in primary_cid_result or "Ok" not in secondary_cid_result:
return False
primary_cid = primary_cid_result["Ok"]
secondary_cid = secondary_cid_result["Ok"]
# 主账户做多,次账户做空
primary_order = {
"symbol": symbol,
"side": "Buy",
"order_type": "Market",
"amount": amount,
"cid": primary_cid
}
secondary_order = {
"symbol": symbol,
"side": "Sell",
"order_type": "Market",
"amount": amount,
"cid": secondary_cid
}
# 下单
primary_result = self.trader.place_order(0, primary_order, sync=True)
secondary_result = self.trader.place_order(1, secondary_order, sync=True)
if "Ok" in primary_result and "Ok" in secondary_result:
self.trader.log(f"套利订单已下达: {symbol}, 价差: {spread:.2f}‰", color="green")
# 记录交易统计 - 开仓时只记录交易量
stats_result = self.trader.update_trade_stats(0, amount * primary_price + amount * secondary_price, 0)
return True
else:
self.trader.log(f"套利订单失败: {symbol}", level="ERROR", color="red")
return False
except Exception as e:
self.trader.log(f"下单异常: {e}", level="ERROR", color="red")
return False
def scan_opportunities(self):
"""扫描套利机会"""
for symbol in self.config.get("symbols", []):
try:
# 获取双边价格
primary_result = self.trader.get_bbo(0, symbol)
secondary_result = self.trader.get_bbo(1, symbol)
if "Ok" not in primary_result or "Ok" not in secondary_result:
continue
primary_bbo = primary_result["Ok"]
secondary_bbo = secondary_result["Ok"]
if not primary_bbo or not secondary_bbo:
continue
primary_bid = float(primary_bbo.get('bid_price', 0))
primary_ask = float(primary_bbo.get('ask_price', 0))
secondary_bid = float(secondary_bbo.get('bid_price', 0))
secondary_ask = float(secondary_bbo.get('ask_price', 0))
if primary_bid == 0 or secondary_ask == 0:
continue
# 计算价差
spread = (primary_bid - secondary_ask) / secondary_ask * 1000 # 千分比
# 检查是否满足开仓条件
if spread > self.config.get("min_spread", 1.0):
self.trader.tlog(f"发现机会", f"{symbol}: {spread:.2f}‰", color="yellow", interval=10)
# 检查平台控制状态
if self.trader.is_web_opening_stopped():
self.trader.log("平台禁止开仓", level="WARN")
continue
if self.trader.is_web_force_closing():
self.trader.log("平台要求强平", level="ERROR")
break
# 下套利订单
self.place_arbitrage_orders(symbol, spread)
except Exception as e:
self.trader.log(f"扫描{symbol}异常: {e}", level="ERROR")
def analyze_market_trend(self, symbol):
"""分析市场趋势 - 使用K线数据"""
try:
# 获取1小时K线数据,用于趋势分析
result = self.trader.get_kline(
account_id=0,
symbol=symbol,
interval="1h",
limit=24 # 获取24小时数据
)
if "Ok" not in result:
return None
kline_data = result["Ok"]
if not kline_data or not kline_data.get('candles') or len(kline_data['candles']) < 10:
return None
candles = kline_data['candles']
# 计算简单移动平均线
prices = [float(candle['close']) for candle in candles[-10:]] # 最近10个小时的收盘价
sma_10 = sum(prices) / len(prices)
current_price = float(candles[-1]['close'])
# 判断趋势
if current_price > sma_10 * 1.002: # 价格高于均线0.2%
trend = "上涨"
trend_strength = (current_price - sma_10) / sma_10 * 100
elif current_price < sma_10 * 0.998: # 价格低于均线0.2%
trend = "下跌"
trend_strength = (sma_10 - current_price) / sma_10 * 100
else:
trend = "震荡"
trend_strength = 0
# 计算波动率
high_prices = [float(candle['high']) for candle in candles[-24:]]
low_prices = [float(candle['low']) for candle in candles[-24:]]
volatility = (max(high_prices) - min(low_prices)) / min(low_prices) * 100
trend_info = {
"symbol": symbol,
"trend": trend,
"strength": trend_strength,
"volatility": volatility,
"current_price": current_price,
"sma_10": sma_10
}
# 记录趋势信息
self.trader.tlog(
f"趋势分析_{symbol}",
f"{symbol}: {trend} {trend_strength:.2f}%, 波动率: {volatility:.2f}%",
color="blue",
interval=300 # 5分钟记录一次
)
return trend_info
except Exception as e:
self.trader.log(f"分析{symbol}趋势异常: {e}", level="ERROR")
return None
def get_support_resistance_levels(self, symbol):
"""获取支撑阻力位 - 使用K线数据"""
try:
# 获取日线数据用于计算支撑阻力位
result = self.trader.get_kline(
account_id=0,
symbol=symbol,
interval="1d",
limit=30 # 获取30天数据
)
if "Ok" not in result:
return None
kline_data = result["Ok"]
if not kline_data or not kline_data.get('candles') or len(kline_data['candles']) < 20:
return None
candles = kline_data['candles']
# 提取高点和低点
highs = [float(candle['high']) for candle in candles]
lows = [float(candle['low']) for candle in candles]
# 计算支撑位和阻力位(简化算法)
resistance_level = max(highs[-10:]) # 最近10天最高价作为阻力位
support_level = min(lows[-10:]) # 最近10天最低价作为支撑位
current_price = float(candles[-1]['close'])
# 计算距离支撑阻力位的百分比
resistance_distance = (resistance_level - current_price) / current_price * 100
support_distance = (current_price - support_level) / current_price * 100
levels_info = {
"symbol": symbol,
"resistance": resistance_level,
"support": support_level,
"current_price": current_price,
"resistance_distance": resistance_distance,
"support_distance": support_distance
}
self.trader.tlog(
f"支撑阻力_{symbol}",
f"{symbol}: 阻力位 {resistance_level:.2f} (+{resistance_distance:.2f}%), "
f"支撑位 {support_level:.2f} (-{support_distance:.2f}%)",
color="purple",
interval=600 # 10分钟记录一次
)
return levels_info
except Exception as e:
self.trader.log(f"计算{symbol}支撑阻力位异常: {e}", level="ERROR")
return None
def on_stop(self):
"""停止策略"""
self.save_state()
if self.config.get("send_to_web", True):
self.trader.stop_web_client()
self.trader.log("策略已停止", web=self.config.get("send_to_web", True))
def subscribes(self):
"""返回订阅列表 - 使用三种数据源"""
subscriptions = []
if self.has_account:
symbols = self.config.get("symbols", ["BTC_USDT", "ETH_USDT"])
# 1. WebSocket实时订阅 - 高频数据
subscriptions.append({
"account_id": 0,
"sub": {"SubscribeWs": [
{"Bbo": symbols}, # 实时最佳买卖价
{"Order": symbols}, # 订单状态更新
{"Position": symbols}, # 持仓更新
{"Funding": symbols} # 资金费率更新
]}
})
# 订阅次账户信息(如果有)
if len(self.cex_configs) > 1:
subscriptions.append({
"account_id": 1,
"sub": {"SubscribeWs": [
{"Order": symbols},
{"Position": symbols}
]}
})
# 2. REST轮询订阅 - 定期数据
subscriptions.append({
"account_id": 0,
"sub": {"SubscribeRest": {
"update_interval": {"secs": 10, "nanos": 0},
"rest_type": "Balance" # 每10秒检查余额
}}
})
# 3. 定时器订阅 - 策略逻辑
subscriptions.extend([
{
"sub": {"SubscribeTimer": {
"update_interval": {"secs": 60, "nanos": 0},
"name": "risk_monitor", # 每分钟风控检查
"initial_delay": {"secs": 30, "nanos": 0} # 延迟30秒启动
}}
},
{
"sub": {"SubscribeTimer": {
"update_interval": {"secs": 300, "nanos": 0},
"name": "stats_update", # 每5分钟统计更新
"initial_delay": {"secs": 45, "nanos": 0} # 延迟45秒启动
}}
}
])
return subscriptions
def on_bbo(self, exchange, symbol, bbo):
"""BBO价格更新回调"""
try:
# 记录最新数据时间
self.last_data_time = time.time()
# 获取买卖价
bid_price = bbo.get('bid_price', 0)
ask_price = bbo.get('ask_price', 0)
if bid_price > 0 and ask_price > 0:
# 计算价差
spread = (ask_price - bid_price) / bid_price * 1000 # 千分比
# 检查套利机会
if spread > self.config.get("min_spread", 1.0):
self.trader.tlog(
f"套利机会_{symbol}",
f"{exchange} {symbol}: 价差 {spread:.2f}‰",
color="green",
interval=30
)
# 检查平台控制状态
if not self.trader.is_web_opening_stopped():
self.place_arbitrage_orders(symbol, spread)
except Exception as e:
self.trader.log(f"BBO处理异常 {symbol}: {e}", level="ERROR")
def on_timer_subscribe(self, timer_name):
"""定时器回调处理"""
try:
if timer_name == "risk_monitor":
self.risk_monitor()
elif timer_name == "stats_update":
self.update_stats()
else:
self.trader.log(f"未知定时器: {timer_name}", level="WARN")
except Exception as e:
self.trader.log(f"定时器{timer_name}异常: {e}", level="ERROR")
def risk_monitor(self):
"""风控监控"""
try:
# 检查总持仓
if hasattr(self, 'positions'):
total_exposure = sum(abs(pos.get('notional_value', 0)) for pos in self.positions.values())
if total_exposure > 50000: # 总敞口超过5万
self.trader.log(f"⚠️ 总敞口过大: {total_exposure}", level="WARN", color="red")
# 检查连接状态
if hasattr(self, 'last_data_time'):
time_since_data = time.time() - self.last_data_time
if time_since_data > 120: # 超过2分钟没有数据
self.trader.log("⚠️ 数据连接可能中断", level="WARN", color="yellow")
except Exception as e:
self.trader.log(f"风控检查异常: {e}", level="ERROR")
def update_stats(self):
"""更新统计数据"""
try:
# 更新表格数据
self.upload_tables()
# 分析市场趋势
for symbol in self.config.get("symbols", []):
trend_info = self.analyze_market_trend(symbol)
if trend_info:
self.trader.tlog(
f"趋势_{symbol}",
f"{symbol}: {trend_info['trend']} {trend_info['strength']:.2f}%",
interval=300,
color="blue"
)
except Exception as e:
self.trader.log(f"统计更新异常: {e}", level="ERROR")
def adjust_strategy_based_on_analysis(self, symbol, trend_info, levels_info):
"""根据技术分析调整策略"""
try:
current_price = trend_info['current_price']
volatility = trend_info['volatility']
trend = trend_info['trend']
# 根据波动率调整最小价差要求
if volatility > 5.0: # 高波动率
adjusted_min_spread = self.config.get("min_spread", 1.0) * 1.5
self.trader.tlog(
f"策略调整_{symbol}",
f"{symbol}: 高波动率{volatility:.2f}%, 调整最小价差至{adjusted_min_spread:.2f}‰",
color="orange",
interval=600
)
elif volatility < 1.0: # 低波动率
adjusted_min_spread = self.config.get("min_spread", 1.0) * 0.8
self.trader.tlog(
f"策略调整_{symbol}",
f"{symbol}: 低波动率{volatility:.2f}%, 调整最小价差至{adjusted_min_spread:.2f}‰",
color="cyan",
interval=600
)
# 根据趋势调整仓位大小
if trend == "上涨" and trend_info['strength'] > 2.0:
self.trader.tlog(
f"趋势信号_{symbol}",
f"{symbol}: 强势上涨{trend_info['strength']:.2f}%, 可考虑增加多头仓位",
color="green",
interval=300
)
elif trend == "下跌" and trend_info['strength'] > 2.0:
self.trader.tlog(
f"趋势信号_{symbol}",
f"{symbol}: 强势下跌{trend_info['strength']:.2f}%, 可考虑增加空头仓位",
color="red",
interval=300
)
except Exception as e:
self.trader.log(f"调整{symbol}策略异常: {e}", level="ERROR")
def check_kline_signals(self, symbol):
"""检查K线信号 - 利用K线完结状态"""
try:
# 获取最新的5分钟K线数据
result = self.trader.get_kline(
account_id=0,
symbol=symbol,
interval="5m",
limit=3 # 获取最近3根K线
)
if "Ok" not in result:
return None
kline_data = result["Ok"]
if not kline_data or not kline_data.get('candles') or len(kline_data['candles']) < 3:
return None
candles = kline_data['candles']
# 检查最新K线是否已完结
latest_candle = candles[-1]
if not latest_candle['confirm']:
# 最新K线未完结,不进行信号判断
return None
# 分析最近3根已完结的K线
prev_candle = candles[-2]
current_candle = candles[-1]
# 检查突破信号(考虑成交量放大)
volume_amplification = current_candle['volume'] / prev_candle['volume'] if prev_candle['volume'] > 0 else 1
if (current_candle['close'] > prev_candle['high'] and volume_amplification > 1.5):
signal = {
"type": "突破信号",
"direction": "看涨",
"price": current_candle['close'],
"volume": current_candle['volume'],
"timestamp": current_candle['timestamp']
}
self.trader.tlog(
f"K线信号_{symbol}",
f"{symbol}: 检测到向上突破信号,价格: {current_candle['close']}, "
f"成交量放大: {volume_amplification:.2f}倍",
color="green",
interval=60
)
return signal
# 检查下跌信号
elif (current_candle['close'] < prev_candle['low'] and volume_amplification > 1.5):
signal = {
"type": "突破信号",
"direction": "看跌",
"price": current_candle['close'],
"volume": current_candle['volume'],
"timestamp": current_candle['timestamp']
}
self.trader.tlog(
f"K线信号_{symbol}",
f"{symbol}: 检测到向下突破信号,价格: {current_candle['close']}, "
f"成交量放大: {volume_amplification:.2f}倍",
color="red",
interval=60
)
return signal
return None
except Exception as e:
self.trader.log(f"检查{symbol}K线信号异常: {e}", level="ERROR")
return None
def on_order(self, account_id, context, order):
"""订单回调处理"""
# 处理订单状态更新
symbol = order.get('symbol')
status = order.get('status')
side = order.get('side')
if status == 'Filled':
self.trader.log(f"订单成交: {symbol} {side}", color="green")
# 更新持仓信息到Web平台
self.update_position_display()
def on_position(self, account_id, positions):
"""持仓回调处理
参数:
account_id: 账户ID
positions: 持仓数据列表,每个元素的数据结构参见[7.2 Position(持仓)数据结构](#72-position持仓数据结构)
"""
# 更新持仓信息
self.update_position_display()
def on_balance(self, exchange, balances):
"""余额轮询回调处理"""
try:
for balance in balances:
asset = balance.get('asset')
available = balance.get('available_balance', 0)
total = balance.get('balance', 0)
if asset == 'USDT' and total > 0:
# 更新主要账户余额
if exchange == self.cex_configs[0].get('exchange'):
self.primary_balance = total
elif len(self.cex_configs) > 1 and exchange == self.cex_configs[1].get('exchange'):
self.secondary_balance = total
# 余额风控检查
if available < total * 0.1: # 可用余额不足10%
self.trader.log(f"⚠️ {asset} 可用余额不足: {available}/{total}", level="WARN")
except Exception as e:
self.trader.log(f"余额处理异常: {e}", level="ERROR")
def update_position_display(self):
"""更新持仓显示"""
try:
# 获取所有持仓
primary_result = self.trader.get_positions(0)
primary_positions = primary_result["Ok"] if "Ok" in primary_result else []
secondary_positions = []
if len(self.cex_configs) > 1:
secondary_result = self.trader.get_positions(1)
secondary_positions = secondary_result["Ok"] if "Ok" in secondary_result else []
# 计算持仓价值
total_value = 0
long_value = 0
short_value = 0
for pos in primary_positions + secondary_positions:
value = float(pos.get('notional_value', 0))
side = pos.get('side', '')
total_value += abs(value)
if side.lower() == 'long':
long_value += value
else:
short_value += abs(value)
# 更新到Web平台
self.trader.update_current_position_value(total_value, long_value, short_value)
except Exception as e:
self.trader.log(f"更新持仓显示失败: {e}", level="ERROR")
📝 更新日志
v2.3.0 (2025年6月24日)
- 新增API方法:
post_stop_order: 下止损订单,支持止损单和止盈单get_stop_orders: 获取止损订单列表,支持时间范围查询amend_stop_order: 修改止损订单,支持修改触发价格、执行价格和数量cancel_stop_order: 取消止损订单,支持通过订单ID或客户端ID取消
- 功能增强:
- 完整的止损订单管理体系,涵盖创建、查询、修改、取消全流程
- 基于
CloseTrigger结构体的高级止损订单配置 - 支持三种触发价格类型:标记价格、成交价格、指数价格
- 支持追踪止盈(Trailing Take Profit)功能
- 支持部分止盈/止损,实现分批平仓策略
- 支持市价和限价两种执行方式
- 支持双向持仓模式下的止损订单管理
- 结构体完善:
- 详细的
CloseTrigger结构体说明和字段解释 TriggerPrice枚举:支持MarkPrice、ContractPrice、IndexPriceTriggerAction结构:支持数量控制、追踪间隙、执行类型等ExecuteType枚举:Market市价执行和Limit限价执行ActionType枚举:Normal一般模式和Trailing追踪模式
- 详细的
- 文档完善:
- 根据实际Rust结构体定义更新所有止损订单相关文档
- 明确标记所有止损订单方法的必需参数,提高API使用的准确性
- 提供基于
CloseTrigger结构的完整使用示例 - 添加追踪止盈、部分平仓等高级功能示例
- 完善止损订单最佳实践指南,包含触发类型选择建议和参数要求说明
- 新增错误处理指南,说明缺失必需参数时的错误信息格式
v2.2.0 (2025年6月24日)
- 新增功能:
- 定时器订阅新增
initial_delay字段,支持设置启动前的初始延迟时间,避免资源竞争 - WebSocket支持Kline(K线)数据订阅,可订阅多种时间周期的实时K线数据
- 新增
get_funding_rate_history和get_kline接口,支持历史数据查询 - 资金费率结构体新增字段:
min_funding_rate(最小值)、max_funding_rate(最大值) - 完整的数据源订阅体系:WebSocket、REST轮询、定时器三种订阅方式
- 定时器订阅新增
- 文档重构:
- 重构"数据源订阅"章节,将三种订阅类型(WebSocket、REST、定时器)并列展示
- 每种订阅类型都提供完整的配置示例和回调处理代码
- 新增综合订阅配置示例,展示如何组合使用三种数据源
- 新增数据源选择指南和最佳实践,帮助用户根据场景选择合适的订阅方式
- 完善资金费率和K线相关API文档,包含新增字段的详细描述和使用示例
- 添加初始延迟配置指南和定时器最佳实践
v2.1.2 (2025年6月24日)
- 新增API方法:
get_funding_rate_history: 获取资金费率历史记录,支持按交易对和时间范围查询
- 功能增强:
- 支持获取单个或全部交易对的资金费率历史
- 支持指定时间范围或获取最近N条记录
- 提供完整的使用示例和数据处理方案
v2.1.1 (2025年6月18日)
- 缓存API重构: 根据最新代码实现重构缓存管理部分
- 移除了
cache_update方法(代码中已不存在) - 更新
cache_save和cache_load的使用说明 cache_save现在直接支持Python对象,无需手动JSON序列化cache_load直接返回Python对象,无需手动JSON反序列化- 添加了更完整的缓存使用示例和最佳实践
- 移除了
- 文档准确性提升: 确保文档描述与实际代码实现完全一致
v2.1.0 (2025年5月27)
- K线数据结构完善: 根据最新的Rust结构体定义完善了K线数据结构说明
- 新增字段支持:
trades: 成交笔数(可选字段)taker_buy_volume: taker买入成交量(可选字段)taker_buy_quote_volume: taker买入成交额(可选字段)
- 增强示例代码: 提供了更完整的K线数据处理示例和最佳实践
- 安全处理指南: 添加了可选字段的安全处理方法
- 新增API方法:
get_funding_fee: 查询历史资金费用记录get_fee_discount_info: 获取费用折扣信息is_fee_discount_enabled: 检查费用折扣状态set_fee_discount_enabled: 设置费用折扣状态get_borrow_limit: 查询借贷限额get_deposit_address: 获取指定币种的充值地址withdrawal: 提币到外部地址或内部转账
- API完整性验证: 对照最新代码结构,确保所有公开API方法均已包含在文档中
v2.0.0 (2025年5月26日)
- 初始版本发布
- 完整的API文档和示例
文档版本: 2.3.0
最后更新: 2025年6月24日