Daily top/bottom basket strategy driven by precomputed XGBoost predictions produced by alpha101/futures_ml/run_pipeline.py.
Timeframe
1m
Direction
Long & Short
Stoploss
-16.0%
Trailing Stop
No
ROI
0m: 16.0%
Interface Version
3
Startup Candles
48
Indicators
0
freqtrade/freqtrade-strategies
freqtrade/freqtrade-strategies
this strategy is based around the idea of generating a lot of potentatils buys and make tiny profits on each trade
freqtrade/freqtrade-strategies
this strategy is based around the idea of generating a lot of potentatils buys and make tiny profits on each trade
# pragma pylint: disable=missing-docstring, invalid-name
# flake8: noqa
from __future__ import annotations
from pathlib import Path
import sys
from typing import Dict, Set, Tuple
import pandas as pd
from freqtrade.strategy import IStrategy
from freqtrade.persistence import Trade
import logging
from typing import Optional
from datetime import datetime
import math
_PROJECT_ROOT = Path(__file__).resolve().parents[2]
if str(_PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(_PROJECT_ROOT))
from alpha101.futures_ml.data import build_pair_dataset, clean_nan_targets
from alpha101.futures_ml.training import get_feature_columns, walk_forward_predict
logger = logging.getLogger(__name__)
class ML1hBasketStrategyCopy2(IStrategy):
"""
Daily top/bottom basket strategy driven by precomputed XGBoost predictions
produced by alpha101/futures_ml/run_pipeline.py.
At each UTC midnight (first candle of the day on timeframe), go long the
top N pairs and short the bottom N pairs ranked by predicted_return.
Positions share symmetric TP/SL and are force-closed at the end of the day
if no stop is hit.
"""
INTERFACE_VERSION = 3
timeframe = "1m"
# Prediction configuration (1h model)
prediction_timeframe = "5m"
process_only_new_candles = True
can_short = True
startup_candle_count = 48
# Per-position guards (kept but basket TP/SL will dominate)
minimal_roi = {"0": 0.16}
stoploss = -0.16
trailing_stop = False
# Basket-level TP/SL
basket_tp = 0.03
basket_sl = -0.03
use_basket_pnl = True
trade_horizon_bars = 1
retrain_every_bars = 1
lookback_bars = 7
min_train_bars = 90
max_train_bars = 365
train_split = 0.9
use_dual_model = True
top_n_long = 5
top_n_short = 5
position_adjustment_enable = False
max_entry_position_adjustment = 6
def __init__(self, config: dict) -> None:
super().__init__(config)
self.daily_longs: Dict[pd.Timestamp, Set[str]] = {}
self.daily_shorts: Dict[pd.Timestamp, Set[str]] = {}
# session_positions: session_date -> {pair_key: (direction, open_price)}
self.session_positions: Dict[pd.Timestamp, Dict[str, Tuple[str, float]]] = {}
self._last_pred_session: pd.Timestamp | None = None
# print(self.daily_longs)
# print(self.daily_shorts)
@staticmethod
def _pair_key(pair: str) -> str:
# Align freqtrade pair (e.g., BTC/USDT:USDT) with prediction symbols (BTC_USDT_USDT)
return pair.replace("/", "_").replace(":", "_")
def informative_pairs(self):
pairs = self.dp.current_whitelist()
return [(pair, self.prediction_timeframe) for pair in pairs]
def _tf_to_timedelta(self, timeframe: str) -> pd.Timedelta:
tf = timeframe.strip().lower()
if tf.endswith("m"):
return pd.Timedelta(minutes=int(tf[:-1]))
if tf.endswith("h"):
return pd.Timedelta(hours=int(tf[:-1]))
if tf.endswith("d"):
return pd.Timedelta(days=int(tf[:-1]))
raise ValueError(f"Unsupported timeframe: {timeframe}")
def _to_utc(self, ts: pd.Timestamp) -> pd.Timestamp:
base = pd.Timestamp(ts)
return base.tz_convert("UTC") if base.tzinfo else base.tz_localize("UTC")
def _floor_ts(self, ts: pd.Timestamp, timeframe: str) -> pd.Timestamp:
delta = self._tf_to_timedelta(timeframe)
base = self._to_utc(ts)
floored = (base.value // delta.value) * delta.value
return pd.to_datetime(floored, utc=True)
def leverage(self, pair, current_time, current_rate, proposed_leverage, max_leverage, entry_tag, side, **kwargs):
return 5
def _floor_series(self, series: pd.Series, timeframe: str) -> pd.Series:
delta = self._tf_to_timedelta(timeframe)
dt = pd.to_datetime(series, utc=True)
floored = (dt.astype("int64") // delta.value) * delta.value
return pd.to_datetime(floored, utc=True)
def _ceil_series(self, series: pd.Series, timeframe: str) -> pd.Series:
delta = self._tf_to_timedelta(timeframe)
dt = pd.to_datetime(series, utc=True)
ceiled = (dt.astype("int64") // delta.value + 1) * delta.value
return pd.to_datetime(ceiled, utc=True)
def _session_key(self, ts: pd.Timestamp) -> pd.Timestamp:
return self._floor_ts(ts, self.prediction_timeframe)
def _is_session_open_bar(self, ts: pd.Timestamp) -> bool:
base = self._to_utc(ts)
session_open = self._floor_ts(base, self.prediction_timeframe)
tolerance = pd.Timedelta(minutes=0.5)
return abs(base - session_open) <= tolerance
def _build_full_dataset_from_dp(self, train_start_date) -> pd.DataFrame:
pairs = self.dp.current_whitelist()
frames = []
for pair in pairs:
df = self.dp.get_pair_dataframe(pair, self.prediction_timeframe)
if df is None or df.empty:
continue
daily_df = df.copy()
if "date" not in daily_df.columns:
continue
daily_df["date"] = pd.to_datetime(daily_df["date"], utc=True)
daily_df = daily_df.sort_values("date").reset_index(drop=True)
daily_df = daily_df[daily_df["date"] >= train_start_date]
symbol = self._pair_key(pair)
frames.append(
build_pair_dataset(
daily_df,
symbol,
self.lookback_bars,
self.prediction_timeframe,
self.trade_horizon_bars,
)
)
if not frames:
return pd.DataFrame()
full = pd.concat(frames, ignore_index=True)
full = full.sort_values(["asof_date", "symbol"]).reset_index(drop=True)
alpha_cols = [c for c in full.columns if c.startswith("alpha")]
if alpha_cols:
full[alpha_cols] = full.groupby("asof_date")[alpha_cols].transform(
lambda g: (g - g.mean()) / (g.std() + 1e-12)
)
full, _ = clean_nan_targets(full, self.retrain_every_bars)
# if self.max_train_bars is not None:
# dates = sorted(full["asof_date"].unique())
# keep = self.max_train_bars + self.retrain_every_bars
# if len(dates) > keep:
# keep_dates = set(dates[-keep:])
# full = full[full["asof_date"].isin(keep_dates)].reset_index(drop=True)
return full
def _refresh_predictions(self, now: pd.Timestamp) -> None:
logger.info("Refreshing ML predictions at %s", now)
train_start_date = now - self._tf_to_timedelta(self.prediction_timeframe) * (self.lookback_bars + self.max_train_bars + 30)
dataset = self._build_full_dataset_from_dp(train_start_date)
if dataset.empty:
logger.warning("No dataset available for prediction")
return
feature_cols = get_feature_columns(dataset)
if not feature_cols:
logger.warning("No feature columns available for prediction")
return
test_end_dt = self._to_utc(now) + self._tf_to_timedelta(self.prediction_timeframe) * self.retrain_every_bars * 2
test_start_dt = test_end_dt - self._tf_to_timedelta(self.prediction_timeframe) * self.retrain_every_bars * 2
if len(dataset["asof_date"].unique()) < self.min_train_bars:
logger.warning("Not enough data for walk-forward prediction. Need at least %d bars. But only have %d bars.", self.min_train_bars, len(dataset["asof_date"].unique()))
return
try:
logger.info("Refreshing predictions from %s to %s", test_start_dt, test_end_dt)
logger.info(f"Dataset rows: {len(dataset)}, feature_cols: {feature_cols}")
preds = walk_forward_predict(
dataset,
feature_cols,
retrain_every_days=self.retrain_every_bars,
min_train_days=self.min_train_bars,
max_train_days=self.max_train_bars,
train_split=self.train_split,
use_dual_model=self.use_dual_model,
test_start_date=test_start_dt,
test_end_date=test_end_dt,
top_n_long=self.top_n_long,
)
except ValueError as exc:
logger.warning("Walk-forward failed: %s", exc)
return
future_pred = preds[preds["target"].isna()]
if future_pred.empty:
logger.warning("No future predictions available")
return
latest_trade_date = self._to_utc(future_pred["trade_date"].max())
latest_pred = future_pred[future_pred["trade_date"] == latest_trade_date]
logger.info(f"{latest_pred[['asof_date', 'trade_date', 'symbol', 'predicted_long', 'predicted_short']]}")
if latest_pred.empty:
return
long_candidates = latest_pred.nlargest(self.top_n_long, "predicted_long")
short_candidates = latest_pred.nlargest(self.top_n_short, "predicted_short")
self.daily_longs[latest_trade_date] = set(long_candidates["symbol"].tolist())
self.daily_shorts[latest_trade_date] = set(short_candidates["symbol"].tolist())
logger.info(
"Refreshed predictions at %s: longs=%s shorts=%s",
latest_trade_date,
",".join(long_candidates["symbol"].tolist()),
",".join(short_candidates["symbol"].tolist()),
)
def _maybe_refresh_predictions(self, dataframe: pd.DataFrame) -> None:
if dataframe.empty:
return
now = self._to_utc(dataframe.iloc[-1]["date"])
# now = pd.Timestamp.now(tz="UTC")
session = self._session_key(now)
if self._last_pred_session is not None and self._last_pred_session >= session:
return
if not self._is_session_open_bar(now):
return
self._refresh_predictions(now)
self._last_pred_session = session
def _safe_pair_name(symbol: str) -> str:
return symbol.replace("/", "_").replace(":", "_")
def _is_long_signal(self, pair: str, tdate: pd.Timestamp) -> bool:
return tdate in self.daily_longs and self._pair_key(pair) in self.daily_longs[tdate]
def _is_short_signal(self, pair: str, tdate: pd.Timestamp) -> bool:
return tdate in self.daily_shorts and self._pair_key(pair) in self.daily_shorts[tdate]
def adjust_trade_position(self, trade: Trade, current_time: datetime,
current_rate: float, current_profit: float,
min_stake: Optional[float], max_stake: float,
current_entry_rate: float, current_exit_rate: float,
current_entry_profit: float, current_exit_profit: float,
**kwargs) -> Optional[float]:
"""
Custom trade adjustment logic, returning the stake amount that a trade should be
increased or decreased.
This means extra buy or sell orders with additional fees.
Only called when `position_adjustment_enable` is set to True.
For full documentation please go to https://www.freqtrade.io/en/latest/strategy-advanced/
When not implemented by a strategy, returns None
:param trade: trade object.
:param current_time: datetime object, containing the current datetime
:param current_rate: Current buy rate.
:param current_profit: Current profit (as ratio), calculated based on current_rate.
:param min_stake: Minimal stake size allowed by exchange (for both entries and exits)
:param max_stake: Maximum stake allowed (either through balance, or by exchange limits).
:param current_entry_rate: Current rate using entry pricing.
:param current_exit_rate: Current rate using exit pricing.
:param current_entry_profit: Current profit using entry pricing.
:param current_exit_profit: Current profit using exit pricing.
:param **kwargs: Ensure to keep this here so updates to this won't break your strategy.
:return float: Stake amount to adjust your trade,
Positive values to increase position, Negative values to decrease position.
Return None for no action.
"""
# Force flat after the session ends
# trade.
filled_entries = trade.select_filled_orders(trade.entry_side)
if len(filled_entries) == 0:
logger.info(f"No filled entries for {trade.pair}")
return None
filled_exits = trade.select_filled_orders(trade.exit_side)
first_not_exit_entry = filled_entries[len(filled_exits)]
n_remaining = len(filled_entries) - len(filled_exits)
session = self._session_key(first_not_exit_entry.order_filled_date)
session_end = session + self._tf_to_timedelta(self.prediction_timeframe) * self.trade_horizon_bars
base_tf = self._tf_to_timedelta("1m")
adjusted_amount = first_not_exit_entry.cost
# 提前1分钟平仓,避免因时间对齐问题错过减仓时机,并且避免重复触发多次减仓(彻底的平仓交给custom_exit处理)
if self._to_utc(current_time) >= session_end - base_tf and n_remaining>1:
logger.info(f"Reduce position for {trade.pair} with amount {adjusted_amount *(1+current_profit)}, current_profit {current_profit} at {trade.open_date_utc}, session: {session} (session_end={session_end})")
return -adjusted_amount *(1+current_profit) # flat all with some buffer
# 最后买入的订单的后一条k线才允许加仓
time_cond = self._floor_ts(current_time, self.prediction_timeframe) >= self._floor_ts(trade.date_last_filled_utc, self.prediction_timeframe) + self._tf_to_timedelta(self.prediction_timeframe)
if not time_cond:
# logger.info(f"Skip position adjustment for {trade.pair} due to time condition. Last trade filled at {trade.date_last_filled_utc.isoformat()}")
return None
session_time = self._floor_ts(current_time, self.prediction_timeframe)
# logger.info(f"long pair: {list(self.daily_longs.values())}")
# logger.info(f"short pair: {list(self.daily_shorts.values())}")
can_long = self._is_long_signal(pair=trade.pair, tdate=session_time)
can_short = self._is_short_signal(pair=trade.pair, tdate=session_time)
# logger.info(f"{trade.pair}: short: {trade.is_short},session_time cond: {session_time in self.daily_longs} long cond: {trade.pair in self.daily_longs.get(session_time, set())} short cond: {trade.pair in self.daily_shorts.get(session_time, set())}")
if not can_long and not can_short:
# logger.info(f"Skip position adjustment for {trade.pair} due to no signal.")
# logger.info(f"self.daily_longs: {self.daily_longs}")
# logger.info(f"self.daily_shorts: {self.daily_shorts}")
return None
positive_direction = (can_long and not trade.is_short) or (can_short and trade.is_short)
if positive_direction:
logger.info(f"Positive Position adjustment for {trade.pair} at {current_time} for amount {adjusted_amount}")
return adjusted_amount
else:
logger.info(f"Negative position adjustment for {trade.pair} at {current_time} for amount {adjusted_amount}")
return -adjusted_amount
def populate_indicators(self, dataframe: pd.DataFrame, metadata: dict) -> pd.DataFrame:
self._maybe_refresh_predictions(dataframe)
if dataframe.empty:
return dataframe
pair_key = self._pair_key(metadata["pair"])
dataframe["session_key"] = self._ceil_series(dataframe["date"], self.prediction_timeframe)
dataframe["is_session_open"] = dataframe["date"] == dataframe["session_key"]
dataframe["signal_long"] = dataframe["session_key"].apply(lambda d: 1 if self._is_long_signal(pair_key, d) else 0)
dataframe["signal_short"] = dataframe["session_key"].apply(lambda d: 1 if self._is_short_signal(pair_key, d) else 0)
return dataframe
def populate_entry_trend(self, dataframe: pd.DataFrame, metadata: dict) -> pd.DataFrame:
self._maybe_refresh_predictions(dataframe)
pair_key = self._pair_key(metadata["pair"])
dataframe["session_key"] = self._ceil_series(dataframe["date"], self.prediction_timeframe)
dataframe["is_session_open"] = dataframe["date"] + self._tf_to_timedelta(self.prediction_timeframe) == dataframe["session_key"]
logger.info(f"last session keys: {dataframe['session_key'].tail(1)}")
dataframe["signal_long"] = dataframe["session_key"].apply(lambda d: 1 if self._is_long_signal(pair_key, d) else 0)
dataframe["signal_short"] = dataframe["session_key"].apply(lambda d: 1 if self._is_short_signal(pair_key, d) else 0)
entry_long = (dataframe["signal_long"] == 1) & (dataframe["is_session_open"])
entry_short = (dataframe["signal_short"] == 1) & (dataframe["is_session_open"])
dataframe.loc[entry_long, "enter_long"] = 1
if self.can_short:
dataframe.loc[entry_short, "enter_short"] = 1
# Record session entries for basket PnL estimation (works in backtesting too)
self._record_session_entries(dataframe, pair_key, entry_long, entry_short)
return dataframe
def populate_exit_trend(self, dataframe: pd.DataFrame, metadata: dict) -> pd.DataFrame:
# Exits are handled by ROI/SL/custom_exit
dataframe["exit_long"] = 0
dataframe["exit_short"] = 0
return dataframe
def custom_exit(self, pair: str, trade, current_time, current_rate, current_profit, **kwargs):
# Basket-level TP/SL: evaluate all open trades from the same session (UTC date)
if self.use_basket_pnl:
try:
basket_pnl = self._basket_pnl(current_time)
except Exception as exc: # pragma: no cover - defensive
logger.warning("basket_pnl_calc_failed: %s", exc)
basket_pnl = None
if basket_pnl is not None:
if basket_pnl >= self.basket_tp:
return "basket_tp"
if basket_pnl <= self.basket_sl:
return "basket_sl"
# 提前1分钟平仓,避免因时间对齐问题错过平仓时机
base_tf = self._tf_to_timedelta("1m")
filled_entries = trade.select_filled_orders(trade.entry_side)
if len(filled_entries) == 0:
logger.info(f"No filled entries for {trade.pair}")
return None
filled_exits = trade.select_filled_orders(trade.exit_side)
first_not_exit_entry = filled_entries[len(filled_exits)]
n_remaining = len(filled_entries) - len(filled_exits)
session = self._session_key(first_not_exit_entry.order_filled_date)
session_end = session + self._tf_to_timedelta(self.prediction_timeframe) * self.trade_horizon_bars
if self._to_utc(current_time) >= session_end - base_tf and n_remaining==1:
logger.info(f"Force flat for {trade.pair} at {current_time} (session_end={session_end})")
return "session_end"
return None
# -----------------------------
# Helpers
# -----------------------------
def _latest_close(self, pair: str) -> float:
df, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
if df.empty:
return None
return float(df.iloc[-1]["close"])
def _basket_pnl(self, current_time: pd.Timestamp) -> float:
session = self._session_key(current_time)
runmode = 'backtesting'
# Prefer ORM trades in live/dry-run
if runmode not in {"backtesting", "hyperopt", "edge"}:
trades = Trade.get_trades([Trade.is_open.is_(True)])
rets = []
for t in trades:
if self._session_key(t.open_date_utc) != session:
continue
last_close = self._latest_close(t.pair)
if last_close is None:
continue
rets.append(t.calc_profit_ratio(last_close))
if rets:
return float(pd.Series(rets).mean())
# Fallback for backtesting/hyperopt using recorded session entries
positions = self.session_positions.get(session)
if not positions:
return None
rets = []
for pair_key, (direction, open_price) in positions.items():
pair_ft = pair_key.replace("_", "/", 1).replace("_", ":", 1).replace("_", "/")
last_close = self._latest_close(pair_ft)
if last_close is None or open_price <= 0:
continue
r = (last_close - open_price) / open_price
if direction == "short":
r *= -1
rets.append(r)
if not rets:
return None
return float(pd.Series(rets).mean())
def _record_session_entries(self, dataframe: pd.DataFrame, pair_key: str, entry_long: pd.Series, entry_short: pd.Series) -> None:
# Find first session-open bar where we enter
for direction, mask in (("long", entry_long), ("short", entry_short)):
if not mask.any():
continue
first_idx = mask.idxmax()
open_price = float(dataframe.loc[first_idx, "open"])
session = dataframe.loc[first_idx, "session_key"]
session_key = pd.Timestamp(session).tz_localize("UTC") if pd.Timestamp(session).tzinfo is None else pd.Timestamp(session).tz_convert("UTC")
self.session_positions.setdefault(session_key, {})[pair_key] = (direction, open_price)