4h single-factor long/short hedging strategy.
Timeframe
4h
Direction
Long & Short
Stoploss
-50.0%
Trailing Stop
No
ROI
0m: 50.0%
Interface Version
3
Startup Candles
200
Indicators
1
freqtrade/freqtrade-strategies
# pragma pylint: disable=missing-docstring, invalid-name
# flake8: noqa
from __future__ import annotations
import logging
from pathlib import Path
import sys
from typing import Dict, Set
import pandas as pd
from freqtrade.strategy import IStrategy
_PROJECT_ROOT = Path(__file__).resolve().parents[2]
if str(_PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(_PROJECT_ROOT))
from alpha101.world_quant.Alpha101_code_1 import Alphas
from alpha101.world_quant.fastengine import FastExpressionEngine
from alpha101.data_helper.get_cap import get_pair_market_caps_last_and_update
import numpy as np
logger = logging.getLogger(__name__)
import time
def _setup_strategy_file_logger() -> None:
log_path = _PROJECT_ROOT / "user_data" / "logs" / "SmallCapStrategyV2.log"
log_path.parent.mkdir(parents=True, exist_ok=True)
for handler in logger.handlers:
if isinstance(handler, logging.FileHandler) and Path(handler.baseFilename) == log_path:
return
file_handler = logging.FileHandler(log_path, encoding="utf-8")
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(
logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
)
logger.addHandler(file_handler)
logger.setLevel(logging.INFO)
logger.propagate = True
class SmallCapStrategyV2(IStrategy):
"""
4h single-factor long/short hedging strategy.
Workflow per 4h session:
1. Calculate one factor value for each whitelist pair.
2. Rank cross-section by the factor.
3. Long top N and short bottom N.
4. Hold for `hold_bars` bars, then force close.
"""
INTERFACE_VERSION = 3
timeframe = "4h"
process_only_new_candles = True
can_short = True
can_long = True
use_exit_signal = True
startup_candle_count = 200
leverage_value = 1.0
minimal_roi = {"0": 0.5}
stoploss = -0.5
trailing_stop = False
top_n_long = 20
top_n_short = 20
ranking_retry_wait_secs = 3
ranking_max_retries = 3
ranking_missing_retry_threshold = 0.9
# ------------------------------------------------------------------
# Single factor expression entry (edit this only)
# FAST expression examples:
# "ts_rank(close, 10) - ts_rank(volume, 10)"
# "rank((close - open) / (open + 1e-12))"
# ------------------------------------------------------------------
factor_expression = '''
# 定义参数
short_window = 5; # 短期窗口,用于衡量最近的强势
long_window = 20; # 长期窗口,用于衡量整体趋势
lookback_window = 10; # 回看窗口,用于寻找近期高点
# 1. 计算动量比率,衡量短期相对于长期的强势程度
# (close / delay(close, short_window)) / (close / delay(close, long_window))
# 这等价于 delay(close, long_window) / delay(close, short_window)
momentum_ratio = ts_delay(close, long_window) / ts_delay(close, short_window);
# 2. 计算从近期高点的回撤幅度
# (ts_max(high, lookback_window) - close) / ts_max(high, lookback_window)
drawdown_from_high = (ts_max(high, lookback_window) - close) / ts_max(high, lookback_window);
# 3. 结合两个条件:强势拉升(momentum_ratio大)且已有一定回撤(drawdown_from_high为正)
# 我们希望在强势股中寻找已经出现健康回调的机会。
raw_factor = momentum_ratio * drawdown_from_high;
# 4. WorldQuant风格的关键一步:横截面标准化(z-score)或排名(rank)
# 这确保了因子值在不同股票间具有可比性,并去除了市场整体的影响。
# 在WorldQuant Brain平台中,通常使用rank或scale(即z-score)。
-ts_mean(rank(raw_factor),12)*zscore(ts_sum(volume*vwap,12)/log(cap));
'''
unfilledtimeout = {
"entry": 10,
"exit": 10,
"exit_timeout_count": 5,
"unit": "seconds"
}
http_proxy = None
https_proxy = None
def __init__(self, config: dict) -> None:
super().__init__(config)
_setup_strategy_file_logger()
self.session_longs: Dict[pd.Timestamp, Set[str]] = {}
self.session_shorts: Dict[pd.Timestamp, Set[str]] = {}
self._last_rank_bar: pd.Timestamp | None = None
self._last_refresh_attempt_bar: pd.Timestamp | None = None
self._historical_rankings_until: pd.Timestamp | None = None
self._last_pair_candle_times: dict[str, pd.Timestamp] = {}
@staticmethod
def _pair_key(pair: str) -> str:
return pair.replace("/", "_").replace(":", "_")
def leverage(
self,
pair,
current_time,
current_rate,
proposed_leverage,
max_leverage,
entry_tag,
side,
**kwargs,
):
return self.leverage_value
@staticmethod
def _pandas_freq(timeframe: str) -> str:
tf = timeframe.strip().lower()
if tf.endswith("m"):
return f"{tf[:-1]}min"
if tf.endswith("h"):
return f"{tf[:-1]}h"
if tf.endswith("d"):
return f"{tf[:-1]}d"
return tf
@staticmethod
def _bar_time(ts: pd.Timestamp) -> pd.Timestamp:
return pd.to_datetime(ts, utc=True).floor(SmallCapStrategyV2._pandas_freq(SmallCapStrategyV2.timeframe))
def _is_backtest_mode(self) -> bool:
if self.dp is None:
return False
logger.info(f"Running in {self.dp.runmode.value} mode")
return self.dp.runmode.value in {"backtest", "hyperopt", "plot", "webserver"}
def _build_engine_wide_data(
self,
pairs: list[str],
end_time: pd.Timestamp,
*,
full_history: bool = False,
) -> pd.DataFrame:
logger.info("Building wide data for factor calculation at time %s", end_time)
required_columns = ["date", "open", "high", "low", "close", "volume"]
lookback = max(self.startup_candle_count + 20, 250)
market_caps = get_pair_market_caps_last_and_update(pairs)
# print('market_caps')
# print(market_caps)
# exit()
supply_dict = market_caps[["pair", "circulating_supply"]].set_index("pair")[
"circulating_supply"
].to_dict()
frames = []
pair_last_dates: list[tuple[str, pd.Timestamp]] = []
eligible_pairs = [pair for pair in pairs if self._pair_key(pair) in supply_dict]
skipped_pairs = len(pairs) - len(eligible_pairs)
if skipped_pairs:
logger.info("Skipping %d pairs without circulating supply data", skipped_pairs)
for pair in eligible_pairs:
df = self.dp.get_pair_dataframe(pair, self.timeframe)
if df.empty:
logger.warning("No dataframe cached for pair %s at time %s", pair, end_time)
continue
if not all(col in df.columns for col in required_columns):
logger.warning("Missing base columns in data for pair %s: %s", pair, required_columns)
continue
df = df[required_columns].copy()
df["date"] = pd.to_datetime(df["date"], utc=True)
df = df[df["date"] <= end_time]
if not full_history:
df = df.tail(lookback)
if df.empty:
logger.warning("No candles available for pair %s up to %s", pair, end_time)
continue
last_candle_time = pd.to_datetime(df["date"].iloc[-1], utc=True)
pair_last_dates.append((pair, last_candle_time))
self._last_pair_candle_times[pair] = last_candle_time
if pd.isna(df["close"].iloc[-1]):
logger.warning("No close price data for pair %s at time %s", pair, end_time)
continue
symbol = self._pair_key(pair)
supply = supply_dict.get(symbol)
if pd.isna(supply):
logger.warning("No circulating supply data for pair %s, skipping", symbol)
continue
close_values = df["close"].to_numpy()
high_values = df["high"].to_numpy()
low_values = df["low"].to_numpy()
frame = df.assign(
symbol=symbol,
cap=close_values * float(supply),
vwap=(high_values + low_values + close_values) / 3.0,
)
frames.append(frame[["date", "symbol", "open", "high", "low", "close", "volume", "vwap", "cap"]])
if pair_last_dates:
latest_pairs = sorted(pair_last_dates, key=lambda item: item[1], reverse=True)[:5]
earliest_pairs = sorted(pair_last_dates, key=lambda item: item[1])[:5]
logger.info("Cached pair last candles latest=%s", latest_pairs)
logger.info("Cached pair last candles earliest=%s", earliest_pairs)
logger.info(
"Whitelist pairs=%d eligible_pairs=%d frames_built=%d lookback=%d",
len(pairs),
len(eligible_pairs),
len(frames),
lookback,
)
if not frames:
return pd.DataFrame()
panel = pd.concat(frames, ignore_index=True)
panel = panel.set_index(["date", "symbol"]).sort_index()
raw_panel_last_ts = pd.to_datetime(panel.index.get_level_values("date"), utc=True).max()
logger.info("Raw panel latest timestamp=%s", raw_panel_last_ts)
panel.index = panel.index.set_levels(
panel.index.levels[1].astype("category"), level=1
)
panel = panel.unstack(level='symbol') # 转为宽表
logger.info("Wide panel latest timestamp=%s", pd.to_datetime(panel.index, utc=True).max())
logger.info("Wide panel shape=%s", panel.shape)
return panel
def _record_session_rankings(self, factor_wide: pd.DataFrame, latest_only: bool) -> None:
factor_wide = factor_wide.sort_index()
if latest_only:
snapshots = factor_wide.tail(1)
logger.debug('use lastest bar')
else:
snapshots = factor_wide
logger.debug(f'use full bars, len({len(factor_wide)})')
for session_ts, snapshot_row in snapshots.iterrows():
session = self._bar_time(pd.to_datetime(session_ts, utc=True))
latest_values = pd.to_numeric(snapshot_row, errors="coerce").dropna()
if latest_values.empty:
logger.warning("No valid factor values for session %s", session)
continue
n_pad = 3
# long_symbols = set(latest_values.nlargest(self.top_n_long+n_pad).index.tolist()) - set(latest_values.nlargest(n_pad).index.tolist())
# short_symbols = set(latest_values.nsmallest(self.top_n_short+n_pad).index.tolist()) - set(latest_values.nsmallest(n_pad).index.tolist())
# long_symbols = set()
long_symbols = set(latest_values.nlargest(self.top_n_long).index.tolist())
short_symbols = set(latest_values.nsmallest(self.top_n_short).index.tolist())
overlap = long_symbols & short_symbols
self.session_longs[session] = long_symbols - overlap
self.session_shorts[session] = short_symbols - overlap
self._last_rank_bar = session
logger.info(
"4h rank session=%s long=%s short=%s overlap=%s",
session,
",".join(sorted(self.session_longs[session])),
",".join(sorted(self.session_shorts[session])),
",".join(sorted(overlap)),
)
if not snapshots.empty and not latest_only:
self._historical_rankings_until = self._bar_time(pd.to_datetime(snapshots.index[-1], utc=True))
def _refresh_rankings(self, bar_time: pd.Timestamp) -> None:
start_build_data_time = time.perf_counter()
session = self._bar_time(bar_time)
if self._last_rank_bar is not None and self._last_rank_bar >= session:
return
try:
pairs = self.dp.current_whitelist()
except Exception as e:
pairs = ["BCH/USDT:USDT", "SUI/USDT:USDT", "AVAX/USDT:USDT", "ENSO/USDT:USDT", "ADA/USDT:USDT", "ASTER/USDT:USDT", "LTC/USDT:USDT", "ENA/USDT:USDT", "LINK/USDT:USDT", "ZRO/USDT:USDT", "AGLD/USDT:USDT", "XLM/USDT:USDT", "CYBER/USDT:USDT", "AAVE/USDT:USDT", "ALLO/USDT:USDT", "SNX/USDT:USDT", "NEAR/USDT:USDT", "UNI/USDT:USDT", "OP/USDT:USDT", "TAO/USDT:USDT", "SAPIEN/USDT:USDT", "\u5e01\u5b89\u4eba\u751f/USDT:USDT", "ICP/USDT:USDT", "YGG/USDT:USDT", "ARB/USDT:USDT", "FIL/USDT:USDT", "TRUMP/USDT:USDT", "BERA/USDT:USDT", "COW/USDT:USDT", "BIO/USDT:USDT", "POL/USDT:USDT", "BEL/USDT:USDT", "INJ/USDT:USDT", "DASH/USDT:USDT", "AXS/USDT:USDT", "PUMP/USDT:USDT", "DOT/USDT:USDT", "XPL/USDT:USDT", "VTHO/USDT:USDT", "ETC/USDT:USDT", "VIRTUAL/USDT:USDT", "HBAR/USDT:USDT", "APT/USDT:USDT", "CHZ/USDT:USDT", "SOMI/USDT:USDT", "TON/USDT:USDT", "PENGU/USDT:USDT", "WIF/USDT:USDT", "IO/USDT:USDT", "MORPHO/USDT:USDT", "WLD/USDT:USDT", "JTO/USDT:USDT", "AT/USDT:USDT", "RENDER/USDT:USDT", "FET/USDT:USDT", "SEI/USDT:USDT", "ZIL/USDT:USDT", "ARPA/USDT:USDT", "0G/USDT:USDT", "HMSTR/USDT:USDT", "LDO/USDT:USDT", "ONDO/USDT:USDT", "CAKE/USDT:USDT", "VANA/USDT:USDT", "RONIN/USDT:USDT", "ROSE/USDT:USDT", "AR/USDT:USDT", "ATOM/USDT:USDT", "HUMA/USDT:USDT", "CRV/USDT:USDT", "EUL/USDT:USDT", "NXPC/USDT:USDT", "BARD/USDT:USDT", "PENDLE/USDT:USDT", "EIGEN/USDT:USDT", "ETHFI/USDT:USDT", "ZK/USDT:USDT", "INIT/USDT:USDT", "VET/USDT:USDT", "DUSK/USDT:USDT", "ORCA/USDT:USDT", "GIGGLE/USDT:USDT", "TIA/USDT:USDT", "MUBARAK/USDT:USDT", "PROVE/USDT:USDT", "LINEA/USDT:USDT", "GALA/USDT:USDT", "S/USDT:USDT", "ZEN/USDT:USDT", "SYRUP/USDT:USDT", "NEIRO/USDT:USDT", "STRK/USDT:USDT", "PARTI/USDT:USDT", "OG/USDT:USDT", "RESOLV/USDT:USDT", "APE/USDT:USDT", "HOME/USDT:USDT", "SIGN/USDT:USDT", "KAIA/USDT:USDT", "ORDI/USDT:USDT", "IOTA/USDT:USDT", "W/USDT:USDT", "GUN/USDT:USDT", "JUP/USDT:USDT", "ALGO/USDT:USDT", "CETUS/USDT:USDT", "COMP/USDT:USDT", "RUNE/USDT:USDT", "AWE/USDT:USDT", "GPS/USDT:USDT", "DOLO/USDT:USDT", "SAND/USDT:USDT", "SLP/USDT:USDT", "ENS/USDT:USDT", "SPK/USDT:USDT", "FF/USDT:USDT", "SOLV/USDT:USDT", "OPEN/USDT:USDT", "D/USDT:USDT", "LIT/USDT:USDT", "WCT/USDT:USDT", "FIDA/USDT:USDT", "TWT/USDT:USDT", "STG/USDT:USDT", "NOM/USDT:USDT", "KSM/USDT:USDT", "BROCCOLI714/USDT:USDT", "IOTX/USDT:USDT", "EDU/USDT:USDT", "ARKM/USDT:USDT", "PNUT/USDT:USDT", "GRT/USDT:USDT", "ALT/USDT:USDT", "AUCTION/USDT:USDT", "SANTOS/USDT:USDT", "QNT/USDT:USDT", "PYTH/USDT:USDT", "STX/USDT:USDT", "MIRA/USDT:USDT", "PLUME/USDT:USDT", "JST/USDT:USDT", "BIGTIME/USDT:USDT", "ZBT/USDT:USDT", "HYPER/USDT:USDT", "STO/USDT:USDT", "CTSI/USDT:USDT", "C98/USDT:USDT", "COTI/USDT:USDT", "WOO/USDT:USDT", "BOME/USDT:USDT", "CFX/USDT:USDT", "BEAMX/USDT:USDT", "FORM/USDT:USDT", "ZKC/USDT:USDT", "XMR/USDT:USDT", "SKY/USDT:USDT", "FLOW/USDT:USDT", "SUPER/USDT:USDT", "ACH/USDT:USDT", "THETA/USDT:USDT", "XVG/USDT:USDT", "TNSR/USDT:USDT", "AXL/USDT:USDT", "MEME/USDT:USDT", "BANANAS31/USDT:USDT", "RPL/USDT:USDT", "TREE/USDT:USDT", "1000CHEEMS/USDT:USDT", "KAITO/USDT:USDT", "TRB/USDT:USDT", "RIF/USDT:USDT", "JASMY/USDT:USDT", "MMT/USDT:USDT", "LA/USDT:USDT", "HEMI/USDT:USDT", "TUT/USDT:USDT", "PROM/USDT:USDT", "DYDX/USDT:USDT", "ASR/USDT:USDT", "A/USDT:USDT", "ACT/USDT:USDT", "IMX/USDT:USDT", "NEO/USDT:USDT", "AIXBT/USDT:USDT", "ME/USDT:USDT", "STEEM/USDT:USDT", "SCR/USDT:USDT", "CHR/USDT:USDT", "MITO/USDT:USDT", "VELODROME/USDT:USDT", "NIL/USDT:USDT", "GLM/USDT:USDT", "RARE/USDT:USDT", "TLM/USDT:USDT", "BB/USDT:USDT", "MOVE/USDT:USDT", "HOLO/USDT:USDT", "1MBABYDOGE/USDT:USDT", "EDEN/USDT:USDT", "LAYER/USDT:USDT", "MANA/USDT:USDT", "MANTA/USDT:USDT", "1INCH/USDT:USDT", "USUAL/USDT:USDT", "ANIME/USDT:USDT", "CKB/USDT:USDT", "SUSHI/USDT:USDT", "GMT/USDT:USDT", "MET/USDT:USDT", "DYM/USDT:USDT", "ALPINE/USDT:USDT", "XTZ/USDT:USDT", "CVX/USDT:USDT", "TOWNS/USDT:USDT", "MAGIC/USDT:USDT", "MINA/USDT:USDT", "SCRT/USDT:USDT", "AEVO/USDT:USDT", "SUN/USDT:USDT", "METIS/USDT:USDT", "TST/USDT:USDT", "XVS/USDT:USDT", "PHB/USDT:USDT", "CELO/USDT:USDT", "ALICE/USDT:USDT", "ASTR/USDT:USDT", "KERNEL/USDT:USDT", "SSV/USDT:USDT", "PEOPLE/USDT:USDT", "CELR/USDT:USDT", "TURBO/USDT:USDT", "NOT/USDT:USDT", "SHELL/USDT:USDT", "SAGA/USDT:USDT", "YFI/USDT:USDT", "RSR/USDT:USDT", "REZ/USDT:USDT", "XAI/USDT:USDT", "VANRY/USDT:USDT", "GMX/USDT:USDT", "LPT/USDT:USDT", "RVN/USDT:USDT", "ID/USDT:USDT", "HAEDAL/USDT:USDT", "1000SATS/USDT:USDT", "SXT/USDT:USDT", "SAHARA/USDT:USDT", "HFT/USDT:USDT", "LUMIA/USDT:USDT", "TURTLE/USDT:USDT", "ZRX/USDT:USDT", "PIXEL/USDT:USDT", "BAND/USDT:USDT", "STORJ/USDT:USDT", "HEI/USDT:USDT", "THE/USDT:USDT", "NTRN/USDT:USDT", "BANK/USDT:USDT", "OGN/USDT:USDT", "SKL/USDT:USDT", "BAT/USDT:USDT", "PORTAL/USDT:USDT", "API3/USDT:USDT", "EPIC/USDT:USDT", "MASK/USDT:USDT", "ACE/USDT:USDT", "VIC/USDT:USDT", "BANANA/USDT:USDT", "DOGS/USDT:USDT", "NMR/USDT:USDT", "DEXE/USDT:USDT", "LQTY/USDT:USDT", "ILV/USDT:USDT", "ERA/USDT:USDT", "MAV/USDT:USDT", "G/USDT:USDT", "EGLD/USDT:USDT", "BICO/USDT:USDT", "POLYX/USDT:USDT", "NEWT/USDT:USDT", "UMA/USDT:USDT", "2Z/USDT:USDT", "ONT/USDT:USDT", "MBOX/USDT:USDT", "LISTA/USDT:USDT", "HIGH/USDT:USDT", "YB/USDT:USDT", "HIVE/USDT:USDT", "ANKR/USDT:USDT", "BABY/USDT:USDT", "LSK/USDT:USDT", "FLUX/USDT:USDT", "CTK/USDT:USDT", "SYN/USDT:USDT", "WAL/USDT:USDT", "AI/USDT:USDT", "DEGO/USDT:USDT", "PHA/USDT:USDT", "1000CAT/USDT:USDT", "ONE/USDT:USDT", "KNC/USDT:USDT", "KAVA/USDT:USDT", "FIO/USDT:USDT"]
factor_wide: pd.DataFrame | None = None
factor_bar_time: pd.Timestamp | None = None
lagging_pairs: list[tuple[str, pd.Timestamp]] = []
latest_pair_count = 0
for attempt in range(self.ranking_max_retries + 1):
wide_data = self._build_engine_wide_data(pairs, session, full_history=self._is_backtest_mode)
if wide_data.empty:
logger.warning("No valid wide data for session %s", session)
self._last_rank_bar = None
return
try:
alpha_data = Alphas(wide_data)
engine = FastExpressionEngine(alpha_data)
factor_wide = engine.evaluate(self.factor_expression)
except Exception as exc:
logger.warning("FastEngine factor evaluation failed: %s", exc)
self._last_rank_bar = None
return
if not isinstance(factor_wide, pd.DataFrame) or factor_wide.empty:
logger.warning("Factor expression did not return a valid DataFrame at %s", session)
self._last_rank_bar = None
return
factor_wide = factor_wide.sort_index()
factor_bar_time = pd.to_datetime(factor_wide.index, utc=True).max()
latest_pair_count = sum(
1 for candle_time in self._last_pair_candle_times.values() if candle_time == factor_bar_time
)
lagging_pairs = sorted(
[
(pair, candle_time)
for pair, candle_time in self._last_pair_candle_times.items()
if candle_time < factor_bar_time
],
key=lambda item: item[1],
)
total_pairs = len(self._last_pair_candle_times)
missing_ratio = (len(lagging_pairs) / total_pairs) if total_pairs else 0.0
logger.info("Factor wide latest timestamp=%s", factor_bar_time)
logger.info(
"Latest candle coverage target=%s up_to_date=%d lagging=%d total=%d missing_ratio=%.2f attempt=%d/%d",
factor_bar_time,
latest_pair_count,
len(lagging_pairs),
total_pairs,
missing_ratio,
attempt + 1,
self.ranking_max_retries + 1,
)
if lagging_pairs:
logger.info("Lagging pairs sample=%s", lagging_pairs[:10])
if missing_ratio <= self.ranking_missing_retry_threshold or attempt == self.ranking_max_retries:
break
logger.warning(
"Lagging pair ratio %.2f exceeds threshold %.2f for session %s. Waiting %ss before retry %d.",
missing_ratio,
self.ranking_missing_retry_threshold,
session,
self.ranking_retry_wait_secs,
attempt + 1,
)
time.sleep(self.ranking_retry_wait_secs)
if factor_wide is None or factor_bar_time is None:
self._last_rank_bar = None
return
if not self._is_backtest_mode:
snapshot = factor_wide[factor_wide.index <= factor_bar_time].tail(1)
else:
snapshot = factor_wide
if snapshot.empty:
logger.warning("No factor snapshot at %s", factor_bar_time)
self._last_rank_bar = None
return
logger.info(
"Factor snapshot target=%s actual=%s",
factor_bar_time,
pd.to_datetime(snapshot.index[-1], utc=True),
)
self._record_session_rankings(snapshot, not self._is_backtest_mode)
end_build_data_time = time.perf_counter()
logger.info(f'Factor wide data with len {len(snapshot)} built in {end_build_data_time - start_build_data_time:.2f} seconds')
def _ensure_rankings_once_per_bar(self, bar_time: pd.Timestamp) -> None:
session = self._bar_time(bar_time)
if self._last_refresh_attempt_bar is not None and self._last_refresh_attempt_bar >= session:
return
self._last_refresh_attempt_bar = session
if pd.to_datetime(session) in self.session_longs and pd.to_datetime(session) in self.session_shorts:
return
self._refresh_rankings(session)
# self._is_backtest_mode()
def _is_long_signal(self, pair: str, ts: pd.Timestamp) -> bool:
session = self._bar_time(ts)
return self._pair_key(pair) in self.session_longs.get(session, set())
def _is_short_signal(self, pair: str, ts: pd.Timestamp) -> bool:
session = self._bar_time(ts)
return self._pair_key(pair) in self.session_shorts.get(session, set())
def _build_signal_masks(
self, pair: str, dates: pd.Series
) -> tuple[pd.Series, pd.Series, pd.Series]:
pair_key = self._pair_key(pair)
sessions = pd.to_datetime(dates, utc=True).dt.floor(self._pandas_freq(self.timeframe))
long_mask = sessions.map(lambda session: pair_key in self.session_longs.get(session, set()))
short_mask = sessions.map(lambda session: pair_key in self.session_shorts.get(session, set()))
should_exit = ~(long_mask | short_mask)
return long_mask.astype("int8"), short_mask.astype("int8"), should_exit.astype("int8")
def populate_indicators(self, dataframe: pd.DataFrame, metadata: dict) -> pd.DataFrame:
logger.debug("Populating indicators for pair %s at time %s", metadata["pair"], dataframe["date"].iloc[-1])
logger.debug(f"Processing dataframe with {len(dataframe)} rows")
if dataframe.empty:
return dataframe
current_bar = pd.to_datetime(dataframe["date"].iloc[-1], utc=True)
self._ensure_rankings_once_per_bar(current_bar)
pair = metadata["pair"]
signal_long, signal_short, should_exit = self._build_signal_masks(pair, dataframe["date"])
dataframe["signal_long"] = signal_long
dataframe["signal_short"] = signal_short
dataframe["should_exit"] = should_exit
logger.debug("Current session longs: %s", self.session_longs.get(self._bar_time(current_bar), set()))
logger.debug("Current session shorts: %s", self.session_shorts.get(self._bar_time(current_bar), set()))
return dataframe
def populate_entry_trend(self, dataframe: pd.DataFrame, metadata: dict) -> pd.DataFrame:
if dataframe.empty:
return dataframe
current_bar = pd.to_datetime(dataframe["date"].iloc[-1], utc=True)
logger.debug("Populating entry trend for pair %s at time %s", metadata["pair"], current_bar)
logger.debug("Session longs for current bar: %s", self.session_longs.get(self._bar_time(current_bar), set()))
logger.debug("Session shorts for current bar: %s", self.session_shorts.get(self._bar_time(current_bar), set()))
if self.can_long:
dataframe.loc[dataframe["signal_long"] == 1, "enter_long"] = 1
if self.can_short:
dataframe.loc[dataframe["signal_short"] == 1, "enter_short"] = 1
logger.debug(
"Entry signals populated for pair %s: long=%d, short=%d",
metadata["pair"],
int(dataframe["signal_long"].sum()),
int(dataframe["signal_short"].sum()),
)
return dataframe
def populate_exit_trend(self, dataframe: pd.DataFrame, metadata: dict) -> pd.DataFrame:
if dataframe.empty:
return dataframe
dataframe.loc[dataframe["should_exit"] == 1, "exit_long"] = 1
dataframe.loc[dataframe["should_exit"] == 1, "exit_short"] = 1
return dataframe
def custom_exit(self, pair: str, trade, current_time, current_rate, current_profit, **kwargs):
# now = pd.to_datetime(current_time, utc=True)
# self._ensure_rankings_once_per_bar(now)
# if trade.entry_side == "long" and not self._is_long_signal(pair, now):
# return "no_long_signal"
# if trade.entry_side == "short" and not self._is_short_signal(pair, now):
# return "no_short_signal"
return None