跳到主要内容

📊 Trader API v2 文档

📌 概述

Trader v2 是一个高性能量化交易引擎的升级版本,基于Rust开发并通过PyO3绑定为Python接口。它在v1版本的基础上增加了更多直接访问交易所API的功能,使策略开发更加灵活强大。

信息

使用版本0.9.0之前的版本请参考Trader API[旧版]。可通过config.toml中设置trader_version = "V2"启用(默认为V2),若需使用旧版请设置trader_version = "V1"


⚠️ 重要:Result返回值处理说明

由于本API基于PyO3构建,所有返回数据的方法在Rust内部都返回Result类型。在Python中调用时,您需要特别注意以下几点:

🔍 返回值结构

大部分API方法返回的结果具有以下结构:

# 成功的返回值结构
{
"Ok": {
# 实际的数据内容
"data": "...",
"code": 200,
"msg": "success"
}
}

# 错误的返回值结构
{
"Err": {
"error": "错误描述",
"code": 400
}
}

💡 正确的使用方式

所有返回数据的方法都应该按以下方式处理:

# ✅ 正确的处理方式
result = trader.http_request("https://api.example.com/data", "GET", None)

if "Ok" in result:
# 成功情况 - 提取实际数据
data = result["Ok"]
print(f"请求成功: {data}")
else:
# 错误情况 - 处理错误信息
error = result.get("Err", "未知错误")
print(f"请求失败: {error}")



📑 API分类索引

分类功能模块主要用途
一、核心交易功能交易所api指令执行、标识符生成执行交易所api、生成唯一ID
二、日志管理日志记录记录系统和交易信息
三、缓存管理缓存操作、缓存系统说明保存和恢复策略状态
四、外部通信HTTP请求获取外部数据
五、NB8 Web平台集成Web客户端管理等六个模块与Web平台交互和数据展示
六、交易所API直接访问账户查询、订单管理、止损订单、市场数据直接访问交易所API
七、数据结构说明K线数据结构、表格格式等理解数据结构和格式
八、数据源订阅WebSocket、REST、定时器三种数据源订阅接收实时和定时数据

一、核心交易功能

1.1 交易指令执行

▶️ publish(cmd)

功能: 向交易引擎发送并执行单个交易指令

参数:

  • cmd: 交易指令对象,符合ExecutionCommand结构

返回值:

  • Result结构: 包含OkErr字段的字典
  • 成功时: {"Ok": execution_result} - 指令执行结果
  • 失败时: {"Err": error_info} - 包含错误信息

示例:

# ✅ 正确的使用方式 - 查询USDT余额
cmd = {
"account_id": 0,
"method": "UsdtBalance",
"sync": True
}
result = trader.publish(cmd)

if "Ok" in result:
balance_data = result["Ok"]
print(f"USDT余额查询成功: {balance_data}")
# 提取具体余额信息
if isinstance(balance_data, dict) and "balance" in balance_data:
print(f"可用余额: {balance_data['balance']}")
else:
error = result.get("Err", "未知错误")
print(f"余额查询失败: {error}")

# 下单指令示例 - 带完整错误处理
place_order_cmd = {
"account_id": 0,
"method": "PlaceOrder",
"order": {
"symbol": "BTC_USDT",
"order_type": "Limit",
"side": "Buy",
"price": 50000.0,
"amount": 0.01,
"time_in_force": "GTC",
"cid": "client_order_123"
},
"params": {
"is_dual_side": False,
"market_order_mode": "Normal"
},
"sync": True
}

result = trader.publish(place_order_cmd)

if "Ok" in result:
order_result = result["Ok"]
print(f"下单成功: {order_result}")
# 提取订单ID
if isinstance(order_result, dict):
order_id = order_result.get("order_id")
client_order_id = order_result.get("client_order_id")
print(f"订单ID: {order_id}, 客户端ID: {client_order_id}")
else:
error = result.get("Err")
print(f"下单失败: {error}")
# 根据错误类型进行处理
if "insufficient balance" in str(error).lower():
print("余额不足,请检查账户余额")
elif "invalid symbol" in str(error).lower():
print("交易对不存在,请检查symbol参数")

# 另一个查询示例
balance_cmd = {"account_id": 0, "method": "UsdtBalance", "sync": True}
result = trader.publish(balance_cmd)

if "Ok" in result:
balance_data = result["Ok"]
print(f"USDT余额查询成功: {balance_data}")
else:
error = result.get("Err", "未知错误")
print(f"USDT余额查询失败: {error}")

▶️ batch_publish(cmds)

功能: 批量执行多个交易指令

参数:

  • cmds: 交易指令对象列表

返回值:

  • Result结构列表: 每个元素都包含OkErr字段的字典
  • 成功时: 每个结果为{"Ok": execution_result}
  • 失败时: 每个结果为{"Err": error_info}

示例:

# ✅ 正确的使用方式 - 批量执行多个交易指令
cmds = [
{
"account_id": 0,
"method": "UsdtBalance",
"sync": True
},
{
"account_id": 0,
"method": "BalanceByCoin",
"asset": "BTC",
"sync": True
}
]

results = trader.batch_publish(cmds)

# 处理批量结果
for i, result in enumerate(results):
if "Ok" in result:
data = result["Ok"]
print(f"指令{i+1}执行成功: {data}")
else:
error = result.get("Err", "未知错误")
print(f"指令{i+1}执行失败: {error}")

# 统计执行结果
success_count = 0
failed_count = 0
cmd_names = ["USDT余额查询", "BTC余额查询"]

for i, result in enumerate(results):
cmd_name = cmd_names[i] if i < len(cmd_names) else f"指令{i+1}"

if "Ok" in result:
success_count += 1
data = result["Ok"]
print(f"✅ {cmd_name} 执行成功: {data}")
else:
failed_count += 1
error = result.get("Err", "未知错误")
print(f"❌ {cmd_name} 执行失败: {error}")

print(f"\n总计: 成功 {success_count} 个, 失败 {failed_count} 个")

1.2 唯一标识符生成

🆔 create_cid(exchange)

功能: 创建唯一的客户端标识符

参数:

  • exchange: 交易所对象,用于生成对应交易所的CID格式

返回值:

  • Result结构: 包含OkErr字段的字典
  • 成功时: {"Ok": "unique_cid_string"} - 唯一的标识符字符串
  • 失败时: {"Err": error_info} - 包含错误信息

用途:

  • 用于标识和跟踪交易订单
  • 在高频交易中避免重复订单
  • 关联策略生成的订单与回报信息
  • 作为交易请求的唯一引用

示例:

# 创建唯一的客户端标识符
from exchange import Exchange # 假设有交易所模块

# 需要提供exchange对象
exchange = Exchange.BINANCE # 或其他交易所
result = trader.create_cid(exchange)

if "Ok" in result:
cid = result["Ok"]
print(f"生成的订单ID: {cid}")

# 在下单时使用生成的CID
place_order_cmd = {
"account_id": 0,
"method": "PlaceOrder",
"order": {
"symbol": "BTC_USDT",
"order_type": "Limit",
"side": "Buy",
"price": 50000.0,
"amount": 0.01,
"cid": cid # 使用生成的CID
},
"sync": True
}
else:
error = result.get("Err", "未知错误")
print(f"生成订单ID失败: {error}")

1.3 进程管理

⛔ graceful_shutdown()

功能: 优雅退出交易进程,发送 SIGINT 信号(等效于 Ctrl+C)来安全停止程序

参数: 无

返回值: 无

用途:

  • 在需要安全退出交易程序时使用
  • 通过发送 kill -2 信号来优雅停止当前进程
  • 允许程序有序地清理资源和保存状态
  • 适用于程序自动退出、异常处理或远程控制停止

示例:

# 在满足特定条件时优雅退出程序
if should_shutdown:
trader.log("准备优雅退出程序", "INFO")
trader.graceful_shutdown()

# 在异常处理中使用
try:
# 执行交易逻辑
pass
except CriticalError as e:
trader.log(f"遇到严重错误,程序将退出: {e}", "ERROR", color="red")
trader.graceful_shutdown()

# 定时退出示例
import time
start_time = time.time()
while True:
# 交易逻辑...
if time.time() - start_time > 24 * 3600: # 24小时后退出
trader.log("程序运行24小时,准备退出", "INFO")
trader.graceful_shutdown()
break

注意事项:

  • 此方法会立即发送退出信号,程序会在短时间内停止
  • 确保在调用此方法前已完成重要数据的保存
  • 建议在调用前使用日志记录退出原因

二、日志管理

2.1 日志记录

📝 log(msg, level=None, color=None, web=True)

功能: 记录日志到控制台和Web平台

参数:

  • msg: 日志消息内容
  • level: 日志级别,可选值: "TRACE", "DEBUG", "INFO", "WARN", "ERROR",默认为"INFO"
  • color: 日志颜色,可选,默认为None
  • web: 是否发送到Web平台,默认为True

返回值: 无

示例:

# 无色日志,默认发送到web平台
trader.log("交易开始")
# 红色日志,发送到web平台
trader.log("价格异常", "WARN","red")
# 不发送到web平台
trader.log("调试信息", "DEBUG", web=False)
# 发送错误日志到平台
trader.log("错误", "ERROR")
# 多行日志使用\n分割
trader.log("多行日志\n第二行\n第三行")

📝 tlog(tag, msg, color=None, interval=0, level=None, query=False)

功能: 限频日志记录,每N秒最多打印一次指定标签的日志

参数:

  • tag: 日志标识
  • msg: 日志内容
  • color: 日志颜色,可选,默认为None
  • interval: 日志最小间隔(秒),默认为0
  • level: 日志级别,可选,默认为"INFO"
  • query: 是否只查询不更新时间,默认为False

返回值:

  • bool - True表示已发送日志,False表示未发送日志

示例:

# 每60秒最多记录一次市场状态
trader.tlog("市场状态", "市场流动性正常", interval=60)

# 使用特定颜色记录警告信息
trader.tlog("价差警告", "价差超过阈值", color="yellow", level="WARN")

# 不更新时间戳
should_log = trader.tlog("检测点", "价格波动", interval=30, query=True)
if should_log:
# 执行额外操作
pass

# 多行日志记录
trader.tlog("深度分析",
f"BTC-USDT 深度\n买: [(50000, 1.2), (49900, 2.5)]\n卖: [(50100, 1.5), (50200, 3.0)]\n不平衡: 0.8750",
interval=5,
level="DEBUG",
color="royalblue")

📝 logt(message, time, color=None, level=None)

功能: 使用指定时间戳记录日志

参数:

  • message: 日志内容
  • time: 日志时间戳(Unix时间戳)
  • color: 日志颜色,可选,默认为None
  • level: 日志级别,可选,默认为"INFO"

返回值: 无

示例:

import time

# 记录1小时前的事件
past_time = int(time.time()) - 3600
trader.logt("历史事件记录", past_time)

# 使用特定颜色和级别记录日志
event_time = int(time.time())
trader.logt("重要事件", event_time, color="blue", level="INFO")

2.2 内置颜色表

日志相关函数(如logtloglogt)支持通过color参数设置文本颜色。系统支持多种颜色,按功能和使用场景分类如下:

交易状态颜色

颜色名称十六进制代码交易应用场景
green#32CD32交易成功、盈利交易、多头信号、买入信号
red#FF4500交易失败、亏损交易、空头信号、卖出信号
blue#0000FF交易执行、订单提交、中性交易状态
yellow#FFFF00交易警告、风险提示、接近止损点

提示: 系统还支持直接使用十六进制颜色代码,格式为#RRGGBB,例如#FF7F50(珊瑚红)。


三、缓存管理

3.1 缓存操作

💾 cache_save(data)

功能: 将数据保存到缓存系统

参数:

  • data: 要保存的数据对象,可以是Python字典、列表或任何可序列化的数据结构

返回值: 无

示例:

# 保存策略状态到缓存
import time

# 准备要保存的策略状态(直接使用Python字典)
state = {
"positions": [
{"symbol": "BTC_USDT", "amount": 0.05, "entry_price": 49800, "unrealized_pnl": 120},
{"symbol": "ETH_USDT", "amount": 0.8, "entry_price": 3200, "unrealized_pnl": 80}
],
"orders": [
{"symbol": "BTC_USDT", "order_id": "123456", "status": "open"},
{"symbol": "ETH_USDT", "order_id": "789012", "status": "filled"}
],
"statistics": {
"total_profit": 1250.5,
"trade_count": 48,
"win_rate": 0.75
},
"last_update": int(time.time())
}

# 直接保存Python对象,无需手动转换为JSON
trader.cache_save(state)

# 也可以保存简单的数据类型
trader.cache_save({"last_price": 50000.0, "timestamp": int(time.time())})

# 保存列表数据
positions_list = [
{"symbol": "BTC_USDT", "side": "long", "amount": 0.1},
{"symbol": "ETH_USDT", "side": "short", "amount": 2.0}
]
trader.cache_save(positions_list)

📂 cache_load()

功能: 从缓存系统加载数据

参数: 无

返回值:

  • 缓存数据对象,如果没有缓存则返回None。返回的数据保持原始的Python数据类型

示例:

# 从缓存加载策略状态
cached_data = trader.cache_load()

if cached_data is not None:
# 数据已自动转换为Python对象,无需手动解析JSON
print(f"加载的策略状态: {cached_data}")

# 直接使用加载的数据
if isinstance(cached_data, dict):
positions = cached_data.get("positions", [])
orders = cached_data.get("orders", [])
statistics = cached_data.get("statistics", {})

print(f"已恢复 {len(positions)} 个持仓, {len(orders)} 个订单")
print(f"总盈利: {statistics.get('total_profit', 0)}, 胜率: {statistics.get('win_rate', 0)*100}%")

# 如果保存的是列表
elif isinstance(cached_data, list):
print(f"加载的持仓列表: {len(cached_data)} 个持仓")
for pos in cached_data:
print(f" {pos['symbol']}: {pos['side']} {pos['amount']}")

else:
print("缓存为空,使用默认配置")
# 初始化默认状态
positions = []
orders = []
statistics = {"total_profit": 0, "trade_count": 0, "win_rate": 0}

# 安全的缓存加载模式
def load_strategy_state():
"""安全加载策略状态"""
try:
cached_state = trader.cache_load()

if cached_state and isinstance(cached_state, dict):
# 验证缓存数据的完整性
required_keys = ["positions", "orders", "statistics"]
if all(key in cached_state for key in required_keys):
return cached_state
else:
print("缓存数据不完整,使用默认状态")

# 返回默认状态
return {
"positions": [],
"orders": [],
"statistics": {"total_profit": 0, "trade_count": 0, "win_rate": 0},
"last_update": int(time.time())
}

except Exception as e:
print(f"加载缓存失败: {e}")
return {
"positions": [],
"orders": [],
"statistics": {"total_profit": 0, "trade_count": 0, "win_rate": 0},
"last_update": int(time.time())
}

# 使用安全加载函数
strategy_state = load_strategy_state()

3.2 缓存系统说明

Trader提供的缓存系统分为两部分,它们是互相独立的机制:

  1. 策略缓存 📊: 通过cache_savecache_load方法管理的缓存,主要用于保存策略状态,如交易记录、统计数据等。

    • cache_save: 保存任意Python数据结构(字典、列表等)到缓存
    • cache_load: 从缓存加载数据,返回原始的Python数据类型
    • 数据格式: 支持自动序列化/反序列化,无需手动转换JSON
  2. WebClient缓存 🌐: WebClient有独立的缓存机制,不使用上述的cache_savecache_load方法。WebClient的配置数据和状态会自动缓存,不仅包括primary_balancesecondary_balance参数,还包括所有的统计数据,如总成交量、成交次数、失败次数、成功次数、胜率、利润等。这些数据在首次设置后会被自动缓存,后续启动时自动读取。另外,当通过相关API更新数据时(如update_trade_stats、update_total_balance等),缓存文件也会自动更新,确保数据持久性。

WebClient缓存文件位置:

  • WebClient缓存文件以{server_name}.json命名
  • 默认存储在当前工作目录中
  • 不同策略应使用不同的server_name以避免缓存冲突
  • 如果需要重置初始余额或统计数据,可以删除对应的缓存文件,或使用新的server_name

缓存机制说明: WebClient的配置数据(包括初始余额、交易统计数据如总成交量、成交次数、失败次数、成功次数、胜率、利润等)会被缓存到本地文件系统。缓存文件以 {server_name}.json 的格式命名,存储在当前工作目录中。当调用统计和余额相关的API(如update_trade_stats、update_total_balance等)更新数据时,缓存文件会实时自动更新,无需手动操作。这就是为什么server_name参数非常重要,它不仅用于Web界面标识,还用于定位缓存文件。不同的策略应使用不同的server_name以避免缓存冲突。WebClient缓存是自动管理的,与策略缓存(cache_save/cache_load)完全独立。


四、外部通信

4.1 HTTP请求

🌐 http_request(url, method, body, headers=None)

功能: 发送HTTP请求获取外部数据

参数:

  • url: 请求URL
  • method: 请求方法,如"GET", "POST"等
  • body: 请求体内容,可选。可以是Python字典、列表或任何可序列化的Python对象,会自动转换为JSON字符串
  • headers: 请求头字典,可选

返回值:

  • Result结构: 包含OkErr字段的字典
  • 成功时: {"Ok": response_data} - 响应数据已转换为Python对象
  • 失败时: {"Err": error_info} - 包含错误信息

示例:

# ✅ GET请求示例 - 获取交易所合约信息
url = "https://api.gateio.ws/api/v4/futures/usdt/contracts/BTC_USDT"
result = trader.http_request(url, "GET", None, None)

if "Ok" in result:
contract_info = result["Ok"]
print(f"BTC_USDT合约信息: {contract_info}")
else:
error = result.get("Err", "未知错误")
print(f"获取合约信息失败: {error}")

# ✅ POST请求示例 - 使用JSONPlaceholder测试API
post_url = "https://jsonplaceholder.typicode.com/posts"
post_data = {
"title": "测试POST请求",
"body": "这是一个测试POST请求的内容",
"userId": 1
}
headers = {"Content-Type": "application/json"}

# 直接传递Python字典,Rust端会自动序列化为JSON
result = trader.http_request(post_url, "POST", post_data, headers)
if "Ok" in result:
post_response = result["Ok"]
print(f"POST请求结果: {post_response}")
print(f"创建的文章ID: {post_response.get('id')}")
else:
error = result.get("Err", "未知错误")
print(f"POST请求失败: {error}")

重要说明:

  • 自动序列化: body参数支持Python字典、列表等可序列化对象,会在Rust端自动转换为JSON字符串
  • 无需手动序列化: 不再需要使用json.dumps()手动序列化数据
  • 向后兼容: 仍然支持传递字符串作为body参数
  • Result处理: 返回值需要检查Ok字段来确定是否成功
  • 错误处理: 如果传入的Python对象无法序列化,会返回详细的错误信息

五、NB8 Web平台集成

Web可视化功能提供了与NB8 Web平台的集成,使策略能够实时展示交易数据、统计信息和图表。详细内容请参考Web可视化文档


六、交易所API直接访问

6.1 订单管理

📊 get_orders(account_id, symbol, start, end, extra=None, generate=False)

功能: 获取指定时间范围内的订单列表

参数:

  • account_id: 整数,账户ID
  • symbol: 字符串,交易对名称
  • start: 整数,开始时间戳(毫秒)
  • end: 整数,结束时间戳(毫秒)
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • 订单列表数据

📊 get_open_orders(account_id, symbol, extra=None, generate=False)

功能: 获取指定交易对的未成交订单列表

参数:

  • account_id: 整数,账户ID
  • symbol: 字符串,交易对名称
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • 未成交订单列表数据

📊 get_all_open_orders(account_id, extra=None, generate=False)

功能: 获取账户下所有交易对的未成交订单

参数:

  • account_id: 整数,账户ID
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • 所有未成交订单列表数据

📊 get_order_by_id(account_id, symbol, order_id=None, cid=None, extra=None, generate=False)

功能: 通过订单ID或客户端订单ID查询单个订单详情

参数:

  • account_id: 整数,账户ID
  • symbol: 字符串,交易对名称
  • order_id: 字符串,可选,订单ID
  • cid: 字符串,可选,客户端订单ID
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

注意: order_idcid 不能同时为空,必须提供其中一个

示例:

# 通过订单ID查询
result = trader.get_order_by_id(
account_id=0,
symbol="BTC_USDT",
order_id="123456789"
)

# 通过客户端订单ID查询
result = trader.get_order_by_id(
account_id=0,
symbol="BTC_USDT",
cid="my_order_001"
)

返回值:

  • 订单详情数据

📊 place_order(account_id, order, params=None, extra=None, sync=True, generate=False)

功能: 下单

参数:

  • account_id: 整数,账户ID
  • order: 字典或对象,订单信息,包含以下字段:
    • symbol: 字符串,交易对名称
    • side: 字符串,交易方向,例如"Buy"或"Sell"
    • order_type: 字符串,订单类型,例如"Limit"或"Market"
    • price: 浮点数,限价单价格
    • amount: 浮点数,交易数量
    • pos_side: 字符串,可选,仓位方向,例如"Long"或"Short"。在双向持仓模式下必须指定,单向持仓模式下可选
    • cid: 字符串,可选,客户端订单ID
    • time_in_force: 字符串,可选,订单有效期,例如"GTC"(Good Till Cancel)
  • params: 字典或对象,可选,下单参数,包含以下可选字段:
    • is_dual_side: 布尔值,是否使用双向持仓模式
    • market_order_mode: 字符串,市价单模式,例如"Normal"或"Amount"
  • extra: 字典,可选,额外参数
  • sync: 布尔值,可选,默认为True,是否同步等待订单结果
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

pos_side参数说明:

  • "Long": 多仓方向
  • "Short": 空仓方向
  • 在双向持仓模式(is_dual_side=True)下,此参数为必填
  • 在单向持仓模式下,此参数可选,但建议填写以提高订单明确性
  • 当需要只减仓(reduce_only)时,必须传递此参数

返回值:

  • Result结构: 包含OkErr字段的字典
  • 成功时: {"Ok": order_result} - 下单成功结果
  • 失败时: {"Err": error_info} - 包含错误信息

示例:

