跳到主要内容

完整策略示例

完整策略示例

以下是一个跨交易所套利策略示例,展示了如何使用Trader API v2的核心功能:

"""
arbitrage_strategy_v2.py - 跨交易所套利策略示例 (v2版本)
"""

import json
import time
import base_strategy

class ArbitrageStrategyV2(base_strategy.BaseStrategy):
def __init__(self, cex_configs, dex_configs, config, trader):
"""初始化策略"""
self.cex_configs = cex_configs
self.has_account = len(cex_configs) > 0
self.dex_configs = dex_configs
self.trader = trader
self.config = config or {
"send_to_web": True,
"min_spread": 1.0, # 最小开仓价差(‰)
"symbols": ["BTC_USDT", "ETH_USDT", "SOL_USDT"] # 监控的交易对
}

# 初始化数据
self.positions = {}
self.running = False
self.primary_balance = 10000.0 # 默认值
self.secondary_balance = 5000.0 # 默认值
self.start_time = time.time() # 策略启动时间
self.last_data_time = time.time() # 最后数据时间

# 加载缓存
self.load_state()

def _init_fee_rates(self):
"""查询费率信息"""
# 默认费率
self.primary_maker_fee = 0.0002
self.primary_taker_fee = 0.0005
self.secondary_maker_fee = 0.0002
self.secondary_taker_fee = 0.0005
self.primary_rebate_rate = 0
self.secondary_rebate_rate = 0

if not self.has_account or not self.config.get("symbols"):
return

symbol = self.config["symbols"][0]

# 查询主账户费率 - 使用v2格式
try:
result = self.trader.get_fee_rate(0, symbol)
if "Ok" in result:
fee_rate = result["Ok"]
self.trader.log(f"主账户费率: {fee_rate}", level="DEBUG", web=self.config.get("send_to_web", True))
self.primary_maker_fee = fee_rate.get('maker_fee_rate', 0.0002)
self.primary_taker_fee = fee_rate.get('taker_fee_rate', 0.0005)
self.primary_rebate_rate = self.cex_configs[0].get('rebate_rate', 0)
except Exception as e:
self.trader.log(f"获取主账户费率失败: {e}", level="WARN", web=self.config.get("send_to_web", True))

# 查询次账户费率
if len(self.cex_configs) > 1:
try:
result = self.trader.get_fee_rate(1, symbol)
if "Ok" in result:
fee_rate = result["Ok"]
self.trader.log(f"次账户费率: {fee_rate}", level="DEBUG", web=self.config.get("send_to_web", True))
self.secondary_maker_fee = fee_rate.get('maker_fee_rate', 0.0002)
self.secondary_taker_fee = fee_rate.get('taker_fee_rate', 0.0005)
self.secondary_rebate_rate = self.cex_configs[1].get('rebate_rate', 0)
except Exception as e:
self.trader.log(f"获取次账户费率失败: {e}", level="WARN", web=self.config.get("send_to_web", True))

# 计算成本,两边都用taker
self.cost = (self.primary_taker_fee + self.secondary_taker_fee) * 2

# 查询账户余额 - 使用v2格式
try:
result = self.trader.get_usdt_balance(0)
if "Ok" in result:
balance = result["Ok"]
self.primary_balance = balance.get('balance', 10000.0)
except Exception:
self.trader.log("获取主账户余额失败", level="WARN", web=self.config.get("send_to_web", True))

# 查询次账户余额
if len(self.cex_configs) > 1:
try:
result = self.trader.get_usdt_balance(1)
if "Ok" in result:
balance = result["Ok"]
self.secondary_balance = balance.get('balance', 5000.0)
except Exception:
self.trader.log("获取次账户余额失败", level="WARN", web=self.config.get("send_to_web", True))

