4h single-factor long/short hedging strategy.
Timeframe
4h
Direction
Long & Short
Stoploss
-50.0%
Trailing Stop
No
ROI
0m: 50.0%
Interface Version
3
Startup Candles
200
Indicators
2
# pragma pylint: disable=missing-docstring, invalid-name
# flake8: noqa
from __future__ import annotations
import logging
from pathlib import Path
import sys
from typing import Dict, Set
import pandas as pd
from freqtrade.strategy import IStrategy
_PROJECT_ROOT = Path(__file__).resolve().parents[2]
if str(_PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(_PROJECT_ROOT))
from alpha101.world_quant.Alpha101_code_1 import Alphas
from alpha101.world_quant.fastengine import FastExpressionEngine
from alpha101.data_helper.get_cap import get_pair_market_caps_last_and_update
import numpy as np
logger = logging.getLogger(__name__)
import time
def _setup_strategy_file_logger() -> None:
log_path = _PROJECT_ROOT / "user_data" / "logs" / "SmallCapStrategy.log"
log_path.parent.mkdir(parents=True, exist_ok=True)
for handler in logger.handlers:
if isinstance(handler, logging.FileHandler) and Path(handler.baseFilename) == log_path:
return
file_handler = logging.FileHandler(log_path, encoding="utf-8")
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(
logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
)
logger.addHandler(file_handler)
logger.setLevel(logging.INFO)
logger.propagate = True
class SmallCapStrategy(IStrategy):
"""
4h single-factor long/short hedging strategy.
Workflow per 4h session:
1. Calculate one factor value for each whitelist pair.
2. Rank cross-section by the factor.
3. Long top N and short bottom N.
4. Hold for `hold_bars` bars, then force close.
"""
INTERFACE_VERSION = 3
timeframe = "4h"
process_only_new_candles = True
can_short = True
use_exit_signal = True
startup_candle_count = 200
leverage_value = 2.0
minimal_roi = {"0": 0.50}
stoploss = -0.50
trailing_stop = False
top_n_long = 3
top_n_short = 5
ranking_retry_wait_secs = 3
ranking_max_retries = 3
ranking_missing_retry_threshold = 0.9
# ------------------------------------------------------------------
# Single factor expression entry (edit this only)
# FAST expression examples:
# "ts_rank(close, 10) - ts_rank(volume, 10)"
# "rank((close - open) / (open + 1e-12))"
# ------------------------------------------------------------------
factor_expression = '''
fbeta= zscore(ts_sum(max(returns, 0) * min(market_return, 0), 24) / ts_sum(market_return* market_return, 24));
alpha_cap = zscore(-rank(cap) * rank(sma(volume*close, 20) / cap) + rank(-cap) * rank(-sma(volume*close, 20) / cap));
alpha_m2 = -zscore(close / ts_mean(close, 20)* sma(log(log(volume)+1), 20)) ;
alpha_liquidity = -zscore(ts_sum(volume*vwap,6)/log(cap));
alpha_adv = zscore(log(ts_mean(close * volume, 6)));
combined = fbeta+alpha_cap +alpha_m2 + alpha_liquidity + alpha_adv;
combined
'''
unfilledtimeout = {
"entry": 15,
"exit": 15,
"exit_timeout_count": 5,
# "unit": "seconds"
}
http_proxy = None
https_proxy = None
def __init__(self, config: dict) -> None:
super().__init__(config)
_setup_strategy_file_logger()
self.session_longs: Dict[pd.Timestamp, Set[str]] = {}
self.session_shorts: Dict[pd.Timestamp, Set[str]] = {}
self._last_rank_bar: pd.Timestamp | None = None
self._last_refresh_attempt_bar: pd.Timestamp | None = None
self._historical_rankings_until: pd.Timestamp | None = None
self._last_pair_candle_times: dict[str, pd.Timestamp] = {}
@staticmethod
def _pair_key(pair: str) -> str:
return pair.replace("/", "_").replace(":", "_")
def leverage(
self,
pair,
current_time,
current_rate,
proposed_leverage,
max_leverage,
entry_tag,
side,
**kwargs,
):
return self.leverage_value
@staticmethod
def _pandas_freq(timeframe: str) -> str:
tf = timeframe.strip().lower()
if tf.endswith("m"):
return f"{tf[:-1]}min"
if tf.endswith("h"):
return f"{tf[:-1]}h"
if tf.endswith("d"):
return f"{tf[:-1]}d"
return tf
@staticmethod
def _bar_time(ts: pd.Timestamp) -> pd.Timestamp:
return pd.to_datetime(ts, utc=True).floor(SmallCapStrategy._pandas_freq(SmallCapStrategy.timeframe))
def _is_backtest_mode(self) -> bool:
if self.dp is None:
return False
return self.dp.runmode.value in {"backtest", "hyperopt", "plot"}
def _build_engine_wide_data(
self,
pairs: list[str],
end_time: pd.Timestamp,
*,
full_history: bool = False,
) -> pd.DataFrame:
logger.info("Building wide data for factor calculation at time %s", end_time)
required_columns = ["date", "open", "high", "low", "close", "volume"]
lookback = max(self.startup_candle_count + 20, 250)
market_caps = get_pair_market_caps_last_and_update(pairs)
supply_dict = market_caps[["pair", "circulating_supply"]].set_index("pair")[
"circulating_supply"
].to_dict()
frames = []
pair_last_dates: list[tuple[str, pd.Timestamp]] = []
eligible_pairs = [pair for pair in pairs if self._pair_key(pair) in supply_dict]
skipped_pairs = len(pairs) - len(eligible_pairs)
if skipped_pairs:
logger.info("Skipping %d pairs without circulating supply data", skipped_pairs)
for pair in eligible_pairs:
df = self.dp.get_pair_dataframe(pair, self.timeframe)
if df.empty:
logger.warning("No dataframe cached for pair %s at time %s", pair, end_time)
continue
if not all(col in df.columns for col in required_columns):
logger.warning("Missing base columns in data for pair %s: %s", pair, required_columns)
continue
df = df[required_columns].copy()
df["date"] = pd.to_datetime(df["date"], utc=True)
df = df[df["date"] <= end_time]
if not full_history:
df = df.tail(lookback)
if df.empty:
logger.warning("No candles available for pair %s up to %s", pair, end_time)
continue
last_candle_time = pd.to_datetime(df["date"].iloc[-1], utc=True)
pair_last_dates.append((pair, last_candle_time))
self._last_pair_candle_times[pair] = last_candle_time
if pd.isna(df["close"].iloc[-1]):
logger.warning("No close price data for pair %s at time %s", pair, end_time)
continue
symbol = self._pair_key(pair)
supply = supply_dict.get(symbol)
if pd.isna(supply):
logger.warning("No circulating supply data for pair %s, skipping", symbol)
continue
close_values = df["close"].to_numpy()
high_values = df["high"].to_numpy()
low_values = df["low"].to_numpy()
frame = df.assign(
symbol=symbol,
cap=close_values * float(supply),
vwap=(high_values + low_values + close_values) / 3.0,
)
frames.append(frame[["date", "symbol", "open", "high", "low", "close", "volume", "vwap", "cap"]])
if pair_last_dates:
latest_pairs = sorted(pair_last_dates, key=lambda item: item[1], reverse=True)[:5]
earliest_pairs = sorted(pair_last_dates, key=lambda item: item[1])[:5]
logger.info("Cached pair last candles latest=%s", latest_pairs)
logger.info("Cached pair last candles earliest=%s", earliest_pairs)
logger.info(
"Whitelist pairs=%d eligible_pairs=%d frames_built=%d lookback=%d",
len(pairs),
len(eligible_pairs),
len(frames),
lookback,
)
if not frames:
return pd.DataFrame()
panel = pd.concat(frames, ignore_index=True)
panel = panel.set_index(["date", "symbol"]).sort_index()
raw_panel_last_ts = pd.to_datetime(panel.index.get_level_values("date"), utc=True).max()
logger.info("Raw panel latest timestamp=%s", raw_panel_last_ts)
panel.index = panel.index.set_levels(
panel.index.levels[1].astype("category"), level=1
)
panel = panel.unstack(level='symbol') # 转为宽表
logger.info("Wide panel latest timestamp=%s", pd.to_datetime(panel.index, utc=True).max())
logger.info("Wide panel shape=%s", panel.shape)
return panel
def _record_session_rankings(self, factor_wide: pd.DataFrame, *, latest_only: bool) -> None:
factor_wide = factor_wide.sort_index()
if latest_only:
snapshots = factor_wide.tail(1)
else:
snapshots = factor_wide
for session_ts, snapshot_row in snapshots.iterrows():
session = self._bar_time(pd.to_datetime(session_ts, utc=True))
latest_values = pd.to_numeric(snapshot_row, errors="coerce").dropna()
if latest_values.empty:
logger.warning("No valid factor values for session %s", session)
continue
long_symbols = set(latest_values.nlargest(self.top_n_long).index.tolist())
short_symbols = set(latest_values.nsmallest(self.top_n_short).index.tolist())
# n_pad = 3
# long_symbols = set(latest_values.nlargest(self.top_n_long+n_pad).nsmallest(self.top_n_long).index.tolist())
# short_symbols = set(latest_values.nsmallest(self.top_n_short+n_pad).nlargest(self.top_n_short).index.tolist())
overlap = long_symbols & short_symbols
self.session_longs[session] = long_symbols - overlap
self.session_shorts[session] = short_symbols - overlap
self._last_rank_bar = session
logger.info(
"4h rank session=%s long=%s short=%s overlap=%s",
session,
",".join(sorted(self.session_longs[session])),
",".join(sorted(self.session_shorts[session])),
",".join(sorted(overlap)),
)
if not snapshots.empty and not latest_only:
self._historical_rankings_until = self._bar_time(pd.to_datetime(snapshots.index[-1], utc=True))
def _refresh_historical_rankings(self, bar_time: pd.Timestamp) -> None:
session = self._bar_time(bar_time)
if self._historical_rankings_until is not None and self._historical_rankings_until >= session:
return
start_build_data_time = time.perf_counter()
pairs = self.dp.current_whitelist()
wide_data = self._build_engine_wide_data(pairs, session, full_history=True)
if wide_data.empty:
logger.warning("No valid wide data for historical ranking build at session %s", session)
return
try:
alpha_data = Alphas(wide_data)
engine = FastExpressionEngine(alpha_data)
factor_wide = engine.evaluate(self.factor_expression)
except Exception as exc:
logger.warning("FastEngine historical factor evaluation failed: %s", exc)
return
if not isinstance(factor_wide, pd.DataFrame) or factor_wide.empty:
logger.warning("Historical factor expression did not return a valid DataFrame at %s", session)
return
factor_wide = factor_wide[factor_wide.index <= session]
if factor_wide.empty:
logger.warning("No historical factor snapshots available up to %s", session)
return
self._record_session_rankings(factor_wide, latest_only=False)
end_build_data_time = time.perf_counter()
logger.info(
"Historical rankings built for %d sessions in %.2f seconds up to %s",
len(factor_wide),
end_build_data_time - start_build_data_time,
session,
)
def _refresh_rankings(self, bar_time: pd.Timestamp) -> None:
start_build_data_time = time.perf_counter()
session = self._bar_time(bar_time)
if self._last_rank_bar is not None and self._last_rank_bar >= session:
return
pairs = self.dp.current_whitelist()
factor_wide: pd.DataFrame | None = None
factor_bar_time: pd.Timestamp | None = None
lagging_pairs: list[tuple[str, pd.Timestamp]] = []
latest_pair_count = 0
for attempt in range(self.ranking_max_retries + 1):
wide_data = self._build_engine_wide_data(pairs, session)
if wide_data.empty:
logger.warning("No valid wide data for session %s", session)
self._last_rank_bar = None
return
try:
alpha_data = Alphas(wide_data)
engine = FastExpressionEngine(alpha_data)
factor_wide = engine.evaluate(self.factor_expression)
except Exception as exc:
logger.warning("FastEngine factor evaluation failed: %s", exc)
self._last_rank_bar = None
return
if not isinstance(factor_wide, pd.DataFrame) or factor_wide.empty:
logger.warning("Factor expression did not return a valid DataFrame at %s", session)
self._last_rank_bar = None
return
factor_wide = factor_wide.sort_index()
factor_bar_time = pd.to_datetime(factor_wide.index, utc=True).max()
latest_pair_count = sum(
1 for candle_time in self._last_pair_candle_times.values() if candle_time == factor_bar_time
)
lagging_pairs = sorted(
[
(pair, candle_time)
for pair, candle_time in self._last_pair_candle_times.items()
if candle_time < factor_bar_time
],
key=lambda item: item[1],
)
total_pairs = len(self._last_pair_candle_times)
missing_ratio = (len(lagging_pairs) / total_pairs) if total_pairs else 0.0
logger.info("Factor wide latest timestamp=%s", factor_bar_time)
logger.info(
"Latest candle coverage target=%s up_to_date=%d lagging=%d total=%d missing_ratio=%.2f attempt=%d/%d",
factor_bar_time,
latest_pair_count,
len(lagging_pairs),
total_pairs,
missing_ratio,
attempt + 1,
self.ranking_max_retries + 1,
)
if lagging_pairs:
logger.info("Lagging pairs sample=%s", lagging_pairs[:10])
if missing_ratio <= self.ranking_missing_retry_threshold or attempt == self.ranking_max_retries:
break
logger.warning(
"Lagging pair ratio %.2f exceeds threshold %.2f for session %s. Waiting %ss before retry %d.",
missing_ratio,
self.ranking_missing_retry_threshold,
session,
self.ranking_retry_wait_secs,
attempt + 1,
)
time.sleep(self.ranking_retry_wait_secs)
if factor_wide is None or factor_bar_time is None:
self._last_rank_bar = None
return
snapshot = factor_wide[factor_wide.index <= factor_bar_time].tail(1)
if snapshot.empty:
logger.warning("No factor snapshot at %s", factor_bar_time)
self._last_rank_bar = None
return
logger.info(
"Factor snapshot target=%s actual=%s",
factor_bar_time,
pd.to_datetime(snapshot.index[-1], utc=True),
)
self._record_session_rankings(snapshot, latest_only=True)
end_build_data_time = time.perf_counter()
logger.info(f'Factor wide data built in {end_build_data_time - start_build_data_time:.2f} seconds')
def _ensure_rankings_once_per_bar(self, bar_time: pd.Timestamp) -> None:
session = self._bar_time(bar_time)
if self._last_refresh_attempt_bar is not None and self._last_refresh_attempt_bar >= session:
return
self._last_refresh_attempt_bar = session
if self._is_backtest_mode():
self._refresh_historical_rankings(session)
else:
self._refresh_rankings(session)
def _is_long_signal(self, pair: str, ts: pd.Timestamp) -> bool:
session = self._bar_time(ts)
return self._pair_key(pair) in self.session_longs.get(session, set())
def _is_short_signal(self, pair: str, ts: pd.Timestamp) -> bool:
session = self._bar_time(ts)
return self._pair_key(pair) in self.session_shorts.get(session, set())
def _build_signal_masks(
self, pair: str, dates: pd.Series
) -> tuple[pd.Series, pd.Series, pd.Series]:
pair_key = self._pair_key(pair)
sessions = pd.to_datetime(dates, utc=True).dt.floor(self._pandas_freq(self.timeframe))
long_mask = sessions.map(lambda session: pair_key in self.session_longs.get(session, set()))
short_mask = sessions.map(lambda session: pair_key in self.session_shorts.get(session, set()))
should_exit = ~(long_mask | short_mask)
return long_mask.astype("int8"), short_mask.astype("int8"), should_exit.astype("int8")
def populate_indicators(self, dataframe: pd.DataFrame, metadata: dict) -> pd.DataFrame:
logger.debug("Populating indicators for pair %s at time %s", metadata["pair"], dataframe["date"].iloc[-1])
logger.debug(f"Processing dataframe with {len(dataframe)} rows")
if dataframe.empty:
return dataframe
current_bar = pd.to_datetime(dataframe["date"].iloc[-1], utc=True)
self._ensure_rankings_once_per_bar(current_bar)
pair = metadata["pair"]
signal_long, signal_short, should_exit = self._build_signal_masks(pair, dataframe["date"])
dataframe["signal_long"] = signal_long
dataframe["signal_short"] = signal_short
dataframe["should_exit"] = should_exit
logger.debug("Current session longs: %s", self.session_longs.get(self._bar_time(current_bar), set()))
logger.debug("Current session shorts: %s", self.session_shorts.get(self._bar_time(current_bar), set()))
return dataframe
def populate_entry_trend(self, dataframe: pd.DataFrame, metadata: dict) -> pd.DataFrame:
if dataframe.empty:
return dataframe
current_bar = pd.to_datetime(dataframe["date"].iloc[-1], utc=True)
logger.debug("Populating entry trend for pair %s at time %s", metadata["pair"], current_bar)
logger.debug("Session longs for current bar: %s", self.session_longs.get(self._bar_time(current_bar), set()))
logger.debug("Session shorts for current bar: %s", self.session_shorts.get(self._bar_time(current_bar), set()))
dataframe.loc[dataframe["signal_long"] == 1, "enter_long"] = 1
if self.can_short:
dataframe.loc[dataframe["signal_short"] == 1, "enter_short"] = 1
logger.debug(
"Entry signals populated for pair %s: long=%d, short=%d",
metadata["pair"],
int(dataframe["signal_long"].sum()),
int(dataframe["signal_short"].sum()),
)
return dataframe
def populate_exit_trend(self, dataframe: pd.DataFrame, metadata: dict) -> pd.DataFrame:
if dataframe.empty:
return dataframe
dataframe.loc[dataframe["should_exit"] == 1, "exit_long"] = 1
dataframe.loc[dataframe["should_exit"] == 1, "exit_short"] = 1
return dataframe
def custom_exit(self, pair: str, trade, current_time, current_rate, current_profit, **kwargs):
# now = pd.to_datetime(current_time, utc=True)
# self._ensure_rankings_once_per_bar(now)
# if trade.entry_side == "long" and not self._is_long_signal(pair, now):
# return "no_long_signal"
# if trade.entry_side == "short" and not self._is_short_signal(pair, now):
# return "no_short_signal"
return None