# ✅ 完整的下单示例 - 限价单
order = {
"cid": traderv2.create_cid(exchange),
"symbol": "BTC_USDT",
"order_type": "Limit",
"side": "Buy",
"pos_side": "Long", # 指定仓位方向
"price": 50000.0,
"amount": 0.01,
"time_in_force": "GTC"
}

params = {
"is_dual_side": True, # 双向持仓模式
}

result = trader.place_order(0, order, params, sync=True)

if "Ok" in result:
order_result = result["Ok"]
print(f"下单成功: {order_result}")
else:
error = result.get("Err", "未知错误")
print(f"下单失败: {error}")

# ✅ 市价单示例
market_order = {
"cid": traderv2.create_cid(exchange),
"symbol": "ETH_USDT",
"order_type": "Market",
"side": "Sell",
"pos_side": "Short", # 空仓
"amount": 0.5,
"time_in_force": "IOC"
}

params = {
"is_dual_side": True,
"market_order_mode": "Normal"
}

result = trader.place_order(0, market_order, params, sync=True)

# ✅ 单向持仓模式示例(pos_side可选但建议添加)
single_side_order = {
"cid": traderv2.create_cid(exchange),
"symbol": "SOL_USDT",
"order_type": "Limit",
"side": "Buy",
"pos_side": "Long", # 虽然单向持仓可选,但建议添加
"price": 100.0,
"amount": 1.0,
"time_in_force": "PostOnly"
}

params = {
"is_dual_side": False # 单向持仓模式
}

result = trader.place_order(0, single_side_order, params, sync=True)

# ✅ 只减仓订单示例(pos_side必须)
reduce_only_order = {
"cid": traderv2.create_cid(exchange),
"symbol": "BTC_USDT",
"order_type": "Limit",
"side": "Sell",
"pos_side": "Long", # 减仓时必须指定pos_side
"price": 51000.0,
"amount": 0.01,
"time_in_force": "GTC",
"reduce_only": True # 只减仓标志
}

result = trader.place_order(0, reduce_only_order, sync=True)

# ⚠️ 错误示例 - 双向持仓模式下缺少pos_side
wrong_order = {
"symbol": "BTC_USDT",
"order_type": "Limit",
"side": "Buy",
# "pos_side": "Long", # 缺少此字段在双向持仓下会报错
"price": 50000.0,
"amount": 0.01
}

params = {"is_dual_side": True}
result = trader.place_order(0, wrong_order, params)
# 这会返回错误: {"Err": "pos_side is required in dual side mode"}

# ✅ 批量下单示例(每个订单都需要pos_side)
orders = []
for i in range(3):
order = {
"cid": traderv2.create_cid(exchange),
"symbol": "BTC_USDT",
"order_type": "Limit",
"side": "Buy",
"pos_side": "Long", # 每个订单都要指定
"price": 50000.0 - i * 100, # 不同价格
"amount": 0.01,
"time_in_force": "GTC"
}
orders.append(order)

params = {"is_dual_side": True}
result = trader.batch_place_order(0, orders, params, sync=True)

pos_side使用场景总结:

场景is_dual_sidepos_side是否必需说明
双向持仓开多仓True✅ 必需 "Long"在多空并存模式下必须明确方向
双向持仓开空仓True✅ 必需 "Short"在多空并存模式下必须明确方向
单向持仓开仓False📋 建议添加提高订单明确性,避免歧义
只减仓订单任意✅ 必需减仓时必须明确要平哪个方向的仓位
平仓订单任意✅ 必需平仓时必须指定要平的仓位方向

📊 batch_place_order(account_id, orders, params=None, extra=None, sync=True, generate=False)

功能: 批量下单

参数:

  • account_id: 整数,账户ID
  • orders: 列表,多个订单信息,每个订单包含与place_order中order相同的字段
  • params: 字典或对象,可选,下单参数,与place_order中params相同
  • extra: 字典,可选,额外参数
  • sync: 布尔值,可选,默认为True,是否同步等待订单结果
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • 批量下单结果数据列表

📊 amend_order(account_id, order, extra=None, sync=True, generate=False)

功能: 修改订单

参数:

  • account_id: 整数,账户ID
  • order: 字典或对象,订单修改信息,包含以下字段:
    • symbol: 字符串,交易对名称
    • order_id: 字符串,订单ID
    • price: 浮点数,可选,新价格
    • amount: 浮点数,可选,新数量
    • cid: 字符串,可选,客户端订单ID(用于替代order_id)
  • extra: 字典,可选,额外参数
  • sync: 布尔值,可选,默认为True,是否同步等待修改结果
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • 订单修改结果数据

📊 cancel_order(account_id, symbol, order_id=None, cid=None, extra=None, sync=True, generate=False)

功能: 取消订单

参数:

  • account_id: 整数,账户ID
  • symbol: 字符串,交易对名称
  • order_id: 字符串,可选,订单ID
  • cid: 字符串,可选,客户端订单ID
  • extra: 字典,可选,额外参数
  • sync: 布尔值,可选,默认为True,是否同步等待取消结果
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

注意: order_idcid 不能同时为空,必须提供其中一个

示例:

# 通过订单ID取消
result = trader.cancel_order(
account_id=0,
symbol="BTC_USDT",
order_id="123456789"
)

# 通过客户端订单ID取消
result = trader.cancel_order(
account_id=0,
symbol="BTC_USDT",
cid="my_order_001"
)

# 异步取消订单
result = trader.cancel_order(
account_id=0,
symbol="BTC_USDT",
order_id="123456789",
sync=False
)

返回值:

  • 取消订单结果数据

📊 batch_cancel_order(account_id, symbol, extra=None, sync=True, generate=False)

功能: 批量取消指定交易对的所有未成交订单

参数:

  • account_id: 整数,账户ID
  • symbol: 字符串,交易对名称
  • extra: 字典,可选,额外参数
  • sync: 布尔值,可选,默认为True,是否同步等待取消结果
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • 批量取消订单结果数据

📊 batch_cancel_order_by_id(account_id, symbol=None, order_ids=None, client_order_ids=None, extra=None, sync=True, generate=False)

功能: 批量取消指定ID的订单

参数:

  • account_id: 整数,账户ID
  • symbol: 字符串,可选,交易对名称
  • order_ids: 字符串列表,可选,订单ID列表
  • client_order_ids: 字符串列表,可选,客户端订单ID列表
  • extra: 字典,可选,额外参数
  • sync: 布尔值,可选,默认为True,是否同步等待取消结果
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • 批量取消订单结果数据

🔧 CloseTrigger结构体说明

止损订单使用CloseTrigger结构体定义触发条件和执行行为,提供了丰富的配置选项:

CloseTrigger结构体字段:

  • id: 可选,订单ID(下单时不填,系统自动生成)
  • cid: 可选,客户自定义订单ID,便于订单跟踪
  • trigger_price: 必填,触发价格配置(TriggerPrice枚举)
  • trigger_action: 必填,触发后的执行行为(TriggerAction结构)

TriggerPrice(触发价格类型):

  • {"MarkPrice": 价格}: 标记价格触发,最稳定,推荐使用
  • {"ContractPrice": 价格}: 最新成交价触发,响应最快
  • {"IndexPrice": 价格}: 指数价格触发,最平滑

TriggerAction(触发行为):

  • quantity: 可选,触发数量,不填则全平持仓
  • trailing_gap: 可选,回调幅度百分比,追踪止盈必填
  • execute_type: 执行类型
    • "Market": 市价执行(默认)
    • {"Limit": 价格}: 限价执行
  • typ: 订单类型
    • "Normal": 一般止损/止盈(默认)
    • "Trailing": 追踪止盈

特殊功能:

  • 追踪止盈: 当typ"Trailing"时,价格朝有利方向移动时止盈线会跟踪调整,直到价格回调达到trailing_gap幅度时触发
  • 部分平仓: 通过quantity字段可以实现部分止盈/止损,其余仓位继续持有
  • 多种触发价格: 支持标记价格、成交价格、指数价格三种触发方式,适应不同交易策略

📊 StopOrder数据结构

以下是止损订单相关的完整数据结构定义:

StopOrder(完整止损订单信息):

{
"symbol": "BTC_USDT", # 交易对符号
"take_profit": { # 止盈订单配置(可选)
"id": "123456789", # 止盈订单ID(系统生成)
"cid": "client_tp_001", # 客户自定义ID(可选)
"trigger_price": {"MarkPrice": 52000.0}, # 触发价格
"trigger_action": { # 触发行为
"quantity": 0.5, # 触发数量(可选,不填则全平)
"trailing_gap": 2.0, # 回调幅度(%)(可选,追踪止盈时必填)
"execute_type": "Market", # 执行类型:"Market"或{"Limit": 价格}
"typ": "Normal" # 订单类型:"Normal"或"Trailing"
}
},
"stop_loss": { # 止损订单配置(可选)
"id": "987654321", # 止损订单ID(系统生成)
"cid": "client_sl_001", # 客户自定义ID(可选)
"trigger_price": {"ContractPrice": 48000.0}, # 触发价格
"trigger_action": { # 触发行为
"quantity": null, # 全平仓位
"trailing_gap": null, # 不使用追踪
"execute_type": "Market", # 市价执行
"typ": "Normal" # 一般止损
}
},
"status": "Open", # 订单状态
"c_time": 1640995200000, # 创建时间(毫秒时间戳)
"u_time": 1640998800000 # 更新时间(毫秒时间戳)
}

StopOrderStatus(止损订单状态枚举):

  • "Open": 未触发(默认状态)
  • "Executed": 已触发执行
  • "Canceled": 已取消
  • "Failed": 触发失败

StopOrderIdRsp(止损订单操作返回结果):

{
"take_profit": { # 止盈订单结果(可选)
"Ok": "123456789" # 成功时返回订单ID
# 或
"Err": "错误信息" # 失败时返回错误信息
},
"stop_loss": { # 止损订单结果(可选)
"Ok": "987654321" # 成功时返回订单ID
# 或
"Err": "错误信息" # 失败时返回错误信息
}
}

ExecuteType(执行类型枚举):

  • "Market": 市价执行(默认)
  • {"Limit": 价格}: 限价执行,如{"Limit": 50000.0}

ActionType(订单类型枚举):

  • "Normal": 一般止损/止盈(默认)
  • "Trailing": 追踪止盈

TriggerPrice(触发价格类型):

  • {"MarkPrice": 价格}: 标记价格触发,如{"MarkPrice": 50000.0}
  • {"ContractPrice": 价格}: 最新成交价触发,如{"ContractPrice": 50000.0}
  • {"IndexPrice": 价格}: 指数价格触发,如{"IndexPrice": 50000.0}

📊 post_stop_order(account_id, symbol, pos_side, take_profit=None, stop_loss=None, is_dual_side=False, extra=None, generate=False)

功能: 下止损订单

参数:

  • account_id: 整数,必需,账户ID
  • symbol: 字符串,必需,交易对名称
  • pos_side: 字符串,必需,仓位方向,例如"Long"或"Short"
  • take_profit: 字典,可选,止盈订单配置(CloseTrigger结构),包含以下字段:
    • cid: 字符串,可选,客户自定义订单ID
    • trigger_price: 字典,触发价格配置,支持以下类型:
      • {"MarkPrice": 价格}: 标记价格触发
      • {"ContractPrice": 价格}: 最新成交价/市场价触发
      • {"IndexPrice": 价格}: 指数价格触发
    • trigger_action: 字典,触发行为配置,包含:
      • quantity: 浮点数,可选,触发数量,不填则全平
      • trailing_gap: 浮点数,可选,回调幅度(%),追踪止盈时必填
      • execute_type: 字符串或字典,执行类型:
        • "Market": 市价平仓
        • {"Limit": 价格}: 限价平仓
      • typ: 字符串,可选,类型:"Normal"(一般)或"Trailing"(追踪),默认"Normal"
  • stop_loss: 字典,可选,止损订单配置(CloseTrigger结构),字段同take_profit
  • is_dual_side: 布尔值,可选,是否使用双向持仓模式,默认为False
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

重要:

  • 此方法仅支持同步执行,无 sync 参数
  • 必需参数: account_idsymbolpos_side 为必需参数
  • 配置要求: take_profitstop_loss 至少需要提供其中一个,否则无法创建止损订单
  • 仓位方向: pos_side 必须正确设置,与要保护的仓位方向一致

返回值:

  • Result结构: 包含OkErr字段的字典
  • 示例:
{
"Ok": {
"stop_loss": {
"Ok": "1326362257269604352"
},
"take_profit": {
"Ok": "1326362256586391552"
}
}
}

示例:

# ✅ 基础止损订单 - 标记价格触发
result = trader.post_stop_order(
account_id=0,
symbol="BTC_USDT",
pos_side="Long",
stop_loss={
"trigger_price": {"MarkPrice": 49000.0}, # 标记价格触发
"trigger_action": {
"execute_type": "Market", # 市价平仓
"typ": "Normal" # 一般止损
}
},
is_dual_side=True
)

if "Ok" in result:
order_result = result["Ok"]
print(f"止损订单下达成功: {order_result}")
else:
error = result.get("Err", "未知错误")
print(f"止损订单失败: {error}")

# ✅ 止盈订单 - 合约价格触发,指定数量
result = trader.post_stop_order(
account_id=0,
symbol="ETH_USDT",
pos_side="Long",
take_profit={
"cid": "my_take_profit_001", # 自定义订单ID
"trigger_price": {"ContractPrice": 3500.0}, # 合约价格触发
"trigger_action": {
"quantity": 0.5, # 止盈数量
"execute_type": {"Limit": 3510.0}, # 限价3510平仓
"typ": "Normal"
}
},
is_dual_side=True
)

# ✅ 追踪止盈订单
result = trader.post_stop_order(
account_id=0,
symbol="SOL_USDT",
pos_side="Long",
take_profit={
"trigger_price": {"MarkPrice": 100.0}, # 标记价格100触发
"trigger_action": {
"trailing_gap": 2.0, # 回调2%时执行
"execute_type": "Market", # 市价平仓
"typ": "Trailing" # 追踪止盈
}
},
is_dual_side=True
)

# ✅ 同时设置止损和止盈
result = trader.post_stop_order(
account_id=0,
symbol="BTC_USDT",
pos_side="Short",
take_profit={
"trigger_price": {"MarkPrice": 48000.0}, # 止盈触发价
"trigger_action": {
"quantity": 0.01, # 部分止盈
"execute_type": "Market",
"typ": "Normal"
}
},
stop_loss={
"trigger_price": {"ContractPrice": 52000.0}, # 止损触发价
"trigger_action": {
"execute_type": "Market", # 市价止损
"typ": "Normal"
}
},
is_dual_side=True
)

# ✅ 指数价格触发的止损订单
result = trader.post_stop_order(
account_id=0,
symbol="BTC_USDT",
pos_side="Long",
stop_loss={
"trigger_price": {"IndexPrice": 49500.0}, # 指数价格触发
"trigger_action": {
"quantity": 0.02, # 指定止损数量
"execute_type": {"Limit": 49000.0}, # 限价49000止损
"typ": "Normal"
}
},
is_dual_side=True
)

# 💡 完整的止损订单管理示例(基于实际策略代码)
def complete_stop_order_lifecycle():
"""完整的止损订单生命周期管理示例"""
account_id = 0
symbol = "XRP_USDT"

print("=== 止损订单完整生命周期示例 ===")

# 1. 检查持仓模式
is_dual_side_result = trader.is_dual_side(account_id)
if "Ok" not in is_dual_side_result:
print("❌ 获取持仓模式失败")
return

is_dual_side = is_dual_side_result['Ok']
print(f"📋 当前持仓模式: {'双向持仓' if is_dual_side else '单向持仓'}")

# 2. 创建止盈止损订单
print("\n📥 创建止盈止损订单...")
result = trader.post_stop_order(
account_id=account_id,
symbol=symbol,
pos_side="Long", # 针对多头仓位设置止盈止损
take_profit={
"cid": "tp_xrp_001", # 客户端止盈订单ID
"trigger_price": {"MarkPrice": 3.0}, # 止盈触发价
"trigger_action": {
"quantity": 2, # 部分止盈数量
"execute_type": "Market", # 市价执行
"typ": "Normal" # 一般止盈
}
},
stop_loss={
"cid": "sl_xrp_001", # 客户端止损订单ID
"trigger_price": {"ContractPrice": 2.0}, # 止损触发价
"trigger_action": {
"execute_type": "Market", # 市价止损
"typ": "Normal" # 一般止损
}
},
is_dual_side=is_dual_side
)

if "Ok" not in result:
print(f"❌ 下止盈止损订单失败: {result.get('Err')}")
return

order_result = result["Ok"]
print(f"✅ 止盈止损订单下达成功:")

# 提取订单ID
take_profit_id = None
stop_loss_id = None

if order_result.get('take_profit') and "Ok" in order_result['take_profit']:
take_profit_id = order_result['take_profit']['Ok']
print(f" 🎯 止盈订单ID: {take_profit_id}")

if order_result.get('stop_loss') and "Ok" in order_result['stop_loss']:
stop_loss_id = order_result['stop_loss']['Ok']
print(f" 🛡️ 止损订单ID: {stop_loss_id}")

# 3. 查询并验证订单
print("\n📊 查询止损订单...")
result = trader.get_stop_orders(account_id, symbol)
if "Ok" in result:
orders = result["Ok"]
open_orders = [order for order in orders if order['status'] == 'Open']
print(f" 📋 当前开放的止损订单数量: {len(open_orders)}")

# 找到我们刚才下的订单
our_orders = []
for order in open_orders:
order_found = False
if order.get('take_profit') and order['take_profit']['id'] == take_profit_id:
order_found = True
if order.get('stop_loss') and order['stop_loss']['id'] == stop_loss_id:
order_found = True
if order_found:
our_orders.append(order)

print(f" ✅ 找到我们的订单: {len(our_orders)} 个")

# 显示订单详情
for i, order in enumerate(our_orders):
print(f" 订单 {i+1}: 状态={order['status']}, 创建时间={order['c_time']}")
if order.get('take_profit'):
tp = order['take_profit']
print(f" 止盈: 触发价={tp['trigger_price']}, 数量={tp['trigger_action'].get('quantity')}")
if order.get('stop_loss'):
sl = order['stop_loss']
print(f" 止损: 触发价={sl['trigger_price']}")
else:
print(f"❌ 查询订单失败: {result.get('Err')}")

# 4. 修改止盈订单(演示修改功能)
if take_profit_id:
print(f"\n🔧 修改止盈订单 {take_profit_id}...")
result = trader.amend_stop_order(
account_id=account_id,
symbol=symbol,
id=take_profit_id,
take_profit={
"trigger_price": {"MarkPrice": 3.1}, # 调整触发价格
"trigger_action": {
"quantity": 3, # 调整数量
"execute_type": "Market",
"typ": "Normal"
}
}
)

if "Ok" in result:
print(f" ✅ 止盈订单修改成功: {result['Ok']}")

# 验证修改结果
verify_result = trader.get_stop_orders(account_id, symbol, id=take_profit_id)
if "Ok" in verify_result and verify_result["Ok"]:
updated_order = verify_result["Ok"][0]
if updated_order.get('take_profit'):
tp = updated_order['take_profit']
new_trigger_price = tp['trigger_price']
new_quantity = tp['trigger_action'].get('quantity')
print(f" 📊 修改验证 - 新触发价: {new_trigger_price}, 新数量: {new_quantity}")
else:
print(f" ❌ 止盈订单修改失败: {result.get('Err')}")

# 5. 演示条件取消(这里模拟条件判断)
print("\n🧹 清理订单...")
should_cleanup = True # 在实际策略中,这里会有具体的条件判断

if should_cleanup:
# 取消止盈订单
if take_profit_id:
result = trader.cancel_stop_order(account_id, symbol, id=take_profit_id)
if "Ok" in result:
print(f" ✅ 止盈订单 {take_profit_id} 取消成功")
else:
print(f" ❌ 止盈订单取消失败: {result.get('Err')}")

# 取消止损订单
if stop_loss_id:
result = trader.cancel_stop_order(account_id, symbol, id=stop_loss_id)
if "Ok" in result:
print(f" ✅ 止损订单 {stop_loss_id} 取消成功")
else:
print(f" ❌ 止损订单取消失败: {result.get('Err')}")

# 6. 最终验证
print("\n🔍 最终验证...")
result = trader.get_stop_orders(account_id, symbol)
if "Ok" in result:
orders = result["Ok"]
open_orders = [order for order in orders if order['status'] == 'Open']
print(f" 📋 剩余开放订单数量: {len(open_orders)}")

print("\n✅ 止损订单完整生命周期示例完成!")

# 🚀 实际策略中的止损订单管理示例
def strategy_stop_order_management():
"""基于实际策略代码的止损订单管理"""
account_id = 0
symbol = "XRP_USDT"

print("=== 策略级止损订单管理 ===")

# 获取开放的止损订单(参考strategy.py代码)
result = trader.get_stop_orders(account_id, symbol)
if "Ok" in result:
orders = result["Ok"]
open_stop_orders = [order for order in orders if order['status'] == 'Open']

print(f"📋 当前开放的止损订单: {len(open_stop_orders)} 个")

# 撤销所有开放的止损订单(实际策略场景)
for order in open_stop_orders:
# 获取止损订单ID(优先取止损,如果没有则取止盈)
order_id = None
order_type = None

if order.get('stop_loss'):
order_id = order['stop_loss']['id']
order_type = "止损"
elif order.get('take_profit'):
order_id = order['take_profit']['id']
order_type = "止盈"

if order_id:
res = trader.cancel_stop_order(account_id, symbol, id=order_id)
if 'Ok' in res:
print(f" ✅ {order_type}订单 {order_id} 撤单成功")
else:
print(f" ❌ {order_type}订单 {order_id} 撤单失败: {res.get('Err')}")
else:
print(f"❌ 获取止损订单失败: {result.get('Err')}")

# 设置新的风控订单
print("\n📥 设置新的风控订单...")
result = trader.post_stop_order(
account_id=account_id,
symbol=symbol,
pos_side="Long",
take_profit={
"trigger_price": {"MarkPrice": 2.8},
"trigger_action": {
"execute_type": "Market",
"typ": "Normal"
}
},
stop_loss={
"trigger_price": {"MarkPrice": 2.2},
"trigger_action": {
"execute_type": "Market",
"typ": "Normal"
}
},
is_dual_side=True
)