def _init_web_client(self):
"""初始化Web客户端"""
web_config = {
"server_name": "跨所套利策略v2",
"primary_balance": self.primary_balance,
"secondary_balance": self.secondary_balance,
"is_production": True,
"open_threshold": self.config.get("min_spread", 1.0) / 10, # 转换为小数
"max_position_ratio": 80,
"max_leverage": 2,
"funding_rate_threshold": 0.1, # 资金费率阈值
"cost": self.cost, # 交易成本
"primary_maker_fee_rate": self.primary_maker_fee,
"primary_taker_fee_rate": self.primary_taker_fee,
"primary_rebate_rate": self.primary_rebate_rate,
"secondary_maker_fee_rate": self.secondary_maker_fee,
"secondary_taker_fee_rate": self.secondary_taker_fee,
"secondary_rebate_rate": self.secondary_rebate_rate
}
self.trader.init_web_client(web_config)

def name(self):
"""返回策略名称"""
return "跨所套利策略v2"

def load_state(self):
"""从缓存加载策略状态"""
cached_data = self.trader.cache_load()
if cached_data:
self.positions = cached_data.get("positions", {})
self.trader.log("已加载缓存状态", web=self.config.get("send_to_web", True))

def save_state(self):
"""保存策略状态到缓存"""
state = {"positions": self.positions, "last_update": int(time.time())}
self.trader.cache_save(state)

def start(self):
"""启动策略"""

# 初始化费率信息
self._init_fee_rates()

# 初始化Web客户端
if self.config.get("send_to_web", True):
self._init_web_client()
self.trader.start_web_client(upload_interval=5)

self.trader.log("启动跨所套利策略v2", web=self.config.get("send_to_web", True))

# 上传表格示例
self.upload_tables()

# tlog示例 频繁日志使用tlog限频
self.trader.tlog("价差", "BTC_USDT: 1.25‰, ETH_USDT: 2.10‰, SOL_USDT: 3.45‰", color="green")

# 初始化检查并启动交易
if self.has_account:
self.scan_opportunities() # 扫描套利机会
else:
self.trader.log("没有配置账户,无法交易", level="WARN")

def upload_tables(self):
"""上传表格示例方法"""

self.spread_table = {
"title": "价差",
"cols": ["交易对", "价差(‰)"],
"rows": [
["BTC_USDT", "1.25"],
["ETH_USDT", "2.10"],
["SOL_USDT", "3.45"]
]
}

self.position_table = {
"title": "持仓",
"cols": ["交易对", "持仓量", "持仓价值", "浮动盈亏", "持仓时间"],
"rows": [
["BTC_USDT", "1.25", "10000", "100", "2小时15分"],
["ETH_USDT", "2.10", "20000", "200", "1小时30分"],
["SOL_USDT", "3.45", "30000", "300", "3小时45分"]
]
}

# 上传表格
self.trader.upload_tables([self.spread_table, self.position_table])

def place_arbitrage_orders(self, symbol, spread):
"""下套利订单 - 使用v2 API"""
try:
# 获取当前价格
primary_result = self.trader.get_ticker(0, symbol)
secondary_result = self.trader.get_ticker(1, symbol)

if "Ok" not in primary_result or "Ok" not in secondary_result:
return False

primary_ticker = primary_result["Ok"]
secondary_ticker = secondary_result["Ok"]

primary_price = float(primary_ticker.get('price', 0))
secondary_price = float(secondary_ticker.get('price', 0))

if primary_price == 0 or secondary_price == 0:
return False

# 计算交易数量
amount = 0.01 # 示例数量

# 生成订单ID
primary_cid_result = self.trader.create_cid()
secondary_cid_result = self.trader.create_cid()

if "Ok" not in primary_cid_result or "Ok" not in secondary_cid_result:
return False

primary_cid = primary_cid_result["Ok"]
secondary_cid = secondary_cid_result["Ok"]

# 主账户做多,次账户做空
primary_order = {
"symbol": symbol,
"side": "Buy",
"order_type": "Market",
"amount": amount,
"cid": primary_cid
}

secondary_order = {
"symbol": symbol,
"side": "Sell",
"order_type": "Market",
"amount": amount,
"cid": secondary_cid
}

# 下单
primary_result = self.trader.place_order(0, primary_order, sync=True)
secondary_result = self.trader.place_order(1, secondary_order, sync=True)

