ETH/BTC统计套利策略 - 支持同时多头和空头 基于Z-score进行均值回归交易
Timeframe
15m
Direction
Long Only
Stoploss
-2.0%
Trailing Stop
No
ROI
0m: 10.0%, 1440m: 5.0%, 2880m: 2.0%, 4320m: 1.0%
Interface Version
3
Startup Candles
N/A
Indicators
1
freqtrade/freqtrade-strategies
freqtrade/freqtrade-strategies
Sample strategy implementing Informative Pairs - compares stake_currency with USDT. Not performing very well - but should serve as an example how to use a referential pair against USDT. author@: xmatthias github@: https://github.com/freqtrade/freqtrade-strategies
freqtrade/freqtrade-strategies
# pragma pylint: disable=missing-docstring, invalid-name, pointless-string-statement
# flake8: noqa: F401
# isort: skip_file
# --- Do not remove these imports ---
import numpy as np
import pandas as pd
from datetime import datetime, timedelta, timezone
from pandas import DataFrame
from typing import Dict, Optional, Union, Tuple, List
import logging
from freqtrade.strategy import (
IStrategy,
Trade,
Order,
PairLocks,
informative,
BooleanParameter,
CategoricalParameter,
DecimalParameter,
IntParameter,
RealParameter,
timeframe_to_minutes,
timeframe_to_next_date,
timeframe_to_prev_date,
merge_informative_pair,
stoploss_from_absolute,
stoploss_from_open,
AnnotationType,
)
# --------------------------------
# 添加自定义库
import talib.abstract as ta
from technical import qtpylib
logger = logging.getLogger(__name__)
logger.setLevel(logging.ERROR)
class StatisticalArbitrageStrategyDeep(IStrategy):
"""
ETH/BTC统计套利策略 - 支持同时多头和空头
基于Z-score进行均值回归交易
"""
# 策略接口版本
INTERFACE_VERSION = 3
# 最优时间框架 - 根据原策略设置为15分钟
timeframe = "15m"
# 允许做空
can_short: bool = True
# 最小ROI - 可以根据需要进行调整
minimal_roi = {
"0": 0.10, # 如果达到10%的利润就立即退出
"1440": 0.05, # 24小时后减少到5%
"2880": 0.02, # 48小时后减少到2%
"4320": 0.01, # 72小时后减少到1%
}
# 止损
stoploss = -0.02
# 追踪止损
trailing_stop = False
trailing_only_offset_is_reached = False
trailing_stop_positive = 0.01
trailing_stop_positive_offset = 0.02
# 只处理新蜡烛
process_only_new_candles = True
# 使用退出信号
use_exit_signal = True
exit_profit_only = False
ignore_roi_if_entry_signal = False
# 策略需要的启动蜡烛数(基于回看周期)
startup_candle_count: int = 200
# 策略参数(可以在超参数优化中调整)
lookback_period = IntParameter(50, 200, default=96, space="buy")
entry_threshold = DecimalParameter(1.5, 3.0, default=2.0, decimals=1, space="buy")
exit_threshold = DecimalParameter(0.2, 1.0, default=0.5, decimals=1, space="sell")
use_ema = BooleanParameter(default=False, space="buy")
# 允许同时持有多头和空头的参数
allow_simultaneous_positions = BooleanParameter(default=True, space="protection")
max_simultaneous_long = IntParameter(1, 5, default=5, space="protection")
max_simultaneous_short = IntParameter(1, 5, default=5, space="protection")
# 风险控制参数
position_adjustment_enable = True
max_entry_position_adjustment = 10
# 订单类型
order_types = {
"entry": "limit",
"exit": "limit",
"stoploss": "market",
"stoploss_on_exchange": False
}
# 订单有效时间
order_time_in_force = {
"entry": "GTC",
"exit": "GTC"
}
def __init__(self, config: dict) -> None:
"""
初始化策略
"""
super().__init__(config)
# 用于跟踪同时持仓的状态
self.active_long_trades = []
self.active_short_trades = []
# 确保max_open_trades足够大以支持同时持仓
if config.get('max_open_trades', 1) < 2:
logger.warning("max_open_trades should be >= 2 for simultaneous long/short positions")
@property
def plot_config(self):
"""
配置策略可视化
"""
return {
"main_plot": {
"close": {"color": "blue", "fill_to": "lower_band"},
"mean": {"color": "orange"},
"upper_band": {"color": "red"},
"lower_band": {"color": "green"},
},
"subplots": {
"Z-Score": {
"zscore": {"color": "purple"},
"entry_threshold_upper": {"color": "red"},
"entry_threshold_lower": {"color": "green"},
"exit_threshold_upper": {"color": "blue"},
"exit_threshold_lower": {"color": "yellow"},
},
"Position": {
"position": {"color": "blue", "type": "bar"}
}
}
}
def informative_pairs(self):
"""
如果需要额外的信息性交易对,可以在这里定义
"""
return []
def calculate_zscore(self, dataframe, price_col='close'):
"""
计算Z-score(价格标准化)
"""
lookback = self.lookback_period.value
use_ema = self.use_ema.value
if use_ema:
# 使用EMA(对近期数据给予更高权重)
half_life = lookback / 2
alpha = 1 - np.exp(np.log(0.5) / half_life)
mean = dataframe[price_col].ewm(alpha=alpha).mean()
std = dataframe[price_col].ewm(alpha=alpha).std()
else:
# 使用简单移动平均
mean = dataframe[price_col].rolling(window=lookback, min_periods=1).mean()
std = dataframe[price_col].rolling(window=lookback, min_periods=1).std()
# 避免除以零
std = std.replace(0, np.nan).fillna(method='ffill').fillna(0.0001)
zscore = (dataframe[price_col] - mean) / std
return zscore, mean, std
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""
添加指标到DataFrame
"""
# 计算Z-score和相关指标
zscore, mean, std = self.calculate_zscore(dataframe)
# 将计算结果添加到dataframe
dataframe['zscore'] = zscore
dataframe['mean'] = mean
dataframe['std'] = std
# 计算布林带(基于Z-score阈值)
entry_threshold = self.entry_threshold.value
exit_threshold = self.exit_threshold.value
dataframe['entry_threshold_upper'] = entry_threshold
dataframe['entry_threshold_lower'] = -entry_threshold
dataframe['exit_threshold_upper'] = exit_threshold
dataframe['exit_threshold_lower'] = -exit_threshold
dataframe['upper_band'] = dataframe['mean'] + entry_threshold * dataframe['std']
dataframe['lower_band'] = dataframe['mean'] - entry_threshold * dataframe['std']
dataframe['upper_exit'] = dataframe['mean'] + exit_threshold * dataframe['std']
dataframe['lower_exit'] = dataframe['mean'] - exit_threshold * dataframe['std']
# 添加用于决策的指标
dataframe['zscore_signal'] = 0
# 计算信号强度(用于仓位大小决策)
dataframe['signal_strength'] = abs(dataframe['zscore']) / entry_threshold
return dataframe
def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""
生成入场信号 - 允许同时持有多头和空头
"""
entry_threshold = self.entry_threshold.value
# 初始化信号列
dataframe.loc[:, 'enter_long'] = 0
dataframe.loc[:, 'enter_short'] = 0
# 计算Z-score的差值用于判断交叉
dataframe['zscore_prev'] = dataframe['zscore'].shift(1)
# 多头入场条件:Z-score从上方穿过负阈值
long_condition = (
(dataframe['zscore_prev'] >= -entry_threshold) &
(dataframe['zscore'] < -entry_threshold) &
(dataframe['volume'] > 0) &
(dataframe['signal_strength'] > 1.0) # 确保信号强度足够
)
# 空头入场条件:Z-score从下方穿过正阈值
short_condition = (
(dataframe['zscore_prev'] <= entry_threshold) &
(dataframe['zscore'] > entry_threshold) &
(dataframe['volume'] > 0) &
(dataframe['signal_strength'] > 1.0) # 确保信号强度足够
)
# 应用入场信号
dataframe.loc[long_condition, 'enter_long'] = 1
dataframe.loc[short_condition, 'enter_short'] = 1
# 添加入场原因和信号强度
dataframe.loc[long_condition, 'enter_tag'] = 'zscore_buy'
dataframe.loc[short_condition, 'enter_tag'] = 'zscore_sell'
return dataframe
def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""
生成退出信号 - 分别处理多头和空头退出
"""
exit_threshold = self.exit_threshold.value
# 初始化退出列
dataframe.loc[:, 'exit_long'] = 0
dataframe.loc[:, 'exit_short'] = 0
# 多头退出条件:Z-score回归到退出阈值内(从负值变为大于-exit_threshold)
long_exit_condition = (
(dataframe['zscore'] > -exit_threshold) &
(dataframe['zscore'].shift(1) <= -exit_threshold) &
(dataframe['volume'] > 0)
)
# 空头退出条件:Z-score回归到退出阈值内(从正值变为小于exit_threshold)
short_exit_condition = (
(dataframe['zscore'] < exit_threshold) &
(dataframe['zscore'].shift(1) >= exit_threshold) &
(dataframe['volume'] > 0)
)
# 应用退出信号
dataframe.loc[long_exit_condition, 'exit_long'] = 1
dataframe.loc[short_exit_condition, 'exit_short'] = 1
return dataframe
def custom_stake_amount(self, pair: str, current_time: datetime, current_rate: float,
proposed_stake: float, min_stake: Optional[float], max_stake: float,
leverage: float, entry_tag: Optional[str], side: str,
**kwargs) -> float:
"""
自定义仓位大小
可以根据Z-score的偏离程度调整仓位大小
"""
# 获取最新的Z-score
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
if len(dataframe) > 0:
latest_zscore = dataframe['zscore'].iloc[-1]
# Z-score偏离越大,仓位可以相对更大(但不要超过最大仓位限制)
zscore_abs = abs(latest_zscore)
entry_threshold = self.entry_threshold.value
# 根据Z-score偏离程度调整仓位
if zscore_abs > entry_threshold * 1.5:
# Z-score偏离很大,可以使用较大仓位
stake_multiplier = min(1.5, 0.8 + (zscore_abs / entry_threshold) * 0.2)
elif zscore_abs > entry_threshold:
# Z-score偏离适中
stake_multiplier = 1.0
else:
# Z-score偏离较小,减少仓位
stake_multiplier = 0.7
adjusted_stake = proposed_stake * stake_multiplier
# 确保在最小和最大仓位限制内
if min_stake is not None:
adjusted_stake = max(adjusted_stake, min_stake)
adjusted_stake = min(adjusted_stake, max_stake)
return adjusted_stake
return proposed_stake
def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float,
time_in_force: str, current_time: datetime, entry_tag: Optional[str],
side: str, **kwargs) -> bool:
"""
在交易入场前进行确认
"""
# 获取最新的Z-score
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
if len(dataframe) > 0:
latest_zscore = dataframe['zscore'].iloc[-1]
entry_threshold = self.entry_threshold.value
# 检查当前Z-score是否仍然符合入场条件
if side == 'long' and latest_zscore >= -entry_threshold:
# Z-score已经回归,取消买入
logger.info(f"Cancelling long entry: Z-score ({latest_zscore:.2f}) above -{entry_threshold}")
return False
elif side == 'short' and latest_zscore <= entry_threshold:
# Z-score已经回归,取消卖出
logger.info(f"Cancelling short entry: Z-score ({latest_zscore:.2f}) below {entry_threshold}")
return False
# 检查是否允许同时持仓
allow_simultaneous = self.allow_simultaneous_positions.value
if not allow_simultaneous:
# 如果不允许同时持仓,检查是否有相反方向的持仓
trades = Trade.get_trades_proxy(pair=pair, is_open=True)
for trade in trades:
if side == 'long' and trade.is_short:
logger.info("Already have short position, skipping long entry")
return False
elif side == 'short' and not trade.is_short:
logger.info("Already have long position, skipping short entry")
return False
# 检查同时持仓数量限制
if side == 'long':
active_longs = [t for t in Trade.get_trades_proxy(pair=pair, is_open=True) if not t.is_short]
if len(active_longs) >= self.max_simultaneous_long.value:
logger.info(f"Maximum long positions ({self.max_simultaneous_long.value}) reached")
return False
else: # short
active_shorts = [t for t in Trade.get_trades_proxy(pair=pair, is_open=True) if t.is_short]
if len(active_shorts) >= self.max_simultaneous_short.value:
logger.info(f"Maximum short positions ({self.max_simultaneous_short.value}) reached")
return False
return True
def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float,
rate: float, time_in_force: str, exit_reason: str,
current_time: datetime, **kwargs) -> bool:
"""
在交易退出前进行确认
"""
# 获取最新的Z-score
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
if len(dataframe) > 0:
latest_zscore = dataframe['zscore'].iloc[-1]
exit_threshold = self.exit_threshold.value
# 检查退出条件是否仍然有效
if not trade.is_short and latest_zscore <= -exit_threshold:
# Z-score仍然低于退出阈值,可能应该保持持仓
logger.info(f"Z-score ({latest_zscore:.2f}) still below -{exit_threshold}, reconsidering exit")
# 这里可以选择返回False以取消退出,但为了安全还是允许退出
elif trade.is_short and latest_zscore >= exit_threshold:
# Z-score仍然高于退出阈值,可能应该保持持仓
logger.info(f"Z-score ({latest_zscore:.2f}) still above {exit_threshold}, reconsidering exit")
return True
def custom_exit(self, pair: str, trade: Trade, current_time: datetime, current_rate: float,
current_profit: float, **kwargs) -> Optional[Union[str, bool]]:
"""
自定义退出逻辑
可以基于Z-score或利润目标提前退出
"""
# 获取最新的Z-score
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
if len(dataframe) > 0:
latest_zscore = dataframe['zscore'].iloc[-1]
exit_threshold = self.exit_threshold.value
# 根据持仓方向检查退出条件
if not trade.is_short:
# 多头退出条件
if latest_zscore > -exit_threshold:
return "zscore_converged_long"
# 如果利润足够大且Z-score开始回升,可以考虑退出
if current_profit > 0.03 and len(dataframe) > 1:
prev_zscore = dataframe['zscore'].iloc[-2]
if latest_zscore > prev_zscore: # Z-score开始上升
return "profit_and_zscore_rising_long"
elif trade.is_short:
# 空头退出条件
if latest_zscore < exit_threshold:
return "zscore_converged_short"
# 如果利润足够大且Z-score开始下降,可以考虑退出
if current_profit > 0.03 and len(dataframe) > 1:
prev_zscore = dataframe['zscore'].iloc[-2]
if latest_zscore < prev_zscore: # Z-score开始下降
return "profit_and_zscore_falling_short"
# 紧急止损:如果Z-score偏离过大
entry_threshold = self.entry_threshold.value
if abs(latest_zscore) > entry_threshold * 2.5:
return "emergency_stop_extreme_zscore"
# 时间止损:持仓时间过长
if (current_time - trade.open_date_utc).days > 7:
return "time_stop_7_days"
return None
def adjust_trade_position(self, trade: Trade, current_time: datetime,
current_rate: float, current_profit: float,
min_stake: float, max_stake: float,
**kwargs) -> Optional[float]:
"""
调整交易仓位(加仓/减仓)
允许对同时持仓的多头和空头分别加仓
"""
dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe)
if len(dataframe) > 0:
latest_zscore = dataframe['zscore'].iloc[-1]
entry_threshold = self.entry_threshold.value
# 确保当前仓位不超过最大调整次数
if trade.nr_of_successful_entries < self.max_entry_position_adjustment:
# 如果是多头且Z-score继续向下偏离
if not trade.is_short and latest_zscore < -entry_threshold * 1.3:
# 计算加仓金额(基于偏离程度)
deviation_ratio = abs(latest_zscore) / entry_threshold
add_size = min(0.5, 0.3 * deviation_ratio) # 最多加仓50%
# 计算加仓金额
add_amount = trade.stake_amount * add_size / current_rate
logger.info(f"Adding to long position: Z-score={latest_zscore:.2f}, adding {add_size*100:.1f}%")
return add_amount
# 如果是空头且Z-score继续向上偏离
elif trade.is_short and latest_zscore > entry_threshold * 1.3:
# 计算加仓金额(基于偏离程度)
deviation_ratio = abs(latest_zscore) / entry_threshold
add_size = min(0.5, 0.3 * deviation_ratio) # 最多加仓50%
# 计算加仓金额
add_amount = trade.stake_amount * add_size / current_rate
logger.info(f"Adding to short position: Z-score={latest_zscore:.2f}, adding {add_size*100:.1f}%")
return add_amount
# 减仓逻辑:如果Z-score开始回归且利润为正
if current_profit > 0.02:
if trade.is_long and latest_zscore > -entry_threshold * 0.8:
# 多头开始回归,减仓25%
reduce_amount = trade.amount * 0.25
logger.info(f"Reducing long position: profit={current_profit*100:.1f}%, Z-score={latest_zscore:.2f}")
return -reduce_amount
elif trade.is_short and latest_zscore < entry_threshold * 0.8:
# 空头开始回归,减仓25%
reduce_amount = trade.amount * 0.25
logger.info(f"Reducing short position: profit={current_profit*100:.1f}%, Z-score={latest_zscore:.2f}")
return -reduce_amount
return None
def leverage(self, pair: str, current_time: datetime, current_rate: float,
proposed_leverage: float, max_leverage: float, entry_tag: Optional[str],
side: str, **kwargs) -> float:
"""
设置杠杆
对于同时持仓的多头和空头,使用保守的杠杆
"""
# 获取持仓数量
open_trades = Trade.get_trades_proxy(pair=pair, is_open=True)
if open_trades:
# 如果已经有持仓,使用更低的杠杆
base_leverage = min(proposed_leverage, max_leverage)
# 根据持仓数量调整杠杆
position_count = len(open_trades)
leverage_multiplier = 1.0 / (1 + 0.2 * position_count) # 每多一个持仓,杠杆降低20%
adjusted_leverage = base_leverage * leverage_multiplier
logger.info(f"Adjusting leverage: {base_leverage:.1f}x -> {adjusted_leverage:.1f}x "
f"(positions: {position_count})")
return adjusted_leverage
return min(proposed_leverage, max_leverage)