Daily top/bottom basket strategy driven by precomputed XGBoost predictions produced by alpha101/futures_ml/run_pipeline.py.
Timeframe
15m
Direction
Long & Short
Stoploss
-3.0%
Trailing Stop
No
ROI
0m: 3.0%
Interface Version
3
Startup Candles
48
Indicators
0
# pragma pylint: disable=missing-docstring, invalid-name
# flake8: noqa
from __future__ import annotations
from datetime import timedelta
from pathlib import Path
from typing import Dict, Set, Tuple
import pandas as pd
from freqtrade.strategy import IStrategy
from freqtrade.persistence import Trade
import logging
logger = logging.getLogger(__name__)
class MLBasketStrategy(IStrategy):
"""
Daily top/bottom basket strategy driven by precomputed XGBoost predictions
produced by alpha101/futures_ml/run_pipeline.py.
At each UTC midnight (first candle of the day on timeframe), go long the
top N pairs and short the bottom N pairs ranked by predicted_return.
Positions share symmetric TP/SL and are force-closed at the end of the day
if no stop is hit.
"""
INTERFACE_VERSION = 3
timeframe = "15m"
process_only_new_candles = True
can_short = True
startup_candle_count = 48
# Per-position guards (kept but basket TP/SL will dominate)
minimal_roi = {"0": 0.03}
stoploss = -0.03
trailing_stop = False
# Basket-level TP/SL
basket_tp = 0.006
basket_sl = -0.006
use_basket_pnl = True
# Basket configuration
prediction_path = Path("alpha101/futures_ml/output/predictions.parquet")
top_n_long = 4
top_n_short = 4
force_flat_after = timedelta(hours=23)
def __init__(self, config: dict) -> None:
super().__init__(config)
self.daily_longs: Dict[pd.Timestamp, Set[str]] = {}
self.daily_shorts: Dict[pd.Timestamp, Set[str]] = {}
# session_positions: session_date -> {pair_key: (direction, open_price)}
self.session_positions: Dict[pd.Timestamp, Dict[str, Tuple[str, float]]] = {}
self._load_signals()
# print(self.daily_longs)
# print(self.daily_shorts)
@staticmethod
def _pair_key(pair: str) -> str:
# Align freqtrade pair (e.g., BTC/USDT:USDT) with prediction symbols (BTC_USDT_USDT)
return pair.replace("/", "_").replace(":", "_")
def _load_signals(self) -> None:
if not self.prediction_path.exists():
logger.warning("Prediction file not found: %s", self.prediction_path)
return
df = pd.read_parquet(self.prediction_path)
if "trade_date" not in df.columns or "symbol" not in df.columns or "predicted_return" not in df.columns:
logger.error("Prediction file missing required columns")
return
df["trade_date"] = pd.to_datetime(df["trade_date"], utc=True).dt.floor("D")
grouped = df.groupby("trade_date")
for tdate, g in grouped:
longs = g.nlargest(self.top_n_long, "predicted_return")["symbol"].tolist()
shorts = g.nsmallest(self.top_n_short, "predicted_return")["symbol"].tolist()
self.daily_longs[tdate] = set(longs)
self.daily_shorts[tdate] = set(shorts)
def _is_long_signal(self, pair: str, tdate: pd.Timestamp) -> bool:
return tdate in self.daily_longs and pair in self.daily_longs[tdate]
def _is_short_signal(self, pair: str, tdate: pd.Timestamp) -> bool:
return tdate in self.daily_shorts and pair in self.daily_shorts[tdate]
def populate_indicators(self, dataframe: pd.DataFrame, metadata: dict) -> pd.DataFrame:
if dataframe.empty:
return dataframe
pair_key = self._pair_key(metadata["pair"])
# print('pair_key',pair_key)
dataframe["date_floor"] = dataframe["date"].dt.floor("D")
dataframe["is_session_open"] = dataframe["date"].dt.hour == 0
dataframe["signal_long"] = dataframe["date_floor"].apply(lambda d: 1 if self._is_long_signal(pair_key, d) else 0)
dataframe["signal_short"] = dataframe["date_floor"].apply(lambda d: 1 if self._is_short_signal(pair_key, d) else 0)
return dataframe
def populate_entry_trend(self, dataframe: pd.DataFrame, metadata: dict) -> pd.DataFrame:
pair_key = self._pair_key(metadata["pair"])
dataframe["date_floor"] = dataframe["date"].dt.floor("D")
dataframe["is_session_open"] = dataframe["date"].dt.hour == 0
dataframe["signal_long"] = dataframe["date_floor"].apply(lambda d: 1 if self._is_long_signal(pair_key, d) else 0)
dataframe["signal_short"] = dataframe["date_floor"].apply(lambda d: 1 if self._is_short_signal(pair_key, d) else 0)
entry_long = (dataframe["signal_long"] == 1) & (dataframe["is_session_open"])
entry_short = (dataframe["signal_short"] == 1) & (dataframe["is_session_open"])
dataframe.loc[entry_long, "enter_long"] = 1
if self.can_short:
dataframe.loc[entry_short, "enter_short"] = 1
# Record session entries for basket PnL estimation (works in backtesting too)
self._record_session_entries(dataframe, pair_key, entry_long, entry_short)
return dataframe
def populate_exit_trend(self, dataframe: pd.DataFrame, metadata: dict) -> pd.DataFrame:
# Exits are handled by ROI/SL/custom_exit
dataframe["exit_long"] = 0
dataframe["exit_short"] = 0
return dataframe
def custom_exit(self, pair: str, trade, current_time, current_rate, current_profit, **kwargs):
# Basket-level TP/SL: evaluate all open trades from the same session (UTC date)
if self.use_basket_pnl:
try:
basket_pnl = self._basket_pnl(current_time)
except Exception as exc: # pragma: no cover - defensive
logger.warning("basket_pnl_calc_failed: %s", exc)
basket_pnl = None
if basket_pnl is not None:
if basket_pnl >= self.basket_tp:
return "basket_tp"
if basket_pnl <= self.basket_sl:
return "basket_sl"
# Force flat after the daily session ends
if current_time - trade.open_date_utc >= self.force_flat_after:
return "eod_flat"
return None
# -----------------------------
# Helpers
# -----------------------------
def _session_key(self, ts: pd.Timestamp) -> pd.Timestamp:
return pd.Timestamp(ts).tz_convert("UTC").floor("D") if pd.Timestamp(ts).tzinfo else pd.Timestamp(ts).floor("D")
def _latest_close(self, pair: str) -> float:
df, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
if df.empty:
return None
return float(df.iloc[-1]["close"])
def _basket_pnl(self, current_time: pd.Timestamp) -> float:
session = self._session_key(current_time)
runmode = 'backtesting'
# Prefer ORM trades in live/dry-run
if runmode not in {"backtesting", "hyperopt", "edge"}:
trades = Trade.get_trades([Trade.is_open.is_(True)])
rets = []
for t in trades:
if self._session_key(t.open_date_utc) != session:
continue
last_close = self._latest_close(t.pair)
if last_close is None:
continue
rets.append(t.calc_profit_ratio(last_close))
if rets:
return float(pd.Series(rets).mean())
# Fallback for backtesting/hyperopt using recorded session entries
positions = self.session_positions.get(session)
if not positions:
return None
rets = []
for pair_key, (direction, open_price) in positions.items():
pair_ft = pair_key.replace("_", "/", 1).replace("_", ":", 1).replace("_", "/")
last_close = self._latest_close(pair_ft)
if last_close is None or open_price <= 0:
continue
r = (last_close - open_price) / open_price
if direction == "short":
r *= -1
rets.append(r)
if not rets:
return None
return float(pd.Series(rets).mean())
def _record_session_entries(self, dataframe: pd.DataFrame, pair_key: str, entry_long: pd.Series, entry_short: pd.Series) -> None:
# Find first session-open bar where we enter
for direction, mask in (("long", entry_long), ("short", entry_short)):
if not mask.any():
continue
first_idx = mask.idxmax()
open_price = float(dataframe.loc[first_idx, "open"])
session = dataframe.loc[first_idx, "date_floor"]
session_key = pd.Timestamp(session).tz_localize("UTC") if pd.Timestamp(session).tzinfo is None else pd.Timestamp(session).tz_convert("UTC")
self.session_positions.setdefault(session_key, {})[pair_key] = (direction, open_price)