if "Ok" in primary_result and "Ok" in secondary_result:
self.trader.log(f"套利订单已下达: {symbol}, 价差: {spread:.2f}‰", color="green")

# 记录交易统计 - 开仓时只记录交易量
stats_result = self.trader.update_trade_stats(0, amount * primary_price + amount * secondary_price, 0)

return True
else:
self.trader.log(f"套利订单失败: {symbol}", level="ERROR", color="red")
return False

except Exception as e:
self.trader.log(f"下单异常: {e}", level="ERROR", color="red")
return False

def scan_opportunities(self):
"""扫描套利机会"""
for symbol in self.config.get("symbols", []):
try:
# 获取双边价格
primary_result = self.trader.get_bbo(0, symbol)
secondary_result = self.trader.get_bbo(1, symbol)

if "Ok" not in primary_result or "Ok" not in secondary_result:
continue

primary_bbo = primary_result["Ok"]
secondary_bbo = secondary_result["Ok"]

if not primary_bbo or not secondary_bbo:
continue

primary_bid = float(primary_bbo.get('bid_price', 0))
primary_ask = float(primary_bbo.get('ask_price', 0))
secondary_bid = float(secondary_bbo.get('bid_price', 0))
secondary_ask = float(secondary_bbo.get('ask_price', 0))

if primary_bid == 0 or secondary_ask == 0:
continue

# 计算价差
spread = (primary_bid - secondary_ask) / secondary_ask * 1000 # 千分比

# 检查是否满足开仓条件
if spread > self.config.get("min_spread", 1.0):
self.trader.tlog(f"发现机会", f"{symbol}: {spread:.2f}‰", color="yellow", interval=10)

# 检查平台控制状态
if self.trader.is_web_opening_stopped():
self.trader.log("平台禁止开仓", level="WARN")
continue

if self.trader.is_web_force_closing():
self.trader.log("平台要求强平", level="ERROR")
break

# 下套利订单
self.place_arbitrage_orders(symbol, spread)

except Exception as e:
self.trader.log(f"扫描{symbol}异常: {e}", level="ERROR")

def analyze_market_trend(self, symbol):
"""分析市场趋势 - 使用K线数据"""
try:
# 获取1小时K线数据,用于趋势分析
result = self.trader.get_kline(
account_id=0,
symbol=symbol,
interval="1h",
limit=24 # 获取24小时数据
)

if "Ok" not in result:
return None

kline_data = result["Ok"]
if not kline_data or not kline_data.get('candles') or len(kline_data['candles']) < 10:
return None

candles = kline_data['candles']

# 计算简单移动平均线
prices = [float(candle['close']) for candle in candles[-10:]] # 最近10个小时的收盘价
sma_10 = sum(prices) / len(prices)

current_price = float(candles[-1]['close'])

# 判断趋势
if current_price > sma_10 * 1.002: # 价格高于均线0.2%
trend = "上涨"
trend_strength = (current_price - sma_10) / sma_10 * 100
elif current_price < sma_10 * 0.998: # 价格低于均线0.2%
trend = "下跌"
trend_strength = (sma_10 - current_price) / sma_10 * 100
else:
trend = "震荡"
trend_strength = 0

# 计算波动率
high_prices = [float(candle['high']) for candle in candles[-24:]]
low_prices = [float(candle['low']) for candle in candles[-24:]]
volatility = (max(high_prices) - min(low_prices)) / min(low_prices) * 100

trend_info = {
"symbol": symbol,
"trend": trend,
"strength": trend_strength,
"volatility": volatility,
"current_price": current_price,
"sma_10": sma_10
}

# 记录趋势信息
self.trader.tlog(
f"趋势分析_{symbol}",
f"{symbol}: {trend} {trend_strength:.2f}%, 波动率: {volatility:.2f}%",
color="blue",
interval=300 # 5分钟记录一次
)

return trend_info

except Exception as e:
self.trader.log(f"分析{symbol}趋势异常: {e}", level="ERROR")
return None

def get_support_resistance_levels(self, symbol):
"""获取支撑阻力位 - 使用K线数据"""
try:
# 获取日线数据用于计算支撑阻力位
result = self.trader.get_kline(
account_id=0,
symbol=symbol,
interval="1d",
limit=30 # 获取30天数据
)