if "Ok" in result:
print(f"✅ 新的风控订单设置成功: {result['Ok']}")
else:
print(f"❌ 风控订单设置失败: {result.get('Err')}")

# 调用示例函数
complete_stop_order_lifecycle()
print("\n" + "="*50 + "\n")
strategy_stop_order_management()

📊 get_stop_orders(account_id, symbol, limit=None, id=None, cid=None, extra=None, generate=False)

功能: 获取止损订单列表

参数:

  • account_id: 整数,必需,账户ID
  • symbol: 字符串,必需,交易对名称
  • limit: 整数,可选,返回订单数量限制
  • id: 字符串,可选,特定订单ID,用于查询指定订单
  • cid: 字符串,可选,客户端订单ID,用于查询指定订单
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

重要:

  • 此方法仅支持同步执行,无 sync 参数
  • 必需参数: account_idsymbol 两个参数为必需
  • 查询模式: 如果不提供idcid,则返回所有止损订单;如果提供其中一个,则返回指定订单

返回值:

  • Result结构: 包含OkErr字段的字典
  • 示例:
{
"Ok": [
{
"c_time": 1751955325429,
"status": "Open",
"stop_loss": {
"cid": "1326362257261215744",
"id": "1326362257269604352",
"trigger_action": {
"execute_type": "Market",
"quantity": null,
"trailing_gap": null,
"typ": "Normal"
},
"trigger_price": {
"ContractPrice": 2.0
}
},
"symbol": "XRP_USDT",
"take_profit": null,
"u_time": 1751955325429
},
{
"c_time": 1751954797851,
"status": "Canceled",
"stop_loss": {
"cid": "1326360044434505728",
"id": "1326360044447088640",
"trigger_action": {
"execute_type": "Market",
"quantity": null,
"trailing_gap": null,
"typ": "Normal"
},
"trigger_price": {
"ContractPrice": 2.0
}
},
"symbol": "XRP_USDT",
"take_profit": null,
"u_time": 1751955183197
},
]
}

示例:

# 获取BTC_USDT的所有止损订单
result = trader.get_stop_orders(
account_id=0,
symbol="BTC_USDT"
)

if "Ok" in result:
stop_orders = result["Ok"]
print(f"止损订单数量: {len(stop_orders)}")

for order in stop_orders:
symbol = order.get('symbol')
status = order.get('status')
create_time = order.get('c_time')

# 检查止盈订单
if order.get('take_profit'):
tp = order['take_profit']
tp_id = tp.get('id')
tp_price = tp['trigger_price']
print(f"止盈订单 - ID: {tp_id}, 触发价: {tp_price}, 状态: {status}")

# 检查止损订单
if order.get('stop_loss'):
sl = order['stop_loss']
sl_id = sl.get('id')
sl_price = sl['trigger_price']
print(f"止损订单 - ID: {sl_id}, 触发价: {sl_price}, 状态: {status}")
else:
error = result.get("Err", "未知错误")
print(f"获取止损订单失败: {error}")

# 通过订单ID获取特定订单
result = trader.get_stop_orders(
account_id=0,
symbol="BTC_USDT",
id="1326362257269604352" # 具体的订单ID
)

if "Ok" in result:
orders = result["Ok"]
if orders:
print(f"找到订单: {orders[0]}")
else:
print("订单不存在")

# 通过客户端订单ID获取特定订单
result = trader.get_stop_orders(
account_id=0,
symbol="BTC_USDT",
cid="client_stop_order_001" # 客户端自定义ID
)

# 限制返回数量(获取最近10个止损订单)
result = trader.get_stop_orders(
account_id=0,
symbol="BTC_USDT",
limit=10
)

if "Ok" in result:
recent_orders = result["Ok"]
print(f"最近{len(recent_orders)}个止损订单")

# 分析订单状态分布
status_count = {}
for order in recent_orders:
status = order.get('status', 'Unknown')
status_count[status] = status_count.get(status, 0) + 1

print(f"订单状态分布: {status_count}")

📊 amend_stop_order(account_id, symbol, id=None, cid=None, take_profit=None, stop_loss=None, extra=None, generate=False)

功能: 修改止损订单

参数:

  • account_id: 整数,必需,账户ID
  • symbol: 字符串,必需,交易对名称
  • id: 字符串,可选,要修改的订单ID
  • cid: 字符串,可选,要修改的客户端订单ID
  • take_profit: 字典,可选,新的止盈订单配置(CloseTrigger结构),包含以下字段:
    • cid: 字符串,可选,客户自定义订单ID
    • trigger_price: 字典,触发价格配置,支持以下类型:
      • {"MarkPrice": 价格}: 标记价格触发
      • {"ContractPrice": 价格}: 最新成交价/市场价触发
      • {"IndexPrice": 价格}: 指数价格触发
    • trigger_action: 字典,触发行为配置,包含:
      • quantity: 浮点数,可选,触发数量,不填则全平
      • trailing_gap: 浮点数,可选,回调幅度(%),追踪止盈时必填
      • execute_type: 字符串或字典,执行类型:
        • "Market": 市价平仓
        • {"Limit": 价格}: 限价平仓
      • typ: 字符串,可选,类型:"Normal"(一般)或"Trailing"(追踪),默认"Normal"
  • stop_loss: 字典,可选,新的止损订单配置(CloseTrigger结构),字段同take_profit
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

重要:

  • 此方法仅支持同步执行,无 sync 参数
  • 必需参数: account_idsymbol 为必需参数
  • 订单标识: idcid 不能同时为空,必须提供其中一个用于标识要修改的订单
  • 修改内容: take_profitstop_loss 至少需要提供其中一个要修改的配置,否则修改操作无意义

返回值:

  • Result结构: 包含OkErr字段的字典
  • 示例:
{
"Ok": {
"stop_loss": {
"Ok": "1326362257269604352"
},
"take_profit": {
"Ok": "1326362256586391552"
}
}
}

示例:

# 首先获取现有的止损订单
result = trader.get_stop_orders(0, "BTC_USDT")
if "Ok" in result and result["Ok"]:
stop_orders = result["Ok"]
# 找到一个开放状态的订单
open_orders = [order for order in stop_orders if order['status'] == 'Open']

if open_orders:
# 获取第一个开放订单的止损部分ID
stop_loss_id = None
if open_orders[0].get('stop_loss'):
stop_loss_id = open_orders[0]['stop_loss']['id']

if stop_loss_id:
# 修改止损订单的触发价格 - 通过订单ID
result = trader.amend_stop_order(
account_id=0,
symbol="BTC_USDT",
id=stop_loss_id, # 使用id参数
stop_loss={
"trigger_price": {"MarkPrice": 48500.0}, # 修改为标记价格触发
"trigger_action": {
"execute_type": "Market", # 市价执行
"typ": "Normal"
}
}
)

if "Ok" in result:
amend_result = result["Ok"]
print(f"止损订单修改成功: {amend_result}")
else:
error = result.get("Err", "未知错误")
print(f"止损订单修改失败: {error}")

# 通过客户端订单ID修改止盈订单为追踪止盈
result = trader.amend_stop_order(
account_id=0,
symbol="ETH_USDT",
cid="client_tp_001", # 使用客户端订单ID
take_profit={
"trigger_price": {"ContractPrice": 3600.0}, # 合约价格触发
"trigger_action": {
"quantity": 0.8, # 数量
"trailing_gap": 1.5, # 回调1.5%
"execute_type": "Market",
"typ": "Trailing" # 改为追踪止盈
}
}
)

if "Ok" in result:
print(f"追踪止盈修改成功: {result['Ok']}")
else:
print(f"追踪止盈修改失败: {result.get('Err', '未知错误')}")

# 同时修改止盈和止损
result = trader.amend_stop_order(
account_id=0,
symbol="SOL_USDT",
id="1326362257269604352", # 止损订单ID
take_profit={
"trigger_price": {"MarkPrice": 120.0},
"trigger_action": {
"quantity": 1.0,
"execute_type": {"Limit": 119.5}, # 限价执行
"typ": "Normal"
}
},
stop_loss={
"trigger_price": {"MarkPrice": 95.0},
"trigger_action": {
"execute_type": "Market",
"typ": "Normal"
}
}
)

if "Ok" in result:
amend_result = result["Ok"]
# 检查修改结果
if amend_result.get('take_profit') and "Ok" in amend_result['take_profit']:
print(f"止盈修改成功,订单ID: {amend_result['take_profit']['Ok']}")
if amend_result.get('stop_loss') and "Ok" in amend_result['stop_loss']:
print(f"止损修改成功,订单ID: {amend_result['stop_loss']['Ok']}")
else:
print(f"修改失败: {result.get('Err', '未知错误')}")

# 完整的修改流程示例
def modify_stop_order_example():
symbol = "BTC_USDT"
account_id = 0

# 1. 查询现有订单
result = trader.get_stop_orders(account_id, symbol)
if "Ok" not in result:
print("获取止损订单失败")
return

orders = result["Ok"]
open_orders = [order for order in orders if order['status'] == 'Open']

if not open_orders:
print("没有找到开放状态的止损订单")
return

# 2. 选择要修改的订单
target_order = open_orders[0]

# 3. 根据订单类型进行修改
if target_order.get('take_profit'):
tp_id = target_order['take_profit']['id']
print(f"修改止盈订单: {tp_id}")

# 修改止盈触发价格
result = trader.amend_stop_order(
account_id=account_id,
symbol=symbol,
id=tp_id,
take_profit={
"trigger_price": {"MarkPrice": 52000.0}, # 新的触发价格
"trigger_action": {
"quantity": 0.05, # 新的数量
"execute_type": "Market",
"typ": "Normal"
}
}
)

if "Ok" in result:
print(f"止盈修改成功: {result['Ok']}")
else:
print(f"止盈修改失败: {result.get('Err')}")

if target_order.get('stop_loss'):
sl_id = target_order['stop_loss']['id']
print(f"修改止损订单: {sl_id}")

# 修改止损触发价格
result = trader.amend_stop_order(
account_id=account_id,
symbol=symbol,
id=sl_id,
stop_loss={
"trigger_price": {"ContractPrice": 47000.0}, # 新的触发价格
"trigger_action": {
"execute_type": "Market",
"typ": "Normal"
}
}
)

if "Ok" in result:
print(f"止损修改成功: {result['Ok']}")
else:
print(f"止损修改失败: {result.get('Err')}")

# 调用示例函数
modify_stop_order_example()

📊 cancel_stop_order(account_id, symbol, id=None, cid=None, extra=None, generate=False)

功能: 取消止损订单

参数:

  • account_id: 整数,必需,账户ID
  • symbol: 字符串,必需,交易对名称
  • id: 字符串,可选,订单ID
  • cid: 字符串,可选,客户端订单ID
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

重要:

  • 此方法仅支持同步执行,无 sync 参数
  • 必需参数: account_idsymbol 两个参数为必需
  • 标识参数: idcid 不能同时为空,必须提供其中一个用于标识要取消的订单

返回值:

  • 取消止损订单结果数据

示例:

# 通过订单ID取消止损订单
result = trader.cancel_stop_order(
account_id=0,
symbol="BTC_USDT",
id="1326362257269604352" # 具体的止损订单ID
)

if "Ok" in result:
cancel_result = result["Ok"]
print(f"止损订单取消成功: {cancel_result}")
else:
error = result.get("Err", "未知错误")
print(f"止损订单取消失败: {error}")

# 通过客户端订单ID取消
result = trader.cancel_stop_order(
account_id=0,
symbol="BTC_USDT",
cid="my_stop_order_001" # 客户端自定义ID
)

if "Ok" in result:
print(f"止损订单取消成功: {result['Ok']}")
else:
print(f"止损订单取消失败: {result.get('Err', '未知错误')}")

# 批量取消止损订单示例
def cancel_all_open_stop_orders(account_id, symbol):
"""取消指定交易对的所有开放止损订单"""
# 1. 获取所有开放状态的止损订单
result = trader.get_stop_orders(account_id, symbol)
if "Ok" not in result:
print(f"获取{symbol}止损订单失败: {result.get('Err')}")
return

orders = result["Ok"]
open_orders = [order for order in orders if order['status'] == 'Open']

if not open_orders:
print(f"{symbol}没有开放状态的止损订单")
return

print(f"找到{len(open_orders)}个开放的止损订单,开始批量取消...")

# 2. 逐个取消订单
success_count = 0
failed_count = 0

for order in open_orders:
# 取消止盈订单
if order.get('take_profit'):
tp_id = order['take_profit']['id']
result = trader.cancel_stop_order(account_id, symbol, id=tp_id)

if "Ok" in result:
print(f"✅ 止盈订单 {tp_id} 取消成功")
success_count += 1
else:
print(f"❌ 止盈订单 {tp_id} 取消失败: {result.get('Err')}")
failed_count += 1

# 取消止损订单
if order.get('stop_loss'):
sl_id = order['stop_loss']['id']
result = trader.cancel_stop_order(account_id, symbol, id=sl_id)

if "Ok" in result:
print(f"✅ 止损订单 {sl_id} 取消成功")
success_count += 1
else:
print(f"❌ 止损订单 {sl_id} 取消失败: {result.get('Err')}")
failed_count += 1

print(f"批量取消完成: 成功 {success_count} 个, 失败 {failed_count} 个")

# 调用批量取消函数
cancel_all_open_stop_orders(0, "BTC_USDT")

# 根据条件取消止损订单
def cancel_stop_orders_by_condition():
"""根据特定条件取消止损订单"""
symbol = "ETH_USDT"
account_id = 0

# 获取所有止损订单
result = trader.get_stop_orders(account_id, symbol)
if "Ok" not in result:
return

orders = result["Ok"]

for order in orders:
if order['status'] != 'Open':
continue # 跳过非开放状态的订单

# 示例条件:取消所有标记价格触发价格低于3000的止盈订单
if order.get('take_profit'):
tp = order['take_profit']
trigger_price = tp['trigger_price']

# 检查是否是标记价格触发且价格低于3000
if ('MarkPrice' in trigger_price and
trigger_price['MarkPrice'] < 3000):

tp_id = tp['id']
result = trader.cancel_stop_order(account_id, symbol, id=tp_id)

if "Ok" in result:
print(f"取消低价止盈订单: {tp_id} (触发价: {trigger_price['MarkPrice']})")
else:
print(f"取消订单失败: {tp_id}")

# 使用示例策略中的代码片段
def example_from_strategy():
"""基于策略代码的实际使用示例"""
account_id = 0
symbol = "XRP_USDT"

# 获取开放的止损订单
result = trader.get_stop_orders(account_id, symbol)
if "Ok" in result:
orders = result["Ok"]
open_stop_orders = [order for order in orders if order['status'] == 'Open']

# 撤销所有开放的止损订单
for order in open_stop_orders:
# 获取止损订单ID(优先取止损,如果没有则取止盈)
order_id = None
if order.get('stop_loss'):
order_id = order['stop_loss']['id']
elif order.get('take_profit'):
order_id = order['take_profit']['id']

if order_id:
res = trader.cancel_stop_order(account_id, symbol, id=order_id)
if 'Ok' in res:
print(f"撤单成功: {order_id}")
else:
print(f"撤单失败: {order_id}, 错误: {res.get('Err')}")

# 调用示例函数
example_from_strategy()

止损订单使用最佳实践:

  1. 风险管理: 建议为每个开仓都设置对应的止损订单,控制单笔交易最大亏损

  2. 触发价格类型选择:

    • MarkPrice: 标记价格触发,适用于大多数场景,可避免市场价格异常波动误触发
    • ContractPrice: 最新成交价触发,响应最快,适合高频交易
    • IndexPrice: 指数价格触发,最稳定,适合长期持仓
  3. 执行类型策略:

    • Market: 市价执行,确保能够成交,但可能有滑点
    • Limit: 限价执行,控制成交价格,但可能无法成交
  4. 追踪止盈的使用:

    • 设置合理的trailing_gap回调幅度,通常1-3%
    • 追踪止盈适用于趋势明确的单边行情
    • 震荡行情建议使用一般止盈避免频繁触发
  5. 数量设置:

    • 不设置quantity将全平持仓
    • 部分止盈可设置具体数量,实现分批平仓策略
  6. 仓位方向: 必须正确设置pos_side参数,与要保护的仓位方向一致

  7. 双向持仓: 在双向持仓模式下,is_dual_side应设置为True

  8. 持续监控: 定期检查止损订单状态,根据市场变化及时调整

  9. 参数要求: 严格按照必需参数要求传递参数

    • post_stop_order: account_idsymbolpos_side 为必需
    • get_stop_orders: account_idsymbol 为必需
    • amend_stop_order: account_idsymbol 为必需,idcid二选一
    • cancel_stop_order: account_idsymbol 为必需,idcid二选一
  10. 同步执行: 所有止损订单操作都是同步执行,无 sync 参数,确保操作的即时性和可靠性

6.2 基础请求

🌐 request(account_id, method, path, auth, query=None, body=None, url=None, headers=None, generate=False)

功能: 向交易所发送原始API请求

参数:

  • account_id: 整数,账户ID
  • method: 字符串,HTTP方法(GET, POST等)
  • path: 字符串,API路径
  • auth: 布尔值,是否需要验证
  • query: 字典,可选,URL查询参数
  • body: 任意类型,可选,请求体数据。可以是Python字典、列表或任何可序列化的Python对象,会自动转换为JSON字符串
  • url: 字符串,可选,完整URL(设置时会覆盖默认URL)
  • headers: 字典,可选,HTTP头信息
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • Result结构: 包含OkErr字段的字典
  • 成功时: {"Ok": response_data} - 交易所API返回的响应数据已转换为Python对象
  • 失败时: {"Err": error_info} - 包含错误信息

示例:

# ✅ GET请求示例 - 获取OKX市场行情数据
query_params = {"instId": "BTC-USD-SWAP"}
result = trader.request(
account_id=0,
method="GET",
path="/api/v5/market/ticker",
auth=False, # 公开接口不需要认证
query=query_params,
body=None,
url=None,
headers=None,
generate=False
)

if "Ok" in result:
ticker_data = result["Ok"]
print(f"BTC-USD-SWAP行情数据: {ticker_data}")
else:
error = result.get("Err", "未知错误")
print(f"获取OKX行情失败: {error}")

重要说明:

  • 自动序列化: body参数支持Python字典、列表等可序列化对象,会在Rust端自动转换为JSON字符串
  • URL覆盖: 设置url参数时会覆盖交易所的默认API基础URL,用于调用其他服务的API
  • 认证处理: auth=True时会自动添加交易所要求的签名和认证头
  • Result处理: 所有返回值都需要检查Ok字段来确定是否成功
  • 命令生成: generate=True时只生成命令对象,不立即执行,用于批量操作

6.3 持仓与账户

📊 get_position(account_id, symbol, extra=None, generate=False)

功能: 获取指定交易对的持仓信息

参数:

  • account_id: 整数,账户ID
  • symbol: 字符串,交易对名称
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • Result结构: 包含OkErr字段的字典
  • 成功时: {"Ok": position_data} - 持仓信息数据,如果无持仓返回None
  • 失败时: {"Err": error_info} - 包含错误信息
  • 数据结构: 参见7.2 Position(持仓)数据结构

📊 get_positions(account_id, extra=None, generate=False)

功能: 获取账户下所有持仓信息

参数:

  • account_id: 整数,账户ID
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • Result结构: 包含OkErr字段的字典
  • 成功时: {"Ok": positions_list} - 持仓信息列表,如果无持仓返回空列表
  • 失败时: {"Err": error_info} - 包含错误信息
  • 数据结构: 列表中每个元素的结构参见7.2 Position(持仓)数据结构

📊 get_max_position(account_id, symbol, level=None, extra=None, generate=False)

功能: 获取交易对的最大可开仓数量

参数:

  • account_id: 整数,账户ID
  • symbol: 字符串,交易对名称
  • level: 浮点数,可选,杠杆倍数
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

{
"long_notional": 1000.0,
"short_notional": 1000.0,
"long_quantity": 0.001,
"short_quantity": 0.001
}
  • 最大可开仓数量数据字典,包含:
    • long_notional: 浮点数,多仓对应Quote名义价值
    • short_notional: 浮点数,空仓对应Quote名义价值
    • long_quantity: 浮点数,多仓对应Base数量
    • short_quantity: 浮点数,空仓对应Base数量

示例:

# 查询BTC_USDT的最大可开仓数量
result = trader.get_max_position(
account_id=0,
symbol="BTC_USDT"
)

if "Ok" in result:
max_position = result["Ok"]
print(f"最大可开仓数量: {max_position}")
print(f"多仓最大名义价值: {max_position['long_notional']} USDT")
print(f"空仓最大名义价值: {max_position['short_notional']} USDT")
print(f"多仓最大数量: {max_position['long_quantity']} BTC")
print(f"空仓最大数量: {max_position['short_quantity']} BTC")
else:
error = result.get("Err", "未知错误")
print(f"查询最大可开仓数量失败: {error}")

# 指定杠杆倍数查询
result = trader.get_max_position(
account_id=0,
symbol="BTC_USDT",
level=5.0 # 5倍杠杆
)

if "Ok" in result:
max_position_5x = result["Ok"]
print(f"5倍杠杆下最大可开仓: {max_position_5x}")
else:
error = result.get("Err", "未知错误")
print(f"查询5倍杠杆最大可开仓失败: {error}")

# 使用publish方法调用
cmd = {
"account_id": 0,
"method": "GetMaxPosition",
"symbol": "BTC_USDT",
"level": 3.0,
"sync": True
}
result = trader.publish(cmd)
print(f"最大可开仓结果: {result}")

💰 get_usdt_balance(account_id, extra=None, generate=False)

功能: 获取账户USDT余额

参数:

  • account_id: 整数,账户ID
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • USDT余额数据字典,包含:
    • asset: 字符串,资产名称("USDT")
    • balance: 浮点数,总余额+浮动盈亏
    • available_balance: 浮点数,可用余额
    • unrealized_pnl: 浮点数,浮动盈亏

💰 get_balances(account_id, extra=None, generate=False)

功能: 获取账户所有币种余额

参数:

  • account_id: 整数,账户ID
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • 所有币种余额数据列表,每个元素为字典,包含:
    • asset: 字符串,资产名称(如"USDT", "BTC"等)
    • balance: 浮点数,总余额+浮动盈亏
    • available_balance: 浮点数,可用余额
    • unrealized_pnl: 浮点数,浮动盈亏

