Web可视化
📌 概述
Web可视化功能提供了与NB8 Web平台的集成,使策略能够实时展示交易数据、统计信息和图表。通过Web可视化,您可以:
- 实时监控策略运行状态
- 查看交易统计和盈亏数据
- 展示市场数据和持仓信息
- 接收平台控制指令
- 管理策略风险
📑 API分类索引
| 分类 | 功能模块 | 主要用途 |
|---|---|---|
| 一、Web客户端管理 | Web客户端初始化、启动和停止 | 管理Web客户端生命周期 |
| 二、策略状态检查 | 平台控制指令检查 | 响应平台控制指令 |
| 三、账户与余额数据 | 余额和资金费管理 | 更新账户状态 |
| 四、持仓与盈亏数据 | 持仓价值和盈亏统计 | 展示持仓和盈亏信息 |
| 五、交易统计数据 | 交易量、胜率等统计 | 展示交易表现 |
| 六、数据可视化 | 表格数据上传 | 展示结构化数据 |
| 七、账户强停功能 | 强停状态管理 | 控制策略执行 |
完整示例代码请参考示例代码
一、Web客户端管理
1.1 Web客户端初始化
🚀 init_web_client(config)
功能: 创建并初始化WebClient,用于与Web平台通信
参数:
config: WebClient配置对象,包含以下可选字段:server_name: 字符串,策略服务器名称,用于在Web平台上标识该策略primary_balance: 浮点数,主账户初始余额,用于计算策略收益率。仅在第一次实盘初始化时需要传递,后续启动会从缓存中读取secondary_balance: 浮点数,次账户初始余额,用于双账户策略。仅在第一次实盘初始化时需要传递,后续启动会从缓存中读取open_threshold: 浮点数,开仓阈值,例如0.14表示千1.4价差开仓max_position_ratio: 浮点数,单个币种最大持仓比例,例如100表示100%,1x杠杆max_leverage: 整数,最大可用杠杆倍数funding_rate_threshold: 浮点数,资金费率阈值,例如0.1表示千1资金费率开仓cost: 浮点数,成本,主所开平 副所开平,4次交易手续费,如0.0824 代表万8.24primary_maker_fee_rate: 浮点数,主所maker手续费率primary_taker_fee_rate: 浮点数,主所taker手续费率primary_rebate_rate: 浮点数,主所返佣率secondary_maker_fee_rate: 浮点数,次所maker手续费率secondary_taker_fee_rate: 浮点数,次所taker手续费率secondary_rebate_rate: 浮点数,次所返佣率
返回值: 无
示例:
# 初始化Web客户端
web_config = {
"server_name": "BTC-ETH套利策略",
"primary_balance": 10000.0, # 主账户初始余额
"secondary_balance": 5000.0, # 次账户初始余额
"open_threshold": 0.14, # 开仓阈值 (千1.4价差)
"max_position_ratio": 100, # 最大持仓比例 (100%)
"max_leverage": 3, # 最大杠杆倍数
"funding_rate_threshold": 0.1, # 资金费率阈值 (千1)
"cost": 0.0824, # 交易成本 (万8.24)
"primary_maker_fee_rate": 0.0002, # 主所maker费率
"primary_taker_fee_rate": 0.0005, # 主所taker费率
"primary_rebate_rate": 0.3, # 主所返佣率
"secondary_maker_fee_rate": 0.0002, # 次所maker费率
"secondary_taker_fee_rate": 0.0005, # 次所taker费率
"secondary_rebate_rate": 0.7, # 次所返佣率
}
trader.init_web_client(web_config)
1.2 Web客户端控制
▶️ start_web_client(upload_interval=None)
功能: 启动WebClient,开始定期向Web平台上传数据
参数:
upload_interval: 上传间隔(秒,s),默认为5秒。此参数控制向Web平台发送数据的频率。
返回值: 无
示例:
# 启动Web客户端,设置10秒上传间隔
trader.start_web_client(upload_interval=10)
⏹️ stop_web_client()
功能: 停止WebClient,确保最后一批数据上传
参数: 无
返回值: 无
示例:
# 策略结束时停止Web客户端
trader.stop_web_client()
二、策略状态检查
这些方法用于检查NB8 Web平台下发的策略控制指令,可用于策略程序中实现对平台实时指令的响应。
🚫 is_web_soft_stopped()
功能: 检查平台是否下发了缓停指令
参数: 无
返回值:
- 布尔值,为true时表示平台要求策略缓慢停止交易
说明:
- 此状态表示平台要求策略在合适的时机停止交易
- 策略可以完成当前交易流程,但不应开始新的交易周期
🔒 is_web_opening_stopped()
功能: 检查平台是否下发了停止开仓指令
参数: 无
返回值:
- 布尔值,为true时表示平台要求策略停止开新仓
说明:
- 此状态表示平台要求策略不再开设新的仓位
- 策略可以继续管理现有仓位,包括平仓操作
🔓 is_web_force_closing()
功能: 检查平台是否下发了强平指令
参数: 无
返回值:
- 布尔值,为true时表示平台要求策略强制平仓
说明:
- 此状态表示平台要求策略立即平掉所有持仓
- 策略应在检测到此状态为true时执行平仓逻辑
示例:
# 检查平台控制指令
def check_platform_control():
# 检查是否需要缓停交易
if trader.is_web_soft_stopped():
print("平台已发出缓停指令,将不再开始新的交易周期")
return "soft_stop"
# 检查是否禁止开仓
if trader.is_web_opening_stopped():
print("平台已禁止开仓,仅允许管理现有仓位")
return "no_open"
# 检查是否需要强制平仓
if trader.is_web_force_closing():
print("平台已发出强平指令,执行紧急平仓流程")
close_all_positions()
return "force_close"
return "normal"
# 交易主循环
def trading_loop():
import time
while True:
# 检查平台状态
status = check_platform_control()
if status == "force_close":
# 等待平仓完成后退出
break
elif status == "soft_stop":
# 完成当前周期后退出
time.sleep(60)
break
elif status == "no_open":
# 只管理现有仓位,不开新仓
manage_positions()
else:
# 正常交易
scan_opportunities()
time.sleep(5)
三、账户与余额数据
💰 update_total_balance(primary_balance, secondary_balance=None, available_primary=None, available_secondary=None)
功能: 更新账户余额信息到Web平台
参数:
primary_balance: 主账户总余额secondary_balance: 次账户总余额,可选available_primary: 主账户可用余额,可选available_secondary: 次账户可用余额,可选
返回值: 无
示例:
# 更新账户余额信息
def update_balance_info():
# 查询主账户余额
primary_cmd = {
"account_id": 0,
"method": "UsdtBalance",
"sync": True
}
primary_result = trader.publish(primary_cmd)
primary_balance = primary_result.get("balance", 0)
available_primary = primary_result.get("available", 0)
# 查询次账户余额
secondary_cmd = {
"account_id": 1,
"method": "UsdtBalance",
"sync": True
}
secondary_result = trader.publish(secondary_cmd)
secondary_balance = secondary_result.get("balance", 0)
available_secondary = secondary_result.get("available", 0)
# 更新到Web平台
trader.update_total_balance(
primary_balance,
secondary_balance,
available_primary,
available_secondary
)
print(f"已更新余额信息 - 主账户: {primary_balance} (可用: {available_primary}), 次账户: {secondary_balance} (可用: {available_secondary})")
💱 add_funding_fee(primary_fee=None, secondary_fee=None)
功能: 添加已结算的资金费用记录到Web平台
参数:
primary_fee: 主账户已结算资金费,可选secondary_fee: 次账户已结算资金费,可选
返回值: 无
💱 update_pred_funding(primary_fee=None, secondary_fee=None)
功能: 更新未结算的预测资金费用记录到Web平台
参数:
primary_fee: 主账户未结算预测资金费,可选secondary_fee: 次账户未结算预测资金费,可选
返回值: 无
示例:
# 记录资金费
def record_funding_fee():
# 添加已结算的资金费
trader.add_funding_fee(primary_fee=-1.25, secondary_fee=-0.85)
# 更新未结算的预测资金费
trader.update_pred_funding(primary_fee=-0.33, secondary_fee=-0.17)
print("已记录资金费信息")
# 资金费率定时结算处理函数示例
def handle_funding_settlement():
import time
# 获取已结算的资金费数据(示例)
# 实际使用时应从交易所API获取真实数据
primary_settled_fee = -2.35
secondary_settled_fee = -1.15
# 记录已结算资金费
trader.add_funding_fee(
primary_fee=primary_settled_fee,
secondary_fee=secondary_settled_fee
)
# 获取未结算的预测资金费(示例)
primary_predicted_fee = -0.45
secondary_predicted_fee = -0.21
# 更新未结算预测资金费
trader.update_pred_funding(
primary_fee=primary_predicted_fee,
secondary_fee=secondary_predicted_fee
)
# 记录结算时间
settlement_time = int(time.time())
print(f"在 {settlement_time} 处理了资金费结算")
四、持仓与盈亏数据
📈 update_total_position_value(total_value, long_position_value, short_position_value)
功能: 更新所有节点持仓价值信息到Web平台
参数:
total_value: 所有节点总持仓价值long_position_value: 所有节点多头持仓价值short_position_value: 所有节点空头持仓价值
返回值: 无
📈 update_current_position_value(total_value, long_position_value, short_position_value)
功能: 更新当前节点持仓价值信息到Web平台
参数:
total_value: 当前节点总持仓价值long_position_value: 当前节点多头持仓价值short_position_value: 当前节点空头持仓价值
返回值: 无
📊 update_floating_profit(floating_profit)
功能: 更新浮动盈亏信息到Web平台
参数:
floating_profit: 浮动盈亏金额
返回值: 无
💹 log_profit(profit)
功能: 上传利润,前端实盘页用来展示利润曲线
参数:
profit: 利润金额
返回值: 无
示例:
# 更新持仓信息
def update_position_info():
# 计算当前节点持仓价值
current_total = 15000.0
current_long = 10000.0
current_short = 5000.0
trader.update_current_position_value(current_total, current_long, current_short)
# 更新所有节点总持仓价值
total_value = 25000.0
total_long = 15000.0
total_short = 10000.0
trader.update_total_position_value(total_value, total_long, total_short)
# 更新浮动盈亏
floating_profit = 320.5
trader.update_floating_profit(floating_profit)
# 记录策略利润曲线数据点
profit = 1580.75
trader.log_profit(profit)
print("已更新持仓与盈亏信息")
# 从实际持仓数据计算持仓价值
def calculate_position_values(positions):
total_value = 0
long_value = 0
short_value = 0
# 获取当前市场价格(示例)
# 实际使用时应从交易所API获取最新价格
market_prices = {
"BTC_USDT": 51200.5,
"ETH_USDT": 3250.8,
"SOL_USDT": 115.25
}
# 计算持仓价值
for pos in positions:
symbol = pos["symbol"]
amount = pos["amount"]
side = pos["side"]
# 获取当前价格
current_price = market_prices.get(symbol, 0)
if current_price == 0:
continue
# 计算持仓价值
position_value = amount * current_price
total_value += position_value
# 根据方向累加多空持仓价值
if side == "long":
long_value += position_value
else:
short_value += position_value
# 更新到Web平台
trader.update_current_position_value(total_value, long_value, short_value)
# 返回计算结果
return {
"total": total_value,
"long": long_value,
"short": short_value
}
五、交易统计数据
📉 update_trade_stats(maker_volume, taker_volume, profit, is_single_close=False)
功能: 更新交易统计数据到Web平台
参数:
maker_volume: 挂单量 不传默认0taker_volume: 吃单量 不传默认0profit: 总利润。注意: 未平仓时传入0,此时系统只会统计交易量,不会统计盈利次数is_single_close: 是否单腿平仓,默认False
💡 提示: 当策略仅开仓但未平仓时,应调用此方法并将
profit参数设为0。这样系统会记录交易量,但不会增加盈利/亏损次数统计。当交易完全结束(开仓并平仓)后,再传入实际的盈亏金额进行完整统计。
返回值:
- 总利润
示例:
# 开仓时,只记录交易量,不计算盈亏
def record_opening_trade(maker_vol, taker_vol):
result = trader.update_trade_stats(maker_vol, taker_vol, 0)
if "Ok" in result:
total_profit = result["Ok"]
print(f"已记录开仓交易量,不计入盈亏,当前总利润: {total_profit}")
return total_profit
else:
error = result.get("Err", "未知错误")
print(f"记录开仓交易失败: {error}")
return None
# 平仓时,记录交易量并更新盈亏
def record_closing_trade(maker_vol, taker_vol, profit, is_single=False):
result = trader.update_trade_stats(maker_vol, taker_vol, profit, is_single)
if "Ok" in result:
total_profit = result["Ok"]
print(f"已记录平仓交易,利润: {profit},当前总利润: {total_profit}")
return total_profit
else:
error = result.get("Err", "未知错误")
print(f"记录平仓交易失败: {error}")
return None
# 实际使用示例
def handle_trade_execution(trade_type, volumes, profit=None):
maker_volume = volumes.get("maker", 0)
taker_volume = volumes.get("taker", 0)
if trade_type == "open":
# 开仓只记录交易量
record_opening_trade(maker_volume, taker_volume)
elif trade_type == "close":
# 平仓记录交易量和盈亏
record_closing_trade(maker_volume, taker_volume, profit or 0)
elif trade_type == "single_close":
# 单腿平仓
record_closing_trade(maker_volume, taker_volume, profit or 0, True)
📊 get_stats()
功能: 获取当前完整的统计数据
参数: 无
返回值:
- 包含统计数据的Python字典,使用原始字段名(而非序列化别名)
返回字段说明:
-
time: 起始时间 -
server_name: 服务器名称 -
账户余额信息
initial_balance: 主所起始余额current_balance: 主所当前余额secondary_initial_balance: 次所初始余额secondary_current_balance: 次所实时余额available_balance: 主所可用余额secondary_available_balance: 次所可用余额
-
交易统计
volume: 累计交易量maker_volume: 累计maker交易量taker_volume: 累计taker交易量count: 总套利次数win_rate: 胜率total_profit: 总利润success_count: 成功次数success_profit: 成功利润failure_count: 失败次数failure_loss: 失败亏损unrealized_pnl: 未实现盈亏single_close_count: 单腿平仓次数single_close_profit: 单腿平仓金额
-
持仓和杠杆信息
total_position_value: 所有节点持仓杠杆total_long_position_value: 所有节点多仓杠杆total_short_position_value: 所有节点空仓杠杆current_position_value: 当前节点持仓杠杆current_long_position_value: 当前节点多仓杠杆current_short_position_value: 当前节点空仓杠杆
-
资金费率与回报率
funding_fee: 当日已结算的资金费总额primary_funding_fee: 当日已结算的主所资金费secondary_funding_fee: 当日已结算的次所资金费
-
交易参数
cost: 成本open_threshold: 开仓阈值,如0.14代表千1.4价差开仓funding_rate_threshold: 资金费率阈值,如0.1代表千1资金费率开仓max_position_ratio: 单个币种最大持仓比例,如100代表100%,1x杠杆fund_transfer: 资金划转max_leverage: 最大可开杠杆primary_maker_fee_rate: 主所maker手续费率primary_taker_fee_rate: 主所taker手续费率primary_rebate_rate: 主所返佣率secondary_maker_fee_rate: 次所maker手续费率secondary_taker_fee_rate: 次所taker手续费率secondary_rebate_rate: 次所返佣率
-
今日数据
today: 今日统计数据对象,包含以下字段:time: 当日起始时间戳initial_balance: 当日初始主所余额current_balance: 当日当前主所余额available_balance: 当日主所可用余额secondary_initial_balance: 当日初始次所余额secondary_current_balance: 当日当前次所余额secondary_available_balance: 当日次所可用余额volume: 当日交易量maker_volume: 当日maker交易量taker_volume: 当日taker交易量count: 当日套利次数win_rate: 当日胜率total_profit: 当日总利润success_count: 当日成功次数success_profit: 当日成功利润failure_count: 当日失败次数failure_loss: 当日失败亏损funding_fee: 未结算的预测资金费总额primary_funding_fee: 未结算的预测主所资金费secondary_funding_fee: 未结算的预测次所资金费
示例:
# 获取并分析统计数据
def analyze_trading_stats():
# 获取完整统计数据
result = trader.get_stats()
if "Ok" in result:
stats = result["Ok"]
else:
error = result.get("Err", "未知错误")
print(f"获取统计数据失败: {error}")
return None
# 提取关键指标
server_name = stats.get("server_name", "未命名策略")
win_rate = stats.get("win_rate", 0) * 100 # 转换为百分比
total_trades = stats.get("count", 0)
success_trades = stats.get("success_count", 0)
failure_trades = stats.get("failure_count", 0)
total_profit = stats.get("total_profit", 0)
total_volume = stats.get("volume", 0)
# 分析统计数据
print(f"策略 '{server_name}' 统计分析:")
print(f"总交易次数: {total_trades} (成功: {success_trades}, 失败: {failure_trades})")
print(f"胜率: {win_rate:.2f}%")
print(f"总盈利: {total_profit:.2f} USDT")
print(f"总交易量: {total_volume:.2f} USDT")
# 计算平均每笔交易盈亏
if total_trades > 0:
avg_profit_per_trade = total_profit / total_trades
print(f"平均每笔交易盈亏: {avg_profit_per_trade:.2f} USDT")
# 获取当日数据
today_stats = stats.get("today", {})
today_profit = today_stats.get("total_profit", 0)
today_trades = today_stats.get("count", 0)
today_win_rate = today_stats.get("win_rate", 0) * 100
print(f"\n今日统计:")
print(f"今日交易次数: {today_trades}")
print(f"今日胜率: {today_win_rate:.2f}%")
print(f"今日盈利: {today_profit:.2f} USDT")
# 返回关键指标
return {
"win_rate": win_rate,
"total_profit": total_profit,
"total_trades": total_trades,
"today_profit": today_profit
}
六、数据可视化
📊 upload_tables(tables)
功能: 上传多个表格数据到Web平台显示
参数:
tables: 表格数据列表,每个表格包含标题、列和行数据
返回值: 无
示例:
# 上传表格数据到Web平台显示
def upload_market_data():
# 价差表
spread_table = {
"title": "交易所价差表",
"cols": ["交易对", "交易所A价格", "交易所B价格", "价差(‰)", "资金费率A", "资金费率B"],
"rows": [
["BTC_USDT", "50120.5", "50080.2", "0.81", "0.01%", "-0.025%"],
["ETH_USDT", "3245.8", "3240.5", "1.63", "0.008%", "-0.01%"],
["SOL_USDT", "110.25", "109.85", "3.64", "0.015%", "-0.02%"],
["BNB_USDT", "415.8", "415.2", "1.44", "0.005%", "-0.015%"]
]
}
# 持仓表
position_table = {
"title": "当前持仓",
"cols": ["交易对", "方向", "数量", "开仓价", "当前价", "浮动盈亏", "持仓时间"],
"rows": [
["BTC_USDT", "多/空", "0.15", "49850", "50100", "+$37.5", "2小时15分"],
["ETH_USDT", "多/空", "1.2", "3210", "3245", "+$42", "1小时30分"]
]
}
# 上传表格数据到Web平台
trader.upload_tables([spread_table, position_table])
print("已更新Web平台数据表格")
# 创建交易机会表格
def create_opportunity_table(opportunities):
table = {
"title": "套利机会",
"cols": ["交易对", "交易所A", "交易所B", "价差(‰)", "预期收益", "资金费差"],
"rows": []
}
# 添加行数据
for opp in opportunities:
row = [
opp["symbol"],
opp["exchange_a"],
opp["exchange_b"],
f"{opp['spread']:.2f}",
f"{opp['expected_profit']:.2f} USDT",
f"{opp['funding_diff']:.4f}%"
]
table["rows"].append(row)
return table
# 示例:定期更新多个表格
def update_dashboard_tables():
import time
# 获取套利机会(示例数据)
opportunities = [
{"symbol": "BTC_USDT", "exchange_a": "Binance", "exchange_b": "OKX",
"spread": 1.25, "expected_profit": 18.5, "funding_diff": 0.035},
{"symbol": "ETH_USDT", "exchange_a": "Binance", "exchange_b": "OKX",
"spread": 2.10, "expected_profit": 12.8, "funding_diff": 0.025}
]
# 获取持仓情况(示例数据)
positions = [
{"symbol": "BTC_USDT", "side": "多/空", "amount": 0.15,
"entry_price": 49850, "current_price": 50100, "pnl": 37.5, "duration": "2小时15分"},
{"symbol": "ETH_USDT", "side": "多/空", "amount": 1.2,
"entry_price": 3210, "current_price": 3245, "pnl": 42, "duration": "1小时30分"}
]
# 创建机会表格
opportunity_table = create_opportunity_table(opportunities)
# 创建持仓表格
position_table = {
"title": "当前持仓",
"cols": ["交易对", "方向", "数量", "开仓价", "当前价", "浮动盈亏", "持仓时间"],
"rows": []
}
for pos in positions:
row = [
pos["symbol"],
pos["side"],
str(pos["amount"]),
str(pos["entry_price"]),
str(pos["current_price"]),
f"+${pos['pnl']}" if pos['pnl'] >= 0 else f"-${abs(pos['pnl'])}",
pos["duration"]
]
position_table["rows"].append(row)
# 上传到Web平台
trader.upload_tables([opportunity_table, position_table])
print(f"已在 {int(time.time())} 更新仪表盘数据")
七、账户强停功能
🛑 set_force_stop(force_stop)
功能: 直接设置实盘强停状态
参数:
force_stop: 布尔值,True表示启用强停状态,False表示解除强停状态
返回值: 无
使用场景:
- 策略亏损过多需要暂停开仓一段时间进行风控
- 发现异常行情,需要紧急暂停交易
- 系统检测到风险预警,需要暂停策略执行
使用示例:
# 启用强停状态
def enable_force_stop():
"""设置实盘强停状态"""
self.trader.set_force_stop(True)
self.trader.log("已启用账户强停状态", level="WARN", color="red")
# 解除强停状态
def disable_force_stop():
"""解除实盘强停状态"""
self.trader.set_force_stop(False)
self.trader.log("已解除账户强停状态", level="INFO", color="green")
# 根据条件自动管理强停状态
def manage_force_stop():
"""根据策略状态自动管理强停状态"""
# 获取统计数据
result = trader.get_stats()
if "Ok" in result:
stats = result["Ok"]
# 检查最大回撤是否超过阈值
max_drawdown = stats.get('max_drawdown', 0.0)
if max_drawdown < -15.0: # 回撤超过15%
trader.set_force_stop(True)
trader.log(f"检测到回撤{max_drawdown:.2f}%超过阈值,已启用强停状态", level="WARN", color="red")
else:
# 如果回撤恢复正常,可以解除强停
trader.set_force_stop(False)
trader.log("回撤状态正常,已解除强停状态", level="INFO")
else:
error = result.get("Err", "未知错误")
trader.log(f"获取统计数据失败: {error}", level="ERROR", color="red")
注意事项:
- 此方法直接设置实盘强停状态,比通过表格内容中添加"账户强停"字段更加直接高效
- 强停状态只影响Web平台上的显示,不会自动停止策略的实际执行,需要在策略代码中主动检测并处理强停状态
- 建议在策略的主循环中定期检查并管理强停状态
效果图:

八、示例代码
import time
import json
import random
import threading
import base_strategy
class Strategy(base_strategy.BaseStrategy):
"""
一个演示 Trader API 所有功能的示例策略。
它模拟了一个简单的跨交易所套利逻辑(非真实交易),
并展示了如何使用日志、缓存、Web集成等功能。
"""
def __init__(self, cex_configs, dex_configs, config, trader):
"""初始化策略"""
self.cex_configs = cex_configs
self.has_primary_account = len(cex_configs) > 0
self.has_secondary_account = len(cex_configs) > 1
self.dex_configs = dex_configs # 本示例未使用 DEX
self.trader = trader
self.config = config or {
'strategy_options': {
'send_to_web': True,
'check_interval_seconds': 10,
'min_spread_threshold': 0.5,
'trade_amount': 0.01,
'symbols': ['BTC_USDT']
}
} # 加载 config.toml 中的配置
self.strategy_options = self.config.get('strategy_options', {})
self.send_to_web = self.strategy_options.get("send_to_web", False)
# 策略状态变量
self.running = False
self.positions = {}
self.orders = {}
self.simulated_prices = {}
self.last_check_time = 0
self.check_interval = self.strategy_options.get("check_interval_seconds", 10)
self.min_spread = self.strategy_options.get("min_spread_threshold", 0.5) / 1000 # 转为小数
self.trade_amount = self.strategy_options.get("trade_amount", 0.01) # 设置为0.01,符合OKX的最小手数要求
self.symbols = self.strategy_options.get("symbols", ["BTC_USDT"])
# 费率和余额信息 (将在 start 中初始化)
self.primary_balance = 10000.0
self.secondary_balance = 5000.0 if self.has_secondary_account else 0.0
self.primary_available = 10000.0
self.secondary_available = 5000.0 if self.has_secondary_account else 0.0
self.primary_maker_fee = 0.0002
self.primary_taker_fee = 0.0005
self.primary_rebate_rate = 0.0
self.secondary_maker_fee = 0.0002
self.secondary_taker_fee = 0.0005
self.secondary_rebate_rate = 0.0
self.cost = 0.0008 # 预估成本
self.trader.log("FullFeatureDemoStrategy 初始化...", web=self.send_to_web)
# 1. 加载缓存状态
self.load_strategy_cache()
def name(self):
"""返回策略名称"""
return "FullFeatureDemoStrategy"
def subscribes(self):
"""定义需要订阅的数据流"""
subscriptions = []
symbols = self.strategy_options.get("symbols", ["BTC_USDT", "ETH_USDT"])
# 使用正确的SubscribeWs格式订阅BBO数据
subscriptions.append({
"sub": {
"SubscribeWs": [
{"Bbo": symbols}
]
}
})
self.trader.log(f"准备订阅BBO行情数据: {symbols}", level="DEBUG", web=self.send_to_web)
return subscriptions
def start(self):
"""启动策略主逻辑"""
self.running = True
self.trader.log("策略启动...", level="INFO", color="green", web=self.send_to_web)
# 2. 初始化费率和账户余额
if self.has_primary_account:
self._init_account_data()
# 3. 初始化Web客户端 (如果启用)
if self.send_to_web:
self._init_web_client()
self.trader.start_web_client(upload_interval=5) # 启动Web客户端,5秒上传一次
self.trader.log("Web客户端已启动", web=self.send_to_web)
# 演示 get_stats
self._log_current_stats()
# 进入订阅模式,等待回调
self.trader.log("策略已启动,等待BBO回调...", web=self.send_to_web)
# 保存初始策略状态
self.save_strategy_cache()
def on_bbo(self, exchange, context, bbo):
"""处理BBO行情更新"""
symbol = bbo['symbol']
# 打印原始数据以进行调试
self.trader.log(f"收到BBO数据: symbol={symbol}, bbo={bbo}", level="DEBUG")
# 如果symbol是字典类型,则提取symbol名称
if isinstance(symbol, dict):
self.trader.log(f"收到字典类型的symbol: {symbol}", level="DEBUG")
if 'symbol' in symbol:
symbol = symbol['symbol']
elif 'instrument_id' in symbol:
symbol = symbol['instrument_id']
elif 'instId' in symbol:
symbol = symbol['instId']
else:
# 尝试找到任何可能是交易对的键
for key, value in symbol.items():
if isinstance(value, str) and ('_' in value or '-' in value or '/' in value):
symbol = value
self.trader.log(f"从字典提取symbol: {symbol}", level="DEBUG")
break
else:
self.trader.log(f"无法从字典中提取symbol: {symbol}", level="WARN")
return
# 确保symbol是字符串
if not isinstance(symbol, str):
self.trader.log(f"symbol类型不是字符串: {type(symbol)}", level="WARN")
symbol = str(symbol)
# 尝试从BBO中获取价格数据
bid_price = 0
ask_price = 0
# 直接处理各种可能的BBO格式
if isinstance(bbo, dict):
# 尝试不同的键名以获取价格
if 'bid_price' in bbo and 'ask_price' in bbo:
bid_price = bbo['bid_price']
ask_price = bbo['ask_price']
elif 'b' in bbo and 'a' in bbo:
bid_price = bbo['b']
ask_price = bbo['a']
elif 'bid' in bbo and 'ask' in bbo:
bid_price = bbo['bid']
ask_price = bbo['ask']
elif 'bids' in bbo and 'asks' in bbo:
bids = bbo['bids']
asks = bbo['asks']
if bids and isinstance(bids, list) and asks and isinstance(asks, list):
if bids[0] and isinstance(bids[0], list) and len(bids[0]) > 0:
bid_price = float(bids[0][0])
if asks[0] and isinstance(asks[0], list) and len(asks[0]) > 0:
ask_price = float(asks[0][0])
# 检查价格是否有效
if bid_price == 0 or ask_price == 0:
self.trader.log(f"无法从BBO中获取有效价格: {bbo}", level="WARN")
return
# 生成模拟的两个交易所价格
primary_price = bid_price
secondary_price = ask_price * (1 + random.uniform(-0.005, 0.005)) # 模拟轻微差异
self.trader.log(f"BBO价格: {symbol} - 买: {primary_price:.2f}, 卖: {secondary_price:.2f}", level="INFO")
# 更新本地价格记录
self.simulated_prices[symbol] = {
'primary': primary_price,
'secondary': secondary_price,
'timestamp': time.time()
}
# 检查Web平台控制指令
if self._check_web_controls():
return
# 检查套利机会并执行交易
self._check_arbitrage_opportunity(symbol, primary_price, secondary_price)
# 定期更新Web平台数据 (如果启用)
if self.send_to_web:
self._update_web_data()
self._upload_demo_tables()
# 定期保存策略状态到缓存
if time.time() % 10 < 1: # 大约每10秒保存一次
# self.trader.publish({"cmd": {"Sync": "SaveStrategyCache"}})
self.save_strategy_cache()
def on_stop(self):
"""停止策略时的清理工作"""
self.running = False
self.trader.log("收到停止信号,开始清理...", level="WARN", color="yellow", web=self.send_to_web)
# 11. 停止Web客户端 (如果启用)
if self.send_to_web and hasattr(self.trader, 'stop_web_client'):
self.trader.stop_web_client()
self.trader.log("Web客户端已停止", web=self.send_to_web)
# 12. 保存最终状态
self.save_strategy_cache()
self.trader.log("最终状态已保存", web=self.send_to_web)
self.trader.log("策略已完全停止。", color="red", web=self.send_to_web)
# --- 核心功能实现 ---
def _init_account_data(self):
"""查询账户初始信息:余额、费率"""
self.trader.log("开始初始化账户数据...", web=self.send_to_web)
cmds = []
# 查询主账户余额
cmds.append({"account_id": 0, "cmd": {"Sync": "UsdtBalance"}})
# 查询主账户费率 (以第一个 symbol 为例)
cmds.append({"account_id": 0, "cmd": {"Sync": {"FeeRate": self.symbols[0]}}})
if self.has_secondary_account:
# 查询次账户余额
cmds.append({"account_id": 1, "cmd": {"Sync": "UsdtBalance"}})
# 查询次账户费率
cmds.append({"account_id": 1, "cmd": {"Sync": {"FeeRate": self.symbols[0]}}})
try:
# 使用 batch_publish 批量查询
results = self.trader.batch_publish(cmds)
self.trader.log(f"账户初始化查询结果: {results}", level="DEBUG", web=self.send_to_web)
# 解析主账户结果
if len(results) > 0 and results[0].get('Ok'):
balance_info = results[0]['Ok']
self.primary_balance = balance_info.get('balance', self.primary_balance)
self.primary_available = balance_info.get('available_balance', self.primary_available)
self.trader.log(f"主账户余额: {self.primary_balance}, 可用: {self.primary_available}", web=self.send_to_web)
if len(results) > 1 and results[1].get('Ok'):
fee_info = results[1]['Ok']
self.primary_maker_fee = fee_info.get('maker_fee_rate', self.primary_maker_fee)
self.primary_taker_fee = fee_info.get('taker_fee_rate', self.primary_taker_fee)
self.primary_rebate_rate = self.cex_configs[0].get('rebate_rate', 0)
self.trader.log(f"主账户费率: Maker {self.primary_maker_fee}, Taker {self.primary_taker_fee}, Rebate {self.primary_rebate_rate}", web=self.send_to_web)
# 解析次账户结果
if self.has_secondary_account and len(results) > 2 and results[2].get('Ok'):
balance_info = results[2]['Ok']
self.secondary_balance = balance_info.get('balance', self.secondary_balance)
self.secondary_available = balance_info.get('available_balance', self.secondary_available)
self.trader.log(f"次账户余额: {self.secondary_balance}, 可用: {self.secondary_available}", web=self.send_to_web)
if self.has_secondary_account and len(results) > 3 and results[3].get('Ok'):
fee_info = results[3]['Ok']
self.secondary_maker_fee = fee_info.get('maker_fee_rate', self.secondary_maker_fee)
self.secondary_taker_fee = fee_info.get('taker_fee_rate', self.secondary_taker_fee)
self.secondary_rebate_rate = self.cex_configs[1].get('rebate_rate', 0)
self.trader.log(f"次账户费率: Maker {self.secondary_maker_fee}, Taker {self.secondary_taker_fee}, Rebate {self.secondary_rebate_rate}", web=self.send_to_web)
# 重新计算成本 (假设套利总是在两个账户间发生,且都用 Taker)
if self.has_secondary_account:
self.cost = (self.primary_taker_fee + self.secondary_taker_fee) * 2 * (1-self.primary_rebate_rate) * (1-self.secondary_rebate_rate) # 考虑返佣
else:
self.cost = self.primary_taker_fee * 2 * (1-self.primary_rebate_rate) # 单账户成本 (如果逻辑需要)
self.trader.log(f"根据查询结果,重新计算交易成本 (Taker*2): {self.cost * 10000:.2f} 万分之", web=self.send_to_web)
except Exception as e:
self.trader.log(f"初始化账户数据时出错: {e}", level="ERROR", color="red", web=self.send_to_web)
def _init_web_client(self):
"""初始化Web客户端配置"""
web_config = self.config.get('web_client', {}) # 从 config.toml 读取基础配置
web_config.update({
"server_name": web_config.get("server_name", "均衡_测试"),
"primary_balance": self.primary_balance,
"secondary_balance": self.secondary_balance if self.has_secondary_account else None,
"is_production": web_config.get("is_production", True),
"open_threshold": self.min_spread * 1000, # API 使用千分比
"max_position_ratio": self.strategy_options.get("max_position_value_ratio", 0.5) * 100, # API 使用百分比
"max_leverage": web_config.get("max_leverage", 5),
"funding_rate_threshold": web_config.get("funding_rate_threshold", 0.1),
"cost": self.cost * 10000, # API 使用万分比
"primary_maker_fee_rate": self.primary_maker_fee,
"primary_taker_fee_rate": self.primary_taker_fee,
"primary_rebate_rate": self.primary_rebate_rate,
"secondary_maker_fee_rate": self.secondary_maker_fee if self.has_secondary_account else None,
"secondary_taker_fee_rate": self.secondary_taker_fee if self.has_secondary_account else None,
"secondary_rebate_rate": self.secondary_rebate_rate if self.has_secondary_account else None,
})
# 清理 None 值
web_config = {k: v for k, v in web_config.items() if v is not None}
try:
self.trader.init_web_client(web_config)
self.trader.log(f"Web客户端配置完成: {web_config}", level="DEBUG", web=self.send_to_web)
except Exception as e:
self.trader.log(f"初始化Web客户端失败: {e}", level="ERROR", color="red", web=self.send_to_web)
self.send_to_web = False # 初始化失败则禁用Web功能
def _check_web_controls(self):
"""检查Web平台下发的控制指令"""
if not self.send_to_web:
return False # Web功能未启用
try:
if self.trader.is_web_force_closing():
self.trader.log("收到Web强平指令!执行强平逻辑...", level="WARN", color="red", web=self.send_to_web)
self._force_close_all_positions() # 实现强平逻辑
return True # 停止策略
if self.trader.is_web_soft_stopped():
self.trader.log("收到Web缓停指令,不再开新仓,等待现有流程结束...", level="WARN", color="yellow", web=self.send_to_web)
# 在 _check_and_execute_trades 中会阻止开仓
# 这里可以添加更复杂的逻辑,比如等待特定条件满足后退出
# 为简单起见,这里也直接返回 True 停止
return True # 停止策略
if self.trader.is_web_opening_stopped():
self.trader.tlog("Web平台", "禁止开仓指令生效中", interval=60, level="INFO", color="yellow") # 移除 web 参数
# 开仓逻辑将在 _check_and_execute_trades 中被阻止
except Exception as e:
self.trader.log(f"检查Web控制指令时出错: {e}", level="ERROR", color="red", web=self.send_to_web)
return False # 继续运行
def _check_and_execute_trades(self):
"""检查套利机会并执行交易(模拟)"""
if not self.has_primary_account or not self.has_secondary_account:
self.trader.tlog("交易检查", "账户不足,无法执行双边套利", interval=60) # 移除 web 参数
return
can_open = True
if self.send_to_web:
try:
# 检查是否禁止开仓
if self.trader.is_web_opening_stopped() or self.trader.is_web_soft_stopped():
can_open = False
except Exception as e:
self.trader.log(f"检查Web开仓状态时出错: {e}", level="ERROR", web=self.send_to_web)
for symbol in self.symbols:
if symbol not in self.simulated_prices:
continue
primary_price = self.simulated_prices[symbol]["primary"]
secondary_price = self.simulated_prices[symbol]["secondary"]
spread = (secondary_price - primary_price) / primary_price
# 简单套利逻辑: B比A贵 -> 在A买,在B卖 (做空)
if spread > self.min_spread and can_open:
self.trader.log(f"发现套利机会 {symbol}: Spread={spread*1000:.3f}‰ > {self.min_spread*1000:.3f}‰. 尝试开仓.", color="green", web=self.send_to_web)
self._execute_arbitrage_open(symbol, primary_price, secondary_price)
# 可以在成功开仓后退出循环,或者继续检查其他币种
# break # 简单处理,一次只做一个
# 反向套利: A比B贵 -> 在B买,在A卖 (做空)
elif spread < -self.min_spread and can_open:
self.trader.log(f"发现反向套利机会 {symbol}: Spread={spread*1000:.3f}‰ < {-self.min_spread*1000:.3f}‰. 尝试开仓.", color="blue", web=self.send_to_web)
self._execute_arbitrage_open(symbol, primary_price, secondary_price, reverse=True)
# break # 简单处理,一次只做一个
# 检查是否需要平仓 (简单逻辑:价差回归或反转一定程度)
elif symbol in self.positions:
position_info = self.positions[symbol]
# 假设 position_info 存储了开仓方向
opened_long_on_primary = position_info.get("opened_long_on_primary", True) # 假设默认是 A买 B卖
needs_close = False
if opened_long_on_primary and spread <= 0: # 价差回归或反转
needs_close = True
self.trader.log(f"{symbol} 价差回归 ({spread*1000:.3f}‰),考虑平仓.", color="green", web=self.send_to_web)
elif not opened_long_on_primary and spread >= 0: # 反向套利的价差回归
needs_close = True
self.trader.log(f"{symbol} 反向套利价差回归 ({spread*1000:.3f}‰),考虑平仓.", color="green", web=self.send_to_web)
if needs_close:
self._execute_arbitrage_close(symbol, primary_price, secondary_price)
# break # 简单处理,一次只做一个
def _execute_arbitrage_open(self, symbol, primary_price, secondary_price, reverse=False):
"""执行套利开仓(模拟下单)"""
self.trader.log(f"执行 {symbol} {'反向' if reverse else ''} 套利开仓...", web=self.send_to_web)
# Get exchange names from config
primary_exchange_name = self.cex_configs[0]['exchange'] if self.has_primary_account else "BinanceSwap"
secondary_exchange_name = self.cex_configs[1]['exchange'] if self.has_secondary_account else "OkxSwap"
# 3. 使用 create_cid 生成唯一订单 ID
cid_buy = self.trader.create_cid(primary_exchange_name if not reverse else secondary_exchange_name) # Pass exchange name
cid_sell = self.trader.create_cid(secondary_exchange_name if not reverse else primary_exchange_name) # Pass exchange name
amount = self.trade_amount
# 构建订单指令
cmds = []
if not reverse: # A买 B卖
# 主账户买入
cmd_buy = {
"account_id": 0,
"cmd": {
"Sync": {
"PlaceOrder": [
{
"symbol": symbol,
"order_type": "Market",
"side": "Buy",
"pos_side": "Long",
"amount": amount,
"cid": cid_buy,
"time_in_force": "GTC"
},
{
"is_dual_side": False,
"market_order_mode": "Normal"
}
]
}
}
}
# 次账户卖出 (做空)
cmd_sell = {
"account_id": 1,
"cmd": {
"Sync": {
"PlaceOrder": [
{
"symbol": symbol,
"order_type": "Market",
"side": "Sell",
"pos_side": "Short",
"amount": amount,
"cid": cid_sell,
"time_in_force": "GTC"
},
{
"is_dual_side": False,
"market_order_mode": "Normal"
}
]
}
}
}
cmds = [cmd_buy, cmd_sell]
opened_long_on_primary = True
else: # B买 A卖
# 次账户买入
cmd_buy = {
"account_id": 1,
"cmd": {
"Sync": {
"PlaceOrder": [
{
"symbol": symbol,
"order_type": "Market",
"side": "Buy",
"pos_side": "Long",
"amount": amount,
"cid": cid_buy,
"time_in_force": "GTC"
},
{
"is_dual_side": False,
"market_order_mode": "Normal"
}
]
}
}
}
# 主账户卖出 (做空)
cmd_sell = {
"account_id": 0,
"cmd": {
"Sync": {
"PlaceOrder": [
{
"symbol": symbol,
"order_type": "Market",
"side": "Sell",
"pos_side": "Short",
"amount": amount,
"cid": cid_sell,
"time_in_force": "GTC"
},
{
"is_dual_side": False,
"market_order_mode": "Normal"
}
]
}
}
}
cmds = [cmd_buy, cmd_sell]
opened_long_on_primary = False
try:
# 4. 使用 batch_publish 批量下单
results = self.trader.batch_publish(cmds)
self.trader.log(f"{symbol} 开仓下单结果: {results}", level="DEBUG", web=self.send_to_web)
# 模拟处理下单结果 (假设都成功)
# 实际应检查 results
if symbol not in self.positions:
self.positions[symbol] = {"long": 0, "short": 0, "entry_price_primary": 0, "entry_price_secondary": 0, "amount": 0}
pos = self.positions[symbol]
pos["amount"] = amount # 简化处理,直接认为成交
pos["entry_price_primary"] = primary_price
pos["entry_price_secondary"] = secondary_price
pos["opened_long_on_primary"] = opened_long_on_primary
pos["open_time"] = time.time()
# 记录订单信息 (简化)
self.orders[cid_buy] = {"status": "filled", "symbol": symbol, "side": "Buy", "amount": amount, "price": primary_price if not reverse else secondary_price}
self.orders[cid_sell] = {"status": "filled", "symbol": symbol, "side": "Sell", "amount": amount, "price": secondary_price if not reverse else primary_price}
# 5. 使用 update_trade_stats 更新开仓统计 (profit=0)
# 假设都是 Taker 成交
taker_volume_primary = amount * primary_price
taker_volume_secondary = amount * secondary_price
self.trader.update_trade_stats(maker_volume=0, taker_volume=taker_volume_primary + taker_volume_secondary, profit=0)
self.trader.log(f"{symbol} 开仓统计已更新 (仅交易量)", color="blue", web=self.send_to_web)
except Exception as e:
self.trader.log(f"执行 {symbol} 开仓时出错: {e}", level="ERROR", color="red", web=self.send_to_web)
def _execute_arbitrage_close(self, symbol, primary_price, secondary_price):
"""执行套利平仓(模拟下单)"""
if symbol not in self.positions:
return
self.trader.log(f"执行 {symbol} 套利平仓...", web=self.send_to_web)
position_info = self.positions[symbol]
amount = position_info["amount"]
opened_long_on_primary = position_info["opened_long_on_primary"]
# Get exchange names from config
primary_exchange_name = self.cex_configs[0]['exchange'] if self.has_primary_account else "BinanceSwap"
secondary_exchange_name = self.cex_configs[1]['exchange'] if self.has_secondary_account else "OkxSwap"
# 3. 使用 create_cid
cid_close_1 = self.trader.create_cid(primary_exchange_name if opened_long_on_primary else secondary_exchange_name) # Pass exchange name
cid_close_2 = self.trader.create_cid(secondary_exchange_name if opened_long_on_primary else primary_exchange_name) # Pass exchange name
cmds = []
if opened_long_on_primary: # 开仓是 A买 B卖 -> 平仓是 A卖 B买
# 主账户卖出 (平多)
cmd_sell = {
"account_id": 0,
"cmd": {"Sync": {"PlaceOrder": [
{
"symbol": symbol,
"order_type": "Market",
"side": "Sell",
"pos_side": "Long",
"amount": amount,
"cid": cid_close_1,
"time_in_force": "GTC"
},
{
"is_dual_side": False,
"market_order_mode": "Normal"
}
]}}
}
# 次账户买入 (平空)
cmd_buy = {
"account_id": 1,
"cmd": {"Sync": {"PlaceOrder": [
{
"symbol": symbol,
"order_type": "Market",
"side": "Buy",
"pos_side": "Short",
"amount": amount,
"cid": cid_close_2,
"time_in_force": "GTC"
},
{
"is_dual_side": False,
"market_order_mode": "Normal"
}
]}}
}
cmds = [cmd_sell, cmd_buy]
else: # 开仓是 B买 A卖 -> 平仓是 B卖 A买
# 次账户卖出 (平多)
cmd_sell = {
"account_id": 1,
"cmd": {"Sync": {"PlaceOrder": [
{
"symbol": symbol,
"order_type": "Market",
"side": "Sell",
"pos_side": "Long",
"amount": amount,
"cid": cid_close_1,
"time_in_force": "GTC"
},
{
"is_dual_side": False,
"market_order_mode": "Normal"
}
]}}
}
# 主账户买入 (平空)
cmd_buy = {
"account_id": 0,
"cmd": {"Sync": {"PlaceOrder": [
{
"symbol": symbol,
"order_type": "Market",
"side": "Buy",
"pos_side": "Short",
"amount": amount,
"cid": cid_close_2,
"time_in_force": "GTC"
},
{
"is_dual_side": False,
"market_order_mode": "Normal"
}
]}}
}
cmds = [cmd_buy, cmd_sell]
try:
# 4. 使用 batch_publish 批量下单
results = self.trader.batch_publish(cmds)
self.trader.log(f"{symbol} 平仓下单结果: {results}", level="DEBUG", web=self.send_to_web)
# 计算利润 (简化,忽略手续费和滑点)
entry_primary = position_info["entry_price_primary"]
entry_secondary = position_info["entry_price_secondary"]
profit = 0
if opened_long_on_primary: # A买B卖 -> A卖B买
profit = (primary_price - entry_primary) * amount + (entry_secondary - secondary_price) * amount
else: # B买A卖 -> B卖A买
profit = (secondary_price - entry_secondary) * amount + (entry_primary - primary_price) * amount
# 减去预估成本
trade_value = amount * (primary_price + secondary_price) / 2 # 估算交易额
estimated_cost = trade_value * self.cost
net_profit = profit - estimated_cost
self.trader.log(f"{symbol} 平仓完成。毛利: {profit:.4f}, 预估成本: {estimated_cost:.4f}, 净利: {net_profit:.4f}", color="green" if net_profit > 0 else "red", web=self.send_to_web)
# 移除持仓
del self.positions[symbol]
# 更新订单状态 (简化)
self.orders[cid_close_1] = {"status": "filled", "symbol": symbol, "side": "Sell" if opened_long_on_primary else "Buy", "amount": amount, "price": primary_price if opened_long_on_primary else secondary_price}
self.orders[cid_close_2] = {"status": "filled", "symbol": symbol, "side": "Buy" if opened_long_on_primary else "Sell", "amount": amount, "price": secondary_price if opened_long_on_primary else primary_price}
# 5. 使用 update_trade_stats 更新平仓统计 (profit != 0)
taker_volume_primary = amount * primary_price
taker_volume_secondary = amount * secondary_price
# 注意:这里的 profit 是净利润
self.trader.update_trade_stats(maker_volume=0, taker_volume=taker_volume_primary + taker_volume_secondary, profit=net_profit)
self.trader.log(f"{symbol} 平仓统计已更新 (含利润)", color="blue", web=self.send_to_web)
# 6. 使用 log_profit 上传利润点 (用于曲线图)
if self.send_to_web:
# 通常上传累计总利润,需要先获取当前总利润
current_stats = self.trader.get_stats()
total_profit_now = current_stats.get("total_profit", 0)
self.trader.log_profit(total_profit_now)
self.trader.log(f"上传利润点: {total_profit_now:.4f}", level="DEBUG", web=self.send_to_web)
except Exception as e:
self.trader.log(f"执行 {symbol} 平仓时出错: {e}", level="ERROR", color="red", web=self.send_to_web)
def _force_close_all_positions(self):
"""强制平掉所有持仓(模拟)"""
self.trader.log("!!! 正在执行强制平仓 !!!", level="CRITICAL", color="red", web=self.send_to_web)
symbols_to_close = list(self.positions.keys())
for symbol in symbols_to_close:
if symbol in self.simulated_prices:
primary_price = self.simulated_prices[symbol]["primary"]
secondary_price = self.simulated_prices[symbol]["secondary"]
self._execute_arbitrage_close(symbol, primary_price, secondary_price)
else:
# 如果没有最新价格,可能需要用其他方式下单或记录错误
self.trader.log(f"无法获取 {symbol} 的价格,无法执行强平!", level="ERROR", web=self.send_to_web)
self.trader.log("强制平仓流程执行完毕。", level="WARN", color="red", web=self.send_to_web)
def _update_web_data(self):
"""定期更新Web平台上的各项数据"""
try:
# 7. 更新账户余额 (模拟变化)
# 实际应重新查询API获取最新余额
self.primary_balance += random.uniform(-1, 1) # 模拟微小变动
self.primary_available = self.primary_balance * random.uniform(0.8, 0.95) # 模拟可用余额
if self.has_secondary_account:
self.secondary_balance += random.uniform(-0.5, 0.5)
self.secondary_available = self.secondary_balance * random.uniform(0.7, 0.9)
self.trader.update_total_balance(
primary_balance=self.primary_balance,
secondary_balance=self.secondary_balance if self.has_secondary_account else None,
available_primary=self.primary_available,
available_secondary=self.secondary_available if self.has_secondary_account else None
)
# 8. 更新持仓价值和浮动盈亏 (模拟计算)
current_total_value = 0
current_long_value = 0
current_short_value = 0
floating_pnl = 0
for symbol, pos_info in self.positions.items():
if symbol in self.simulated_prices:
current_primary = self.simulated_prices[symbol]["primary"]
current_secondary = self.simulated_prices[symbol]["secondary"]
amount = pos_info["amount"]
entry_primary = pos_info["entry_price_primary"]
entry_secondary = pos_info["entry_price_secondary"]
opened_long_on_primary = pos_info["opened_long_on_primary"]
# 简单计算当前价值 (忽略多空方向对价值计算的影响,仅示意)
value = amount * (current_primary + current_secondary) / 2
current_total_value += value
if opened_long_on_primary:
current_long_value += amount * current_primary # 主账户多头价值
current_short_value += amount * current_secondary # 次账户空头价值 (绝对值)
else:
current_long_value += amount * current_secondary # 次账户多头价值
current_short_value += amount * current_primary # 主账户空头价值 (绝对值)
# 计算浮动盈亏
pnl = 0
if opened_long_on_primary:
pnl = (current_primary - entry_primary) * amount + (entry_secondary - current_secondary) * amount
else:
pnl = (current_secondary - entry_secondary) * amount + (entry_primary - current_primary) * amount
floating_pnl += pnl
# 假设这是单个节点,所以 total 和 current 一样
self.trader.update_current_position_value(current_total_value, current_long_value, current_short_value)
self.trader.update_total_position_value(current_total_value, current_long_value, current_short_value)
self.trader.update_floating_profit(floating_pnl)
# 9. 更新资金费 (模拟)
pred_funding_primary = random.uniform(-0.5, 0.1)
pred_funding_secondary = random.uniform(-0.2, 0.2)
self.trader.update_pred_funding(primary_fee=pred_funding_primary, secondary_fee=pred_funding_secondary)
# 模拟结算了一笔资金费
if random.random() < 0.1: # 10% 的概率结算
settled_primary = random.uniform(-1, 0)
settled_secondary = random.uniform(-0.5, 0)
self.trader.add_funding_fee(primary_fee=settled_primary, secondary_fee=settled_secondary)
self.trader.log(f"模拟结算资金费: 主={settled_primary:.4f}, 次={settled_secondary:.4f}", level="DEBUG", web=self.send_to_web)
except Exception as e:
self.trader.log(f"更新Web数据时出错: {e}", level="ERROR", web=self.send_to_web)
def _log_current_stats(self):
"""获取并记录当前统计数据"""
if not self.send_to_web:
return
try:
stats = self.trader.get_stats()
# 使用 tlog 限频打印,避免过多日志
self.trader.tlog("当前统计", f"总利润: {stats.get('total_profit', 0):.2f}, "
f"胜率: {stats.get('win_rate', 0)*100:.1f}%, "
f"交易次数: {stats.get('count', 0)}, "
f"持仓价值: {stats.get('current_position_value', 0):.2f}",
interval=60, # 每60秒最多打印一次
level="INFO",
color="blue") # 移除 web 参数
except Exception as e:
self.trader.log(f"获取统计数据时出错: {e}", level="ERROR", web=self.send_to_web)
def _upload_demo_tables(self):
"""上传示例表格数据到Web平台"""
if not self.send_to_web:
return
try:
# 表格1: 当前持仓详情
position_rows = []
for symbol, pos_info in self.positions.items():
open_time = pos_info.get('open_time', 0)
duration_minutes = (time.time() - open_time) / 60 if open_time > 0 else 0
direction = "A多/B空" if pos_info.get("opened_long_on_primary") else "B多/A空"
amount = pos_info.get("amount", 0)
entry_a = pos_info.get("entry_price_primary", 0)
entry_b = pos_info.get("entry_price_secondary", 0)
# 计算简单 PNL (需要当前价格)
pnl_str = "N/A"
if symbol in self.simulated_prices:
current_a = self.simulated_prices[symbol]["primary"]
current_b = self.simulated_prices[symbol]["secondary"]
pnl = 0
if pos_info.get("opened_long_on_primary"):
pnl = (current_a - entry_a) * amount + (entry_b - current_b) * amount
else:
pnl = (current_b - entry_b) * amount + (entry_a - current_a) * amount
pnl_str = f"{pnl:.4f}"
position_rows.append([
symbol,
direction,
f"{amount:.6f}",
f"{entry_a:.2f} / {entry_b:.2f}",
pnl_str,
f"{duration_minutes:.1f} min"
])
position_table = {
"title": "模拟持仓详情",
"cols": ["交易对", "方向(主/次)", "数量", "开仓价(主/次)", "浮动盈亏", "持仓时间"],
"rows": position_rows
}
# 表格2: 模拟价差监控
spread_rows = []
for symbol in self.symbols:
if symbol in self.simulated_prices:
price_a = self.simulated_prices[symbol]["primary"]
price_b = self.simulated_prices[symbol]["secondary"]
spread = (price_b - price_a) / price_a * 1000 # 千分比
spread_rows.append([symbol, f"{price_a:.2f}", f"{price_b:.2f}", f"{spread:.3f}‰"])
else:
spread_rows.append([symbol, "N/A", "N/A", "N/A"])
spread_table = {
"title": "模拟市场价差 (‰)",
"cols": ["交易对", "价格A (主)", "价格B (次)", "价差 (B-A)/A"],
"rows": spread_rows
}
# 10. 使用 upload_tables 上传
self.trader.upload_tables([position_table, spread_table])
self.trader.tlog("Web表格", "持仓和价差表格已更新", interval=15) # 移除 web 参数
except Exception as e:
self.trader.log(f"上传表格时出错: {e}", level="ERROR", web=self.send_to_web)
# --- 缓存管理 ---
def save_strategy_cache(self):
"""保存策略状态到缓存"""
try:
state = {
"positions": self.positions,
"orders": self.orders, # 保存订单历史 (可能很大,考虑是否必要)
"last_check_time": self.last_check_time,
# 可以添加其他需要持久化的状态
}
# 11. 使用 cache_save 保存
self.trader.cache_save(json.dumps(state))
self.trader.tlog("缓存", "策略状态已保存", interval=60, level="DEBUG") # 移除 web 参数
# 演示 cache_update (通常用于增量更新,这里仅作演示)
update_data = {"last_save_timestamp": time.time()}
# 12. 使用 cache_update 更新
self.trader.cache_update(json.dumps(update_data))
except Exception as e:
self.trader.log(f"保存策略缓存失败: {e}", level="ERROR", web=self.send_to_web)
def load_strategy_cache(self):
"""从缓存加载策略状态"""
try:
# 13. 使用 cache_load 加载
cached_data = self.trader.cache_load()
if cached_data:
state = json.loads(cached_data)
self.positions = state.get("positions", {})
self.orders = state.get("orders", {})
self.last_check_time = state.get("last_check_time", 0)
# 恢复其他状态...
self.trader.log(f"成功从缓存加载策略状态。恢复了 {len(self.positions)} 个持仓, {len(self.orders)} 个订单记录。", color="blue", web=self.send_to_web)
else:
self.trader.log("未找到缓存数据,使用初始状态。", web=self.send_to_web)
except Exception as e:
self.trader.log(f"加载策略缓存失败: {e}", level="ERROR", web=self.send_to_web)
# --- 其他API演示 ---
def _demonstrate_other_apis(self):
"""在主循环中演示其他非核心功能的API"""
# 14. 演示 logt: 记录带时间戳的日志
if random.random() < 0.05: # 低概率触发
event_time = int(time.time()) - random.randint(60, 3600) # 过去1分钟到1小时内
self.trader.logt(f"模拟历史事件日志", event_time, color="navy", level="DEBUG") # 移除 web 参数
# 15. 演示 request: 调用外部API (示例: 获取Binance行情)
if random.random() < 0.02: # 更低概率触发
try:
symbol_req = "BTCUSDT" # Binance 使用无下划线格式
url = f"https://api.binance.com/api/v3/ticker/price?symbol={symbol_req}"
self.trader.log(f"尝试使用 request 获取 {url}...", level="DEBUG", web=self.send_to_web)
response = self.trader.request(url, "GET", None)
if response and 'price' in response:
self.trader.log(f"外部API ({url}) 成功: {response}", color="teal", web=self.send_to_web)
else:
self.trader.log(f"外部API ({url}) 返回无效: {response}", level="WARN", web=self.send_to_web)
except Exception as e:
self.trader.log(f"调用 request API 失败: {e}", level="ERROR", web=self.send_to_web)
# 16. 演示 set_force_stop: 根据条件触发强停 (模拟: 亏损过大)
if self.send_to_web and random.random() < 0.03:
try:
stats = self.trader.get_stats()
total_profit = stats.get('total_profit', 0)
# 假设亏损超过初始总余额的 5% 就触发强停 (仅为演示)
initial_total_balance = self.primary_balance + self.secondary_balance
if initial_total_balance > 0 and total_profit < -0.05 * initial_total_balance:
self.trader.log(f"模拟检测到亏损过大 ({total_profit:.2f}),触发 set_force_stop(True)", level="WARN", color="orangered", web=self.send_to_web)
self.trader.set_force_stop(True)
# else:
# # 可以在条件恢复时解除强停
# if self.trader.is_web_force_stopped(): # 假设有这个API检查状态
# self.trader.log("模拟检测到状态恢复,解除强停 set_force_stop(False)", level="INFO", color="lightgreen", web=self.send_to_web)
# self.trader.set_force_stop(False)
except Exception as e:
self.trader.log(f"执行 set_force_stop 演示时出错: {e}", level="ERROR", web=self.send_to_web)
# --- 回调处理 (如果需要) ---
def on_order(self, account_id, context, order):
"""处理订单更新回调"""
# 注意:在同步下单模式 (Sync) 下,结果直接在 publish/batch_publish 返回
# 异步下单模式 (Async) 或交易所主动推送的更新会触发此回调
self.trader.log(f"收到订单回调 (Acc: {account_id}, Ctx: {context}): {order}", level="DEBUG", web=self.send_to_web)
# 可以在这里更新 self.orders 状态
cid = order.get('cid')
if cid and cid in self.orders:
self.orders[cid].update(order) # 更新订单信息
else:
# 处理没有本地记录的订单更新 (可能是手工下的单或其他策略的)
pass
def on_position(self, account_id, positions):
"""处理持仓更新回调"""
# 交易所主动推送的持仓变化会触发此回调
self.trader.log(f"收到持仓回调 (Acc: {account_id}): {positions}", level="DEBUG", web=self.send_to_web)
# 可以在这里更精确地更新 self.positions
# 注意:这里的 positions 结构可能与 self.positions 不同,需要适配
# 例如,需要合并来自两个账户的持仓信息到 self.positions[symbol] 中
def _calculate_position_profit(self, position, primary_price, secondary_price):
"""计算当前持仓的盈亏"""
# 提取持仓信息
amount = position.get('amount', 0)
open_primary_price = position.get('open_primary_price', 0)
open_secondary_price = position.get('open_secondary_price', 0)
is_reverse = position.get('is_reverse', False)
if is_reverse:
# 反向套利: 主所卖, 次所买
open_spread = open_secondary_price - open_primary_price
current_spread = secondary_price - primary_price
else:
# 正向套利: 主所买, 次所卖
open_spread = open_primary_price - open_secondary_price
current_spread = primary_price - secondary_price
# 计算盈亏 (价差收敛带来的利润)
profit = (open_spread - current_spread) * amount
# 减去交易成本
total_cost = (self.primary_taker_fee + self.secondary_taker_fee) * 2 * amount * (primary_price + secondary_price) / 2
return profit - total_cost
def _check_arbitrage_opportunity(self, symbol, primary_price, secondary_price):
"""检查套利机会并执行交易"""
# 计算价差 (‰)
spread = abs(primary_price - secondary_price) / min(primary_price, secondary_price) * 1000
# 判断方向:哪边买哪边卖
reverse = secondary_price < primary_price
# 检查价差是否足够大
min_threshold = self.min_spread * 1000 # 转为千分比
if spread >= min_threshold:
# 价差足够大,可以开仓
if symbol not in self.positions:
self.trader.log(f"发现套利机会! {symbol} 价差: {spread:.2f}‰", color="green", web=self.send_to_web)
# 执行开仓
self._execute_arbitrage_open(symbol, primary_price, secondary_price, reverse)
else:
# 已有仓位,检查是否可以平仓
position = self.positions.get(symbol)
if position:
current_profit = self._calculate_position_profit(position, primary_price, secondary_price)
self.trader.tlog(
f"持仓状态",
f"{symbol} 当前价差: {spread:.2f}‰, 利润: {current_profit:.2f} USDT",
interval=5
)
# 如果价差缩小或利润足够大,平仓
if spread < min_threshold * 0.5 or current_profit > 0:
self._execute_arbitrage_close(symbol, primary_price, secondary_price)