if "Ok" not in result:
return None

kline_data = result["Ok"]
if not kline_data or not kline_data.get('candles') or len(kline_data['candles']) < 20:
return None

candles = kline_data['candles']

# 提取高点和低点
highs = [float(candle['high']) for candle in candles]
lows = [float(candle['low']) for candle in candles]

# 计算支撑位和阻力位(简化算法)
resistance_level = max(highs[-10:]) # 最近10天最高价作为阻力位
support_level = min(lows[-10:]) # 最近10天最低价作为支撑位

current_price = float(candles[-1]['close'])

# 计算距离支撑阻力位的百分比
resistance_distance = (resistance_level - current_price) / current_price * 100
support_distance = (current_price - support_level) / current_price * 100

levels_info = {
"symbol": symbol,
"resistance": resistance_level,
"support": support_level,
"current_price": current_price,
"resistance_distance": resistance_distance,
"support_distance": support_distance
}

self.trader.tlog(
f"支撑阻力_{symbol}",
f"{symbol}: 阻力位 {resistance_level:.2f} (+{resistance_distance:.2f}%), "
f"支撑位 {support_level:.2f} (-{support_distance:.2f}%)",
color="purple",
interval=600 # 10分钟记录一次
)

return levels_info

except Exception as e:
self.trader.log(f"计算{symbol}支撑阻力位异常: {e}", level="ERROR")
return None

def on_stop(self):
"""停止策略"""
self.save_state()
if self.config.get("send_to_web", True):
self.trader.stop_web_client()
self.trader.log("策略已停止", web=self.config.get("send_to_web", True))

def subscribes(self):
"""返回订阅列表 - 使用三种数据源"""
subscriptions = []
if self.has_account:
symbols = self.config.get("symbols", ["BTC_USDT", "ETH_USDT"])

# 1. WebSocket实时订阅 - 高频数据
subscriptions.append({
"account_id": 0,
"event_handle_mode": "Latest", # 高频数据使用Latest模式
"sub": {"SubscribeWs": [
{"Bbo": symbols}, # 实时最佳买卖价
]}
})

# 2. WebSocket实时订阅 - 重要数据
subscriptions.append({
"account_id": 0,
"event_handle_mode": "Sync", # 重要数据使用Sync模式
"sub": {"SubscribeWs": [
{"Order": symbols}, # 订单状态更新
{"Position": symbols}, # 持仓更新
{"Funding": symbols} # 资金费率更新
]}
})

# 订阅次账户信息(如果有)
if len(self.cex_configs) > 1:
subscriptions.append({
"account_id": 1,
"event_handle_mode": "Sync", # 次账户数据使用Sync模式
"sub": {"SubscribeWs": [
{"Order": symbols},
{"Position": symbols}
]}
})

# 2. REST轮询订阅 - 定期数据
subscriptions.append({
"account_id": 0,
"sub": {"SubscribeRest": {
"update_interval": {"secs": 10, "nanos": 0},
"rest_type": "Balance" # 每10秒检查余额
}}
})

# 3. 定时器订阅 - 策略逻辑
subscriptions.extend([
{
"sub": {"SubscribeTimer": {
"update_interval": {"secs": 60, "nanos": 0},
"name": "risk_monitor", # 每分钟风控检查
"initial_delay": {"secs": 30, "nanos": 0} # 延迟30秒启动
}}
},
{
"sub": {"SubscribeTimer": {
"update_interval": {"secs": 300, "nanos": 0},
"name": "stats_update", # 每5分钟统计更新
"initial_delay": {"secs": 45, "nanos": 0} # 延迟45秒启动
}}
}
])

return subscriptions

def on_bbo(self, exchange, symbol, bbo):
"""BBO价格更新回调"""
try:
# 记录最新数据时间
self.last_data_time = time.time()

# 获取买卖价
bid_price = bbo.get('bid_price', 0)
ask_price = bbo.get('ask_price', 0)