💰 get_balance_by_coin(account_id, asset, extra=None, generate=False)

功能: 获取指定币种的余额

参数:

  • account_id: 整数,账户ID
  • asset: 字符串,币种名称
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • 指定币种余额数据字典,包含:
    • asset: 字符串,资产名称(与请求的asset相同)
    • balance: 浮点数,总余额+浮动盈亏
    • available_balance: 浮点数,可用余额
    • unrealized_pnl: 浮点数,浮动盈亏

💰 get_fee_rate(account_id, symbol, extra=None, generate=False)

功能: 获取交易对的手续费率

参数:

  • account_id: 整数,账户ID
  • symbol: 字符串,交易对名称
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • 手续费率数据

💰 get_fee_discount_info(account_id, extra=None, generate=False)

功能: 获取费用折扣信息

参数:

  • account_id: 整数,账户ID
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • 费用折扣信息数据

💰 is_fee_discount_enabled(account_id, extra=None, generate=False)

功能: 检查费用折扣是否已启用

参数:

  • account_id: 整数,账户ID
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • 布尔值,表示费用折扣是否已启用

💰 set_fee_discount_enabled(account_id, enabled, extra=None, generate=False)

功能: 设置费用折扣启用状态

参数:

  • account_id: 整数,账户ID
  • enabled: 布尔值,是否启用费用折扣
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • 设置结果

6.4 市场数据

📈 get_ticker(account_id, symbol, extra=None, generate=False)

功能: 获取指定交易对的行情数据

参数:

  • account_id: 整数,账户ID
  • symbol: 字符串,交易对名称
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • 交易对行情数据

📈 get_tickers(account_id, extra=None, generate=False)

功能: 获取所有交易对的行情数据

参数:

  • account_id: 整数,账户ID
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • 所有交易对行情数据列表

📉 get_bbo(account_id, symbol, extra=None, generate=False)

功能: 获取交易对的最优买卖报价(Best Bid & Offer)

参数:

  • account_id: 整数,账户ID
  • symbol: 字符串,交易对名称
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • 最优买卖报价数据

📉 get_bbo_tickers(account_id, extra=None, generate=False)

功能: 获取所有交易对的最优买卖报价

参数:

  • account_id: 整数,账户ID
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • 所有交易对最优买卖报价数据列表

📊 get_depth(account_id, symbol, limit=None, extra=None, generate=False)

功能: 获取交易对的深度数据(订单簿)

参数:

  • account_id: 整数,账户ID
  • symbol: 字符串,交易对名称
  • limit: 整数,可选,深度条数限制
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • 深度数据

📊 get_instrument(account_id, symbol, extra=None, generate=False)

功能: 获取交易对的详细信息

参数:

  • account_id: 整数,账户ID
  • symbol: 字符串,交易对名称
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • 交易对详细信息

📊 get_instruments(account_id, extra=None, generate=False)

功能: 获取所有可交易的交易对信息

参数:

  • account_id: 整数,账户ID
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • 所有交易对信息列表

📊 get_mark_price(account_id, symbol=None, extra=None, generate=False)

功能: 获取标记价格

参数:

  • account_id: 整数,账户ID
  • symbol: 字符串,可选,交易对名称,不指定则获取所有交易对
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • 标记价格数据

📊 get_funding_rates(account_id, extra=None, generate=False)

功能: 获取所有交易对的资金费率

参数:

  • account_id: 整数,账户ID
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • 所有交易对资金费率数据列表,每个元素为字典,包含:
    • symbol: 字符串,交易对符号
    • funding_rate: 浮点数,当前资金费率(小数形式,如0.0001表示0.01%)
    • next_funding_at: 整数,下次资金费结算时间,Unix时间戳(毫秒)
    • min_funding_rate: 浮点数,资金费率最小值(交易所设定的下限)
    • max_funding_rate: 浮点数,资金费率最大值(交易所设定的上限)
    • funding_interval: 整数或None,资金费结算周期,以小时为单位(如8表示8小时结算一次)

📊 get_funding_rate_by_symbol(account_id, symbol, extra=None, generate=False)

功能: 获取指定交易对的资金费率

参数:

  • account_id: 整数,账户ID
  • symbol: 字符串,交易对名称
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • 指定交易对资金费率数据字典,包含:
    • symbol: 字符串,交易对符号
    • funding_rate: 浮点数,当前资金费率(小数形式,如0.0001表示0.01%)
    • next_funding_at: 整数,下次资金费结算时间,Unix时间戳(毫秒)
    • min_funding_rate: 浮点数,资金费率最小值(交易所设定的下限)
    • max_funding_rate: 浮点数,资金费率最大值(交易所设定的上限)
    • funding_interval: 整数或None,资金费结算周期,以小时为单位(如8表示8小时结算一次)

示例:

# 获取BTC_USDT的资金费率
result = trader.get_funding_rate_by_symbol(
account_id=0,
symbol="BTC_USDT"
)

if "Ok" in result:
funding_rate_data = result["Ok"]
current_rate = funding_rate_data['funding_rate']
min_rate = funding_rate_data['min_funding_rate']
max_rate = funding_rate_data['max_funding_rate']
interval = funding_rate_data.get('funding_interval', 8) # 默认8小时

print(f"BTC_USDT 当前资金费率: {current_rate:.6f} ({current_rate*100:.4f}%)")
print(f"费率范围: {min_rate:.6f} ~ {max_rate:.6f}")
print(f"结算周期: 每{interval}小时")
print(f"下次结算时间: {funding_rate_data['next_funding_at']}")
else:
error = result.get("Err", "未知错误")
print(f"获取资金费率失败: {error}")

# 获取所有交易对的资金费率并分析
result = trader.get_funding_rates(account_id=0)

if "Ok" in result:
all_funding_rates = result["Ok"]

for rate_data in all_funding_rates:
symbol = rate_data['symbol']
current_rate = rate_data['funding_rate']
min_rate = rate_data['min_funding_rate']
max_rate = rate_data['max_funding_rate']

# 计算当前费率在允许范围内的位置(百分比)
if max_rate != min_rate:
position_pct = (current_rate - min_rate) / (max_rate - min_rate) * 100
print(f"{symbol}: 当前 {current_rate:.6f}, 位置 {position_pct:.1f}% (范围: {min_rate:.6f} ~ {max_rate:.6f})")
else:
print(f"{symbol}: 当前 {current_rate:.6f} (固定费率)")
else:
error = result.get("Err", "未知错误")
print(f"获取所有资金费率失败: {error}")

📊 get_funding_rate_history(account_id, symbol=None, since_secs=None, limit=100, extra=None, generate=False)

功能: 获取资金费率历史记录

参数:

  • account_id: 整数,账户ID
  • symbol: 字符串,可选,交易对名称,不传则获取全部交易对
  • since_secs: 整数,可选,开始时间,Unix时间戳(秒),不传则返回最近limit条数据
  • limit: 整数,可选,返回数据条数限制,默认100
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • 资金费率历史记录列表,每条记录为字典,包含:
    • symbol: 字符串,交易对符号
    • funding_rate: 浮点数,资金费率(小数形式,如0.0001表示0.01%)
    • funding_time: 整数,结算时间,Unix时间戳(毫秒)

示例:

# 获取BTC_USDT的最近100条资金费率历史
result = trader.get_funding_rate_history(
account_id=0,
symbol="BTC_USDT",
limit=100
)

if "Ok" in result:
funding_history = result["Ok"]
print(f"获取到{len(funding_history)}条资金费率历史记录")

for record in funding_history:
print(f"时间: {record['funding_time']}, "
f"费率: {record['funding_rate']:.6f} ({record['funding_rate']*100:.4f}%), "
f"交易对: {record['symbol']}")
else:
error = result.get("Err", "未知错误")
print(f"获取资金费率历史失败: {error}")

# 获取指定时间范围的资金费率历史
import time
since_time = int(time.time()) - 7 * 24 * 60 * 60 # 7天前的时间戳(秒)

result = trader.get_funding_rate_history(
account_id=0,
symbol="BTC_USDT",
since_secs=since_time,
limit=50
)

if "Ok" in result:
funding_history = result["Ok"]
print(f"获取7天内的资金费率历史: {len(funding_history)}条")

# 计算平均资金费率
if funding_history:
avg_rate = sum(record['funding_rate'] for record in funding_history) / len(funding_history)
print(f"平均资金费率: {avg_rate:.6f} ({avg_rate*100:.4f}%)")
else:
error = result.get("Err", "未知错误")
print(f"获取指定时间范围资金费率历史失败: {error}")

# 获取所有交易对的最近资金费率历史
result = trader.get_funding_rate_history(
account_id=0,
limit=20 # 每个交易对最近20条
)

if "Ok" in result:
all_funding_history = result["Ok"]
print(f"获取所有交易对资金费率历史: {len(all_funding_history)}条")

# 按交易对分组统计
symbol_stats = {}
for record in all_funding_history:
symbol = record['symbol']
if symbol not in symbol_stats:
symbol_stats[symbol] = []
symbol_stats[symbol].append(record['funding_rate'])

# 显示各交易对的资金费率统计
for symbol, rates in symbol_stats.items():
avg_rate = sum(rates) / len(rates)
max_rate = max(rates)
min_rate = min(rates)
print(f"{symbol}: 平均 {avg_rate:.6f}, 最高 {max_rate:.6f}, 最低 {min_rate:.6f}")
else:
error = result.get("Err", "未知错误")
print(f"获取所有交易对资金费率历史失败: {error}")

# 使用publish方法调用
cmd = {
"account_id": 0,
"method": "FundingRateHistory",
"symbol": "BTC_USDT",
"since_secs": 1672531200, # 2023-01-01的时间戳
"limit": 100,
"sync": True
}
result = trader.publish(cmd)
print(f"资金费率历史: {result}")

📊 get_funding_fee(account_id, symbol, start_time=None, end_time=None, extra=None, generate=False)

功能: 查询指定交易对的历史资金费用记录

参数:

  • account_id: 整数,账户ID
  • symbol: 字符串,交易对名称
  • start_time: 整数,可选,开始时间,Unix时间戳(毫秒)
  • end_time: 整数,可选,结束时间,Unix时间戳(毫秒)
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • 资金费用历史记录列表,每条记录为字典,包含:
    • symbol: 字符串,交易对符号
    • funding_fee: 浮点数,资金费用金额(正数表示收入,负数表示支出)
    • timestamp: 整数,结算时间,Unix时间戳(毫秒)

示例:

# 查询BTC_USDT的资金费用记录
result = trader.get_funding_fee(
account_id=0,
symbol="BTC_USDT"
)

if "Ok" in result:
funding_fee_data = result["Ok"]
print(f"资金费用记录: {funding_fee_data}")
else:
error = result.get("Err", "未知错误")
print(f"获取资金费用记录失败: {error}")
funding_fee_data = None

# 查询指定时间范围的资金费用
import time
end_time = int(time.time() * 1000) # 当前时间戳(毫秒)
start_time = end_time - 7 * 24 * 60 * 60 * 1000 # 7天前

result = trader.get_funding_fee(
account_id=0,
symbol="BTC_USDT",
start_time=start_time,
end_time=end_time
)

if "Ok" in result:
funding_fee_data = result["Ok"]

# 处理资金费用数据
total_fee = 0
for record in funding_fee_data:
fee_type = "收入" if record['funding_fee'] > 0 else "支出"
total_fee += record['funding_fee']
print(f"时间: {record['timestamp']}, "
f"费用: {record['funding_fee']:.6f} ({fee_type}), "
f"交易对: {record['symbol']}")

print(f"总资金费用: {total_fee:.6f}")
else:
error = result.get("Err", "未知错误")
print(f"获取指定时间范围资金费用失败: {error}")

# 使用publish方法调用
cmd = {
"account_id": 1,
"method": "FundingFee",
"symbol": "BTC_USDT",
"start_time": 1672531200000,
"end_time": 1672617600000,
"sync": True
}
result = trader.publish(cmd)

📈 get_kline(account_id, symbol, interval, start_time=None, end_time=None, limit=None, extra=None, generate=False)

功能: 获取指定交易对的K线数据

参数:

  • account_id: 整数,账户ID
  • symbol: 字符串,交易对符号
  • interval: 字符串,K线时间粒度,支持的值包括:
    • "1m": 1分钟
    • "3m": 3分钟
    • "5m": 5分钟
    • "15m": 15分钟
    • "30m": 30分钟
    • "1h": 1小时
    • "2h": 2小时
    • "4h": 4小时
    • "6h": 6小时
    • "8h": 8小时
    • "12h": 12小时
    • "1d": 1天
    • "3d": 3天
    • "1w": 1周
    • "1M": 1月
  • start_time: 整数,可选,开始时间,Unix时间戳(毫秒)
  • end_time: 整数,可选,结束时间,Unix时间戳(毫秒)
  • limit: 整数,可选,返回数据条数限制,默认100,最大1000
  • extra: 字典,可选,扩展参数,用于特定交易所的额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • K线数据对象,包含以下字段:
    • symbol: 交易对符号
    • interval: K线时间粒度
    • candles: 蜡烛图数据列表,每个元素包含:
      • timestamp: 开盘时间,Unix时间戳(毫秒)
      • open: 开盘价
      • high: 最高价
      • low: 最低价
      • close: 收盘价
      • volume: 成交量(基础货币)
      • quote_volume: 成交额(报价货币)
      • trades: 成交笔数(可选)
      • taker_buy_volume: taker买入成交量(基础货币,可选)
      • taker_buy_quote_volume: taker买入成交额(报价货币,可选)
      • confirm: K线状态,false代表K线未完结,true代表K线已完结

示例:

# 获取BTC_USDT的1分钟K线数据,使用默认限制100条
result = trader.get_kline(
account_id=0,
symbol="BTC_USDT",
interval="1m",
limit=100
)

if "Ok" in result:
kline_data = result["Ok"]
print(f"交易对: {kline_data['symbol']}")
print(f"时间粒度: {kline_data['interval']}")
print(f"获取到{len(kline_data['candles'])}条K线数据")
else:
error = result.get("Err", "未知错误")
print(f"获取K线数据失败: {error}")
kline_data = None

# 获取指定时间范围的K线数据
import time
end_time = int(time.time() * 1000) # 当前时间戳(毫秒)
start_time = end_time - 24 * 60 * 60 * 1000 # 24小时前

result = trader.get_kline(
account_id=0,
symbol="BTC_USDT",
interval="1h",
start_time=start_time,
end_time=end_time,
limit=24 # 24小时数据
)

if "Ok" in result:
kline_data = result["Ok"]
print(f"获取24小时K线数据成功: {len(kline_data['candles'])}条")
else:
error = result.get("Err", "未知错误")
print(f"获取24小时K线数据失败: {error}")
kline_data = None

# 使用扩展参数(特定交易所的额外参数)
result = trader.get_kline(
account_id=0,
symbol="BTC_USDT",
interval="5m",
limit=200,
extra={"custom_param": "value"} # 扩展参数
)

if "Ok" in result:
kline_data = result["Ok"]
print(f"使用扩展参数获取K线数据成功")
else:
error = result.get("Err", "未知错误")
print(f"使用扩展参数获取K线数据失败: {error}")
kline_data = None

# 处理K线数据
if kline_data:
candles = kline_data['candles']
for candle in candles:
status = "已完结" if candle['confirm'] else "未完结"

# 处理可选字段
trades_info = f", 成交笔数: {candle['trades']}" if candle.get('trades') else ""
taker_buy_info = ""
if candle.get('taker_buy_volume'):
buy_ratio = candle['taker_buy_volume'] / candle['volume'] * 100 if candle['volume'] > 0 else 0
taker_buy_info = f", 主动买入占比: {buy_ratio:.1f}%"

print(f"时间: {candle['timestamp']}, 开盘: {candle['open']}, "
f"最高: {candle['high']}, 最低: {candle['low']}, "
f"收盘: {candle['close']}, 成交量: {candle['volume']}, "
f"成交额: {candle['quote_volume']}{trades_info}{taker_buy_info}, 状态: {status}")

# 检查最新K线是否已完结
if candles:
latest_candle = candles[-1]
if latest_candle['confirm']:
print("最新K线已完结,可以用于技术分析")
else:
print("最新K线未完结,数据可能还在变化")
else:
print("无法处理K线数据,数据获取失败")

# 使用publish方法调用K线API
cmd = {
"account_id": 1,
"method": "Kline",
"symbol": "BTC_USDT",
"interval": "1m",
"start_time": 1672531200000,
"end_time": 1672617600000,
"limit": 100,
"extra": {}, # 扩展参数字典
"sync": True
}
result = trader.publish(cmd)
print(f"K线数据: {result}")
print(f"返回的蜡烛图数量: {len(result['candles'])}")

6.5 账户设置与杠杆管理

📊 get_max_leverage(account_id, symbol, extra=None, generate=False)

功能: 获取交易对的最大可用杠杆倍数

参数:

  • account_id: 整数,账户ID
  • symbol: 字符串,交易对名称
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • 最大可用杠杆倍数数据

📊 set_leverage(account_id, symbol, leverage, extra=None, generate=False)

功能: 设置交易对的杠杆倍数

参数:

  • account_id: 整数,账户ID
  • symbol: 字符串,交易对名称
  • leverage: 整数,杠杆倍数
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • 设置杠杆倍数结果

📊 get_margin_mode(account_id, symbol, margin_coin, extra=None, generate=False)

功能: 获取交易对的保证金模式

参数:

  • account_id: 整数,账户ID
  • symbol: 字符串,交易对名称
  • margin_coin: 字符串,保证金币种
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • 保证金模式数据

📊 set_margin_mode(account_id, symbol, margin_coin, margin_mode, extra=None, generate=False)

功能: 设置交易对的保证金模式

参数:

  • account_id: 整数,账户ID
  • symbol: 字符串,交易对名称
  • margin_coin: 字符串,保证金币种
  • margin_mode: 字符串或对象,保证金模式,例如"isolated"(逐仓)或"cross"(全仓)
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • 设置保证金模式结果

📊 is_dual_side(account_id, extra=None, generate=False)

功能: 查询账户是否使用双向持仓模式

参数:

  • account_id: 整数,账户ID
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • 是否使用双向持仓模式

📊 set_dual_side(account_id, dual_side, extra=None, generate=False)

功能: 设置账户的持仓模式

参数:

  • account_id: 整数,账户ID
  • dual_side: 布尔值,是否使用双向持仓模式
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • 设置持仓模式结果

6.6 资金划转与借贷

📊 transfer(account_id, transfer, extra=None, generate=False)

功能: 在账户内部或不同账户之间划转资金

参数:

  • account_id: 整数,账户ID
  • transfer: 字典或对象,划转信息,包含以下字段:
    • asset: 字符串,币种名称
    • amount: 浮点数,划转金额
    • from: 字符串,转出账户类型,支持的值:
      • "Spot": 现货钱包
      • "UsdtFuture": U本位合约钱包(默认)
      • "CoinFuture": 币本位合约钱包
      • "Margin": 杠杆全仓钱包
      • "IsolatedMargin": 杠杆逐仓钱包
    • to: 字符串,转入账户类型,支持的值同上
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • 资金划转结果

示例:

# 从现货钱包转账到U本位合约钱包
transfer_data = {
"asset": "USDT",
"amount": 1000.0,
"from": "Spot",
"to": "UsdtFuture"
}
result = trader.transfer(0, transfer_data)

if "Ok" in result:
success_data = result["Ok"]
print(f"划转成功: {success_data}")
else:
error = result.get("Err", "未知错误")
print(f"划转失败: {error}")

# 从U本位合约钱包转账到杠杆全仓钱包
transfer_data = {
"asset": "BTC",
"amount": 0.1,
"from": "UsdtFuture",
"to": "Margin"
}
result = trader.transfer(0, transfer_data)

if "Ok" in result:
success_data = result["Ok"]
print(f"BTC划转成功: {success_data}")
else:
error = result.get("Err", "未知错误")
print(f"BTC划转失败: {error}")

📊 sub_transfer(account_id, sub_transfer, extra=None, generate=False)

功能: 主账户和子账户之间划转资金

参数:

  • account_id: 整数,账户ID
  • sub_transfer: 字典或对象,划转信息,包含以下字段:
    • cid: 字符串,可选,客户端自定义ID
    • asset: 字符串,币种名称
    • amount: 浮点数,划转金额
    • from: 字符串,转出账户类型,支持的值:
      • "Spot": 现货钱包
      • "UsdtFuture": U本位合约钱包(默认)
      • "CoinFuture": 币本位合约钱包
      • "Margin": 杠杆全仓钱包
      • "IsolatedMargin": 杠杆逐仓钱包
    • to: 字符串,转入账户类型,支持的值同上
    • from_account: 字符串,可选,转出账户ID,为空时默认为母账户。注意: 当交易所为Bitget时,母账户不能为空
    • to_account: 字符串,可选,转入账户ID,为空时默认为母账户。注意: 当交易所为Bitget时,母账户不能为空
    • direction: 字符串,划转方向,支持的值:
      • "MasterToSub": 母转子(默认)
      • "SubToMaster": 子转母
      • "SubToSub": 子转子
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • 子账户资金划转结果

示例:

# 母账户向子账户划转USDT
sub_transfer_data = {
"cid": "transfer_001",
"asset": "USDT",
"amount": 500.0,
"from": "UsdtFuture",
"to": "UsdtFuture",
"from_account": None, # 母账户
"to_account": "sub_account_123", # 子账户ID
"direction": "MasterToSub"
}
result = trader.sub_transfer(0, sub_transfer_data)

if "Ok" in result:
success_data = result["Ok"]
print(f"子账户划转成功: {success_data}")
else:
error = result.get("Err", "未知错误")
print(f"子账户划转失败: {error}")

# 子账户向母账户划转BTC
sub_transfer_data = {
"asset": "BTC",
"amount": 0.05,
"from": "Spot",
"to": "Spot",
"from_account": "sub_account_456",
"to_account": None, # 母账户
"direction": "SubToMaster"
}
result = trader.sub_transfer(0, sub_transfer_data)

if "Ok" in result:
success_data = result["Ok"]
print(f"BTC子转母成功: {success_data}")
else:
error = result.get("Err", "未知错误")
print(f"BTC子转母失败: {error}")

