Daily top/bottom basket strategy driven by precomputed XGBoost predictions produced by alpha101/futures_ml/run_pipeline.py.
Timeframe
3m
Direction
Long & Short
Stoploss
-3.0%
Trailing Stop
No
ROI
0m: 3.0%
Interface Version
3
Startup Candles
48
Indicators
0
davidzr/freqtrade-strategies
BB_RPB_TSL @author jilv220 Simple bollinger brand strategy inspired by this blog ( https://hacks-for-life.blogspot.com/2020/12/freqtrade-notes.html ) RPB, which stands for Real Pull Back, taken from ( https://github.com/GeorgeMurAlkh/freqtrade-stuff/blob/main/user_data/strategies/TheRealPullbackV2.py ) The trailing custom stoploss taken from BigZ04_TSL from Perkmeister ( modded by ilya ) I modified it to better suit my taste and added Hyperopt for this strategy.
TheoBrigitte/freqtrade
This strategy is based on research work by Sergey Malchevskiy and uses Renko bricks to identify trends. The brick size is optimized using ATR to find out boundaries from an informative period. For more information on the brick size optimization, see the following article: https://towardsdatascience.com/renko-brick-size-optimization-34d64400f60e
keithorange/HUGE_FreqTrade_Strategy_Collection
BB_RPB_TSL @author jilv220 Simple bollinger brand strategy inspired by this blog ( https://hacks-for-life.blogspot.com/2020/12/freqtrade-notes.html ) RPB, which stands for Real Pull Back, taken from ( https://github.com/GeorgeMurAlkh/freqtrade-stuff/blob/main/user_data/strategies/TheRealPullbackV2.py ) The trailing custom stoploss taken from BigZ04_TSL from Perkmeister ( modded by ilya ) I modified it to better suit my taste and added Hyperopt for this strategy.
# pragma pylint: disable=missing-docstring, invalid-name
# flake8: noqa
from __future__ import annotations
from pathlib import Path
import sys
from typing import Dict, Set, Tuple
import pandas as pd
from freqtrade.strategy import IStrategy
from freqtrade.persistence import Trade
import logging
from typing import Optional
from datetime import datetime
import math
_PROJECT_ROOT = Path(__file__).resolve().parents[2]
if str(_PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(_PROJECT_ROOT))
from alpha101.futures_ml.data import build_pair_dataset, clean_nan_targets
from alpha101.futures_ml.training import get_feature_columns, walk_forward_predict
logger = logging.getLogger(__name__)
class ML1hBasketStrategy(IStrategy):
"""
Daily top/bottom basket strategy driven by precomputed XGBoost predictions
produced by alpha101/futures_ml/run_pipeline.py.
At each UTC midnight (first candle of the day on timeframe), go long the
top N pairs and short the bottom N pairs ranked by predicted_return.
Positions share symmetric TP/SL and are force-closed at the end of the day
if no stop is hit.
"""
INTERFACE_VERSION = 3
timeframe = "3m"
# Prediction configuration (1h model)
prediction_timeframe = "4h"
process_only_new_candles = True
can_short = True
startup_candle_count = 48
leverage_value = 5.0
# Per-position guards (kept but basket TP/SL will dominate)
minimal_roi = {"0": 0.03*leverage_value}
stoploss = -0.03*leverage_value
trailing_stop = False
# Basket-level TP/SL
basket_tp = 0.01*leverage_value
basket_sl = -0.01*leverage_value
use_basket_pnl = True
trade_horizon_bars = 1
retrain_every_bars = 1
lookback_bars = 8
min_train_bars = 90
max_train_bars = 365
train_split = 0.9
use_dual_model = True
top_n_long = 5
top_n_short = 5
# position_adjustment_enable = False
# max_entry_position_adjustment = 6
unfilledtimeout = {
'entry': 1,
'exit': 1,
'unit': 'minutes'
}
def __init__(self, config: dict) -> None:
super().__init__(config)
self.daily_longs: Dict[pd.Timestamp, Set[str]] = {}
self.daily_shorts: Dict[pd.Timestamp, Set[str]] = {}
# session_positions: session_date -> {pair_key: (direction, open_price)}
self.session_positions: Dict[pd.Timestamp, Dict[str, Tuple[str, float]]] = {}
self._last_pred_session: pd.Timestamp | None = None
# print(self.daily_longs)
# print(self.daily_shorts)
@staticmethod
def _pair_key(pair: str) -> str:
# Align freqtrade pair (e.g., BTC/USDT:USDT) with prediction symbols (BTC_USDT_USDT)
return pair.replace("/", "_").replace(":", "_")
def informative_pairs(self):
pairs = self.dp.current_whitelist()
return [(pair, self.prediction_timeframe) for pair in pairs]
def _tf_to_timedelta(self, timeframe: str) -> pd.Timedelta:
tf = timeframe.strip().lower()
if tf.endswith("m"):
return pd.Timedelta(minutes=int(tf[:-1]))
if tf.endswith("h"):
return pd.Timedelta(hours=int(tf[:-1]))
if tf.endswith("d"):
return pd.Timedelta(days=int(tf[:-1]))
raise ValueError(f"Unsupported timeframe: {timeframe}")
def _to_utc(self, ts: pd.Timestamp) -> pd.Timestamp:
base = pd.Timestamp(ts)
return base.tz_convert("UTC") if base.tzinfo else base.tz_localize("UTC")
def _floor_ts(self, ts: pd.Timestamp, timeframe: str) -> pd.Timestamp:
delta = self._tf_to_timedelta(timeframe)
base = self._to_utc(ts)
floored = (base.value // delta.value) * delta.value
return pd.to_datetime(floored, utc=True)
def leverage(self, pair, current_time, current_rate, proposed_leverage, max_leverage, entry_tag, side, **kwargs):
return self.leverage_value
def _floor_series(self, series: pd.Series, timeframe: str) -> pd.Series:
delta = self._tf_to_timedelta(timeframe)
dt = pd.to_datetime(series, utc=True)
floored = (dt.astype("int64") // delta.value) * delta.value
return pd.to_datetime(floored, utc=True)
def _session_key(self, ts: pd.Timestamp) -> pd.Timestamp:
return self._floor_ts(ts, self.prediction_timeframe)
def _build_full_dataset_from_dp(self, train_start_date) -> pd.DataFrame:
pairs = self.dp.current_whitelist()
frames = []
for pair in pairs:
df = self.dp.get_pair_dataframe(pair, self.prediction_timeframe)
if df is None or df.empty:
continue
daily_df = df.copy()
if "date" not in daily_df.columns:
continue
daily_df["date"] = pd.to_datetime(daily_df["date"], utc=True)
daily_df = daily_df.sort_values("date").reset_index(drop=True)
daily_df = daily_df[daily_df["date"] >= train_start_date]
symbol = self._pair_key(pair)
frames.append(
build_pair_dataset(
daily_df,
symbol,
self.lookback_bars,
self.prediction_timeframe,
self.trade_horizon_bars,
)
)
if not frames:
return pd.DataFrame()
full = pd.concat(frames, ignore_index=True)
full = full.sort_values(["asof_date", "symbol"]).reset_index(drop=True)
alpha_cols = [c for c in full.columns if c.startswith("alpha")]
if alpha_cols:
full[alpha_cols] = full.groupby("asof_date")[alpha_cols].transform(
lambda g: (g - g.mean()) / (g.std() + 1e-12)
)
full, _ = clean_nan_targets(full, self.retrain_every_bars)
# if self.max_train_bars is not None:
# dates = sorted(full["asof_date"].unique())
# keep = self.max_train_bars + self.retrain_every_bars
# if len(dates) > keep:
# keep_dates = set(dates[-keep:])
# full = full[full["asof_date"].isin(keep_dates)].reset_index(drop=True)
return full
def _refresh_predictions(self, now: pd.Timestamp) -> None:
logger.info("Refreshing ML predictions at %s", now)
train_start_date = now - self._tf_to_timedelta(self.prediction_timeframe) * (self.lookback_bars + self.max_train_bars + 200)
dataset = self._build_full_dataset_from_dp(train_start_date)
if dataset.empty:
logger.warning("No dataset available for prediction")
return
feature_cols = get_feature_columns(dataset)
if not feature_cols:
logger.warning("No feature columns available for prediction")
return
test_end_dt = self._to_utc(now) + self._tf_to_timedelta(self.prediction_timeframe) * self.retrain_every_bars * 2
test_start_dt = test_end_dt - self._tf_to_timedelta(self.prediction_timeframe) * self.retrain_every_bars * 6
if len(dataset["asof_date"].unique()) < self.min_train_bars:
logger.warning("Not enough data for walk-forward prediction. Need at least %d bars. But only have %d bars.", self.min_train_bars, len(dataset["asof_date"].unique()))
return
try:
logger.info("Refreshing predictions from %s to %s", test_start_dt, test_end_dt)
logger.info(f"Dataset rows: {len(dataset)}, feature_cols: {feature_cols}")
preds = walk_forward_predict(
dataset,
feature_cols,
retrain_every_days=self.retrain_every_bars,
min_train_days=self.min_train_bars,
max_train_days=self.max_train_bars,
train_split=self.train_split,
use_dual_model=self.use_dual_model,
test_start_date=test_start_dt,
test_end_date=test_end_dt,
top_n_long=self.top_n_long,
)
except ValueError as exc:
logger.warning("Walk-forward failed: %s", exc)
return
future_pred = preds[preds["target"].isna()]
if future_pred.empty:
logger.warning("No future predictions available")
return
latest_trade_date = self._to_utc(future_pred["trade_date"].max())
latest_pred = future_pred[future_pred["trade_date"] == latest_trade_date]
logger.info(f"{latest_pred[['asof_date', 'trade_date', 'symbol', 'predicted_long', 'predicted_short']]}")
if latest_pred.empty:
return
long_candidates = latest_pred.nlargest(self.top_n_long, "predicted_long")
short_candidates = latest_pred.nlargest(self.top_n_short, "predicted_short")
duplicate_pairs = set(long_candidates["symbol"].tolist()) & set(short_candidates["symbol"].tolist())
self.daily_longs[latest_trade_date] = set(long_candidates["symbol"].tolist()) - duplicate_pairs
self.daily_shorts[latest_trade_date] = set(short_candidates["symbol"].tolist()) - duplicate_pairs
logger.info(
"Refreshed predictions at %s: longs=%s shorts=%s duplicates=%s",
latest_trade_date,
",".join(long_candidates["symbol"].tolist()),
",".join(short_candidates["symbol"].tolist()),
",".join(duplicate_pairs),
)
def _maybe_refresh_predictions(self, dataframe: pd.DataFrame) -> None:
# logger.info("Maybe refresh predictions called at dataframe with last date %s", dataframe["date"].max() if not dataframe.empty else "N/A")
if dataframe.empty:
return
# now = self._to_utc(dataframe.iloc[-1]["date"])
now = pd.Timestamp.now(tz="UTC")
session = self._session_key(now)
# logger.info("Current session: %s, last session: %s", session, self._last_pred_session)
if self._last_pred_session is not None and self._last_pred_session >= session:
return
# if not self._is_session_open_bar(now):
# return
self._refresh_predictions(now)
self._last_pred_session = session
def _safe_pair_name(symbol: str) -> str:
return symbol.replace("/", "_").replace(":", "_")
def _is_long_signal(self, pair: str, date: pd.Timestamp) -> bool:
session_key = self._session_key(date)
time_condition = date >= session_key and date <= session_key + self._tf_to_timedelta("6m")
return time_condition and session_key in self.daily_longs and self._pair_key(pair) in self.daily_longs[session_key]
def _is_short_signal(self, pair: str, date: pd.Timestamp) -> bool:
session_key = self._session_key(date)
time_condition = date >= session_key and date <= session_key + self._tf_to_timedelta("15m")
return time_condition and session_key in self.daily_shorts and self._pair_key(pair) in self.daily_shorts[session_key]
def populate_indicators(self, dataframe: pd.DataFrame, metadata: dict) -> pd.DataFrame:
self._maybe_refresh_predictions(dataframe)
if dataframe.empty:
return dataframe
pair_key = self._pair_key(metadata["pair"])
dataframe["signal_long"] = dataframe["date"].apply(lambda d: 1 if self._is_long_signal(pair_key, d) else 0)
dataframe["signal_short"] = dataframe["date"].apply(lambda d: 1 if self._is_short_signal(pair_key, d) else 0)
return dataframe
def populate_entry_trend(self, dataframe: pd.DataFrame, metadata: dict) -> pd.DataFrame:
self._maybe_refresh_predictions(dataframe)
pair_key = self._pair_key(metadata["pair"])
# logger.info(f"last session keys: {dataframe['date'].tail(1)}")
dataframe["signal_long"] = dataframe["date"].apply(lambda d: 1 if self._is_long_signal(pair_key, d) else 0)
dataframe["signal_short"] = dataframe["date"].apply(lambda d: 1 if self._is_short_signal(pair_key, d) else 0)
# logger.info(f"tail 2: \n{dataframe[['date', 'signal_long', 'signal_short']].tail(2)}")
# logger.info(f"long pairs: {self.daily_longs}")
entry_long = (dataframe["signal_long"] == 1)
entry_short = (dataframe["signal_short"] == 1)
dataframe.loc[entry_long, "enter_long"] = 1
if self.can_short:
dataframe.loc[entry_short, "enter_short"] = 1
# Record session entries for basket PnL estimation (works in backtesting too)
# self._record_session_entries(dataframe, pair_key, entry_long, entry_short)
return dataframe
def populate_exit_trend(self, dataframe: pd.DataFrame, metadata: dict) -> pd.DataFrame:
# Exits are handled by ROI/SL/custom_exit
dataframe["exit_long"] = 0
dataframe["exit_short"] = 0
return dataframe
def custom_exit(self, pair: str, trade, current_time, current_rate, current_profit, **kwargs):
# Basket-level TP/SL: evaluate all open trades from the same session (UTC date)
# if self.use_basket_pnl:
# try:
# basket_pnl = self._basket_pnl(current_time)
# except Exception as exc: # pragma: no cover - defensive
# logger.warning("basket_pnl_calc_failed: %s", exc)
# basket_pnl = None
# if basket_pnl is not None:
# if basket_pnl >= self.basket_tp:
# return "basket_tp"
# if basket_pnl <= self.basket_sl:
# return "basket_sl"
# 提前1分钟平仓,避免因时间对齐问题错过平仓时机
filled_entries = trade.select_filled_orders(trade.entry_side)
if len(filled_entries) == 0:
return None
# 找到当前交易的开仓时间
session_open_time = self._session_key(trade.open_date_utc)
session_end_time = session_open_time + self._tf_to_timedelta(self.prediction_timeframe) * self.trade_horizon_bars
exit_sign = self._to_utc(current_time) > session_end_time
# 如果下一个session继续买入,则不强制平仓
if trade.entry_side == "long":
exit_sign = exit_sign and not self._is_long_signal(pair, current_time)
elif trade.entry_side == "short":
exit_sign = exit_sign and not self._is_short_signal(pair, current_time)
# logger.info(f"for pair: {trade.pair}, open_date_utc: {trade.open_date_utc} session_end: {session_end}, exit_sign: {exit_sign}")
if exit_sign:
logger.info(f"Force flat for {trade.pair} at {current_time} (session_end={session_end_time})")
return "session_end"
return None
# -----------------------------
# Helpers
# -----------------------------
def _latest_close(self, pair: str) -> float:
df, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
if df.empty:
return None
return float(df.iloc[-1]["close"])
def _basket_pnl(self, current_time: pd.Timestamp) -> float:
session = self._session_key(current_time)
runmode = 'backtesting'
# Prefer ORM trades in live/dry-run
if runmode not in {"backtesting", "hyperopt", "edge"}:
trades = Trade.get_trades([Trade.is_open.is_(True)])
rets = []
for t in trades:
if self._session_key(t.open_date_utc) != session:
continue
last_close = self._latest_close(t.pair)
if last_close is None:
continue
rets.append(t.calc_profit_ratio(last_close))
if rets:
return float(pd.Series(rets).mean())
# Fallback for backtesting/hyperopt using recorded session entries
positions = self.session_positions.get(session)
if not positions:
return None
rets = []
for pair_key, (direction, open_price) in positions.items():
pair_ft = pair_key.replace("_", "/", 1).replace("_", ":", 1).replace("_", "/")
last_close = self._latest_close(pair_ft)
if last_close is None or open_price <= 0:
continue
r = (last_close - open_price) / open_price
if direction == "short":
r *= -1
rets.append(r)
if not rets:
return None
return float(pd.Series(rets).mean())
# def _record_session_entries(self, dataframe: pd.DataFrame, pair_key: str, entry_long: pd.Series, entry_short: pd.Series) -> None:
# # Find first session-open bar where we enter
# for direction, mask in (("long", entry_long), ("short", entry_short)):
# if not mask.any():
# continue
# first_idx = mask.idxmax()
# open_price = float(dataframe.loc[first_idx, "open"])
# session = dataframe.loc[first_idx, "session_key"]
# session_key = pd.Timestamp(session).tz_localize("UTC") if pd.Timestamp(session).tzinfo is None else pd.Timestamp(session).tz_convert("UTC")
# self.session_positions.setdefault(session_key, {})[pair_key] = (direction, open_price)