if bid_price > 0 and ask_price > 0:
# 计算价差
spread = (ask_price - bid_price) / bid_price * 1000 # 千分比

# 检查套利机会
if spread > self.config.get("min_spread", 1.0):
self.trader.tlog(
f"套利机会_{symbol}",
f"{exchange} {symbol}: 价差 {spread:.2f}‰",
color="green",
interval=30
)

# 检查平台控制状态
if not self.trader.is_web_opening_stopped():
self.place_arbitrage_orders(symbol, spread)

except Exception as e:
self.trader.log(f"BBO处理异常 {symbol}: {e}", level="ERROR")

def on_timer_subscribe(self, timer_name):
"""定时器回调处理"""
try:
if timer_name == "risk_monitor":
self.risk_monitor()
elif timer_name == "stats_update":
self.update_stats()
else:
self.trader.log(f"未知定时器: {timer_name}", level="WARN")
except Exception as e:
self.trader.log(f"定时器{timer_name}异常: {e}", level="ERROR")

def risk_monitor(self):
"""风控监控"""
try:
# 检查总持仓
if hasattr(self, 'positions'):
total_exposure = sum(abs(pos.get('notional_value', 0)) for pos in self.positions.values())
if total_exposure > 50000: # 总敞口超过5万
self.trader.log(f"⚠️ 总敞口过大: {total_exposure}", level="WARN", color="red")

# 检查连接状态
if hasattr(self, 'last_data_time'):
time_since_data = time.time() - self.last_data_time
if time_since_data > 120: # 超过2分钟没有数据
self.trader.log("⚠️ 数据连接可能中断", level="WARN", color="yellow")

except Exception as e:
self.trader.log(f"风控检查异常: {e}", level="ERROR")

def update_stats(self):
"""更新统计数据"""
try:
# 更新表格数据
self.upload_tables()

# 分析市场趋势
for symbol in self.config.get("symbols", []):
trend_info = self.analyze_market_trend(symbol)
if trend_info:
self.trader.tlog(
f"趋势_{symbol}",
f"{symbol}: {trend_info['trend']} {trend_info['strength']:.2f}%",
interval=300,
color="blue"
)

except Exception as e:
self.trader.log(f"统计更新异常: {e}", level="ERROR")

def adjust_strategy_based_on_analysis(self, symbol, trend_info, levels_info):
"""根据技术分析调整策略"""
try:
current_price = trend_info['current_price']
volatility = trend_info['volatility']
trend = trend_info['trend']

# 根据波动率调整最小价差要求
if volatility > 5.0: # 高波动率
adjusted_min_spread = self.config.get("min_spread", 1.0) * 1.5
self.trader.tlog(
f"策略调整_{symbol}",
f"{symbol}: 高波动率{volatility:.2f}%, 调整最小价差至{adjusted_min_spread:.2f}‰",
color="orange",
interval=600
)
elif volatility < 1.0: # 低波动率
adjusted_min_spread = self.config.get("min_spread", 1.0) * 0.8
self.trader.tlog(
f"策略调整_{symbol}",
f"{symbol}: 低波动率{volatility:.2f}%, 调整最小价差至{adjusted_min_spread:.2f}‰",
color="cyan",
interval=600
)

# 根据趋势调整仓位大小
if trend == "上涨" and trend_info['strength'] > 2.0:
self.trader.tlog(
f"趋势信号_{symbol}",
f"{symbol}: 强势上涨{trend_info['strength']:.2f}%, 可考虑增加多头仓位",
color="green",
interval=300
)
elif trend == "下跌" and trend_info['strength'] > 2.0:
self.trader.tlog(
f"趋势信号_{symbol}",
f"{symbol}: 强势下跌{trend_info['strength']:.2f}%, 可考虑增加空头仓位",
color="red",
interval=300
)

except Exception as e:
self.trader.log(f"调整{symbol}策略异常: {e}", level="ERROR")

def check_kline_signals(self, symbol):
"""检查K线信号 - 利用K线完结状态"""
try:
# 获取最新的5分钟K线数据
result = self.trader.get_kline(
account_id=0,
symbol=symbol,
interval="5m",
limit=3 # 获取最近3根K线
)