# 子账户之间划转(需要指定两个子账户)
sub_transfer_data = {
"asset": "ETH",
"amount": 1.0,
"from": "UsdtFuture",
"to": "UsdtFuture",
"from_account": "sub_account_123",
"to_account": "sub_account_456",
"direction": "SubToSub"
}
result = trader.sub_transfer(0, sub_transfer_data)

if "Ok" in result:
success_data = result["Ok"]
print(f"ETH子转子成功: {success_data}")
else:
error = result.get("Err", "未知错误")
print(f"ETH子转子失败: {error}")

# 使用publish方法调用
cmd = {
"account_id": 0,
"method": "Transfer",
"transfer": {
"asset": "USDT",
"amount": 1000.0,
"from": "Spot",
"to": "UsdtFuture"
},
"sync": True
}
result = trader.publish(cmd)

# 子账户划转的publish方法调用
cmd = {
"account_id": 0,
"method": "SubTransfer",
"sub_transfer": {
"cid": "transfer_002",
"asset": "USDT",
"amount": 500.0,
"from": "UsdtFuture",
"to": "UsdtFuture",
"from_account": None,
"to_account": "sub_account_123",
"direction": "MasterToSub"
},
"sync": True
}
result = trader.publish(cmd)

钱包类型说明:

钱包类型说明适用场景
Spot现货钱包现货交易、充值提现
UsdtFutureU本位合约钱包USDT保证金的永续合约交易
CoinFuture币本位合约钱包币本位保证金的合约交易
Margin杠杆全仓钱包杠杆交易(全仓模式)
IsolatedMargin杠杆逐仓钱包杠杆交易(逐仓模式)

划转方向说明:

方向说明使用场景
MasterToSub母转子母账户向子账户划转资金
SubToMaster子转母子账户向母账户划转资金
SubToSub子转子子账户之间直接划转资金

注意事项:

  1. Bitget交易所特殊要求: 当使用Bitget交易所时,from_accountto_account字段中的母账户不能为空,需要明确指定母账户ID
  2. 账户ID格式: 不同交易所的账户ID格式可能不同,请参考具体交易所的API文档
  3. 权限要求: 子账户划转功能需要相应的API权限,确保API密钥具有子账户管理权限
  4. 资金安全: 划转操作不可逆,请仔细核对划转参数,特别是账户ID和金额

📊 get_deposit_address(account_id, ccy, chain=None, amount=None, extra=None, generate=False)

功能: 获取指定币种的充值地址

参数:

  • account_id: 整数,账户ID
  • ccy: 字符串,币种名称,如"USDT"、"BTC"等
  • chain: 字符串,可选,链类型,支持的值:
    • "Erc20": 以太坊链(默认)
    • "Trc20": 波场链
    • "Bep20": 币安智能链
    • "Sol": Solana链
    • "Polygon": Polygon链
    • "ArbitrumOne": Arbitrum链
    • "Optimism": Optimism链
    • "Ton": TON链
    • "AVAXC": Avalanche链
  • amount: 浮点数,可选,预估充值金额(部分交易所需要)
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • 充值地址信息字典,包含:
    • asset: 字符串,币种名称
    • address: 字符串,充值地址
    • chain: 字符串或None,链类型
    • tag: 字符串或None,地址标签(某些币种如EOS、XRP需要)
    • url: 字符串或None,充值二维码URL(可选)

示例:

# 获取USDT的ERC20充值地址
result = trader.get_deposit_address(
account_id=0,
ccy="USDT",
chain="Erc20"
)

if "Ok" in result:
deposit_info = result["Ok"]
print(f"USDT ERC20充值地址: {deposit_info['address']}")
print(f"币种: {deposit_info['asset']}")
print(f"链类型: {deposit_info['chain']}")
else:
error = result.get("Err", "未知错误")
print(f"获取USDT ERC20充值地址失败: {error}")

# 获取USDT的TRC20充值地址
result = trader.get_deposit_address(
account_id=0,
ccy="USDT",
chain="Trc20"
)

if "Ok" in result:
deposit_info_trc = result["Ok"]
print(f"USDT TRC20充值地址: {deposit_info_trc['address']}")
else:
error = result.get("Err", "未知错误")
print(f"获取USDT TRC20充值地址失败: {error}")

# 获取BTC充值地址(不指定链,使用默认)
result = trader.get_deposit_address(
account_id=0,
ccy="BTC"
)

if "Ok" in result:
btc_deposit = result["Ok"]
print(f"BTC充值地址: {btc_deposit['address']}")
else:
error = result.get("Err", "未知错误")
print(f"获取BTC充值地址失败: {error}")

# 获取需要标签的币种充值地址(如EOS)
result = trader.get_deposit_address(
account_id=0,
ccy="EOS"
)

if "Ok" in result:
eos_deposit = result["Ok"]
print(f"EOS充值地址: {eos_deposit['address']}")
if eos_deposit.get('tag'):
print(f"EOS充值标签: {eos_deposit['tag']}")
else:
error = result.get("Err", "未知错误")
print(f"获取EOS充值地址失败: {error}")

# 使用publish方法调用
cmd = {
"account_id": 0,
"method": "GetDepositAddress",
"ccy": "USDT",
"chain": "Trc20",
"sync": True
}
result = trader.publish(cmd)
print(f"充值地址信息: {result}")

📊 withdrawal(account_id, withdrawal, extra=None, generate=False)

功能: 提币到外部地址或内部转账

参数:

  • account_id: 整数,账户ID

  • withdrawal: 字典,提币信息,包含以下字段:

    • cid: 字符串,可选,客户端自定义ID

    • asset: 字符串,币种名称

    • amt: 浮点数,提币数量

    • addr: 字典,提币地址信息,支持两种类型:

      链上提币 (OnChain):

      • OnChain: 字典,包含:
        • chain: 字符串,区块链网络,支持的值:
          • "Erc20": 以太坊链
          • "Trc20": 波场链
          • "Bep20": 币安智能链
          • "Sol": Solana链
          • "Polygon": Polygon链
          • "ArbitrumOne": Arbitrum链
          • "Optimism": Optimism链
          • "Ton": TON链
          • "AVAXC": Avalanche链
        • address: 字符串,目标钱包地址
        • tag: 字符串或None,地址标签(EOS、XRP等币种需要)

      内部转账 (InternalTransfer):

      • InternalTransfer: 字符串,支持三种格式:
        • {"Email": "user@example.com"}: 邮箱转账
        • {"Phone": ["+86", "13800138000"]}: 手机号转账(区号+手机号)
        • {"Uid": "12345678"}: 用户ID转账
  • extra: 字典,可选,额外参数

  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • 提币操作结果

示例:

# 链上提币 - USDT通过TRC20网络提币
withdrawal_data = {
"cid": "withdraw_001",
"asset": "USDT",
"amt": 100.0,
"addr": {
"OnChain": {
"chain": "Trc20",
"address": "TKzxdSv2FZKQrEqkKVgp5DcwEXBEKMg2Ax",
"tag": None
}
}
}
result = trader.withdrawal(0, withdrawal_data)

if "Ok" in result:
success_data = result["Ok"]
print(f"USDT提币成功: {success_data}")
else:
error = result.get("Err", "未知错误")
print(f"USDT提币失败: {error}")

# 链上提币 - BTC提币
btc_withdrawal = {
"cid": "withdraw_btc_001",
"asset": "BTC",
"amt": 0.01,
"addr": {
"OnChain": {
"chain": "Erc20", # BTC通常不需要指定链
"address": "1A1zP1eP5QGefi2DMPTfTL5SLmv7DivfNa",
"tag": None
}
}
}
result = trader.withdrawal(0, btc_withdrawal)

if "Ok" in result:
success_data = result["Ok"]
print(f"BTC提币成功: {success_data}")
else:
error = result.get("Err", "未知错误")
print(f"BTC提币失败: {error}")

# 链上提币 - EOS提币(需要tag)
eos_withdrawal = {
"asset": "EOS",
"amt": 10.0,
"addr": {
"OnChain": {
"chain": "Erc20",
"address": "eosaccount123",
"tag": "1234567890" # EOS需要memo标签
}
}
}
result = trader.withdrawal(0, eos_withdrawal)

if "Ok" in result:
success_data = result["Ok"]
print(f"EOS提币成功: {success_data}")
else:
error = result.get("Err", "未知错误")
print(f"EOS提币失败: {error}")

# 内部转账 - 通过邮箱转账
internal_transfer_email = {
"cid": "internal_001",
"asset": "USDT",
"amt": 50.0,
"addr": {
"InternalTransfer": {
"Email": "recipient@example.com"
}
}
}
result = trader.withdrawal(0, internal_transfer_email)

if "Ok" in result:
success_data = result["Ok"]
print(f"邮箱内部转账成功: {success_data}")
else:
error = result.get("Err", "未知错误")
print(f"邮箱内部转账失败: {error}")

# 内部转账 - 通过手机号转账
internal_transfer_phone = {
"asset": "BTC",
"amt": 0.005,
"addr": {
"InternalTransfer": {
"Phone": ["+86", "13800138000"] # [区号, 手机号]
}
}
}
result = trader.withdrawal(0, internal_transfer_phone)

if "Ok" in result:
success_data = result["Ok"]
print(f"手机号内部转账成功: {success_data}")
else:
error = result.get("Err", "未知错误")
print(f"手机号内部转账失败: {error}")

# 内部转账 - 通过用户ID转账
internal_transfer_uid = {
"asset": "ETH",
"amt": 0.1,
"addr": {
"InternalTransfer": {
"Uid": "12345678"
}
}
}
result = trader.withdrawal(0, internal_transfer_uid)

if "Ok" in result:
success_data = result["Ok"]
print(f"用户ID内部转账成功: {success_data}")
else:
error = result.get("Err", "未知错误")
print(f"用户ID内部转账失败: {error}")

# 使用publish方法调用
cmd = {
"account_id": 0,
"method": "Withdrawal",
"withdrawal": {
"cid": "withdraw_publish_001",
"asset": "USDT",
"amt": 100.0,
"addr": {
"OnChain": {
"chain": "Trc20",
"address": "TKzxdSv2FZKQrEqkKVgp5DcwEXBEKMg2Ax",
"tag": None
}
}
},
"sync": True
}
result = trader.publish(cmd)
print(f"提币结果: {result}")

重要提醒:

  1. 手续费: 链上提币会产生网络手续费,请确保账户余额充足
  2. 最小提币量: 每个币种都有最小提币数量限制,请查询交易所规则
  3. 地址验证: 提币前请仔细核对目标地址,错误地址可能导致资金永久丢失
  4. 网络确认: 不同链的确认时间不同,TRC20通常较快,ERC20在网络拥堵时可能较慢
  5. 标签要求: EOS、XRP、XMR等币种需要正确的标签(tag/memo),否则可能丢失
  6. API权限: 提币功能需要账户开启提币权限,并且API密钥需要具备提币权限
  7. 风控限制: 大额提币可能触发风控审核,需要额外验证

链类型选择建议:

币种推荐链类型特点
USDTBep20手续费低,确认快
USDTErc20兼容性好,但手续费较高
USDTTrc20手续费适中,速度快
ETHErc20原生链
BNBBep20币安生态

📊 borrow(account_id, coin, amount, extra=None, generate=False)

功能: 借入资金

参数:

  • account_id: 整数,账户ID
  • coin: 字符串,币种
  • amount: 浮点数,借入金额
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • 借入资金结果

📊 repay(account_id, coin, amount, extra=None, generate=False)

功能: 偿还借入的资金

参数:

  • account_id: 整数,账户ID
  • coin: 字符串,币种
  • amount: 浮点数,偿还金额
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • 偿还资金结果

📊 get_borrowed(account_id, coin=None, extra=None, generate=False)

功能: 查询已借入的资金

参数:

  • account_id: 整数,账户ID
  • coin: 字符串,可选,币种,不指定则查询所有币种
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • 已借入资金数据列表,每个元素为字典,包含:
    • coin: 字符串,币种名称
    • amount: 浮点数,已借入金额

📊 get_borrow_rate(account_id, coin=None, extra=None, generate=False)

功能: 查询借贷利率

参数:

  • account_id: 整数,账户ID
  • coin: 字符串,可选,币种,不指定则查询所有币种
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • 借贷利率数据列表,每个元素为字典,包含:
    • ccy: 字符串,币种名称
    • rate: 浮点数,借贷利率

📊 get_borrow_limit(account_id, coin, is_vip=None, extra=None, generate=False)

功能: 查询借贷限额

参数:

  • account_id: 整数,账户ID
  • coin: 字符串,币种
  • is_vip: 布尔值,可选,是否为VIP用户
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • 借贷限额数据字典,包含:
    • debt: 浮点数,当前负债
    • interest: 浮点数,当前计息
    • coin: 字符串,借贷币种
    • rate: 浮点数,日利率
    • borrow_limit: 浮点数,可借贷限额
    • vip_detail: 字典或None,尊享用户详情(可选),包含:
      • pos_loan: 浮点数,当前账户负债占用
      • available_loan: 浮点数,当前账户剩余可用
      • used_loan: 浮点数,当前账户已借额度

示例:

# 查询USDT的借贷限额
result = trader.get_borrow_limit(
account_id=0,
coin="USDT"
)

if "Ok" in result:
borrow_limit = result["Ok"]
print(f"USDT借贷限额: {borrow_limit}")
print(f"当前负债: {borrow_limit['debt']}")
print(f"可借贷限额: {borrow_limit['borrow_limit']}")
print(f"日利率: {borrow_limit['rate']:.6f}")
else:
error = result.get("Err", "未知错误")
print(f"查询USDT借贷限额失败: {error}")

# VIP用户查询借贷限额
result = trader.get_borrow_limit(
account_id=0,
coin="BTC",
is_vip=True
)

if "Ok" in result:
borrow_limit_vip = result["Ok"]
print(f"VIP用户BTC借贷限额: {borrow_limit_vip}")

# 检查VIP详情
if borrow_limit_vip.get('vip_detail'):
vip_info = borrow_limit_vip['vip_detail']
print(f"VIP可用额度: {vip_info['available_loan']}")
print(f"VIP已用额度: {vip_info['used_loan']}")
else:
error = result.get("Err", "未知错误")
print(f"查询VIP用户BTC借贷限额失败: {error}")

# 使用publish方法调用
cmd = {
"account_id": 1,
"method": "GetBorrowLimit",
"coin": "USDT",
"is_vip": False,
"sync": True
}
result = trader.publish(cmd)

6.7 账户信息

📊 get_account_info(account_id, extra=None, generate=False)

功能: 获取账户详细信息

参数:

  • account_id: 整数,账户ID
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • 账户详细信息数据

📊 get_account_mode(account_id, extra=None, generate=False)

功能: 获取账户模式

参数:

  • account_id: 整数,账户ID
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • 账户模式数据

📊 set_account_mode(account_id, account_mode, extra=None, generate=False)

功能: 设置账户模式

参数:

  • account_id: 整数,账户ID
  • account_mode: 字符串或对象,账户模式
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • 设置账户模式结果

📊 get_user_id(account_id, extra=None, generate=False)

功能: 获取用户ID

参数:

  • account_id: 整数,账户ID
  • extra: 字典,可选,额外参数
  • generate: 布尔值,可选,默认为False,设为True时只生成命令而不执行

返回值:

  • 用户ID数据

七、数据结构说明

7.1 K线数据结构

📈 Candle(蜡烛图数据)

单根蜡烛图数据结构,包含完整的OHLCV信息:

pub struct Candle {
/// 开盘时间,Unix时间戳(毫秒)
pub timestamp: i64,
/// 开盘价
pub open: f64,
/// 最高价
pub high: f64,
/// 最低价
pub low: f64,
/// 收盘价
pub close: f64,
/// 成交量(基础货币)
pub volume: f64,
/// 成交额(报价货币)
pub quote_volume: f64,
/// 成交笔数(可选)
pub trades: Option&lt;u32&gt;,
/// taker买入成交量(基础货币,可选)
pub taker_buy_volume: Option&lt;f64&gt;,
/// taker买入成交额(报价货币,可选)
pub taker_buy_quote_volume: Option&lt;f64&gt;,
/// K线状态:false 代表 K 线未完结,true 代表 K 线已完结
pub confirm: bool,
}

字段说明:

  • timestamp: 该K线的开盘时间,使用Unix时间戳(毫秒)
  • open/high/low/close: 标准OHLC价格数据
  • volume: 该时间段内的总成交量(以基础货币计算)
  • quote_volume: 该时间段内的总成交额(以报价货币计算)
  • trades: 该时间段内的成交笔数,某些交易所可能不提供此数据
  • taker_buy_volume: 主动买入的成交量,用于分析买卖压力
  • taker_buy_quote_volume: 主动买入的成交额
  • confirm: 关键字段,表示该K线是否已完结。未完结的K线数据可能还在变化

📊 Kline(K线数据集合)

包含多根蜡烛图数据的集合:

pub struct Kline {
/// 交易对符号
pub symbol: Symbol,
/// K线时间粒度
pub interval: KlineInterval,
/// 蜡烛图数据列表
pub candles: Vec&lt;Candle&gt;,
}

⏰ KlineInterval(K线时间粒度)

支持的K线时间粒度枚举:

pub enum KlineInterval {
/// 1分钟
Min1, // "1m"
/// 3分钟
Min3, // "3m"
/// 5分钟
Min5, // "5m"
/// 15分钟
Min15, // "15m"
/// 30分钟
Min30, // "30m"
/// 1小时
Hour1, // "1h"
/// 2小时
Hour2, // "2h"
/// 4小时
Hour4, // "4h"
/// 6小时
Hour6, // "6h"
/// 8小时
Hour8, // "8h"
/// 12小时
Hour12, // "12h"
/// 1天
Day1, // "1d"
/// 3天
Day3, // "3d"
/// 1周
Week1, // "1w"
/// 1月
Month1, // "1M"
}

KlineInterval枚举方法:

  • as_str(): 转换为字符串表示(如"1m", "1h", "1d"等)
  • from_str(): 从字符串解析为KlineInterval枚举值
  • default(): 返回默认值(Min1,即1分钟)

字符串映射表:

  • 支持大小写不敏感的解析(如"1h"和"1H"都可以)
  • 月份使用大写"M"("1M")以区别于分钟"m"

使用示例:

# 处理K线数据的完整示例
def analyze_kline_data(kline_data):
"""分析K线数据"""
symbol = kline_data['symbol']
interval = kline_data['interval']
candles = kline_data['candles']

print(f"分析 {symbol}{interval} K线数据,共 {len(candles)} 根")

for i, candle in enumerate(candles):
# 检查K线是否完结
status = "已完结" if candle['confirm'] else "未完结"

# 计算涨跌幅
change_pct = (candle['close'] - candle['open']) / candle['open'] * 100

# 计算买卖压力比(如果有taker数据)
buy_pressure = 0
if candle.get('taker_buy_volume') and candle['volume'] > 0:
buy_pressure = candle['taker_buy_volume'] / candle['volume']

# 处理可选的成交笔数信息
trades_count = candle.get('trades', 0)

print(f"K线 {i+1}: 时间={candle['timestamp']}, "
f"OHLC=({candle['open']}, {candle['high']}, {candle['low']}, {candle['close']}), "
f"涨跌幅={change_pct:.2f}%, 成交量={candle['volume']}, "
f"买压={buy_pressure:.2f}, 成交笔数={trades_count}, 状态={status}")

# 只对已完结的K线进行技术分析
if candle['confirm']:
# 进行技术分析逻辑
pass

# 获取并分析K线数据
result = trader.get_kline(0, "BTC_USDT", "1h", limit=24)

if "Ok" in result:
kline_data = result["Ok"]
analyze_kline_data(kline_data)
else:
error = result.get("Err", "未知错误")
print(f"获取K线数据失败: {error}")

K线数据使用最佳实践:

  1. 检查K线完结状态: 始终检查confirm字段,只对已完结的K线进行技术分析
  2. 处理可选字段: tradestaker_buy_volume等字段可能为空,需要安全处理
  3. 时间戳处理: timestamp为毫秒级Unix时间戳,转换为日期时需要除以1000
  4. 买卖压力分析: 利用taker_buy_volume分析市场买卖压力
  5. 成交活跃度: 通过trades字段分析市场活跃程度
# K线数据安全处理示例
def safe_process_candle(candle):
"""安全处理K线数据"""
# 基础OHLCV数据(必有)
timestamp = candle['timestamp']
ohlc = (candle['open'], candle['high'], candle['low'], candle['close'])
volume = candle['volume']
quote_volume = candle['quote_volume']
is_confirmed = candle['confirm']

# 可选数据的安全处理
trades = candle.get('trades')
taker_buy_vol = candle.get('taker_buy_volume')
taker_buy_quote_vol = candle.get('taker_buy_quote_volume')

# 计算衍生指标
price_change = (ohlc[3] - ohlc[0]) / ohlc[0] * 100 if ohlc[0] > 0 else 0

# 买卖压力分析(如果有数据)
buy_pressure = None
if taker_buy_vol is not None and volume > 0:
buy_pressure = taker_buy_vol / volume

# 平均每笔交易金额(如果有数据)
avg_trade_size = None
if trades is not None and trades > 0:
avg_trade_size = quote_volume / trades

return {
'timestamp': timestamp,
'ohlc': ohlc,
'volume': volume,
'quote_volume': quote_volume,
'price_change_pct': price_change,
'buy_pressure': buy_pressure,
'avg_trade_size': avg_trade_size,
'trades_count': trades,
'is_confirmed': is_confirmed
}

7.2 Position(持仓)数据结构

📊 Position(持仓数据)

持仓数据结构包含完整的仓位信息:

{
# 仓位id(可选)
"id": "position_123456",

# 交易对符号
"symbol": "BTC_USDT",

# 创建仓位的Unix时间戳,以毫秒为单位(可选)
"ctime": 1640995200000,

# 仓位更新的Unix时间戳,以毫秒为单位(可选)
"utime": 1640998800000,

# 保证金模式
"margin_mode": "Cross", # "Cross"(全仓)或 "Isolated"(逐仓)

# 维持保证金(可选)
"margin": 1000.0,

# 维持保证金率(可选)
"mmr": 0.005,

# 仓位方向
"side": "Long", # "Long"(多头)或 "Short"(空头)

# 仓位的杠杆率
"leverage": 10,

# 仓位数量
"amount": 0.5,

# 仓位的平均开仓价格
"entry_price": 50000.0,

# 未实现盈亏,市场价格和开仓价格之间的差异乘以合约数量,可以为负
"unrealized_pnl": 250.5,

# 爆仓价格(可选)
"liquidation_price": 45000.0,

# 标记价格(可选)
"mark_price": 50500.0
}

