BTC-only short-horizon FreqAI strategy aligned with the bundled Polymarket project. Focus: - one pair: BTC/USDT:USDT - base timeframe: 1m - informative timeframes: 5m / 15m / 1h - targets: 2m / 5m / 10m / 15m directional classification with explicit NO_EDGE class
Timeframe
1m
Direction
Long & Short
Stoploss
-0.8%
Trailing Stop
No
ROI
0m: 0.7%, 8m: 0.3%, 15m: 0.0%
Interface Version
N/A
Startup Candles
N/A
Indicators
6
freqtrade/freqtrade-strategies
Strategy 003 author@: Gerald Lonlas github@: https://github.com/freqtrade/freqtrade-strategies
import logging
import sqlite3
from datetime import datetime, timedelta
from functools import lru_cache
from pathlib import Path
import numpy as np
import pandas as pd
import talib.abstract as ta
from pandas import DataFrame
from technical import qtpylib
from freqtrade.persistence import Trade
from freqtrade.strategy import IStrategy
logger = logging.getLogger(__name__)
USER_DATA_DIR = Path("/Users/mac/trade/user_data")
FUTURES_DATA_DIR = USER_DATA_DIR / "data" / "bitget" / "futures"
POLYMARKET_RUNTIME_DB_PATH = Path("/Users/mac/codex_runtime/btc_short_horizon_ml/runtime/decision_collector.sqlite3")
DEFAULT_PRICE_TO_BEAT_PROXY_OFFSET_USD = 40.83
def _safe_div(numerator: pd.Series, denominator: pd.Series) -> pd.Series:
return numerator / denominator.replace(0.0, np.nan)
def _pick_first_populated_column(dataframe: DataFrame, candidates: list[str]) -> str | None:
scored: list[tuple[float, str]] = []
for candidate in candidates:
if candidate not in dataframe.columns:
continue
series = pd.to_numeric(dataframe[candidate], errors="coerce").dropna()
if series.empty:
continue
scored.append((float(series.abs().sum()), candidate))
if not scored:
return None
scored.sort(reverse=True)
return scored[0][1]
def _read_local_feather(path: Path, columns: list[str] | None = None) -> DataFrame:
if not path.exists():
return DataFrame()
try:
if columns:
return pd.read_feather(path, columns=columns)
return pd.read_feather(path)
except Exception:
return DataFrame()
@lru_cache(maxsize=4)
def _cached_polymarket_price_to_beat_lookup(db_mtime: float) -> dict[int, float]:
if not POLYMARKET_RUNTIME_DB_PATH.exists():
return {}
try:
conn = sqlite3.connect(str(POLYMARKET_RUNTIME_DB_PATH))
frame = pd.read_sql_query(
"""
SELECT event_slug, price_to_beat, recorded_at_utc
FROM contour_decisions
WHERE contour_id = 'polymarket_v2'
AND decision_phase = 'round_signal'
AND price_to_beat IS NOT NULL
ORDER BY recorded_at_utc ASC
""",
conn,
)
except Exception:
return {}
finally:
try:
conn.close()
except Exception:
pass
if frame.empty:
return {}
suffix = frame["event_slug"].astype(str).str.extract(r"(\d+)$")[0]
frame["bucket_start_epoch"] = pd.to_numeric(suffix, errors="coerce").astype("Int64")
frame["price_to_beat"] = pd.to_numeric(frame["price_to_beat"], errors="coerce")
frame = frame.dropna(subset=["bucket_start_epoch", "price_to_beat"])
if frame.empty:
return {}
frame = frame.drop_duplicates(subset=["bucket_start_epoch"], keep="last")
return {
int(row["bucket_start_epoch"]): float(row["price_to_beat"])
for _, row in frame.iterrows()
}
def _polymarket_price_to_beat_lookup() -> dict[int, float]:
if not POLYMARKET_RUNTIME_DB_PATH.exists():
return {}
try:
return _cached_polymarket_price_to_beat_lookup(POLYMARKET_RUNTIME_DB_PATH.stat().st_mtime)
except OSError:
return {}
class BtcShortHorizonFreqaiStrategy(IStrategy):
"""
BTC-only short-horizon FreqAI strategy aligned with the bundled Polymarket project.
Focus:
- one pair: BTC/USDT:USDT
- base timeframe: 1m
- informative timeframes: 5m / 15m / 1h
- targets: 2m / 5m / 10m / 15m directional classification with explicit NO_EDGE class
"""
timeframe = "1m"
can_short = True
process_only_new_candles = True
use_exit_signal = True
startup_candle_count: int = 80
minimal_roi = {
"0": 0.007,
"8": 0.003,
"15": 0.0,
}
stoploss = -0.0075
order_types = {
"entry": "market",
"exit": "market",
"stoploss": "market",
"force_entry": "market",
"force_exit": "market",
"emergency_exit": "market",
"stoploss_on_exchange": False,
}
def informative_pairs(self):
if not self.dp:
return []
pairs = self.dp.current_whitelist() or self.config.get("exchange", {}).get("pair_whitelist", [])
informative: list[tuple[str, str, str]] = []
for pair in pairs:
informative.append((pair, "5m", "mark"))
informative.append((pair, "1h", "mark"))
informative.append((pair, "1h", "funding_rate"))
return informative
def feature_engineering_expand_all(
self, dataframe: DataFrame, period: int, metadata: dict, **kwargs
) -> DataFrame:
close = dataframe["close"]
high = dataframe["high"]
low = dataframe["low"]
volume = dataframe["volume"]
one_bar_return = close.pct_change()
ema = ta.EMA(dataframe, timeperiod=period)
atr = ta.ATR(dataframe, timeperiod=period)
lowest_low = low.rolling(period).min()
highest_high = high.rolling(period).max()
range_span = (highest_high - lowest_low).replace(0.0, np.nan)
dataframe["%-ret-period"] = close.pct_change(period)
dataframe["%-logret-period"] = np.log(_safe_div(close, close.shift(period)))
dataframe["%-volatility-period"] = one_bar_return.rolling(period).std(ddof=0)
dataframe["%-ema_gap-period"] = _safe_div(close, ema) - 1.0
dataframe["%-rsi-period"] = ta.RSI(dataframe, timeperiod=period) / 100.0
dataframe["%-range_pct-period"] = _safe_div(highest_high, lowest_low) - 1.0
dataframe["%-close_pos-period"] = _safe_div(close - lowest_low, range_span)
dataframe["%-volume_ratio-period"] = _safe_div(volume, volume.rolling(period).mean())
dataframe["%-atr_ratio-period"] = _safe_div(atr, close)
return dataframe
def feature_engineering_expand_basic(
self, dataframe: DataFrame, metadata: dict, **kwargs
) -> DataFrame:
close = dataframe["close"]
open_ = dataframe["open"]
high = dataframe["high"]
low = dataframe["low"]
volume = dataframe["volume"]
logret = np.log(_safe_div(close, close.shift(1)))
rolling_vwap = _safe_div((qtpylib.typical_price(dataframe) * volume).rolling(20).sum(), volume.rolling(20).sum())
obv = ta.OBV(dataframe)
volume_ma_20 = volume.rolling(20).mean()
impulse_up_3 = close.pct_change(3)
recent_high_10 = high.rolling(10).max().shift(1)
recent_high_20 = high.rolling(20).max().shift(1)
recent_low_10 = low.rolling(10).min().shift(1)
recent_low_20 = low.rolling(20).min().shift(1)
dataframe["%-ret_1"] = close.pct_change(1)
dataframe["%-logret_1"] = logret
dataframe["%-body_pct"] = _safe_div(close - open_, close)
dataframe["%-hl_spread"] = _safe_div(high - low, close)
dataframe["%-upper_wick_pct"] = _safe_div(high - np.maximum(open_, close), close)
dataframe["%-lower_wick_pct"] = _safe_div(np.minimum(open_, close) - low, close)
dataframe["%-realized_vol_1"] = logret.abs()
dataframe["%-price_accel_1"] = close.pct_change(1).diff(1)
dataframe["%-obv_delta_1"] = obv.diff(1)
dataframe["%-vwap_distance_20"] = _safe_div(close, rolling_vwap) - 1.0
dataframe["%-volume_thrust_20"] = _safe_div(volume, volume_ma_20)
dataframe["%-impulse_up_3m"] = impulse_up_3
dataframe["%-breakout_up_10"] = (_safe_div(close, recent_high_10) - 1.0).fillna(0.0)
dataframe["%-breakout_up_20"] = (_safe_div(close, recent_high_20) - 1.0).fillna(0.0)
dataframe["%-breakout_down_10"] = (_safe_div(close, recent_low_10) - 1.0).fillna(0.0)
dataframe["%-breakout_down_20"] = (_safe_div(close, recent_low_20) - 1.0).fillna(0.0)
dataframe["%-raw_volume"] = volume
dataframe["%-raw_price"] = close
return dataframe
def feature_engineering_standard(
self, dataframe: DataFrame, metadata: dict, **kwargs
) -> DataFrame:
dataframe = dataframe.copy()
close = dataframe["close"]
open_ = dataframe["open"]
high = dataframe["high"]
low = dataframe["low"]
volume = dataframe["volume"]
ema_5 = ta.EMA(dataframe, timeperiod=5)
ema_10 = ta.EMA(dataframe, timeperiod=10)
ema_20 = ta.EMA(dataframe, timeperiod=20)
ema_50 = ta.EMA(dataframe, timeperiod=50)
bb_middle = close.rolling(20).mean()
bb_std = close.rolling(20).std(ddof=0)
rolling_vwap = _safe_div((qtpylib.typical_price(dataframe) * volume).rolling(20).sum(), volume.rolling(20).sum())
obv = ta.OBV(dataframe)
macd = ta.MACD(dataframe)
donchian_high = high.rolling(3).max().shift(1)
donchian_low = low.rolling(3).min().shift(1)
realized_vol_1 = np.log(_safe_div(close, close.shift(1))).abs()
volume_thrust = _safe_div(volume, volume.rolling(20).mean())
recent_high_10 = high.rolling(10).max().shift(1)
recent_high_20 = high.rolling(20).max().shift(1)
recent_low_10 = low.rolling(10).min().shift(1)
recent_low_20 = low.rolling(20).min().shift(1)
breakout_up_10 = (_safe_div(close, recent_high_10) - 1.0).fillna(0.0)
breakout_up_20 = (_safe_div(close, recent_high_20) - 1.0).fillna(0.0)
breakout_down_10 = (_safe_div(close, recent_low_10) - 1.0).fillna(0.0)
breakout_down_20 = (_safe_div(close, recent_low_20) - 1.0).fillna(0.0)
bullish_body_3 = _safe_div((close - open_).rolling(3).sum(), close.rolling(3).mean())
bearish_body_3 = _safe_div((open_ - close).rolling(3).sum(), close.rolling(3).mean())
dataframe["%-ema_gap_5_20"] = _safe_div(ema_5, ema_20) - 1.0
dataframe["%-ema_gap_10_50"] = _safe_div(ema_10, ema_50) - 1.0
dataframe["%-bollinger_bandwidth_20"] = _safe_div(4.0 * bb_std, bb_middle)
dataframe["%-rsi_1m_standard"] = ta.RSI(dataframe, timeperiod=14) / 100.0
dataframe["%-macd_hist_slope_1m"] = macd["macdhist"].diff(1)
dataframe["%-obv_delta_1m"] = obv.diff(1)
dataframe["%-vwap_distance_1m"] = _safe_div(close, rolling_vwap) - 1.0
# FreqAI can call this hook before expand_basic has materialized %-realized_vol_1.
dataframe["%-realized_vol_3m"] = realized_vol_1.rolling(3).mean()
dataframe["%-vol_of_vol_3m"] = realized_vol_1.rolling(3).std(ddof=0)
dataframe["%-donchian_breakout_3m"] = np.select(
[close > donchian_high, close < donchian_low],
[1.0, -1.0],
default=0.0,
)
dataframe["%-trend_pressure_1m"] = (
(dataframe["%-ema_gap_5_20"] * 0.6) + (dataframe["%-ema_gap_10_50"] * 0.4)
)
dataframe["%-micro_momentum_balance_1m"] = (
(dataframe["%-rsi_1m_standard"] - 0.5) * 1.5
+ np.tanh(dataframe["%-macd_hist_slope_1m"] / 8.0) * 0.8
+ np.tanh(dataframe["%-vwap_distance_1m"] * 800.0) * 0.6
+ (dataframe["%-donchian_breakout_3m"] * 0.35)
)
dataframe["%-squeeze_breakout_bias_1m"] = (
dataframe["%-donchian_breakout_3m"] * dataframe["%-bollinger_bandwidth_20"]
)
dataframe["%-volatility_state_1m"] = (
_safe_div(
dataframe["%-realized_vol_3m"],
dataframe["%-realized_vol_3m"].rolling(20).mean(),
).replace([np.inf, -np.inf], np.nan).fillna(0.0)
)
dataframe["%-breakout_up_10_1m"] = breakout_up_10
dataframe["%-breakout_up_20_1m"] = breakout_up_20
dataframe["%-breakout_down_10_1m"] = breakout_down_10
dataframe["%-breakout_down_20_1m"] = breakout_down_20
dataframe["%-volume_thrust_1m"] = volume_thrust
dataframe["%-bullish_body_pressure_3m"] = bullish_body_3.fillna(0.0)
dataframe["%-bearish_body_pressure_3m"] = bearish_body_3.fillna(0.0)
dataframe["%-up_breakout_impulse_1m"] = (
np.tanh(breakout_up_10 * 900.0) * 0.45
+ np.tanh(breakout_up_20 * 700.0) * 0.25
+ np.tanh(volume_thrust.sub(1.0).fillna(0.0) * 1.6) * 0.15
+ np.tanh(bullish_body_3.fillna(0.0) * 55.0) * 0.15
)
dataframe["%-down_breakout_impulse_1m"] = (
np.tanh((-breakout_down_10) * 900.0) * 0.45
+ np.tanh((-breakout_down_20) * 700.0) * 0.25
+ np.tanh(volume_thrust.sub(1.0).fillna(0.0) * 1.6) * 0.15
+ np.tanh(bearish_body_3.fillna(0.0) * 55.0) * 0.15
)
date_index = pd.to_datetime(dataframe["date"], utc=True, errors="coerce")
dataframe["%-minute_sin"] = np.sin(2.0 * np.pi * date_index.dt.minute / 60.0)
dataframe["%-minute_cos"] = np.cos(2.0 * np.pi * date_index.dt.minute / 60.0)
dataframe["%-hour_sin"] = np.sin(2.0 * np.pi * date_index.dt.hour / 24.0)
dataframe["%-hour_cos"] = np.cos(2.0 * np.pi * date_index.dt.hour / 24.0)
dataframe["%-weekday_sin"] = np.sin(2.0 * np.pi * date_index.dt.dayofweek / 7.0)
dataframe["%-weekday_cos"] = np.cos(2.0 * np.pi * date_index.dt.dayofweek / 7.0)
dataframe["%%rsi_1m"] = dataframe["%-rsi_1m_standard"]
dataframe["%%macd_hist_slope_1m"] = dataframe["%-macd_hist_slope_1m"]
dataframe["%%vwap_distance_1m"] = dataframe["%-vwap_distance_1m"]
dataframe["%%donchian_breakout_3m"] = dataframe["%-donchian_breakout_3m"]
dataframe["%%ema_gap_5_20"] = dataframe["%-ema_gap_5_20"]
dataframe["%%ema_gap_10_50"] = dataframe["%-ema_gap_10_50"]
dataframe["%%bb_width_20"] = dataframe["%-bollinger_bandwidth_20"]
dataframe["%%trend_pressure_1m"] = dataframe["%-trend_pressure_1m"]
dataframe["%%micro_momentum_balance_1m"] = dataframe["%-micro_momentum_balance_1m"]
dataframe["%%volatility_state_1m"] = dataframe["%-volatility_state_1m"]
dataframe["%%up_breakout_impulse_1m"] = dataframe["%-up_breakout_impulse_1m"]
dataframe["%%down_breakout_impulse_1m"] = dataframe["%-down_breakout_impulse_1m"]
dataframe["%%breakout_up_20_1m"] = dataframe["%-breakout_up_20_1m"]
dataframe["%%breakout_down_20_1m"] = dataframe["%-breakout_down_20_1m"]
dataframe["%%volume_thrust_1m"] = dataframe["%-volume_thrust_1m"]
dataframe = self._merge_mark_and_funding_features(dataframe, metadata)
return dataframe
def _merge_mark_and_funding_features(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
pair = metadata["pair"]
merged = dataframe.sort_values("date").copy()
def load_market_frame(timeframe: str, candle_type: str) -> DataFrame:
if self.dp:
try:
frame = self.dp.get_pair_dataframe(pair=pair, timeframe=timeframe, candle_type=candle_type)
except Exception:
frame = DataFrame()
if frame is not None and not frame.empty:
return frame
local_path = FUTURES_DATA_DIR / f"BTC_USDT_USDT-{timeframe}-{candle_type}.feather"
return _read_local_feather(local_path)
mark_5m_frame = load_market_frame("5m", "mark")
if mark_5m_frame is None or mark_5m_frame.empty:
merged["mark_open_5m"] = np.nan
merged["mark_close_5m"] = np.nan
merged["mark_source_date_5m"] = pd.NaT
else:
mark_5m_frame = mark_5m_frame.loc[:, ["date", "open", "close"]].copy()
mark_5m_frame["date"] = pd.to_datetime(mark_5m_frame["date"], utc=True, errors="coerce")
mark_5m_frame = mark_5m_frame.dropna(subset=["date"]).sort_values("date")
mark_5m_frame = mark_5m_frame.rename(
columns={
"open": "mark_open_5m",
"close": "mark_close_5m",
"date": "mark_source_date_5m",
}
)
mark_5m_frame["date"] = mark_5m_frame["mark_source_date_5m"]
merged = pd.merge_asof(merged, mark_5m_frame, on="date", direction="backward")
mark_frame = load_market_frame("1h", "mark")
if mark_frame is None or mark_frame.empty:
merged["mark_close_1h"] = np.nan
merged["mark_source_date_1h"] = pd.NaT
else:
mark_frame = mark_frame.loc[:, ["date", "close"]].copy()
mark_frame["date"] = pd.to_datetime(mark_frame["date"], utc=True, errors="coerce")
mark_frame = mark_frame.dropna(subset=["date"]).sort_values("date")
mark_frame = mark_frame.rename(columns={"close": "mark_close_1h", "date": "mark_source_date_1h"})
mark_frame["date"] = mark_frame["mark_source_date_1h"]
merged = pd.merge_asof(merged, mark_frame, on="date", direction="backward")
funding_frame = load_market_frame("1h", "funding_rate")
if funding_frame is None or funding_frame.empty:
merged["funding_rate_1h"] = np.nan
merged["funding_source_date_1h"] = pd.NaT
else:
funding_frame = funding_frame.copy()
funding_frame["date"] = pd.to_datetime(funding_frame["date"], utc=True, errors="coerce")
funding_frame = funding_frame.dropna(subset=["date"]).sort_values("date")
funding_column = _pick_first_populated_column(
funding_frame,
["fundingRate", "funding_rate", "value", "open", "close", "high", "low"],
)
if funding_column is None:
merged["funding_rate_1h"] = np.nan
merged["funding_source_date_1h"] = pd.NaT
else:
funding_frame = funding_frame.loc[:, ["date", funding_column]].rename(
columns={funding_column: "funding_rate_1h", "date": "funding_source_date_1h"}
)
funding_frame["date"] = funding_frame["funding_source_date_1h"]
merged = pd.merge_asof(merged, funding_frame, on="date", direction="backward")
merged["%-mark_available_1h"] = merged["mark_close_1h"].notna().astype(float)
merged["%-funding_available_1h"] = merged["funding_rate_1h"].notna().astype(float)
merged["mark_close_1h"] = merged["mark_close_1h"].ffill()
merged["funding_rate_1h"] = merged["funding_rate_1h"].ffill()
merged["mark_source_date_1h"] = pd.to_datetime(merged["mark_source_date_1h"], utc=True, errors="coerce").ffill()
merged["funding_source_date_1h"] = pd.to_datetime(
merged["funding_source_date_1h"], utc=True, errors="coerce"
).ffill()
merged["%-basis_vs_mark_1h"] = (_safe_div(merged["close"], merged["mark_close_1h"]) - 1.0).fillna(0.0)
merged["%-mark_return_1h"] = merged["mark_close_1h"].pct_change().fillna(0.0)
merged["%-funding_rate_1h"] = merged["funding_rate_1h"].fillna(0.0)
merged["%-funding_rate_delta_1h"] = merged["funding_rate_1h"].diff(1).fillna(0.0)
merged["%-funding_rate_abs_1h"] = merged["funding_rate_1h"].abs().fillna(0.0)
merged["%-mark_age_hours_1h"] = (
(pd.to_datetime(merged["date"], utc=True, errors="coerce") - merged["mark_source_date_1h"])
.dt.total_seconds()
.div(3600.0)
.clip(lower=0.0, upper=24.0)
.fillna(24.0)
)
merged["%-funding_age_hours_1h"] = (
(pd.to_datetime(merged["date"], utc=True, errors="coerce") - merged["funding_source_date_1h"])
.dt.total_seconds()
.div(3600.0)
.clip(lower=0.0, upper=24.0)
.fillna(24.0)
)
merged["%-mark_available_5m"] = merged["mark_open_5m"].notna().astype(float)
merged["mark_source_date_5m"] = pd.to_datetime(merged["mark_source_date_5m"], utc=True, errors="coerce").ffill()
merged["mark_open_5m"] = pd.to_numeric(merged["mark_open_5m"], errors="coerce").ffill()
merged["mark_close_5m"] = pd.to_numeric(merged["mark_close_5m"], errors="coerce").ffill()
merged["%-mark_age_minutes_5m"] = (
(pd.to_datetime(merged["date"], utc=True, errors="coerce") - merged["mark_source_date_5m"])
.dt.total_seconds()
.div(60.0)
.clip(lower=0.0, upper=30.0)
.fillna(30.0)
)
merged["%-mark_freshness_5m"] = (1.0 - (merged["%-mark_age_minutes_5m"] / 30.0)).clip(lower=0.0, upper=1.0)
merged["%-mark_freshness_1h"] = (1.0 - (merged["%-mark_age_hours_1h"] / 24.0)).clip(lower=0.0, upper=1.0)
merged["%-funding_freshness_1h"] = (
1.0 - (merged["%-funding_age_hours_1h"] / 24.0)
).clip(lower=0.0, upper=1.0)
merged["%-basis_vs_mark_effective_1h"] = (
merged["%-basis_vs_mark_1h"] * merged["%-mark_freshness_1h"]
).fillna(0.0)
merged["%-funding_rate_delta_effective_1h"] = (
merged["%-funding_rate_delta_1h"] * merged["%-funding_freshness_1h"]
).fillna(0.0)
date_index = pd.to_datetime(merged["date"], utc=True, errors="coerce")
minute_mod_5m = (date_index.dt.minute % 5).fillna(0).astype(float)
merged["%-cycle_progress_5m"] = (minute_mod_5m / 5.0).clip(lower=0.0, upper=0.8)
merged["%-minutes_to_close_5m"] = (5.0 - minute_mod_5m).clip(lower=1.0, upper=5.0)
merged["bucket_start_5m"] = date_index.dt.floor("5min")
merged["bucket_start_epoch_5m"] = (
(merged["bucket_start_5m"].astype("int64") // 10**9)
.where(merged["bucket_start_5m"].notna(), pd.NA)
.astype("Int64")
)
price_to_beat_lookup = _polymarket_price_to_beat_lookup()
merged["polymarket_price_to_beat_5m_actual"] = (
merged["bucket_start_epoch_5m"].map(price_to_beat_lookup).astype(float)
if price_to_beat_lookup
else np.nan
)
observed_proxy_offset = (
pd.to_numeric(merged["polymarket_price_to_beat_5m_actual"], errors="coerce")
- pd.to_numeric(merged["mark_open_5m"], errors="coerce")
)
valid_offsets = observed_proxy_offset.replace([np.inf, -np.inf], np.nan).dropna()
global_proxy_offset = (
float(valid_offsets.median())
if not valid_offsets.empty
else DEFAULT_PRICE_TO_BEAT_PROXY_OFFSET_USD
)
rolling_proxy_offset = (
observed_proxy_offset.ffill().rolling(240, min_periods=1).median().fillna(global_proxy_offset)
)
mark_open_anchor = pd.to_numeric(merged["mark_open_5m"], errors="coerce")
merged["polymarket_price_to_beat_proxy_5m"] = (
pd.to_numeric(merged["polymarket_price_to_beat_5m_actual"], errors="coerce")
.fillna(mark_open_anchor + rolling_proxy_offset)
.fillna(pd.to_numeric(merged["close"], errors="coerce") + global_proxy_offset)
)
merged["%-price_to_beat_proxy_delta_1m"] = (
_safe_div(pd.to_numeric(merged["close"], errors="coerce"), merged["polymarket_price_to_beat_proxy_5m"]) - 1.0
).fillna(0.0)
merged["%-price_to_beat_proxy_offset_5m"] = (
_safe_div(
merged["polymarket_price_to_beat_proxy_5m"] - mark_open_anchor,
merged["polymarket_price_to_beat_proxy_5m"],
)
).fillna(0.0)
merged["%-mark_open_gap_5m"] = (
_safe_div(pd.to_numeric(merged["close"], errors="coerce"), mark_open_anchor) - 1.0
).fillna(0.0)
merged["%-price_to_beat_actual_available_5m"] = merged["polymarket_price_to_beat_5m_actual"].notna().astype(float)
merged["%%basis_vs_mark_1h"] = merged["%-basis_vs_mark_1h"]
merged["%%funding_rate_delta_1h"] = merged["%-funding_rate_delta_1h"]
merged["%%basis_vs_mark_effective_1h"] = merged["%-basis_vs_mark_effective_1h"]
merged["%%funding_rate_delta_effective_1h"] = merged["%-funding_rate_delta_effective_1h"]
merged["%%price_to_beat_proxy_delta_1m"] = merged["%-price_to_beat_proxy_delta_1m"]
merged["%%price_to_beat_proxy_offset_5m"] = merged["%-price_to_beat_proxy_offset_5m"]
merged["%%mark_open_gap_5m"] = merged["%-mark_open_gap_5m"]
merged["%%cycle_progress_5m"] = merged["%-cycle_progress_5m"]
merged["%%minutes_to_close_5m"] = merged["%-minutes_to_close_5m"]
return merged
def set_freqai_targets(self, dataframe: DataFrame, metadata: dict, **kwargs) -> DataFrame:
base_thresholds = {
2: 0.00030,
5: 0.00050,
10: 0.00070,
15: 0.00090,
}
dataframe = dataframe.copy()
close = dataframe["close"]
atr_ratio_14 = _safe_div(ta.ATR(dataframe, timeperiod=14), close).fillna(0.0)
realized_vol_20 = close.pct_change().rolling(20).std(ddof=0).fillna(0.0)
volatility_context = ((realized_vol_20 * 0.7) + (atr_ratio_14 * 0.3)).fillna(0.0)
trend_pressure = pd.to_numeric(
dataframe.get("%-trend_pressure_1m", pd.Series(0.0, index=dataframe.index)),
errors="coerce",
).fillna(0.0)
micro_momentum_balance = pd.to_numeric(
dataframe.get("%-micro_momentum_balance_1m", pd.Series(0.0, index=dataframe.index)),
errors="coerce",
).fillna(0.0)
basis_vs_mark = pd.to_numeric(
dataframe.get("%-basis_vs_mark_effective_1h", dataframe.get("%-basis_vs_mark_1h", pd.Series(0.0, index=dataframe.index))),
errors="coerce",
).fillna(0.0)
up_breakout_impulse = pd.to_numeric(
dataframe.get("%-up_breakout_impulse_1m", pd.Series(0.0, index=dataframe.index)),
errors="coerce",
).fillna(0.0)
down_breakout_impulse = pd.to_numeric(
dataframe.get("%-down_breakout_impulse_1m", pd.Series(0.0, index=dataframe.index)),
errors="coerce",
).fillna(0.0)
breakout_up_20 = pd.to_numeric(
dataframe.get("%-breakout_up_20_1m", pd.Series(0.0, index=dataframe.index)),
errors="coerce",
).fillna(0.0)
breakout_down_20 = pd.to_numeric(
dataframe.get("%-breakout_down_20_1m", pd.Series(0.0, index=dataframe.index)),
errors="coerce",
).fillna(0.0)
volume_thrust = pd.to_numeric(
dataframe.get("%-volume_thrust_1m", pd.Series(1.0, index=dataframe.index)),
errors="coerce",
).fillna(1.0)
price_to_beat_proxy = pd.to_numeric(
dataframe.get("polymarket_price_to_beat_proxy_5m", dataframe["close"]),
errors="coerce",
).replace(0.0, np.nan)
price_to_beat_proxy = price_to_beat_proxy.fillna(pd.to_numeric(dataframe["close"], errors="coerce"))
minutes_to_close_5m = pd.to_numeric(
dataframe.get("%-minutes_to_close_5m", pd.Series(5.0, index=dataframe.index)),
errors="coerce",
).fillna(5.0)
price_to_beat_proxy_delta = pd.to_numeric(
dataframe.get("%-price_to_beat_proxy_delta_1m", pd.Series(0.0, index=dataframe.index)),
errors="coerce",
).fillna(0.0)
# Normalize current-bar context into a bounded directional bias.
trend_bias = (
np.tanh(trend_pressure * 250.0) * 0.50
+ np.tanh(micro_momentum_balance * 1.35) * 0.35
+ np.tanh(basis_vs_mark * 180.0) * 0.15
).clip(-1.0, 1.0)
bullish_impulse = (
np.tanh(up_breakout_impulse * 1.8) * 0.55
+ np.tanh(breakout_up_20 * 900.0) * 0.20
+ np.tanh((volume_thrust - 1.0) * 1.4) * 0.10
+ trend_bias.clip(lower=0.0) * 0.15
).clip(0.0, 1.0)
bearish_impulse = (
np.tanh(down_breakout_impulse * 1.8) * 0.55
+ np.tanh((-breakout_down_20) * 900.0) * 0.20
+ np.tanh((volume_thrust - 1.0) * 1.4) * 0.10
+ (-trend_bias).clip(lower=0.0) * 0.15
).clip(0.0, 1.0)
label_columns: dict[str, pd.Series] = {}
for horizon_minutes, base_threshold in base_thresholds.items():
horizon_scale = float(np.sqrt(horizon_minutes / 5.0))
adaptive_threshold = np.maximum(
base_threshold,
volatility_context * horizon_scale * 0.45,
)
bullish_bias = trend_bias.clip(lower=0.0)
bearish_bias = (-trend_bias).clip(lower=0.0)
up_threshold = adaptive_threshold * (
1.0
- bullish_bias * 0.08
- bullish_impulse * 0.14
+ bearish_bias * 0.20
+ bearish_impulse * 0.10
)
down_threshold = adaptive_threshold * (
1.0
- bearish_bias * 0.08
- bearish_impulse * 0.12
+ bullish_bias * 0.22
+ bullish_impulse * 0.08
)
up_threshold = np.clip(up_threshold, adaptive_threshold * 0.85, adaptive_threshold * 1.35)
down_threshold = np.clip(down_threshold, adaptive_threshold * 0.85, adaptive_threshold * 1.35)
cycle_close_pressure = ((5.0 - minutes_to_close_5m).clip(lower=0.0, upper=4.0) / 4.0).fillna(0.0)
event_alignment = np.where(horizon_minutes <= 5, 1.0, 0.55)
event_threshold_scale = (
1.0
- event_alignment * cycle_close_pressure * 0.08
- event_alignment * price_to_beat_proxy_delta.abs().clip(upper=0.006) * 10.0
)
event_threshold_scale = np.clip(event_threshold_scale, 0.82, 1.10)
up_threshold = up_threshold * event_threshold_scale
down_threshold = down_threshold * event_threshold_scale
future_return = dataframe["close"].shift(-horizon_minutes) / price_to_beat_proxy - 1.0
label_columns[f"&-dir_{horizon_minutes}m"] = pd.Series(
np.select(
[
future_return <= -down_threshold,
future_return >= up_threshold,
],
[
f"DOWN_{horizon_minutes}M",
f"UP_{horizon_minutes}M",
],
default=f"NO_EDGE_{horizon_minutes}M",
),
index=dataframe.index,
)
return pd.concat([dataframe, pd.DataFrame(label_columns, index=dataframe.index)], axis=1)
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
dataframe = self.freqai.start(dataframe, metadata, self)
return dataframe
@staticmethod
def _probability(dataframe: DataFrame, column: str) -> pd.Series:
if column in dataframe.columns:
return pd.to_numeric(dataframe[column], errors="coerce").fillna(0.0)
return pd.Series(0.0, index=dataframe.index)
@staticmethod
def _helper(dataframe: DataFrame, column: str) -> pd.Series:
if column in dataframe.columns:
return pd.to_numeric(dataframe[column], errors="coerce").fillna(0.0)
return pd.Series(0.0, index=dataframe.index)
def populate_entry_trend(self, df: DataFrame, metadata: dict) -> DataFrame:
up_2m = self._probability(df, "UP_2M")
down_2m = self._probability(df, "DOWN_2M")
no_edge_2m = self._probability(df, "NO_EDGE_2M")
up_5m = self._probability(df, "UP_5M")
down_5m = self._probability(df, "DOWN_5M")
no_edge_5m = self._probability(df, "NO_EDGE_5M")
up_10m = self._probability(df, "UP_10M")
down_10m = self._probability(df, "DOWN_10M")
up_15m = self._probability(df, "UP_15M")
down_15m = self._probability(df, "DOWN_15M")
score_2m = up_2m - down_2m
score_5m = up_5m - down_5m
score_10m = up_10m - down_10m
score_15m = up_15m - down_15m
horizon_weights = [0.30, 0.40, 0.20, 0.10]
composite_score = (score_2m * horizon_weights[0]) + (score_5m * horizon_weights[1]) + (score_10m * horizon_weights[2]) + (score_15m * horizon_weights[3])
directional_probability = pd.concat([up_2m, up_5m, up_10m, up_15m], axis=1).mul(horizon_weights, axis=1).sum(axis=1)
directional_probability = directional_probability.where(composite_score >= 0.0, pd.concat([down_2m, down_5m, down_10m, down_15m], axis=1).mul(horizon_weights, axis=1).sum(axis=1))
rsi_1m = self._helper(df, "%%rsi_1m")
macd_slope = self._helper(df, "%%macd_hist_slope_1m")
vwap_distance = self._helper(df, "%%vwap_distance_1m")
donchian_breakout = self._helper(df, "%%donchian_breakout_3m")
ema_gap_5_20 = self._helper(df, "%%ema_gap_5_20")
ema_gap_10_50 = self._helper(df, "%%ema_gap_10_50")
basis_vs_mark = self._helper(df, "%%basis_vs_mark_effective_1h")
funding_delta = self._helper(df, "%%funding_rate_delta_effective_1h")
no_edge_10m = self._probability(df, "NO_EDGE_10M")
no_edge_15m = self._probability(df, "NO_EDGE_15M")
bullish_momentum_votes = (
(rsi_1m >= 0.52).astype(int)
+ (macd_slope > 0.0).astype(int)
+ (vwap_distance > 0.0).astype(int)
+ (donchian_breakout > 0.0).astype(int)
)
bearish_momentum_votes = (
(rsi_1m <= 0.48).astype(int)
+ (macd_slope < 0.0).astype(int)
+ (vwap_distance < 0.0).astype(int)
+ (donchian_breakout < 0.0).astype(int)
)
bullish_short_guard = (
(bullish_momentum_votes >= 3)
| ((rsi_1m >= 0.54) & (ema_gap_5_20 >= 0.0008) & (ema_gap_10_50 >= 0.0020))
| ((basis_vs_mark >= 0.0025) & (ema_gap_10_50 >= 0.0020))
)
bearish_long_guard = (
(bearish_momentum_votes >= 3)
| ((rsi_1m <= 0.46) & (ema_gap_5_20 <= -0.0008) & (ema_gap_10_50 <= -0.0020))
| ((basis_vs_mark <= -0.0025) & (ema_gap_10_50 <= -0.0020))
)
long_supportive_horizons = (
(up_10m >= down_10m + 0.02).astype(int)
+ (up_15m >= down_15m + 0.02).astype(int)
)
short_supportive_horizons = (
(down_10m >= up_10m + 0.02).astype(int)
+ (down_15m >= up_15m + 0.02).astype(int)
)
weighted_no_edge = (no_edge_2m * horizon_weights[0]) + (no_edge_5m * horizon_weights[1]) + (no_edge_10m * horizon_weights[2]) + (no_edge_15m * horizon_weights[3])
long_conditions = [
df["do_predict"] == 1,
composite_score > 0.18,
directional_probability >= 0.50,
up_5m >= 0.50,
up_2m + 0.08 >= down_2m,
weighted_no_edge <= 0.30,
no_edge_5m <= 0.35,
bullish_momentum_votes >= 2,
bearish_momentum_votes <= 1,
long_supportive_horizons >= 1,
~bearish_long_guard,
ema_gap_5_20 >= -0.0002,
ema_gap_10_50 >= -0.0012,
basis_vs_mark >= -0.0040,
funding_delta >= -0.00035,
no_edge_2m <= 0.45,
df["volume"] > 0,
]
short_conditions = [
df["do_predict"] == 1,
composite_score < -0.18,
directional_probability >= 0.50,
down_5m >= 0.50,
down_2m + 0.08 >= up_2m,
weighted_no_edge <= 0.30,
no_edge_5m <= 0.35,
bearish_momentum_votes >= 2,
bullish_momentum_votes <= 1,
short_supportive_horizons >= 1,
~bullish_short_guard,
ema_gap_5_20 <= 0.0002,
ema_gap_10_50 <= 0.0012,
basis_vs_mark <= 0.0040,
funding_delta <= 0.00035,
no_edge_2m <= 0.45,
df["volume"] > 0,
]
if long_conditions:
df.loc[
np.logical_and.reduce(long_conditions),
["enter_long", "enter_tag"],
] = (1, "freqai_btc_short_horizon_long")
if short_conditions:
df.loc[
np.logical_and.reduce(short_conditions),
["enter_short", "enter_tag"],
] = (1, "freqai_btc_short_horizon_short")
return df
def populate_exit_trend(self, df: DataFrame, metadata: dict) -> DataFrame:
up_2m = self._probability(df, "UP_2M")
down_2m = self._probability(df, "DOWN_2M")
no_edge_2m = self._probability(df, "NO_EDGE_2M")
up_5m = self._probability(df, "UP_5M")
down_5m = self._probability(df, "DOWN_5M")
no_edge_5m = self._probability(df, "NO_EDGE_5M")
up_10m = self._probability(df, "UP_10M")
down_10m = self._probability(df, "DOWN_10M")
up_15m = self._probability(df, "UP_15M")
down_15m = self._probability(df, "DOWN_15M")
score_2m = up_2m - down_2m
score_5m = up_5m - down_5m
score_10m = up_10m - down_10m
score_15m = up_15m - down_15m
composite_score = (score_2m * 0.35) + (score_5m * 0.40) + (score_10m * 0.15) + (score_15m * 0.10)
rsi_1m = self._helper(df, "%%rsi_1m")
macd_slope = self._helper(df, "%%macd_hist_slope_1m")
vwap_distance = self._helper(df, "%%vwap_distance_1m")
no_edge_pressure = (no_edge_2m * 0.45) + (no_edge_5m * 0.55)
exit_long_conditions = [
(composite_score < -0.14)
| ((score_2m < -0.16) & (score_5m < -0.05))
| ((rsi_1m < 0.47) & (macd_slope < 0.0) & (vwap_distance < 0.0) & (no_edge_pressure > 0.48))
]
exit_short_conditions = [
(composite_score > 0.14)
| ((score_2m > 0.16) & (score_5m > 0.05))
| ((rsi_1m > 0.53) & (macd_slope > 0.0) & (vwap_distance > 0.0) & (no_edge_pressure > 0.48))
]
df.loc[np.logical_and.reduce(exit_long_conditions), ["exit_long", "exit_tag"]] = (
1,
"freqai_btc_short_horizon_exit_long",
)
df.loc[np.logical_and.reduce(exit_short_conditions), ["exit_short", "exit_tag"]] = (
1,
"freqai_btc_short_horizon_exit_short",
)
return df
def custom_exit(
self,
pair: str,
trade: Trade,
current_time: datetime,
current_rate: float,
current_profit: float,
**kwargs,
):
if current_time - trade.open_date_utc >= timedelta(minutes=15):
return "horizon_timeout_15m"
if current_profit >= 0.009:
return "fast_take_profit"
return None