if "Ok" not in result:
return None

kline_data = result["Ok"]
if not kline_data or not kline_data.get('candles') or len(kline_data['candles']) < 3:
return None

candles = kline_data['candles']

# 检查最新K线是否已完结
latest_candle = candles[-1]
if not latest_candle['confirm']:
# 最新K线未完结,不进行信号判断
return None

# 分析最近3根已完结的K线
prev_candle = candles[-2]
current_candle = candles[-1]

# 检查突破信号(考虑成交量放大)
volume_amplification = current_candle['volume'] / prev_candle['volume'] if prev_candle['volume'] > 0 else 1
if (current_candle['close'] > prev_candle['high'] and volume_amplification > 1.5):

signal = {
"type": "突破信号",
"direction": "看涨",
"price": current_candle['close'],
"volume": current_candle['volume'],
"timestamp": current_candle['timestamp']
}

self.trader.tlog(
f"K线信号_{symbol}",
f"{symbol}: 检测到向上突破信号,价格: {current_candle['close']}, "
f"成交量放大: {volume_amplification:.2f}倍",
color="green",
interval=60
)

return signal

# 检查下跌信号
elif (current_candle['close'] < prev_candle['low'] and volume_amplification > 1.5):

signal = {
"type": "突破信号",
"direction": "看跌",
"price": current_candle['close'],
"volume": current_candle['volume'],
"timestamp": current_candle['timestamp']
}

self.trader.tlog(
f"K线信号_{symbol}",
f"{symbol}: 检测到向下突破信号,价格: {current_candle['close']}, "
f"成交量放大: {volume_amplification:.2f}倍",
color="red",
interval=60
)

return signal

return None

except Exception as e:
self.trader.log(f"检查{symbol}K线信号异常: {e}", level="ERROR")
return None

def on_order(self, account_id, context, order):
"""订单回调处理"""
# 处理订单状态更新
symbol = order.get('symbol')
status = order.get('status')
side = order.get('side')

if status == 'Filled':
self.trader.log(f"订单成交: {symbol} {side}", color="green")

# 更新持仓信息到Web平台
self.update_position_display()

def on_position(self, account_id, positions):
"""持仓回调处理

参数:
account_id: 账户ID
positions: 持仓数据列表,每个元素的数据结构参见[7.2 Position(持仓)数据结构](#72-position持仓数据结构)
"""
# 更新持仓信息
self.update_position_display()

def on_balance(self, exchange, balances):
"""余额轮询回调处理"""
try:
for balance in balances:
asset = balance.get('asset')
available = balance.get('available_balance', 0)
total = balance.get('balance', 0)

if asset == 'USDT' and total > 0:
# 更新主要账户余额
if exchange == self.cex_configs[0].get('exchange'):
self.primary_balance = total
elif len(self.cex_configs) > 1 and exchange == self.cex_configs[1].get('exchange'):
self.secondary_balance = total

# 余额风控检查
if available < total * 0.1: # 可用余额不足10%
self.trader.log(f"⚠️ {asset} 可用余额不足: {available}/{total}", level="WARN")

except Exception as e:
self.trader.log(f"余额处理异常: {e}", level="ERROR")

def update_position_display(self):
"""更新持仓显示"""
try:
# 获取所有持仓
primary_result = self.trader.get_positions(0)
primary_positions = primary_result["Ok"] if "Ok" in primary_result else []

secondary_positions = []
if len(self.cex_configs) > 1:
secondary_result = self.trader.get_positions(1)
secondary_positions = secondary_result["Ok"] if "Ok" in secondary_result else []

# 计算持仓价值
total_value = 0
long_value = 0
short_value = 0

for pos in primary_positions + secondary_positions:
value = float(pos.get('notional_value', 0))
side = pos.get('side', '')

total_value += abs(value)
if side.lower() == 'long':
long_value += value
else:
short_value += abs(value)

# 更新到Web平台
self.trader.update_current_position_value(total_value, long_value, short_value)

except Exception as e:
self.trader.log(f"更新持仓显示失败: {e}", level="ERROR")