字段说明:

字段类型必填说明
id字符串仓位唯一标识符
symbol字符串交易对符号
ctime整数创建时间(毫秒时间戳)
utime整数更新时间(毫秒时间戳)
margin_mode字符串保证金模式:"Cross"(全仓)或"Isolated"(逐仓)
margin浮点数维持保证金金额
mmr浮点数维持保证金率
side字符串仓位方向:"Long"(多头)或"Short"(空头)
leverage整数杠杆倍数(1-125)
amount浮点数仓位数量(正数)
entry_price浮点数平均开仓价格
unrealized_pnl浮点数未实现盈亏(可为负)
liquidation_price浮点数预估爆仓价格
mark_price浮点数当前标记价格

📊 MaxPosition(最大可开仓数据)

最大可开仓数据结构:

{
# 多仓对应Quote名义价值
"long_notional": 100000.0,

# 空仓对应Quote名义价值
"short_notional": 100000.0,

# 多仓对应Base数量
"long_quantity": 2.0,

# 空仓对应Base数量
"short_quantity": 2.0
}

字段说明:

字段类型说明
long_notional浮点数多仓最大名义价值(以报价货币计)
short_notional浮点数空仓最大名义价值(以报价货币计)
long_quantity浮点数多仓最大数量(以基础货币计)
short_quantity浮点数空仓最大数量(以基础货币计)

使用示例:

# 获取单个持仓信息
result = trader.get_position(0, "BTC_USDT")

if "Ok" in result:
position = result["Ok"]
if position: # 有持仓
print(f"交易对: {position['symbol']}")
print(f"方向: {position['side']}")
print(f"数量: {position['amount']}")
print(f"开仓价: {position['entry_price']}")
print(f"未实现盈亏: {position['unrealized_pnl']}")
print(f"杠杆: {position['leverage']}x")
print(f"保证金模式: {position['margin_mode']}")

# 计算盈亏比例
if position['entry_price'] > 0:
pnl_pct = position['unrealized_pnl'] / (position['amount'] * position['entry_price']) * 100
print(f"盈亏比例: {pnl_pct:.2f}%")

# 风险评估
if position.get('liquidation_price'):
mark = position.get('mark_price', position['entry_price'])
if position['side'] == 'Long':
risk_pct = (mark - position['liquidation_price']) / mark * 100
else:
risk_pct = (position['liquidation_price'] - mark) / mark * 100
print(f"距离爆仓: {risk_pct:.2f}%")
else:
print("无持仓")
else:
error = result.get("Err", "未知错误")
print(f"获取持仓失败: {error}")

# 获取所有持仓信息
result = trader.get_positions(0)

if "Ok" in result:
positions = result["Ok"]
print(f"持仓数量: {len(positions)}")

# 统计持仓信息
total_pnl = 0
long_value = 0
short_value = 0

for pos in positions:
symbol = pos['symbol']
side = pos['side']
amount = pos['amount']
pnl = pos['unrealized_pnl']
mark = pos.get('mark_price', pos['entry_price'])

# 计算持仓价值
position_value = amount * mark
if side == 'Long':
long_value += position_value
else:
short_value += position_value

total_pnl += pnl

# 打印每个持仓
print(f"{symbol} {side}: 数量={amount}, PNL={pnl:.2f}, 价值={position_value:.2f}")

print(f"\n汇总统计:")
print(f"总未实现盈亏: {total_pnl:.2f}")
print(f"多头总价值: {long_value:.2f}")
print(f"空头总价值: {short_value:.2f}")
print(f"净敞口: {long_value - short_value:.2f}")
else:
error = result.get("Err", "未知错误")
print(f"获取所有持仓失败: {error}")

持仓数据处理最佳实践:

  1. 空仓检查: 返回值可能为空列表或None,表示无持仓
  2. 可选字段处理: liquidation_pricemark_price等字段可能不存在
  3. 风险计算: 利用爆仓价格和标记价格计算风险水平
  4. 盈亏分析: 结合数量和开仓价格计算盈亏比例
  5. 保证金监控: 关注维持保证金率,防止爆仓风险
# 持仓风险管理示例
def analyze_position_risk(position):
"""分析持仓风险"""
symbol = position['symbol']
side = position['side']
amount = position['amount']
entry_price = position['entry_price']
unrealized_pnl = position['unrealized_pnl']
leverage = position['leverage']

# 计算持仓价值
position_value = amount * entry_price

# 计算盈亏比例
pnl_percentage = (unrealized_pnl / position_value) * 100 if position_value > 0 else 0

# 风险评级
risk_level = "低"
if leverage > 10:
risk_level = "高"
elif leverage > 5:
risk_level = "中"

# 爆仓风险评估
liquidation_risk = "未知"
if position.get('liquidation_price') and position.get('mark_price'):
liq_price = position['liquidation_price']
mark_price = position['mark_price']

if side == 'Long':
distance_pct = ((mark_price - liq_price) / mark_price) * 100
else:
distance_pct = ((liq_price - mark_price) / mark_price) * 100

if distance_pct < 5:
liquidation_risk = "极高"
elif distance_pct < 10:
liquidation_risk = "高"
elif distance_pct < 20:
liquidation_risk = "中"
else:
liquidation_risk = "低"

return {
'symbol': symbol,
'side': side,
'value': position_value,
'pnl_percentage': pnl_percentage,
'risk_level': risk_level,
'liquidation_risk': liquidation_risk,
'leverage': leverage
}

# 批量分析持仓风险
def analyze_all_positions_risk(positions):
"""批量分析所有持仓风险"""
high_risk_positions = []
total_risk_value = 0

for pos in positions:
risk_info = analyze_position_risk(pos)

# 识别高风险持仓
if risk_info['risk_level'] == '高' or risk_info['liquidation_risk'] in ['高', '极高']:
high_risk_positions.append(risk_info)
total_risk_value += risk_info['value']

return {
'high_risk_count': len(high_risk_positions),
'high_risk_value': total_risk_value,
'high_risk_positions': high_risk_positions
}

7.3 表格数据格式

表格数据格式示例 📋
{
"title": "表格标题",
"cols": ["列1", "列2", "列3", ...],
"rows": [
["行1值1", "行1值2", "行1值3", ...],
["行2值1", "行2值2", "行2值3", ...],
...
]
}

7.4 WebClient配置格式

WebClient配置示例 ⚙️
{
"server_name": "策略名称", // 策略服务器名称,用于在Web平台上标识
"primary_balance": 10000.0, // 主账户初始余额,仅首次初始化需要
"secondary_balance": 5000.0, // 次账户初始余额,仅首次初始化需要
"is_production": true, // 是否为生产环境,影响风控级别
"open_threshold": 0.14, // 开仓阈值,如0.14代表千1.4价差开仓
"max_position_ratio": 100, // 单个币种最大持仓比例,如100代表100%,1x杠杆
"max_leverage": 10, // 最大可用杠杆倍数
"funding_rate_threshold": 0.1, // 资金费率阈值,如0.1代表千1资金费率开仓
"cost": 0.0824, // 成本 主所开平 副所开平,4次交易手续费,如0.0824 代表万8.24
"primary_maker_fee_rate": 0.0002, // 主所maker手续费率
"primary_taker_fee_rate": 0.0005, // 主所taker手续费率
"primary_rebate_rate": 0.3, // 主所返佣率
"secondary_maker_fee_rate": 0.0002, // 次所maker手续费率
"secondary_taker_fee_rate": 0.0005, // 次所taker手续费率
"secondary_rebate_rate": 0.7, // 次所返佣率
}

所有字段均为可选项,但推荐在首次初始化时设置server_nameprimary_balancesecondary_balance。初始余额设置后会被缓存,后续启动策略时会自动从缓存中读取,无需再次设置。

缓存机制说明: WebClient的配置数据(包括初始余额、交易统计数据如总成交量、成交次数、失败次数、成功次数、胜率、利润等)会被缓存到本地文件系统。缓存文件以 {server_name}.json 的格式命名,存储在当前工作目录中。当调用统计和余额相关的API(如update_trade_stats、update_total_balance等)更新数据时,缓存文件会实时自动更新,无需手动操作。这就是为什么server_name参数非常重要,它不仅用于Web界面标识,还用于定位缓存文件。不同的策略应使用不同的server_name以避免缓存冲突。WebClient缓存是自动管理的,与策略缓存(cache_save/cache_load)完全独立。

7.4 命令格式说明

v2版本的命令格式相比v1版本更加扁平化,直接使用方法名作为顶级字段:

v1格式(嵌套结构):

{
"account_id": 0,
"cmd": {
"Sync": "UsdtBalance"
}
}

v2格式(扁平化结构):

{
"account_id": 0,
"method": "UsdtBalance",
"sync": true
}

八、数据源订阅

8.1 订阅机制概述

Trader API v2提供了完整的数据源订阅机制,通过策略的subscribes()方法定义需要订阅的数据类型。系统支持三种不同的数据源订阅方式,每种方式适用于不同的数据获取场景。

📊 三种数据源类型

数据源类型数据来源更新方式适用场景延迟特性
SubscribeWs交易所WebSocket实时推送高频交易、实时行情极低延迟
SubscribeRest交易所REST API定时轮询账户状态、合约信息秒级延迟
SubscribeTimer系统定时器定时触发策略逻辑、状态检查可配置间隔

🔄 数据流向图

┌─────────────────┐    ┌──────────────┐    ┌─────────────────┐
│ 交易所API │ │ 系统定时器 │ │ 策略回调函数 │
│ │ │ │ │ │
│ WebSocket ────┐ │ │ Timer ───┐ │ │ ┌─ on_bbo() │
│ REST API ──┐ │ │ │ │ │ │ ├─ on_kline() │
│ │ │ │ │ │ │ │ ├─ on_order() │
└───────────┼──┼─┘ └────────────┼─┘ │ ├─ on_timer() │
│ │ │ │ └─ on_balance()│
│ └─────────────────┐ │ │ │
└──────────────────┐ │ │ └─────────────────┘
│ │ │ ▲
▼ ▼ ▼ │
┌─────────────────┐ │
│ Trader Engine │──────┘
│ 数据分发中心 │
└─────────────────┘

8.2 WebSocket实时订阅 (SubscribeWs)

WebSocket订阅是最重要的数据源,提供交易所的实时推送数据,具有极低延迟的特性,是高频交易和实时行情分析的核心。

📡 WebSocket支持的频道类型

频道分类频道类型说明回调函数延迟级别
市场行情Bbo最佳买卖价on_bbo毫秒级
Depth市场深度on_depth毫秒级
Trade成交记录on_trade毫秒级
KlineK线数据on_kline秒级
MarkPrice标记价格on_mark_price秒级
Funding资金费率on_funding分钟级
账户数据Order订单更新on_order毫秒级
Position持仓更新on_position毫秒级
Balance余额更新on_balance秒级
FundingFee资金费结算on_funding_fee小时级
OrderAndFill订单和成交on_order_and_fill毫秒级

🚀 WebSocket订阅配置

基础配置格式:

{
"account_id": 0, # 账户ID(市场数据可选,账户数据必须)
"sub": {
"SubscribeWs": [
# 市场数据订阅(无需账户ID)
{
"频道类型": 配置参数
},
# 账户数据订阅(需要账户ID)
{
"频道类型": 配置参数
}
]
}
}

📈 Kline数据订阅详解

📈 Kline订阅配置

基础格式:

{
"Kline": {
"symbols": ["BTC_USDT", "ETH_USDT"], # 订阅的交易对列表
"interval": "1m" # K线时间间隔
}
}

支持的时间间隔:

  • "1m", "3m", "5m", "15m", "30m": 分钟级别
  • "1h", "2h", "4h", "6h", "8h", "12h": 小时级别
  • "1d", "3d": 天级别
  • "1w": 周级别
  • "1M": 月级别

🔄 Kline回调数据结构

当订阅Kline数据后,每当有新的K线数据时,会触发on_kline(exchange, kline)回调:

回调参数:

  • exchange: 字符串,交易所名称
  • kline: K线数据对象,包含以下字段:
    • symbol: 交易对符号
    • interval: K线时间粒度
    • candles: 蜡烛图数据列表(通常只包含最新的一根K线)

Candle数据结构:

{
"timestamp": 1640995200000, # 开盘时间,Unix时间戳(毫秒)
"open": 50000.0, # 开盘价
"high": 50500.0, # 最高价
"low": 49800.0, # 最低价
"close": 50200.0, # 收盘价
"volume": 123.45, # 成交量(基础货币)
"quote_volume": 6180000.0, # 成交额(报价货币)
"trades": 1250, # 成交笔数(可选)
"taker_buy_volume": 65.2, # 主动买入成交量(可选)
"taker_buy_quote_volume": 3270000.0, # 主动买入成交额(可选)
"confirm": False # K线状态:false=未完结,true=已完结
}

📊 Kline订阅示例

完整订阅示例:

def subscribes(self):
"""返回订阅配置"""
return [
{
"account_id": 0,
"sub": {
"SubscribeWs": [
# 订阅多个交易对的1分钟K线
{
"Kline": {
"symbols": ["BTC_USDT", "ETH_USDT", "SOL_USDT"],
"interval": "1m"
}
},
# 同时订阅BBO数据
{
"Bbo": ["BTC_USDT", "ETH_USDT"]
}
]
}
},
# 可以订阅不同时间周期的K线
{
"account_id": 0,
"sub": {
"SubscribeWs": [
{
"Kline": {
"symbols": ["BTC_USDT"],
"interval": "5m"
}
}
]
}
}
]

def on_kline(self, exchange, kline):
"""K线数据回调处理"""
symbol = kline['symbol']
interval = kline['interval']
candles = kline['candles']

for candle in candles:
timestamp = candle['timestamp']
open_price = candle['open']
high_price = candle['high']
low_price = candle['low']
close_price = candle['close']
volume = candle['volume']
is_confirmed = candle['confirm']

# 计算涨跌幅
price_change_pct = (close_price - open_price) / open_price * 100

status = "已完结" if is_confirmed else "未完结"

self.trader.log(
f"{exchange} {symbol} {interval} K线更新: "
f"价格 {close_price} (涨跌 {price_change_pct:+.2f}%), "
f"成交量 {volume:.2f}, 状态: {status}",
level="DEBUG"
)

# 只对已完结的K线进行技术分析
if is_confirmed:
self.analyze_kline_signal(symbol, candle)

def analyze_kline_signal(self, symbol, candle):
"""分析K线信号"""
# 检测大阳线/大阴线
body_ratio = abs(candle['close'] - candle['open']) / (candle['high'] - candle['low'])

if body_ratio > 0.7: # 实体超过70%
if candle['close'] > candle['open']:
signal_type = "大阳线"
color = "green"
else:
signal_type = "大阴线"
color = "red"

self.trader.tlog(
f"K线信号_{symbol}",
f"{symbol}: 检测到{signal_type},实体比例 {body_ratio:.1%}",
color=color,
interval=60
)

🔥 其他WebSocket订阅示例

深度数据订阅:

{
"Depth": {
"symbols": ["BTC_USDT", "ETH_USDT"],
"levels": 5 # 订阅5档深度
}
}

账户数据订阅:

{
"account_id": 0,
"sub": {
"SubscribeWs": [
{
"Order": ["BTC_USDT", "ETH_USDT"] # 订单更新
},
{
"Position": ["BTC_USDT", "ETH_USDT"] # 持仓更新
},
{
"OrderAndFill": ["BTC_USDT"] # 订单和成交数据
}
]
}
}

💡 WebSocket完整示例

订阅配置:

def subscribes(self):
"""WebSocket订阅配置示例"""
return [
{
"account_id": 0,
"sub": {
"SubscribeWs": [
# 市场行情数据
{
"Bbo": ["BTC_USDT", "ETH_USDT"] # 最佳买卖价
},
{
"Kline": {
"symbols": ["BTC_USDT", "ETH_USDT"],
"interval": "1m" # 1分钟K线
}
},
{
"Depth": {
"symbols": ["BTC_USDT"],
"levels": 10 # 10档深度
}
},
{
"Funding": ["BTC_USDT", "ETH_USDT"] # 资金费率
},

# 账户数据
{
"Order": ["BTC_USDT", "ETH_USDT"] # 订单状态
},
{
"Position": ["BTC_USDT", "ETH_USDT"] # 持仓变化
},
{
"Balance": [] # 账户余额(不指定币种获取所有)
}
]
}
}
]

回调函数处理:

def on_bbo(self, exchange, symbol, bbo):
"""最佳买卖价回调"""
bid_price = bbo.get('bid_price')
ask_price = bbo.get('ask_price')
spread = ask_price - bid_price

self.trader.log(f"{symbol} BBO: 买{bid_price}{ask_price} 价差{spread}")

def on_kline(self, exchange, kline):
"""K线数据回调"""
symbol = kline['symbol']
interval = kline['interval']

for candle in kline['candles']:
if candle['confirm']: # 只处理已完结的K线
close_price = candle['close']
volume = candle['volume']
self.trader.log(f"{symbol} {interval} K线完结: 价格{close_price} 成交量{volume}")

def on_depth(self, exchange, symbol, depth):
"""深度数据回调"""
bids = depth.get('bids', [])
asks = depth.get('asks', [])

if bids and asks:
best_bid = bids[0][0] if bids[0] else 0
best_ask = asks[0][0] if asks[0] else 0
self.trader.log(f"{symbol} 深度: 最优买{best_bid} 最优卖{best_ask}")

def on_order(self, exchange, order):
"""订单状态回调"""
symbol = order.get('symbol')
status = order.get('status')
side = order.get('side')

self.trader.log(f"订单更新: {symbol} {side} {status}")

def on_position(self, exchange, position):
"""持仓变化回调

参数:
exchange: 交易所名称
position: 持仓数据,数据结构参见[7.2 Position(持仓)数据结构](#72-position持仓数据结构)
"""
symbol = position.get('symbol')
amount = position.get('amount', 0)
side = position.get('side')
unrealized_pnl = position.get('unrealized_pnl', 0)

self.trader.log(f"持仓更新: {symbol} {side} 数量{amount} 未实现盈亏{unrealized_pnl}")

8.3 REST轮询订阅 (SubscribeRest)

REST轮询订阅适用于不需要极低延迟但需要定期更新的数据,如账户余额、持仓状态、合约信息等。系统会按照指定的时间间隔主动调用REST API获取数据。

🔄 REST轮询支持的数据类型

数据类型说明回调函数更新频率建议适用场景
Balance账户余额on_balance1-5秒余额监控、风控
Position持仓信息on_position2-10秒持仓管理、盈亏计算
Funding资金费率on_funding30-300秒资金费率监控
Instrument合约信息on_instrument60-300秒合约规则更新

⚙️ REST轮询配置

基础配置格式:

{
"account_id": 0, # 必须指定账户ID
"sub": {
"SubscribeRest": {
"update_interval": {
"secs": 5, # 轮询间隔(秒)
"nanos": 0 # 纳秒精度(通常为0)
},
"rest_type": "数据类型" # 要轮询的数据类型
}
}
}

实际示例:

# 轮询账户余额(每5秒一次)
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 5, "nanos": 0},
"rest_type": "Balance"
}
}
}

# 轮询持仓信息(每10秒一次)
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 10, "nanos": 0},
"rest_type": "Position"
}
}
}

# 轮询合约信息(每5分钟一次)
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 300, "nanos": 0},
"rest_type": "Instrument"
}
}
}

💡 REST轮询完整示例

订阅配置:

def subscribes(self):
"""REST轮询订阅配置示例"""
return [
# 轮询账户余额
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 5, "nanos": 0},
"rest_type": "Balance"
}
}
},

# 轮询持仓信息
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 10, "nanos": 0},
"rest_type": "Position"
}
}
},

# 轮询资金费率(备用方案,当WebSocket不稳定时)
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 60, "nanos": 0},
"rest_type": "Funding"
}
}
},

# 轮询合约信息更新
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 300, "nanos": 0},
"rest_type": "Instrument"
}
}
}
]

回调函数处理:

def on_balance(self, exchange, balances):
"""余额轮询回调"""
for balance in balances:
asset = balance.get('asset')
available = balance.get('available_balance', 0)
total = balance.get('balance', 0)

if total > 0: # 只关注有余额的币种
self.trader.log(f"余额更新: {asset} 可用{available} 总计{total}")

# 风控检查
if available < total * 0.1: # 可用余额不足10%
self.trader.log(f"⚠️ {asset} 可用余额不足: {available}", level="WARN")

def on_position(self, exchange, positions):
"""持仓轮询回调

参数:
exchange: 交易所名称
positions: 持仓数据列表,每个元素的数据结构参见[7.2 Position(持仓)数据结构](#72-position持仓数据结构)
"""
for position in positions:
symbol = position.get('symbol')
amount = position.get('amount', 0)
side = position.get('side')
unrealized_pnl = position.get('unrealized_pnl', 0)

if abs(amount) > 0: # 只关注有持仓的交易对
self.trader.log(f"持仓检查: {symbol} {side} 数量{amount} 浮盈{unrealized_pnl}")

# 持仓风控
if unrealized_pnl < -1000: # 浮亏超过1000
self.trader.log(f"🚨 {symbol} 浮亏过大: {unrealized_pnl}", level="ERROR")

def on_funding(self, exchange, funding_rates):
"""资金费率轮询回调"""
for funding in funding_rates:
symbol = funding.get('symbol')
rate = funding.get('funding_rate', 0)
next_time = funding.get('next_funding_at', 0)

# 资金费率监控
if abs(rate) > 0.001: # 费率超过0.1%
rate_pct = rate * 100
self.trader.log(f"高资金费率: {symbol} {rate_pct:.3f}% 下次结算{next_time}")

def on_instrument(self, exchange, instruments):
"""合约信息轮询回调"""
for instrument in instruments:
symbol = instrument.get('symbol')
status = instrument.get('status')

