4h single-factor long/short hedging strategy.
Timeframe
4h
Direction
Long & Short
Stoploss
-99.0%
Trailing Stop
No
ROI
0m: 99.0%
Interface Version
3
Startup Candles
200
Indicators
1
freqtrade/freqtrade-strategies
# pragma pylint: disable=missing-docstring, invalid-name
# flake8: noqa
from __future__ import annotations
import logging
from pathlib import Path
import sys
from typing import Dict, Set
import pandas as pd
from freqtrade.strategy import IStrategy
_PROJECT_ROOT = Path(__file__).resolve().parents[2]
if str(_PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(_PROJECT_ROOT))
from alpha101.world_quant.Alpha101_code_1 import Alphas
from alpha101.world_quant.fastengine import FastExpressionEngine
from alpha101.data_helper.get_cap import get_pair_market_caps_last_and_update
import numpy as np
logger = logging.getLogger(__name__)
import time
def _setup_strategy_file_logger() -> None:
log_path = _PROJECT_ROOT / "user_data" / "logs" / "SmallCapStrategyV2.log"
log_path.parent.mkdir(parents=True, exist_ok=True)
for handler in logger.handlers:
if isinstance(handler, logging.FileHandler) and Path(handler.baseFilename) == log_path:
return
file_handler = logging.FileHandler(log_path, encoding="utf-8")
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(
logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
)
logger.addHandler(file_handler)
logger.setLevel(logging.INFO)
logger.propagate = True
class FundingSimple(IStrategy):
"""
4h single-factor long/short hedging strategy.
Workflow per 4h session:
1. Calculate one factor value for each whitelist pair.
2. Rank cross-section by the factor.
3. Long top N and short bottom N.
4. Hold for `hold_bars` bars, then force close.
"""
INTERFACE_VERSION = 3
timeframe = "4h"
trade_timeframe = '4h'
process_only_new_candles = True
can_short = True
can_long = True
reverse_long_short = False
use_exit_signal = True
startup_candle_count = 200
leverage_value = 1.0
minimal_roi = {"0": 0.99}
stoploss = -0.99
trailing_stop = False
max_open_trades = 100
trailing_stop_positive = 0.1
trailing_stop_positive_offset = 0.25
trailing_only_offset_is_reached = True
top_n_long = 10
top_n_short = 10
ranking_retry_wait_secs = 3
ranking_max_retries = 3
ranking_missing_retry_threshold = 0.9
logger.info(f"timeframe: {timeframe}, trade timeframe: {trade_timeframe}")
# ------------------------------------------------------------------
# Single factor expression entry (edit this only)
# FAST expression examples:
# "ts_rank(close, 10) - ts_rank(volume, 10)"
# "rank((close - open) / (open + 1e-12))"
# ------------------------------------------------------------------
factor_expression = '''
-zscore(rank(funding))
'''
unfilledtimeout = {
"entry": 10,
"exit": 10,
"exit_timeout_count": 5,
"unit": "seconds"
}
http_proxy = None
https_proxy = None
funding_filter_enabled = False
funding_rate_threshold_8h = -0.005 # -0.05% / 8h
funding_timeframe = "8h"
def __init__(self, config: dict) -> None:
super().__init__(config)
_setup_strategy_file_logger()
self.session_longs: Dict[pd.Timestamp, Set[str]] = {}
self.session_shorts: Dict[pd.Timestamp, Set[str]] = {}
self._last_rank_bar: pd.Timestamp | None = None
self._last_refresh_attempt_bar: pd.Timestamp | None = None
self._historical_rankings_until: pd.Timestamp | None = None
self._last_pair_candle_times: dict[str, pd.Timestamp] = {}
@staticmethod
def _pair_key(pair: str) -> str:
return pair.replace("/", "_").replace(":", "_")
def leverage(
self,
pair,
current_time,
current_rate,
proposed_leverage,
max_leverage,
entry_tag,
side,
**kwargs,
):
return self.leverage_value
@staticmethod
def _pandas_freq(timeframe: str) -> str:
tf = timeframe.strip().lower()
if tf.endswith("m"):
return f"{tf[:-1]}min"
if tf.endswith("h"):
return f"{tf[:-1]}h"
if tf.endswith("d"):
return f"{tf[:-1]}d"
return tf
@staticmethod
def _bar_time(ts: pd.Timestamp) -> pd.Timestamp:
return pd.to_datetime(ts, utc=True).floor(FundingSimple._pandas_freq(FundingSimple.trade_timeframe))
def _is_backtest_mode(self) -> bool:
if self.dp is None:
return False
logger.info(f"Running in {self.dp.runmode.value} mode")
return self.dp.runmode.value in {"backtest", "hyperopt", "plot", "webserver"}
def _build_funding_wide_data(
self,
pairs: list[str],
end_time: pd.Timestamp,
*,
full_history: bool = False,
) -> pd.DataFrame:
"""
Build funding wide dataframe.
Return shape:
index = date
columns = symbol
values = funding rate
This is later merged into the main engine wide_data as:
wide_data["funding"]
"""
frames = []
lookback = max(self.startup_candle_count + 20, 250)
for pair in pairs:
symbol = self._pair_key(pair)
try:
funding = self.dp.get_pair_dataframe(
pair=pair,
timeframe=self.funding_timeframe,
candle_type="funding_rate",
)
except Exception as exc:
logger.warning("Failed to load funding data for %s: %s", pair, exc)
continue
if funding is None or funding.empty:
logger.debug("Empty funding data for pair %s", pair)
continue
funding = funding.copy()
if "date" not in funding.columns:
logger.warning("No date column found in funding data for %s", pair)
continue
funding["date"] = pd.to_datetime(funding["date"], utc=True)
funding = funding[funding["date"] <= end_time]
if not full_history:
funding = funding.tail(lookback)
if funding.empty:
continue
# Freqtrade funding_rate candle usually stores the rate in open/close.
# Keep your previous behavior: prefer open, fallback to close.
if "open" in funding.columns:
rate_col = "open"
elif "close" in funding.columns:
rate_col = "close"
else:
logger.warning(
"No open/close column found in funding data for %s. columns=%s",
pair,
list(funding.columns),
)
continue
frames.append(
funding[["date", rate_col]]
.rename(columns={rate_col: "funding"})
.assign(symbol=symbol)
)
if not frames:
logger.warning("No funding frames built for %d pairs up to %s", len(pairs), end_time)
return pd.DataFrame()
panel = pd.concat(frames, ignore_index=True)
panel = panel.set_index(["date", "symbol"]).sort_index()
funding_wide = panel["funding"].unstack("symbol").sort_index()
logger.info(
"Built funding wide data shape=%s latest=%s nan_ratio=%.4f",
funding_wide.shape,
pd.to_datetime(funding_wide.index, utc=True).max() if not funding_wide.empty else None,
float(funding_wide.isna().mean().mean()) if not funding_wide.empty else float("nan"),
)
return funding_wide
def _build_engine_wide_data(
self,
pairs: list[str],
end_time: pd.Timestamp,
*,
full_history: bool = False,
) -> pd.DataFrame:
"""
Build the main wide_data panel for factor calculation.
Returned columns are MultiIndex:
level 0 = field, e.g. open/high/low/close/volume/vwap/cap/funding
level 1 = symbol, e.g. BTC_USDT_USDT
After this method, expressions can directly use:
funding
"""
logger.info("Building wide data for factor calculation at time %s", end_time)
required_columns = ["date", "open", "high", "low", "close", "volume"]
lookback = max(self.startup_candle_count + 20, 250)
market_caps = get_pair_market_caps_last_and_update(pairs)
supply_dict = (
market_caps[["pair", "circulating_supply"]]
.set_index("pair")["circulating_supply"]
.to_dict()
)
frames = []
pair_last_dates: list[tuple[str, pd.Timestamp]] = []
eligible_pairs = [
pair for pair in pairs
if self._pair_key(pair) in supply_dict
]
skipped_pairs = len(pairs) - len(eligible_pairs)
if skipped_pairs:
logger.info("Skipping %d pairs without circulating supply data", skipped_pairs)
for pair in eligible_pairs:
df = self.dp.get_pair_dataframe(pair, self.timeframe)
if df is None or df.empty:
logger.warning("No dataframe cached for pair %s at time %s", pair, end_time)
continue
if not all(col in df.columns for col in required_columns):
logger.warning(
"Missing base columns in data for pair %s: required=%s actual=%s",
pair,
required_columns,
list(df.columns),
)
continue
df = df[required_columns].copy()
df["date"] = pd.to_datetime(df["date"], utc=True)
df = df[df["date"] <= end_time]
if not full_history:
df = df.tail(lookback)
if df.empty:
logger.warning("No candles available for pair %s up to %s", pair, end_time)
continue
last_candle_time = pd.to_datetime(df["date"].iloc[-1], utc=True)
pair_last_dates.append((pair, last_candle_time))
self._last_pair_candle_times[pair] = last_candle_time
if pd.isna(df["close"].iloc[-1]):
logger.warning("No close price data for pair %s at time %s", pair, end_time)
continue
symbol = self._pair_key(pair)
supply = supply_dict.get(symbol)
if pd.isna(supply):
logger.warning("No circulating supply data for pair %s, skipping", symbol)
continue
close_values = df["close"].to_numpy()
high_values = df["high"].to_numpy()
low_values = df["low"].to_numpy()
frame = df.assign(
symbol=symbol,
cap=close_values * float(supply),
vwap=(high_values + low_values + close_values) / 3.0,
)
frames.append(
frame[
[
"date",
"symbol",
"open",
"high",
"low",
"close",
"volume",
"vwap",
"cap",
]
]
)
if pair_last_dates:
latest_pairs = sorted(
pair_last_dates,
key=lambda item: item[1],
reverse=True,
)[:5]
earliest_pairs = sorted(
pair_last_dates,
key=lambda item: item[1],
)[:5]
logger.info("Cached pair last candles latest=%s", latest_pairs)
logger.info("Cached pair last candles earliest=%s", earliest_pairs)
logger.info(
"Whitelist pairs=%d eligible_pairs=%d frames_built=%d lookback=%d",
len(pairs),
len(eligible_pairs),
len(frames),
lookback,
)
if not frames:
return pd.DataFrame()
panel = pd.concat(frames, ignore_index=True)
panel = panel.set_index(["date", "symbol"]).sort_index()
raw_panel_last_ts = pd.to_datetime(
panel.index.get_level_values("date"),
utc=True,
).max()
logger.info("Raw panel latest timestamp=%s", raw_panel_last_ts)
panel.index = panel.index.set_levels(
panel.index.levels[1].astype("category"),
level=1,
)
# Convert to wide format:
# columns level 0 = field
# columns level 1 = symbol
panel = panel.unstack(level="symbol")
# ------------------------------------------------------------
# Add funding field into wide panel
# ------------------------------------------------------------
funding_wide = self._build_funding_wide_data(
eligible_pairs,
end_time,
full_history=full_history,
)
if not funding_wide.empty:
funding_wide.index = pd.to_datetime(funding_wide.index, utc=True)
funding_wide = funding_wide.sort_index()
# Align funding to the main candle timeline.
# Example: 8h funding -> 4h strategy candles via ffill.
funding_wide = funding_wide.reindex(panel.index).ffill()
# Keep only symbols that exist in the main panel.
panel_symbols = panel.columns.get_level_values(1).unique()
funding_wide = funding_wide.reindex(columns=panel_symbols)
funding_known_ratio = float(funding_wide.notna().mean().mean())
funding_nan_ratio = float(funding_wide.isna().mean().mean())
funding_wide.columns = pd.MultiIndex.from_product(
[["funding"], funding_wide.columns],
names=panel.columns.names,
)
panel = pd.concat([panel, funding_wide], axis=1).sort_index(axis=1)
logger.info(
(
"Added funding field to wide panel: funding_shape=%s "
"known_ratio=%.4f nan_ratio=%.4f"
),
funding_wide.shape,
funding_known_ratio,
funding_nan_ratio,
)
else:
logger.warning("No funding data available to add into wide panel")
logger.info(
"Wide panel latest timestamp=%s",
pd.to_datetime(panel.index, utc=True).max(),
)
logger.info("Wide panel shape=%s", panel.shape)
return panel
def _record_session_rankings(self, factor_wide: pd.DataFrame, latest_only: bool, funding_wide: pd.DataFrame | None = None) -> None:
factor_wide = factor_wide.sort_index()
if latest_only:
snapshots = factor_wide.tail(1)
logger.debug('use lastest bar')
else:
snapshots = factor_wide
logger.debug(f'use full bars, len({len(factor_wide)})')
long_funding_stats, short_funding_stats = [], []
for session_ts, snapshot_row in snapshots.iterrows():
session = self._bar_time(pd.to_datetime(session_ts, utc=True))
if session != pd.to_datetime(session_ts, utc=True):
continue
latest_values = pd.to_numeric(snapshot_row, errors="coerce").dropna()
if latest_values.empty:
logger.warning("No valid factor values for session %s", session)
continue
# 默认:所有有 factor 的 symbol 都可以 short
short_candidates = latest_values.copy()
if self.funding_filter_enabled and funding_wide is not None and not funding_wide.empty:
funding_hist = funding_wide[funding_wide.index <= session]
if not funding_hist.empty:
funding_row = pd.to_numeric(funding_hist.iloc[-1], errors="coerce")
# 只保留有 factor 且 funding 合格的币
funding_row = funding_row.reindex(latest_values.index)
# funding_nan_count = int(funding_row.isna().sum())
# funding_known_count = int(funding_row.notna().sum())
# funding_total_count = len(funding_row)
# print("funding_nan_count",funding_nan_count,'funding_known_count',funding_known_count,'funding_total_count',funding_total_count)
# print('we fill nan funding rate with 0')
funding_row = funding_row.fillna(0)
eligible_short_symbols = funding_row[
funding_row > self.funding_rate_threshold_8h
].dropna().index
# logging.info(f'Funding filter: {len(funding_row)-len(eligible_short_symbols)} in {len(funding_row)} stocks')
short_candidates = latest_values.loc[
latest_values.index.intersection(eligible_short_symbols)
]
else:
logger.warning("No funding snapshot available before %s", session)
if self.reverse_long_short:
# reverse 模式下,short 是因子大的那边
short_symbols = short_candidates.nlargest(self.top_n_short).index.tolist()
long_symbols = latest_values.nsmallest(self.top_n_long).index.tolist()
else:
# 正常模式下,short 是因子小的那边
long_symbols = latest_values.nlargest(self.top_n_long).index.tolist()
short_symbols = short_candidates.nsmallest(self.top_n_short).index.tolist()
self.session_longs[session] = set(long_symbols)
self.session_shorts[session] = set(short_symbols)
self._last_rank_bar = session
if self.funding_filter_enabled and funding_wide is not None and not funding_wide.empty:
long_funding_stats.append(funding_row.reindex(long_symbols).mean())
short_funding_stats.append(funding_row.reindex(short_symbols).mean())
if len(short_symbols) < self.top_n_short:
logger.warning(
"Short count below target after funding filter session=%s selected=%d target=%d",
session,
len(short_symbols),
self.top_n_short,
)
if not snapshots.empty and not latest_only:
self._historical_rankings_until = self._bar_time(pd.to_datetime(snapshots.index[-1], utc=True))
long_funding_stats = np.array(long_funding_stats, dtype=float)
short_funding_stats = np.array(short_funding_stats, dtype=float)
if len(long_funding_stats) and len(short_funding_stats):
logger.info(
"Mean funding spread long-short=%.8f long_mean=%.8f short_mean=%.8f sessions=%d",
float(np.nanmean(long_funding_stats - short_funding_stats)),
float(np.nanmean(long_funding_stats)),
float(np.nanmean(short_funding_stats)),
len(short_funding_stats),
)
else:
logger.info("No selected long/short funding stats collected")
def _refresh_rankings(self, bar_time: pd.Timestamp) -> None:
start_build_data_time = time.perf_counter()
session = self._bar_time(bar_time)
if self._last_rank_bar is not None and self._last_rank_bar >= session:
return
try:
pairs = self.dp.current_whitelist()
except Exception as e:
pairs = ["BCH/USDT:USDT", "SUI/USDT:USDT", "AVAX/USDT:USDT", "ENSO/USDT:USDT", "ADA/USDT:USDT", "ASTER/USDT:USDT", "LTC/USDT:USDT", "ENA/USDT:USDT", "LINK/USDT:USDT", "ZRO/USDT:USDT", "AGLD/USDT:USDT", "XLM/USDT:USDT", "CYBER/USDT:USDT", "AAVE/USDT:USDT", "ALLO/USDT:USDT", "SNX/USDT:USDT", "NEAR/USDT:USDT", "UNI/USDT:USDT", "OP/USDT:USDT", "TAO/USDT:USDT", "SAPIEN/USDT:USDT", "\u5e01\u5b89\u4eba\u751f/USDT:USDT", "ICP/USDT:USDT", "YGG/USDT:USDT", "ARB/USDT:USDT", "FIL/USDT:USDT", "TRUMP/USDT:USDT", "BERA/USDT:USDT", "COW/USDT:USDT", "BIO/USDT:USDT", "POL/USDT:USDT", "BEL/USDT:USDT", "INJ/USDT:USDT", "DASH/USDT:USDT", "AXS/USDT:USDT", "PUMP/USDT:USDT", "DOT/USDT:USDT", "XPL/USDT:USDT", "VTHO/USDT:USDT", "ETC/USDT:USDT", "VIRTUAL/USDT:USDT", "HBAR/USDT:USDT", "APT/USDT:USDT", "CHZ/USDT:USDT", "SOMI/USDT:USDT", "TON/USDT:USDT", "PENGU/USDT:USDT", "WIF/USDT:USDT", "IO/USDT:USDT", "MORPHO/USDT:USDT", "WLD/USDT:USDT", "JTO/USDT:USDT", "AT/USDT:USDT", "RENDER/USDT:USDT", "FET/USDT:USDT", "SEI/USDT:USDT", "ZIL/USDT:USDT", "ARPA/USDT:USDT", "0G/USDT:USDT", "HMSTR/USDT:USDT", "LDO/USDT:USDT", "ONDO/USDT:USDT", "CAKE/USDT:USDT", "VANA/USDT:USDT", "RONIN/USDT:USDT", "ROSE/USDT:USDT", "AR/USDT:USDT", "ATOM/USDT:USDT", "HUMA/USDT:USDT", "CRV/USDT:USDT", "EUL/USDT:USDT", "NXPC/USDT:USDT", "BARD/USDT:USDT", "PENDLE/USDT:USDT", "EIGEN/USDT:USDT", "ETHFI/USDT:USDT", "ZK/USDT:USDT", "INIT/USDT:USDT", "VET/USDT:USDT", "DUSK/USDT:USDT", "ORCA/USDT:USDT", "GIGGLE/USDT:USDT", "TIA/USDT:USDT", "MUBARAK/USDT:USDT", "PROVE/USDT:USDT", "LINEA/USDT:USDT", "GALA/USDT:USDT", "S/USDT:USDT", "ZEN/USDT:USDT", "SYRUP/USDT:USDT", "NEIRO/USDT:USDT", "STRK/USDT:USDT", "PARTI/USDT:USDT", "OG/USDT:USDT", "RESOLV/USDT:USDT", "APE/USDT:USDT", "HOME/USDT:USDT", "SIGN/USDT:USDT", "KAIA/USDT:USDT", "ORDI/USDT:USDT", "IOTA/USDT:USDT", "W/USDT:USDT", "GUN/USDT:USDT", "JUP/USDT:USDT", "ALGO/USDT:USDT", "CETUS/USDT:USDT", "COMP/USDT:USDT", "RUNE/USDT:USDT", "AWE/USDT:USDT", "GPS/USDT:USDT", "DOLO/USDT:USDT", "SAND/USDT:USDT", "SLP/USDT:USDT", "ENS/USDT:USDT", "SPK/USDT:USDT", "FF/USDT:USDT", "SOLV/USDT:USDT", "OPEN/USDT:USDT", "D/USDT:USDT", "LIT/USDT:USDT", "WCT/USDT:USDT", "FIDA/USDT:USDT", "TWT/USDT:USDT", "STG/USDT:USDT", "NOM/USDT:USDT", "KSM/USDT:USDT", "BROCCOLI714/USDT:USDT", "IOTX/USDT:USDT", "EDU/USDT:USDT", "ARKM/USDT:USDT", "PNUT/USDT:USDT", "GRT/USDT:USDT", "ALT/USDT:USDT", "AUCTION/USDT:USDT", "SANTOS/USDT:USDT", "QNT/USDT:USDT", "PYTH/USDT:USDT", "STX/USDT:USDT", "MIRA/USDT:USDT", "PLUME/USDT:USDT", "JST/USDT:USDT", "BIGTIME/USDT:USDT", "ZBT/USDT:USDT", "HYPER/USDT:USDT", "STO/USDT:USDT", "CTSI/USDT:USDT", "C98/USDT:USDT", "COTI/USDT:USDT", "WOO/USDT:USDT", "BOME/USDT:USDT", "CFX/USDT:USDT", "BEAMX/USDT:USDT", "FORM/USDT:USDT", "ZKC/USDT:USDT", "XMR/USDT:USDT", "SKY/USDT:USDT", "FLOW/USDT:USDT", "SUPER/USDT:USDT", "ACH/USDT:USDT", "THETA/USDT:USDT", "XVG/USDT:USDT", "TNSR/USDT:USDT", "AXL/USDT:USDT", "MEME/USDT:USDT", "BANANAS31/USDT:USDT", "RPL/USDT:USDT", "TREE/USDT:USDT", "1000CHEEMS/USDT:USDT", "KAITO/USDT:USDT", "TRB/USDT:USDT", "RIF/USDT:USDT", "JASMY/USDT:USDT", "MMT/USDT:USDT", "LA/USDT:USDT", "HEMI/USDT:USDT", "TUT/USDT:USDT", "PROM/USDT:USDT", "DYDX/USDT:USDT", "ASR/USDT:USDT", "A/USDT:USDT", "ACT/USDT:USDT", "IMX/USDT:USDT", "NEO/USDT:USDT", "AIXBT/USDT:USDT", "ME/USDT:USDT", "STEEM/USDT:USDT", "SCR/USDT:USDT", "CHR/USDT:USDT", "MITO/USDT:USDT", "VELODROME/USDT:USDT", "NIL/USDT:USDT", "GLM/USDT:USDT", "RARE/USDT:USDT", "TLM/USDT:USDT", "BB/USDT:USDT", "MOVE/USDT:USDT", "HOLO/USDT:USDT", "1MBABYDOGE/USDT:USDT", "EDEN/USDT:USDT", "LAYER/USDT:USDT", "MANA/USDT:USDT", "MANTA/USDT:USDT", "1INCH/USDT:USDT", "USUAL/USDT:USDT", "ANIME/USDT:USDT", "CKB/USDT:USDT", "SUSHI/USDT:USDT", "GMT/USDT:USDT", "MET/USDT:USDT", "DYM/USDT:USDT", "ALPINE/USDT:USDT", "XTZ/USDT:USDT", "CVX/USDT:USDT", "TOWNS/USDT:USDT", "MAGIC/USDT:USDT", "MINA/USDT:USDT", "SCRT/USDT:USDT", "AEVO/USDT:USDT", "SUN/USDT:USDT", "METIS/USDT:USDT", "TST/USDT:USDT", "XVS/USDT:USDT", "PHB/USDT:USDT", "CELO/USDT:USDT", "ALICE/USDT:USDT", "ASTR/USDT:USDT", "KERNEL/USDT:USDT", "SSV/USDT:USDT", "PEOPLE/USDT:USDT", "CELR/USDT:USDT", "TURBO/USDT:USDT", "NOT/USDT:USDT", "SHELL/USDT:USDT", "SAGA/USDT:USDT", "YFI/USDT:USDT", "RSR/USDT:USDT", "REZ/USDT:USDT", "XAI/USDT:USDT", "VANRY/USDT:USDT", "GMX/USDT:USDT", "LPT/USDT:USDT", "RVN/USDT:USDT", "ID/USDT:USDT", "HAEDAL/USDT:USDT", "1000SATS/USDT:USDT", "SXT/USDT:USDT", "SAHARA/USDT:USDT", "HFT/USDT:USDT", "LUMIA/USDT:USDT", "TURTLE/USDT:USDT", "ZRX/USDT:USDT", "PIXEL/USDT:USDT", "BAND/USDT:USDT", "STORJ/USDT:USDT", "HEI/USDT:USDT", "THE/USDT:USDT", "NTRN/USDT:USDT", "BANK/USDT:USDT", "OGN/USDT:USDT", "SKL/USDT:USDT", "BAT/USDT:USDT", "PORTAL/USDT:USDT", "API3/USDT:USDT", "EPIC/USDT:USDT", "MASK/USDT:USDT", "ACE/USDT:USDT", "VIC/USDT:USDT", "BANANA/USDT:USDT", "DOGS/USDT:USDT", "NMR/USDT:USDT", "DEXE/USDT:USDT", "LQTY/USDT:USDT", "ILV/USDT:USDT", "ERA/USDT:USDT", "MAV/USDT:USDT", "G/USDT:USDT", "EGLD/USDT:USDT", "BICO/USDT:USDT", "POLYX/USDT:USDT", "NEWT/USDT:USDT", "UMA/USDT:USDT", "2Z/USDT:USDT", "ONT/USDT:USDT", "MBOX/USDT:USDT", "LISTA/USDT:USDT", "HIGH/USDT:USDT", "YB/USDT:USDT", "HIVE/USDT:USDT", "ANKR/USDT:USDT", "BABY/USDT:USDT", "LSK/USDT:USDT", "FLUX/USDT:USDT", "CTK/USDT:USDT", "SYN/USDT:USDT", "WAL/USDT:USDT", "AI/USDT:USDT", "DEGO/USDT:USDT", "PHA/USDT:USDT", "1000CAT/USDT:USDT", "ONE/USDT:USDT", "KNC/USDT:USDT", "KAVA/USDT:USDT", "FIO/USDT:USDT"]
factor_wide: pd.DataFrame | None = None
factor_bar_time: pd.Timestamp | None = None
lagging_pairs: list[tuple[str, pd.Timestamp]] = []
latest_pair_count = 0
for attempt in range(self.ranking_max_retries + 1):
is_backtest = self._is_backtest_mode()
wide_data = self._build_engine_wide_data(
pairs,
session,
full_history=is_backtest,
)
if wide_data.empty:
logger.warning("No valid wide data for session %s", session)
self._last_rank_bar = None
return
if (
isinstance(wide_data.columns, pd.MultiIndex)
and "funding" in wide_data.columns.get_level_values(0)
):
funding_wide = wide_data["funding"]
else:
funding_wide = pd.DataFrame()
logger.warning("Funding field not found in wide_data for session %s", session)
try:
alpha_data = Alphas(wide_data)
engine = FastExpressionEngine(alpha_data)
factor_wide = engine.evaluate(self.factor_expression)
except Exception as exc:
logger.warning("FastEngine factor evaluation failed: %s", exc)
self._last_rank_bar = None
return
if not isinstance(factor_wide, pd.DataFrame) or factor_wide.empty:
logger.warning("Factor expression did not return a valid DataFrame at %s", session)
self._last_rank_bar = None
return
factor_wide = factor_wide.sort_index()
factor_bar_time = pd.to_datetime(factor_wide.index, utc=True).max()
latest_pair_count = sum(
1 for candle_time in self._last_pair_candle_times.values() if candle_time == factor_bar_time
)
lagging_pairs = sorted(
[
(pair, candle_time)
for pair, candle_time in self._last_pair_candle_times.items()
if candle_time < factor_bar_time
],
key=lambda item: item[1],
)
total_pairs = len(self._last_pair_candle_times)
missing_ratio = (len(lagging_pairs) / total_pairs) if total_pairs else 0.0
logger.info("Factor wide latest timestamp=%s", factor_bar_time)
logger.info(
"Latest candle coverage target=%s up_to_date=%d lagging=%d total=%d missing_ratio=%.2f attempt=%d/%d",
factor_bar_time,
latest_pair_count,
len(lagging_pairs),
total_pairs,
missing_ratio,
attempt + 1,
self.ranking_max_retries + 1,
)
if lagging_pairs:
logger.info("Top 10 Lagging pairs sample=%s", lagging_pairs[:10])
if missing_ratio <= self.ranking_missing_retry_threshold or attempt == self.ranking_max_retries or is_backtest:
break
logger.warning(
"Lagging pair ratio %.2f exceeds threshold %.2f for session %s. Waiting %ss before retry %d.",
missing_ratio,
self.ranking_missing_retry_threshold,
session,
self.ranking_retry_wait_secs,
attempt + 1,
)
time.sleep(self.ranking_retry_wait_secs)
if factor_wide is None or factor_bar_time is None:
self._last_rank_bar = None
return
if not self._is_backtest_mode:
snapshot = factor_wide[factor_wide.index <= factor_bar_time].tail(1)
else:
snapshot = factor_wide
if snapshot.empty:
logger.warning("No factor snapshot at %s", factor_bar_time)
self._last_rank_bar = None
return
logger.info(
"Factor snapshot target=%s actual=%s",
factor_bar_time,
pd.to_datetime(snapshot.index[-1], utc=True),
)
self._record_session_rankings(snapshot, not self._is_backtest_mode, funding_wide=funding_wide,)
end_build_data_time = time.perf_counter()
logger.info(f'Factor wide data with len {len(snapshot)} built in {end_build_data_time - start_build_data_time:.2f} seconds')
def _ensure_rankings_once_per_bar(self, bar_time: pd.Timestamp) -> None:
session = self._bar_time(bar_time)
if self._last_refresh_attempt_bar is not None and self._last_refresh_attempt_bar >= session:
return
self._last_refresh_attempt_bar = session
if pd.to_datetime(session) in self.session_longs and pd.to_datetime(session) in self.session_shorts:
return
self._refresh_rankings(session)
# self._is_backtest_mode()
def _build_signal_masks(
self, pair: str, dates: pd.Series
) -> tuple[pd.Series, pd.Series, pd.Series]:
pair_key = self._pair_key(pair)
sessions = pd.to_datetime(dates, utc=True).dt.floor(self._pandas_freq(self.timeframe))
long_mask = sessions.map(lambda session: pair_key in self.session_longs.get(session, set()))
short_mask = sessions.map(lambda session: pair_key in self.session_shorts.get(session, set()))
should_exit = ~(long_mask | short_mask)
return long_mask.astype("int8"), short_mask.astype("int8"), should_exit.astype("int8")
def populate_indicators(self, dataframe: pd.DataFrame, metadata: dict) -> pd.DataFrame:
logger.debug("Populating indicators for pair %s at time %s", metadata["pair"], dataframe["date"].iloc[-1])
logger.debug(f"Processing dataframe with {len(dataframe)} rows")
if dataframe.empty:
return dataframe
current_bar = pd.to_datetime(dataframe["date"].iloc[-1], utc=True)
self._ensure_rankings_once_per_bar(current_bar)
pair = metadata["pair"]
signal_long, signal_short, should_exit = self._build_signal_masks(pair, dataframe["date"])
dataframe["signal_long"] = signal_long
dataframe["signal_short"] = signal_short
dataframe["should_exit"] = should_exit
logger.debug("Current session longs: %s", self.session_longs.get(self._bar_time(current_bar), set()))
logger.debug("Current session shorts: %s", self.session_shorts.get(self._bar_time(current_bar), set()))
return dataframe
def populate_entry_trend(self, dataframe: pd.DataFrame, metadata: dict) -> pd.DataFrame:
if dataframe.empty:
return dataframe
current_bar = pd.to_datetime(dataframe["date"].iloc[-1], utc=True)
logger.debug("Populating entry trend for pair %s at time %s", metadata["pair"], current_bar)
logger.debug("Session longs for current bar: %s", self.session_longs.get(self._bar_time(current_bar), set()))
logger.debug("Session shorts for current bar: %s", self.session_shorts.get(self._bar_time(current_bar), set()))
if self.can_long:
dataframe.loc[dataframe["signal_long"] == 1, "enter_long"] = 1
if self.can_short:
dataframe.loc[dataframe["signal_short"] == 1, "enter_short"] = 1
logger.debug(
"Entry signals populated for pair %s: long=%d, short=%d",
metadata["pair"],
int(dataframe["signal_long"].sum()),
int(dataframe["signal_short"].sum()),
)
return dataframe
def populate_exit_trend(self, dataframe: pd.DataFrame, metadata: dict) -> pd.DataFrame:
if dataframe.empty:
return dataframe
dataframe.loc[dataframe["should_exit"] == 1, "exit_long"] = 1
dataframe.loc[dataframe["should_exit"] == 1, "exit_short"] = 1
return dataframe
def custom_exit(self, pair: str, trade, current_time, current_rate, current_profit, **kwargs):
# now = pd.to_datetime(current_time, utc=True)
# self._ensure_rankings_once_per_bar(now)
# if trade.entry_side == "long" and not self._is_long_signal(pair, now):
# return "no_long_signal"
# if trade.entry_side == "short" and not self._is_short_signal(pair, now):
# return "no_short_signal"
return None