# 监控交易对状态变化
if status != 'trading':
self.trader.log(f"⚠️ 交易对状态异常: {symbol} 状态为 {status}", level="WARN")

8.4 定时器订阅 (SubscribeTimer)

定时器订阅用于执行策略相关的定时任务,如定期检查订单状态、执行风控逻辑、统计数据更新等。不依赖外部数据源,完全由系统定时器驱动。

⏰ 定时器特点

  • 精确定时: 支持秒级和纳秒级精度
  • 多定时器: 可以设置多个不同名称的定时器
  • 独立执行: 不依赖交易所连接状态
  • 灵活配置: 可以动态调整执行间隔
  • 错开启动: 支持初始延迟,避免多个定时器同时启动造成资源竞争

🎯 定时器配置

基础配置格式:

{
"sub": {
"SubscribeTimer": {
"update_interval": {
"secs": 间隔秒数, # 定时器间隔(秒)
"nanos": 纳秒精度 # 纳秒精度(通常为0)
},
"name": "定时器名称", # 定时器标识名称
"initial_delay": { # 可选:初始延迟时间
"secs": 延迟秒数, # 启动前延迟(秒)
"nanos": 纳秒精度 # 纳秒精度(通常为0)
}
}
}
}

字段说明:

  • update_interval: 定时器执行间隔,必须字段
  • name: 定时器名称,用于标识和日志记录,必须字段
  • initial_delay: 初始延迟时间,可选字段,用于错开多个定时器的启动时间,避免资源竞争

initial_delay使用场景:

场景类型建议延迟时间原因说明示例
风控检查30-60秒等待数据稳定,避免启动时误报余额检查、持仓风控
统计计算错开15-60秒避免多个计算任务同时执行收益统计、交易分析
网络请求错开10-30秒分散API调用,避免频率限制外部数据获取
文件操作错开30-120秒避免同时写文件造成冲突日志轮转、数据备份
连接监控0秒(立即)需要尽快检测连接状态WebSocket状态检查

实际应用示例:

# 每30秒检查一次价格预警(立即启动)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 30, "nanos": 0},
"name": "price_alert_check"
}
}
}

# 每分钟更新一次策略统计(延迟10秒启动)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 60, "nanos": 0},
"name": "stats_update",
"initial_delay": {"secs": 10, "nanos": 0}
}
}
}

# 每10秒检查一次订单状态(延迟5秒启动)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 10, "nanos": 0},
"name": "order_status_check",
"initial_delay": {"secs": 5, "nanos": 0}
}
}
}

# 风控检查(延迟30秒启动,避免启动时误报)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 300, "nanos": 0},
"name": "risk_check",
"initial_delay": {"secs": 30, "nanos": 0}
}
}
}

# 对应的回调函数处理
def on_timer_subscribe(self, timer_name):
"""定时器回调处理"""
if timer_name == "price_alert_check":
self.check_price_alerts()
elif timer_name == "stats_update":
self.update_strategy_stats()
elif timer_name == "order_status_check":
self.check_pending_orders()
else:
self.trader.log(f"未知定时器: {timer_name}", level="WARN")

💡 定时器完整示例

订阅配置:

def subscribes(self):
"""定时器订阅配置示例"""
return [
# 策略监控定时器(每30秒,立即启动)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 30, "nanos": 0},
"name": "strategy_monitor"
}
}
},

# 风控检查定时器(每5分钟,延迟60秒启动避免启动时误报)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 300, "nanos": 0},
"name": "risk_check",
"initial_delay": {"secs": 60, "nanos": 0}
}
}
},

# 统计数据更新(每分钟,延迟15秒启动)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 60, "nanos": 0},
"name": "stats_update",
"initial_delay": {"secs": 15, "nanos": 0}
}
}
},

# 价格预警检查(每10秒,延迟5秒启动)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 10, "nanos": 0},
"name": "price_alert",
"initial_delay": {"secs": 5, "nanos": 0}
}
}
},

# 订单状态检查(每15秒,延迟10秒启动)
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 15, "nanos": 0},
"name": "order_check",
"initial_delay": {"secs": 10, "nanos": 0}
}
}
}
]

回调函数处理:

def on_timer_subscribe(self, timer_name):
"""定时器回调统一处理"""
try:
if timer_name == "strategy_monitor":
self.strategy_monitor()
elif timer_name == "risk_check":
self.risk_management_check()
elif timer_name == "stats_update":
self.update_strategy_statistics()
elif timer_name == "price_alert":
self.check_price_alerts()
elif timer_name == "order_check":
self.check_pending_orders()
else:
self.trader.log(f"未知定时器: {timer_name}", level="WARN")
except Exception as e:
self.trader.log(f"定时器{timer_name}执行异常: {e}", level="ERROR")

def strategy_monitor(self):
"""策略监控逻辑"""
# 检查策略运行状态
uptime = time.time() - self.start_time
self.trader.tlog(
"策略监控",
f"策略运行时间: {uptime:.0f}秒, 状态: 正常",
interval=300, # 5分钟记录一次
color="green"
)

# 检查WebSocket连接状态
if hasattr(self, 'last_data_time'):
time_since_data = time.time() - self.last_data_time
if time_since_data > 60: # 超过1分钟没有数据
self.trader.log("⚠️ WebSocket数据中断超过1分钟", level="WARN")

def risk_management_check(self):
"""风控检查"""
# 检查总持仓风险
total_exposure = self.calculate_total_exposure()
max_exposure = 100000 # 最大敞口100k

if total_exposure > max_exposure:
self.trader.log(
f"🚨 总敞口超限: {total_exposure} > {max_exposure}",
level="ERROR",
color="red"
)
# 执行风控措施
self.emergency_close_positions()

# 检查浮动盈亏
total_pnl = self.calculate_total_pnl()
if total_pnl < -5000: # 浮亏超过5k
self.trader.log(
f"🚨 浮亏过大: {total_pnl}",
level="ERROR",
color="red"
)

def update_strategy_statistics(self):
"""更新策略统计"""
# 计算成功率
if self.total_trades > 0:
win_rate = self.successful_trades / self.total_trades * 100
self.trader.tlog(
"策略统计",
f"交易次数: {self.total_trades}, 成功率: {win_rate:.1f}%",
interval=300,
color="blue"
)

# 更新到Web平台
if hasattr(self.trader, 'update_trade_stats'):
self.trader.update_trade_stats(
maker_volume=self.total_maker_volume,
taker_volume=self.total_taker_volume,
profit=self.total_profit
)

def check_price_alerts(self):
"""价格预警检查"""
for symbol in self.monitored_symbols:
current_price = self.get_current_price(symbol)
if current_price:
# 检查价格突破
if symbol in self.price_alerts:
alert = self.price_alerts[symbol]
if alert['type'] == 'above' and current_price > alert['price']:
self.trader.log(f"💡 {symbol} 突破预警价格 {alert['price']}", color="yellow")
elif alert['type'] == 'below' and current_price < alert['price']:
self.trader.log(f"💡 {symbol} 跌破预警价格 {alert['price']}", color="yellow")

def check_pending_orders(self):
"""检查挂单状态"""
# 检查长时间未成交的订单
for order_id, order_info in self.pending_orders.items():
order_age = time.time() - order_info['created_time']

if order_age > 600: # 超过10分钟未成交
self.trader.log(
f"⏰ 订单{order_id}长时间未成交: {order_age:.0f}秒",
level="WARN"
)

# 可以选择撤销长时间未成交的订单
if order_age > 1800: # 超过30分钟强制撤销
self.cancel_order(order_id)
self.trader.log(f"🗑️ 强制撤销超时订单: {order_id}")

8.5 综合订阅配置示例

以下是一个完整的策略订阅配置示例,展示如何同时使用三种数据源:

def subscribes(self):
"""综合订阅配置示例"""
return [
# 1. WebSocket实时数据订阅
{
"account_id": 0,
"sub": {
"SubscribeWs": [
# 市场行情数据
{
"Kline": {
"symbols": ["BTC_USDT", "ETH_USDT"],
"interval": "1m"
}
},
{
"Bbo": ["BTC_USDT", "ETH_USDT"]
},
{
"Depth": {
"symbols": ["BTC_USDT"],
"levels": 5
}
},
# 账户数据
{
"Order": ["BTC_USDT", "ETH_USDT"]
},
{
"Position": ["BTC_USDT", "ETH_USDT"]
}
]
}
},

# 2. REST轮询数据订阅
{
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {"secs": 5, "nanos": 0},
"rest_type": "Balance" # 每5秒更新余额
}
}
},

# 3. 定时器任务订阅
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 30, "nanos": 0},
"name": "strategy_monitor" # 每30秒执行策略监控
}
}
},
{
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 300, "nanos": 0},
"name": "risk_check" # 每5分钟执行风控检查
}
}
}
]

8.6 数据源选择与最佳实践

🎯 数据源选择指南

根据不同的应用场景,选择合适的数据源类型:

应用场景推荐数据源订阅频道更新频率理由
高频交易WebSocketBbo, Depth, Trade毫秒级需要极低延迟的实时数据
技术分析WebSocketKline, MarkPrice秒级K线完结状态重要,实时性要求高
套利策略WebSocketBbo, Funding秒-分钟级价差和资金费率变化敏感
风控监控REST轮询Balance, Position1-10秒定期检查即可,不需要实时
策略逻辑定时器Timer10秒-5分钟独立于市场数据的逻辑执行
账户管理WebSocket + RESTOrder + Balance混合实时订单状态 + 定期余额检查

✅ 最佳实践建议

1. 分层订阅策略

def subscribes(self):
"""分层订阅策略示例"""
return [
# 核心层:高频实时数据
{
"account_id": 0,
"sub": {"SubscribeWs": [
{"Bbo": self.trading_symbols}, # 交易信号生成
{"Order": self.trading_symbols} # 订单状态监控
]}
},

# 支撑层:中频数据
{
"account_id": 0,
"sub": {"SubscribeRest": {
"update_interval": {"secs": 10, "nanos": 0},
"rest_type": "Balance" # 余额监控
}}
},

# 管理层:低频定时任务
{
"sub": {"SubscribeTimer": {
"update_interval": {"secs": 60, "nanos": 0},
"name": "risk_management", # 风控检查
"initial_delay": {"secs": 30, "nanos": 0} # 延迟启动避免误报
}}
}
]

2. 数据处理优化

def on_bbo(self, exchange, symbol, bbo):
"""优化的BBO处理"""
try:
# 快速数据验证
if not self.validate_bbo_data(bbo):
return

# 核心交易逻辑(最小化处理时间)
signal = self.generate_trading_signal(symbol, bbo)
if signal:
self.execute_trade(signal)

except Exception as e:
# 错误处理不影响主流程
self.trader.log(f"BBO处理异常: {e}", level="ERROR")

def on_kline(self, exchange, kline):
"""K线数据处理最佳实践"""
for candle in kline['candles']:
# 关键:只处理已完结的K线
if candle['confirm']:
self.analyze_completed_candle(candle)
else:
# 未完结K线可用于实时监控,但不做技术分析
self.monitor_current_candle(candle)

3. 资源管理与性能优化

class PerformanceOptimizedStrategy:
def __init__(self):
# 数据缓存,避免重复计算
self.price_cache = {}
self.last_update_time = {}

def subscribes(self):
"""性能优化的订阅配置"""
# 只订阅必要的交易对
core_symbols = ["BTC_USDT", "ETH_USDT"] # 核心交易对
monitor_symbols = ["SOL_USDT", "BNB_USDT"] # 监控交易对

return [
# 核心交易对 - 高频订阅
{
"account_id": 0,
"sub": {"SubscribeWs": [
{"Bbo": core_symbols},
{"Kline": {"symbols": core_symbols, "interval": "1m"}}
]}
},

# 监控交易对 - 低频订阅
{
"account_id": 0,
"sub": {"SubscribeWs": [
{"Bbo": monitor_symbols}
]}
}
]

def on_bbo(self, exchange, symbol, bbo):
"""带缓存的BBO处理"""
current_time = time.time()
last_time = self.last_update_time.get(symbol, 0)

# 限制处理频率,避免过度计算
if current_time - last_time < 0.1: # 100ms内不重复处理
return

self.last_update_time[symbol] = current_time
self.process_bbo_data(symbol, bbo)

✅ 推荐配置模式

高频交易模式:

# 追求极低延迟,最小化订阅
["Bbo", "Order"] # 仅订阅必要数据

技术分析模式:

# 需要完整市场数据
["Kline", "Bbo", "Depth", "MarkPrice"]

稳健套利模式:

# 平衡实时性和稳定性
WebSocket: ["Bbo", "Funding", "Order", "Position"]
REST: ["Balance"] (5)
Timer: ["risk_check"] (每分钟,延迟30秒启动)

❌ 常见误区避免

1. 过度订阅

# ❌ 错误:订阅过多不必要的数据
["Bbo", "Depth", "Trade", "Kline", "MarkPrice", "Funding"] # 太多了

# ✅ 正确:只订阅策略需要的数据
["Bbo", "Order"] # 简单策略只需要这些

2. 忽略数据状态

# ❌ 错误:不检查K线完结状态
def on_kline(self, exchange, kline):
candle = kline['candles'][-1]
self.analyze_signal(candle) # 可能分析未完结K线

# ✅ 正确:检查K线状态
def on_kline(self, exchange, kline):
candle = kline['candles'][-1]
if candle['confirm']: # 只分析已完结的K线
self.analyze_signal(candle)

3. 阻塞回调函数

# ❌ 错误:在回调中执行耗时操作
def on_bbo(self, exchange, symbol, bbo):
time.sleep(0.1) # 阻塞其他数据处理
self.complex_calculation() # 复杂计算

# ✅ 正确:异步处理耗时操作
def on_bbo(self, exchange, symbol, bbo):
# 快速缓存数据
self.bbo_cache[symbol] = bbo
# 复杂处理放在定时器中进行

def on_timer_subscribe(self, timer_name):
if timer_name == "complex_analysis":
self.perform_complex_analysis()

4. 忽略初始延迟配置

# ❌ 错误:多个定时器同时启动,造成资源竞争
[
{"SubscribeTimer": {"update_interval": {"secs": 300, "nanos": 0}, "name": "stats"}},
{"SubscribeTimer": {"update_interval": {"secs": 300, "nanos": 0}, "name": "report"}},
{"SubscribeTimer": {"update_interval": {"secs": 300, "nanos": 0}, "name": "backup"}}
]

# ✅ 正确:使用初始延迟错开启动时间
[
{"SubscribeTimer": {"update_interval": {"secs": 300, "nanos": 0}, "name": "stats"}},
{"SubscribeTimer": {"update_interval": {"secs": 300, "nanos": 0}, "name": "report",
"initial_delay": {"secs": 60, "nanos": 0}}},
{"SubscribeTimer": {"update_interval": {"secs": 300, "nanos": 0}, "name": "backup",
"initial_delay": {"secs": 120, "nanos": 0}}}
]

5. 初始延迟最佳实践

def subscribes(self):
"""使用initial_delay的最佳实践"""
return [
# 立即启动的监控类定时器
{
"sub": {"SubscribeTimer": {
"update_interval": {"secs": 30, "nanos": 0},
"name": "connection_monitor" # 无延迟,立即监控连接状态
}}
},

# 延迟启动的风控类定时器
{
"sub": {"SubscribeTimer": {
"update_interval": {"secs": 60, "nanos": 0},
"name": "risk_check",
"initial_delay": {"secs": 60, "nanos": 0} # 延迟1分钟,等数据稳定
}}
},

# 错开启动的统计类定时器(避免同时执行)
{
"sub": {"SubscribeTimer": {
"update_interval": {"secs": 300, "nanos": 0},
"name": "stats_calc",
"initial_delay": {"secs": 30, "nanos": 0} # 错开30秒
}}
},
{
"sub": {"SubscribeTimer": {
"update_interval": {"secs": 300, "nanos": 0},
"name": "report_gen",
"initial_delay": {"secs": 90, "nanos": 0} # 错开90秒
}}
}
]

🔧 调试与监控

数据流监控:

def on_bbo(self, exchange, symbol, bbo):
# 统计数据接收频率
self.data_stats[symbol] = self.data_stats.get(symbol, 0) + 1

def on_timer_subscribe(self, timer_name):
if timer_name == "data_monitor":
for symbol, count in self.data_stats.items():
self.trader.tlog(
"数据统计",
f"{symbol}: {count}次/分钟",
interval=60
)
self.data_stats.clear()

通过合理的数据源选择和配置,可以构建高效、稳定的量化交易策略。

九、完整策略示例

以下是一个跨交易所套利策略示例,展示了如何使用Trader API v2的核心功能:

"""
arbitrage_strategy_v2.py - 跨交易所套利策略示例 (v2版本)
"""

import json
import time
import base_strategy

class ArbitrageStrategyV2(base_strategy.BaseStrategy):
def __init__(self, cex_configs, dex_configs, config, trader):
"""初始化策略"""
self.cex_configs = cex_configs
self.has_account = len(cex_configs) > 0
self.dex_configs = dex_configs
self.trader = trader
self.config = config or {
"send_to_web": True,
"min_spread": 1.0, # 最小开仓价差(‰)
"symbols": ["BTC_USDT", "ETH_USDT", "SOL_USDT"] # 监控的交易对
}

# 初始化数据
self.positions = {}
self.running = False
self.primary_balance = 10000.0 # 默认值
self.secondary_balance = 5000.0 # 默认值
self.start_time = time.time() # 策略启动时间
self.last_data_time = time.time() # 最后数据时间

# 加载缓存
self.load_state()

def _init_fee_rates(self):
"""查询费率信息"""
# 默认费率
self.primary_maker_fee = 0.0002
self.primary_taker_fee = 0.0005
self.secondary_maker_fee = 0.0002
self.secondary_taker_fee = 0.0005
self.primary_rebate_rate = 0
self.secondary_rebate_rate = 0

if not self.has_account or not self.config.get("symbols"):
return

symbol = self.config["symbols"][0]

# 查询主账户费率 - 使用v2格式
try:
result = self.trader.get_fee_rate(0, symbol)
if "Ok" in result:
fee_rate = result["Ok"]
self.trader.log(f"主账户费率: {fee_rate}", level="DEBUG", web=self.config.get("send_to_web", True))
self.primary_maker_fee = fee_rate.get('maker_fee_rate', 0.0002)
self.primary_taker_fee = fee_rate.get('taker_fee_rate', 0.0005)
self.primary_rebate_rate = self.cex_configs[0].get('rebate_rate', 0)
except Exception as e:
self.trader.log(f"获取主账户费率失败: {e}", level="WARN", web=self.config.get("send_to_web", True))

# 查询次账户费率
if len(self.cex_configs) > 1:
try:
result = self.trader.get_fee_rate(1, symbol)
if "Ok" in result:
fee_rate = result["Ok"]
self.trader.log(f"次账户费率: {fee_rate}", level="DEBUG", web=self.config.get("send_to_web", True))
self.secondary_maker_fee = fee_rate.get('maker_fee_rate', 0.0002)
self.secondary_taker_fee = fee_rate.get('taker_fee_rate', 0.0005)
self.secondary_rebate_rate = self.cex_configs[1].get('rebate_rate', 0)
except Exception as e:
self.trader.log(f"获取次账户费率失败: {e}", level="WARN", web=self.config.get("send_to_web", True))

# 计算成本,两边都用taker
self.cost = (self.primary_taker_fee + self.secondary_taker_fee) * 2

# 查询账户余额 - 使用v2格式
try:
result = self.trader.get_usdt_balance(0)
if "Ok" in result:
balance = result["Ok"]
self.primary_balance = balance.get('balance', 10000.0)
except Exception:
self.trader.log("获取主账户余额失败", level="WARN", web=self.config.get("send_to_web", True))

# 查询次账户余额
if len(self.cex_configs) > 1:
try:
result = self.trader.get_usdt_balance(1)
if "Ok" in result:
balance = result["Ok"]
self.secondary_balance = balance.get('balance', 5000.0)
except Exception:
self.trader.log("获取次账户余额失败", level="WARN", web=self.config.get("send_to_web", True))

def _init_web_client(self):
"""初始化Web客户端"""
web_config = {
"server_name": "跨所套利策略v2",
"primary_balance": self.primary_balance,
"secondary_balance": self.secondary_balance,
"is_production": True,
"open_threshold": self.config.get("min_spread", 1.0) / 10, # 转换为小数
"max_position_ratio": 80,
"max_leverage": 2,
"funding_rate_threshold": 0.1, # 资金费率阈值
"cost": self.cost, # 交易成本
"primary_maker_fee_rate": self.primary_maker_fee,
"primary_taker_fee_rate": self.primary_taker_fee,
"primary_rebate_rate": self.primary_rebate_rate,
"secondary_maker_fee_rate": self.secondary_maker_fee,
"secondary_taker_fee_rate": self.secondary_taker_fee,
"secondary_rebate_rate": self.secondary_rebate_rate
}
self.trader.init_web_client(web_config)

def name(self):
"""返回策略名称"""
return "跨所套利策略v2"

def load_state(self):
"""从缓存加载策略状态"""
cached_data = self.trader.cache_load()
if cached_data:
self.positions = cached_data.get("positions", {})
self.trader.log("已加载缓存状态", web=self.config.get("send_to_web", True))

def save_state(self):
"""保存策略状态到缓存"""
state = {"positions": self.positions, "last_update": int(time.time())}
self.trader.cache_save(state)

def start(self):
"""启动策略"""

# 初始化费率信息
self._init_fee_rates()

# 初始化Web客户端
if self.config.get("send_to_web", True):
self._init_web_client()
self.trader.start_web_client(upload_interval=5)

self.trader.log("启动跨所套利策略v2", web=self.config.get("send_to_web", True))

# 上传表格示例
self.upload_tables()

# tlog示例 频繁日志使用tlog限频
self.trader.tlog("价差", "BTC_USDT: 1.25‰, ETH_USDT: 2.10‰, SOL_USDT: 3.45‰", color="green")

# 初始化检查并启动交易
if self.has_account:
self.scan_opportunities() # 扫描套利机会
else:
self.trader.log("没有配置账户,无法交易", level="WARN")

def upload_tables(self):
"""上传表格示例方法"""

self.spread_table = {
"title": "价差",
"cols": ["交易对", "价差(‰)"],
"rows": [
["BTC_USDT", "1.25"],
["ETH_USDT", "2.10"],
["SOL_USDT", "3.45"]
]
}

self.position_table = {
"title": "持仓",
"cols": ["交易对", "持仓量", "持仓价值", "浮动盈亏", "持仓时间"],
"rows": [
["BTC_USDT", "1.25", "10000", "100", "2小时15分"],
["ETH_USDT", "2.10", "20000", "200", "1小时30分"],
["SOL_USDT", "3.45", "30000", "300", "3小时45分"]
]
}

# 上传表格
self.trader.upload_tables([self.spread_table, self.position_table])

def place_arbitrage_orders(self, symbol, spread):
"""下套利订单 - 使用v2 API"""
try:
# 获取当前价格
primary_result = self.trader.get_ticker(0, symbol)
secondary_result = self.trader.get_ticker(1, symbol)

if "Ok" not in primary_result or "Ok" not in secondary_result:
return False

primary_ticker = primary_result["Ok"]
secondary_ticker = secondary_result["Ok"]

primary_price = float(primary_ticker.get('price', 0))
secondary_price = float(secondary_ticker.get('price', 0))

if primary_price == 0 or secondary_price == 0:
return False

# 计算交易数量
amount = 0.01 # 示例数量

# 生成订单ID
primary_cid_result = self.trader.create_cid()
secondary_cid_result = self.trader.create_cid()

if "Ok" not in primary_cid_result or "Ok" not in secondary_cid_result:
return False

primary_cid = primary_cid_result["Ok"]
secondary_cid = secondary_cid_result["Ok"]

# 主账户做多,次账户做空
primary_order = {
"symbol": symbol,
"side": "Buy",
"order_type": "Market",
"amount": amount,
"cid": primary_cid
}

secondary_order = {
"symbol": symbol,
"side": "Sell",
"order_type": "Market",
"amount": amount,
"cid": secondary_cid
}

# 下单
primary_result = self.trader.place_order(0, primary_order, sync=True)
secondary_result = self.trader.place_order(1, secondary_order, sync=True)

if "Ok" in primary_result and "Ok" in secondary_result:
self.trader.log(f"套利订单已下达: {symbol}, 价差: {spread:.2f}‰", color="green")

# 记录交易统计 - 开仓时只记录交易量
stats_result = self.trader.update_trade_stats(0, amount * primary_price + amount * secondary_price, 0)

return True
else:
self.trader.log(f"套利订单失败: {symbol}", level="ERROR", color="red")
return False

except Exception as e:
self.trader.log(f"下单异常: {e}", level="ERROR", color="red")
return False

def scan_opportunities(self):
"""扫描套利机会"""
for symbol in self.config.get("symbols", []):
try:
# 获取双边价格
primary_result = self.trader.get_bbo(0, symbol)
secondary_result = self.trader.get_bbo(1, symbol)

if "Ok" not in primary_result or "Ok" not in secondary_result:
continue

primary_bbo = primary_result["Ok"]
secondary_bbo = secondary_result["Ok"]

if not primary_bbo or not secondary_bbo:
continue

primary_bid = float(primary_bbo.get('bid_price', 0))
primary_ask = float(primary_bbo.get('ask_price', 0))
secondary_bid = float(secondary_bbo.get('bid_price', 0))
secondary_ask = float(secondary_bbo.get('ask_price', 0))

if primary_bid == 0 or secondary_ask == 0:
continue

# 计算价差
spread = (primary_bid - secondary_ask) / secondary_ask * 1000 # 千分比

# 检查是否满足开仓条件
if spread > self.config.get("min_spread", 1.0):
self.trader.tlog(f"发现机会", f"{symbol}: {spread:.2f}‰", color="yellow", interval=10)

# 检查平台控制状态
if self.trader.is_web_opening_stopped():
self.trader.log("平台禁止开仓", level="WARN")
continue

if self.trader.is_web_force_closing():
self.trader.log("平台要求强平", level="ERROR")
break

# 下套利订单
self.place_arbitrage_orders(symbol, spread)

except Exception as e:
self.trader.log(f"扫描{symbol}异常: {e}", level="ERROR")

def analyze_market_trend(self, symbol):
"""分析市场趋势 - 使用K线数据"""
try:
# 获取1小时K线数据,用于趋势分析
result = self.trader.get_kline(
account_id=0,
symbol=symbol,
interval="1h",
limit=24 # 获取24小时数据
)

if "Ok" not in result:
return None

kline_data = result["Ok"]
if not kline_data or not kline_data.get('candles') or len(kline_data['candles']) < 10:
return None

candles = kline_data['candles']

# 计算简单移动平均线
prices = [float(candle['close']) for candle in candles[-10:]] # 最近10个小时的收盘价
sma_10 = sum(prices) / len(prices)

current_price = float(candles[-1]['close'])

# 判断趋势
if current_price > sma_10 * 1.002: # 价格高于均线0.2%
trend = "上涨"
trend_strength = (current_price - sma_10) / sma_10 * 100
elif current_price < sma_10 * 0.998: # 价格低于均线0.2%
trend = "下跌"
trend_strength = (sma_10 - current_price) / sma_10 * 100
else:
trend = "震荡"
trend_strength = 0

# 计算波动率
high_prices = [float(candle['high']) for candle in candles[-24:]]
low_prices = [float(candle['low']) for candle in candles[-24:]]
volatility = (max(high_prices) - min(low_prices)) / min(low_prices) * 100

trend_info = {
"symbol": symbol,
"trend": trend,
"strength": trend_strength,
"volatility": volatility,
"current_price": current_price,
"sma_10": sma_10
}

# 记录趋势信息
self.trader.tlog(
f"趋势分析_{symbol}",
f"{symbol}: {trend} {trend_strength:.2f}%, 波动率: {volatility:.2f}%",
color="blue",
interval=300 # 5分钟记录一次
)

return trend_info

except Exception as e:
self.trader.log(f"分析{symbol}趋势异常: {e}", level="ERROR")
return None

def get_support_resistance_levels(self, symbol):
"""获取支撑阻力位 - 使用K线数据"""
try:
# 获取日线数据用于计算支撑阻力位
result = self.trader.get_kline(
account_id=0,
symbol=symbol,
interval="1d",
limit=30 # 获取30天数据
)

if "Ok" not in result:
return None

kline_data = result["Ok"]
if not kline_data or not kline_data.get('candles') or len(kline_data['candles']) < 20:
return None

candles = kline_data['candles']

# 提取高点和低点
highs = [float(candle['high']) for candle in candles]
lows = [float(candle['low']) for candle in candles]

# 计算支撑位和阻力位(简化算法)
resistance_level = max(highs[-10:]) # 最近10天最高价作为阻力位
support_level = min(lows[-10:]) # 最近10天最低价作为支撑位

current_price = float(candles[-1]['close'])

# 计算距离支撑阻力位的百分比
resistance_distance = (resistance_level - current_price) / current_price * 100
support_distance = (current_price - support_level) / current_price * 100

levels_info = {
"symbol": symbol,
"resistance": resistance_level,
"support": support_level,
"current_price": current_price,
"resistance_distance": resistance_distance,
"support_distance": support_distance
}

self.trader.tlog(
f"支撑阻力_{symbol}",
f"{symbol}: 阻力位 {resistance_level:.2f} (+{resistance_distance:.2f}%), "
f"支撑位 {support_level:.2f} (-{support_distance:.2f}%)",
color="purple",
interval=600 # 10分钟记录一次
)

return levels_info

except Exception as e:
self.trader.log(f"计算{symbol}支撑阻力位异常: {e}", level="ERROR")
return None

def on_stop(self):
"""停止策略"""
self.save_state()
if self.config.get("send_to_web", True):
self.trader.stop_web_client()
self.trader.log("策略已停止", web=self.config.get("send_to_web", True))

def subscribes(self):
"""返回订阅列表 - 使用三种数据源"""
subscriptions = []
if self.has_account:
symbols = self.config.get("symbols", ["BTC_USDT", "ETH_USDT"])

# 1. WebSocket实时订阅 - 高频数据
subscriptions.append({
"account_id": 0,
"sub": {"SubscribeWs": [
{"Bbo": symbols}, # 实时最佳买卖价
{"Order": symbols}, # 订单状态更新
{"Position": symbols}, # 持仓更新
{"Funding": symbols} # 资金费率更新
]}
})

# 订阅次账户信息(如果有)
if len(self.cex_configs) > 1:
subscriptions.append({
"account_id": 1,
"sub": {"SubscribeWs": [
{"Order": symbols},
{"Position": symbols}
]}
})

# 2. REST轮询订阅 - 定期数据
subscriptions.append({
"account_id": 0,
"sub": {"SubscribeRest": {
"update_interval": {"secs": 10, "nanos": 0},
"rest_type": "Balance" # 每10秒检查余额
}}
})

# 3. 定时器订阅 - 策略逻辑
subscriptions.extend([
{
"sub": {"SubscribeTimer": {
"update_interval": {"secs": 60, "nanos": 0},
"name": "risk_monitor", # 每分钟风控检查
"initial_delay": {"secs": 30, "nanos": 0} # 延迟30秒启动
}}
},
{
"sub": {"SubscribeTimer": {
"update_interval": {"secs": 300, "nanos": 0},
"name": "stats_update", # 每5分钟统计更新
"initial_delay": {"secs": 45, "nanos": 0} # 延迟45秒启动
}}
}
])

return subscriptions

def on_bbo(self, exchange, symbol, bbo):
"""BBO价格更新回调"""
try:
# 记录最新数据时间
self.last_data_time = time.time()

# 获取买卖价
bid_price = bbo.get('bid_price', 0)
ask_price = bbo.get('ask_price', 0)

if bid_price > 0 and ask_price > 0:
# 计算价差
spread = (ask_price - bid_price) / bid_price * 1000 # 千分比

# 检查套利机会
if spread > self.config.get("min_spread", 1.0):
self.trader.tlog(
f"套利机会_{symbol}",
f"{exchange} {symbol}: 价差 {spread:.2f}‰",
color="green",
interval=30
)

# 检查平台控制状态
if not self.trader.is_web_opening_stopped():
self.place_arbitrage_orders(symbol, spread)

except Exception as e:
self.trader.log(f"BBO处理异常 {symbol}: {e}", level="ERROR")

def on_timer_subscribe(self, timer_name):
"""定时器回调处理"""
try:
if timer_name == "risk_monitor":
self.risk_monitor()
elif timer_name == "stats_update":
self.update_stats()
else:
self.trader.log(f"未知定时器: {timer_name}", level="WARN")
except Exception as e:
self.trader.log(f"定时器{timer_name}异常: {e}", level="ERROR")

def risk_monitor(self):
"""风控监控"""
try:
# 检查总持仓
if hasattr(self, 'positions'):
total_exposure = sum(abs(pos.get('notional_value', 0)) for pos in self.positions.values())
if total_exposure > 50000: # 总敞口超过5万
self.trader.log(f"⚠️ 总敞口过大: {total_exposure}", level="WARN", color="red")

# 检查连接状态
if hasattr(self, 'last_data_time'):
time_since_data = time.time() - self.last_data_time
if time_since_data > 120: # 超过2分钟没有数据
self.trader.log("⚠️ 数据连接可能中断", level="WARN", color="yellow")

except Exception as e:
self.trader.log(f"风控检查异常: {e}", level="ERROR")

def update_stats(self):
"""更新统计数据"""
try:
# 更新表格数据
self.upload_tables()

# 分析市场趋势
for symbol in self.config.get("symbols", []):
trend_info = self.analyze_market_trend(symbol)
if trend_info:
self.trader.tlog(
f"趋势_{symbol}",
f"{symbol}: {trend_info['trend']} {trend_info['strength']:.2f}%",
interval=300,
color="blue"
)

except Exception as e:
self.trader.log(f"统计更新异常: {e}", level="ERROR")

def adjust_strategy_based_on_analysis(self, symbol, trend_info, levels_info):
"""根据技术分析调整策略"""
try:
current_price = trend_info['current_price']
volatility = trend_info['volatility']
trend = trend_info['trend']

# 根据波动率调整最小价差要求
if volatility > 5.0: # 高波动率
adjusted_min_spread = self.config.get("min_spread", 1.0) * 1.5
self.trader.tlog(
f"策略调整_{symbol}",
f"{symbol}: 高波动率{volatility:.2f}%, 调整最小价差至{adjusted_min_spread:.2f}‰",
color="orange",
interval=600
)
elif volatility < 1.0: # 低波动率
adjusted_min_spread = self.config.get("min_spread", 1.0) * 0.8
self.trader.tlog(
f"策略调整_{symbol}",
f"{symbol}: 低波动率{volatility:.2f}%, 调整最小价差至{adjusted_min_spread:.2f}‰",
color="cyan",
interval=600
)

# 根据趋势调整仓位大小
if trend == "上涨" and trend_info['strength'] > 2.0:
self.trader.tlog(
f"趋势信号_{symbol}",
f"{symbol}: 强势上涨{trend_info['strength']:.2f}%, 可考虑增加多头仓位",
color="green",
interval=300
)
elif trend == "下跌" and trend_info['strength'] > 2.0:
self.trader.tlog(
f"趋势信号_{symbol}",
f"{symbol}: 强势下跌{trend_info['strength']:.2f}%, 可考虑增加空头仓位",
color="red",
interval=300
)

except Exception as e:
self.trader.log(f"调整{symbol}策略异常: {e}", level="ERROR")

def check_kline_signals(self, symbol):
"""检查K线信号 - 利用K线完结状态"""
try:
# 获取最新的5分钟K线数据
result = self.trader.get_kline(
account_id=0,
symbol=symbol,
interval="5m",
limit=3 # 获取最近3根K线
)

if "Ok" not in result:
return None

kline_data = result["Ok"]
if not kline_data or not kline_data.get('candles') or len(kline_data['candles']) < 3:
return None

candles = kline_data['candles']

# 检查最新K线是否已完结
latest_candle = candles[-1]
if not latest_candle['confirm']:
# 最新K线未完结,不进行信号判断
return None

# 分析最近3根已完结的K线
prev_candle = candles[-2]
current_candle = candles[-1]

# 检查突破信号(考虑成交量放大)
volume_amplification = current_candle['volume'] / prev_candle['volume'] if prev_candle['volume'] > 0 else 1
if (current_candle['close'] > prev_candle['high'] and volume_amplification > 1.5):

signal = {
"type": "突破信号",
"direction": "看涨",
"price": current_candle['close'],
"volume": current_candle['volume'],
"timestamp": current_candle['timestamp']
}

self.trader.tlog(
f"K线信号_{symbol}",
f"{symbol}: 检测到向上突破信号,价格: {current_candle['close']}, "
f"成交量放大: {volume_amplification:.2f}倍",
color="green",
interval=60
)

return signal

# 检查下跌信号
elif (current_candle['close'] < prev_candle['low'] and volume_amplification > 1.5):

signal = {
"type": "突破信号",
"direction": "看跌",
"price": current_candle['close'],
"volume": current_candle['volume'],
"timestamp": current_candle['timestamp']
}

self.trader.tlog(
f"K线信号_{symbol}",
f"{symbol}: 检测到向下突破信号,价格: {current_candle['close']}, "
f"成交量放大: {volume_amplification:.2f}倍",
color="red",
interval=60
)

return signal

return None

except Exception as e:
self.trader.log(f"检查{symbol}K线信号异常: {e}", level="ERROR")
return None

def on_order(self, account_id, context, order):
"""订单回调处理"""
# 处理订单状态更新
symbol = order.get('symbol')
status = order.get('status')
side = order.get('side')

if status == 'Filled':
self.trader.log(f"订单成交: {symbol} {side}", color="green")

# 更新持仓信息到Web平台
self.update_position_display()

def on_position(self, account_id, positions):
"""持仓回调处理

参数:
account_id: 账户ID
positions: 持仓数据列表,每个元素的数据结构参见[7.2 Position(持仓)数据结构](#72-position持仓数据结构)
"""
# 更新持仓信息
self.update_position_display()

def on_balance(self, exchange, balances):
"""余额轮询回调处理"""
try:
for balance in balances:
asset = balance.get('asset')
available = balance.get('available_balance', 0)
total = balance.get('balance', 0)

if asset == 'USDT' and total > 0:
# 更新主要账户余额
if exchange == self.cex_configs[0].get('exchange'):
self.primary_balance = total
elif len(self.cex_configs) > 1 and exchange == self.cex_configs[1].get('exchange'):
self.secondary_balance = total

# 余额风控检查
if available < total * 0.1: # 可用余额不足10%
self.trader.log(f"⚠️ {asset} 可用余额不足: {available}/{total}", level="WARN")

except Exception as e:
self.trader.log(f"余额处理异常: {e}", level="ERROR")

def update_position_display(self):
"""更新持仓显示"""
try:
# 获取所有持仓
primary_result = self.trader.get_positions(0)
primary_positions = primary_result["Ok"] if "Ok" in primary_result else []

secondary_positions = []
if len(self.cex_configs) > 1:
secondary_result = self.trader.get_positions(1)
secondary_positions = secondary_result["Ok"] if "Ok" in secondary_result else []

# 计算持仓价值
total_value = 0
long_value = 0
short_value = 0

for pos in primary_positions + secondary_positions:
value = float(pos.get('notional_value', 0))
side = pos.get('side', '')

total_value += abs(value)
if side.lower() == 'long':
long_value += value
else:
short_value += abs(value)

# 更新到Web平台
self.trader.update_current_position_value(total_value, long_value, short_value)

except Exception as e:
self.trader.log(f"更新持仓显示失败: {e}", level="ERROR")

📝 更新日志

v2.3.0 (2025年6月24日)

  • 新增API方法:
    • post_stop_order: 下止损订单,支持止损单和止盈单
    • get_stop_orders: 获取止损订单列表,支持时间范围查询
    • amend_stop_order: 修改止损订单,支持修改触发价格、执行价格和数量
    • cancel_stop_order: 取消止损订单,支持通过订单ID或客户端ID取消
  • 功能增强:
    • 完整的止损订单管理体系,涵盖创建、查询、修改、取消全流程
    • 基于CloseTrigger结构体的高级止损订单配置
    • 支持三种触发价格类型:标记价格、成交价格、指数价格
    • 支持追踪止盈(Trailing Take Profit)功能
    • 支持部分止盈/止损,实现分批平仓策略
    • 支持市价和限价两种执行方式
    • 支持双向持仓模式下的止损订单管理
  • 结构体完善:
    • 详细的CloseTrigger结构体说明和字段解释
    • TriggerPrice枚举:支持MarkPrice、ContractPrice、IndexPrice
    • TriggerAction结构:支持数量控制、追踪间隙、执行类型等
    • ExecuteType枚举:Market市价执行和Limit限价执行
    • ActionType枚举:Normal一般模式和Trailing追踪模式
  • 文档完善:
    • 根据实际Rust结构体定义更新所有止损订单相关文档
    • 明确标记所有止损订单方法的必需参数,提高API使用的准确性
    • 提供基于CloseTrigger结构的完整使用示例
    • 添加追踪止盈、部分平仓等高级功能示例
    • 完善止损订单最佳实践指南,包含触发类型选择建议和参数要求说明
    • 新增错误处理指南,说明缺失必需参数时的错误信息格式

v2.2.0 (2025年6月24日)

  • 新增功能:
    • 定时器订阅新增initial_delay字段,支持设置启动前的初始延迟时间,避免资源竞争
    • WebSocket支持Kline(K线)数据订阅,可订阅多种时间周期的实时K线数据
    • 新增get_funding_rate_historyget_kline接口,支持历史数据查询
    • 资金费率结构体新增字段:min_funding_rate(最小值)、max_funding_rate(最大值)
    • 完整的数据源订阅体系:WebSocket、REST轮询、定时器三种订阅方式
  • 文档重构:
    • 重构"数据源订阅"章节,将三种订阅类型(WebSocket、REST、定时器)并列展示
    • 每种订阅类型都提供完整的配置示例和回调处理代码
    • 新增综合订阅配置示例,展示如何组合使用三种数据源
    • 新增数据源选择指南和最佳实践,帮助用户根据场景选择合适的订阅方式
    • 完善资金费率和K线相关API文档,包含新增字段的详细描述和使用示例
    • 添加初始延迟配置指南和定时器最佳实践

v2.1.2 (2025年6月24日)

  • 新增API方法:
    • get_funding_rate_history: 获取资金费率历史记录,支持按交易对和时间范围查询
  • 功能增强:
    • 支持获取单个或全部交易对的资金费率历史
    • 支持指定时间范围或获取最近N条记录
    • 提供完整的使用示例和数据处理方案

v2.1.1 (2025年6月18日)

  • 缓存API重构: 根据最新代码实现重构缓存管理部分
    • 移除了cache_update方法(代码中已不存在)
    • 更新cache_savecache_load的使用说明
    • cache_save现在直接支持Python对象,无需手动JSON序列化
    • cache_load直接返回Python对象,无需手动JSON反序列化
    • 添加了更完整的缓存使用示例和最佳实践
  • 文档准确性提升: 确保文档描述与实际代码实现完全一致

v2.1.0 (2025年5月27)

  • K线数据结构完善: 根据最新的Rust结构体定义完善了K线数据结构说明
  • 新增字段支持:
    • trades: 成交笔数(可选字段)
    • taker_buy_volume: taker买入成交量(可选字段)
    • taker_buy_quote_volume: taker买入成交额(可选字段)
  • 增强示例代码: 提供了更完整的K线数据处理示例和最佳实践
  • 安全处理指南: 添加了可选字段的安全处理方法
  • 新增API方法:
    • get_funding_fee: 查询历史资金费用记录
    • get_fee_discount_info: 获取费用折扣信息
    • is_fee_discount_enabled: 检查费用折扣状态
    • set_fee_discount_enabled: 设置费用折扣状态
    • get_borrow_limit: 查询借贷限额
    • get_deposit_address: 获取指定币种的充值地址
    • withdrawal: 提币到外部地址或内部转账
  • API完整性验证: 对照最新代码结构,确保所有公开API方法均已包含在文档中

v2.0.0 (2025年5月26日)

  • 初始版本发布
  • 完整的API文档和示例

文档版本: 2.3.0

最后更新: 2025年6月24日