Timeframe
5m
Direction
Long & Short
Stoploss
-3.0%
Trailing Stop
No
ROI
0m: 10000.0%
Interface Version
3
Startup Candles
200
Indicators
11
freqtrade/freqtrade-strategies
Strategy 003 author@: Gerald Lonlas github@: https://github.com/freqtrade/freqtrade-strategies
"""
OsirisGoldV9 — Time-Profit + 1h Trend Filter (BTC 5m)
========================================================
FINAL DIAGNOSIS FROM V4→V8:
• V4: 5993 trades, WR=48.6%, -67% → entries have NO edge at any score
• V7: time_profit=100% WR but timeout=1169 losses at 9% WR → 50% trades wrong direction
• V8: Same pattern — tp1+tp2=+4.5%, timeout+early_loss=-6.9%
Win avg=+0.39%, Loss avg=-0.54% → need WR>58% to be EV+
Currently WR=46.5% → 12% gap to close
THE FIX: Filter entries by 1h trend direction.
When 1h EMA9>EMA21 + ADX>20 → only take longs (trend up)
When 1h EMA9<EMA21 + ADX>20 → only take shorts (trend down)
When ADX<20 → no trade (ranging = coin flip)
This should cut ~40-50% of false entries (wrong direction trades)
and push WR from 46% → 55-60%
Combined with time-profit exit + shorter timeout = should be EV+
"""
import logging
import numpy as np
from pandas import DataFrame
from freqtrade.strategy import (
IStrategy, DecimalParameter, IntParameter,
merge_informative_pair,
)
from freqtrade.persistence import Trade
import talib.abstract as ta
logger = logging.getLogger(__name__)
class OsirisGoldV9(IStrategy):
INTERFACE_VERSION = 3
can_short = True
timeframe = "5m"
stoploss = -0.03
minimal_roi = {"0": 100}
trailing_stop = False
use_custom_stoploss = False
startup_candle_count = 200
process_only_new_candles = True
# ── Entry scoring ────────────────────────────────────────────────────
score_min = IntParameter(9, 14, default=11, space="buy", optimize=True)
buy_pressure = DecimalParameter(0.45, 0.60, default=0.50, decimals=2, space="buy", optimize=True)
buy_rvol = DecimalParameter(0.7, 1.5, default=0.9, decimals=1, space="buy", optimize=True)
buy_cvd_bars = IntParameter(2, 5, default=3, space="buy", optimize=True)
buy_mfi = IntParameter(15, 35, default=22, space="buy", optimize=True)
buy_rsi_min = IntParameter(25, 40, default=29, space="buy", optimize=True)
buy_rsi_max = IntParameter(60, 75, default=69, space="buy", optimize=True)
buy_stochrsi = IntParameter(25, 55, default=44, space="buy", optimize=True)
buy_cci = IntParameter(-150, -40, default=-75, space="buy", optimize=True)
buy_adx = IntParameter(12, 30, default=15, space="buy", optimize=True)
buy_ribbon = IntParameter(1, 4, default=1, space="buy", optimize=True)
buy_efficiency = DecimalParameter(0.05, 0.25, default=0.11, decimals=2, space="buy", optimize=True)
buy_vol_ratio = DecimalParameter(1.0, 2.0, default=1.6, decimals=1, space="buy", optimize=True)
buy_rsi_15m = IntParameter(30, 50, default=36, space="buy", optimize=True)
# ── 1h trend filter ──────────────────────────────────────────────────
trend_adx_min = IntParameter(15, 30, default=20, space="buy", optimize=True)
# ── Exit ─────────────────────────────────────────────────────────────
tp1_candles = IntParameter(4, 12, default=8, space="sell", optimize=True)
tp1_pct = DecimalParameter(0.1, 0.5, default=0.3, decimals=1, space="sell", optimize=True)
tp2_candles = IntParameter(12, 24, default=16, space="sell", optimize=True)
tp2_pct = DecimalParameter(0.05, 0.25, default=0.15, decimals=2, space="sell", optimize=True)
timeout_candles = IntParameter(12, 36, default=20, space="sell", optimize=True)
_last_entry: dict = {}
_consec_losses: dict = {}
def informative_pairs(self):
pairs = self.dp.current_whitelist() if self.dp else []
return [(p, "15m") for p in pairs] + [(p, "1h") for p in pairs]
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
pair = metadata["pair"]
# Standard 5m
dataframe["rsi"] = ta.RSI(dataframe, timeperiod=14)
macd = ta.MACD(dataframe, fastperiod=12, slowperiod=26, signalperiod=9)
dataframe["macd_hist"] = macd["macdhist"]
for p in [8, 13, 21, 50, 200]:
dataframe[f"ema{p}"] = ta.EMA(dataframe, timeperiod=p)
dataframe["ema_ribbon"] = (
(dataframe["ema8"] > dataframe["ema13"]).astype(int)
+ (dataframe["ema13"] > dataframe["ema21"]).astype(int)
+ (dataframe["ema21"] > dataframe["ema50"]).astype(int)
+ (dataframe["ema50"] > dataframe["ema200"]).astype(int)
)
dataframe["ema_ribbon_bear"] = (
(dataframe["ema8"] < dataframe["ema13"]).astype(int)
+ (dataframe["ema13"] < dataframe["ema21"]).astype(int)
+ (dataframe["ema21"] < dataframe["ema50"]).astype(int)
+ (dataframe["ema50"] < dataframe["ema200"]).astype(int)
)
dataframe["adx"] = ta.ADX(dataframe, timeperiod=14)
stoch = ta.STOCHRSI(dataframe, timeperiod=14, fastk_period=3, fastd_period=3)
dataframe["stochrsi_k"] = stoch["fastk"]
dataframe["cci"] = ta.CCI(dataframe, timeperiod=14)
dataframe["atr"] = ta.ATR(dataframe, timeperiod=14)
# VWAP
tp = (dataframe["high"] + dataframe["low"] + dataframe["close"]) / 3
vv = dataframe["volume"].rolling(50).sum().replace(0, 1)
dataframe["vwap"] = (tp * dataframe["volume"]).rolling(50).sum() / vv
# Volume Flow
hl = (dataframe["high"] - dataframe["low"]).replace(0, np.nan)
bv = (dataframe["volume"] * (dataframe["close"] - dataframe["low"]) / hl).fillna(dataframe["volume"] * 0.5)
sv = (dataframe["volume"] * (dataframe["high"] - dataframe["close"]) / hl).fillna(dataframe["volume"] * 0.5)
total = (bv + sv).replace(0, 1)
dataframe["pressure_ratio"] = (bv / total).fillna(0.5)
dataframe["cvd"] = (bv - sv).cumsum()
dataframe["cvd_rising"] = (dataframe["cvd"] > dataframe["cvd"].shift(1)).astype(int)
dataframe["rvol"] = (dataframe["volume"] / dataframe["volume"].rolling(20).mean().replace(0, 1)).fillna(1)
dataframe["mfi"] = ta.MFI(dataframe, timeperiod=14)
# Microstructure
hl_safe = (dataframe["high"] - dataframe["low"]).replace(0, np.nan).fillna(0.0001)
body = (dataframe["close"] - dataframe["open"]).abs()
dataframe["body_ratio"] = body / hl_safe
dataframe["is_absorption"] = ((dataframe["body_ratio"] < 0.30) & (dataframe["rvol"] > 1.5)).astype(int)
lwr = (dataframe[["open", "close"]].min(axis=1) - dataframe["low"]) / hl_safe
uwr = (dataframe["high"] - dataframe[["open", "close"]].max(axis=1)) / hl_safe
dataframe["is_pin_bar_bull"] = ((lwr > 0.60) & (dataframe["body_ratio"] < 0.25) & (uwr < 0.15)).astype(int)
dataframe["is_pin_bar_bear"] = ((uwr > 0.60) & (dataframe["body_ratio"] < 0.25) & (lwr < 0.15)).astype(int)
rh = dataframe["high"].rolling(20).max().shift(1)
rl = dataframe["low"].rolling(20).min().shift(1)
dataframe["liq_sweep_low"] = ((dataframe["low"] < rl) & (dataframe["close"] > rl)).astype(int)
dataframe["liq_sweep_high"] = ((dataframe["high"] > rh) & (dataframe["close"] < rh)).astype(int)
# Innovation
d20 = (dataframe["close"] - dataframe["close"].shift(20)).abs()
v20 = dataframe["close"].diff().abs().rolling(20).sum()
dataframe["efficiency_ratio"] = (d20 / v20.replace(0, 1)).fillna(0)
atr50 = ta.ATR(dataframe, timeperiod=50)
dataframe["volatility_ratio"] = (dataframe["atr"] / atr50.replace(0, 1)).fillna(1)
# Squeeze
ke = ta.EMA(dataframe, timeperiod=20)
ka = ta.ATR(dataframe, timeperiod=10)
bb = ta.BBANDS(dataframe, timeperiod=20, nbdevup=2.0, nbdevdn=2.0)
sq = ((bb["lowerband"] > ke - ka * 1.5) & (bb["upperband"] < ke + ka * 1.5)).astype(int)
dataframe["squeeze_release_bull"] = ((sq.shift(1) == 1) & (sq == 0) & (dataframe["close"] > dataframe["close"].shift(1))).astype(int)
dataframe["squeeze_release_bear"] = ((sq.shift(1) == 1) & (sq == 0) & (dataframe["close"] < dataframe["close"].shift(1))).astype(int)
# 15m
inf_15m = self.dp.get_pair_dataframe(pair=pair, timeframe="15m") if self.dp else DataFrame()
if not inf_15m.empty:
inf_15m["rsi_15m"] = ta.RSI(inf_15m, timeperiod=14)
m = merge_informative_pair(dataframe, inf_15m[["date", "rsi_15m"]], self.timeframe, "15m", ffill=True)
dataframe["rsi_15m"] = m["rsi_15m_15m"].values
else:
dataframe["rsi_15m"] = 50
# 1h trend
inf_1h = self.dp.get_pair_dataframe(pair=pair, timeframe="1h") if self.dp else DataFrame()
if not inf_1h.empty:
inf_1h["ema9_1h"] = ta.EMA(inf_1h, timeperiod=9)
inf_1h["ema21_1h"] = ta.EMA(inf_1h, timeperiod=21)
inf_1h["adx_1h"] = ta.ADX(inf_1h, timeperiod=14)
m = merge_informative_pair(
dataframe, inf_1h[["date", "ema9_1h", "ema21_1h", "adx_1h"]],
self.timeframe, "1h", ffill=True,
)
dataframe["ema9_1h"] = m["ema9_1h_1h"].values
dataframe["ema21_1h"] = m["ema21_1h_1h"].values
dataframe["adx_1h"] = m["adx_1h_1h"].values
else:
dataframe["ema9_1h"] = dataframe["close"]
dataframe["ema21_1h"] = dataframe["close"]
dataframe["adx_1h"] = 25
return dataframe
# =====================================================================
# SCORING
# =====================================================================
def _score_long(self, df):
s = np.zeros(len(df))
s += (df["pressure_ratio"] > self.buy_pressure.value).astype(int)
s += (df["rvol"] > self.buy_rvol.value).astype(int)
cc = df["cvd_rising"].rolling(int(self.buy_cvd_bars.value)).sum()
s += (cc >= self.buy_cvd_bars.value).astype(int)
s += (df["mfi"] > self.buy_mfi.value).astype(int)
s += ((df["rsi"] > self.buy_rsi_min.value) & (df["rsi"] < self.buy_rsi_max.value)).astype(int)
s += ((df["macd_hist"] > 0) | (df["macd_hist"] > df["macd_hist"].shift(1))).astype(int)
s += (df["stochrsi_k"] < self.buy_stochrsi.value).astype(int)
s += ((df["cci"] > self.buy_cci.value) & (df["cci"].shift(1) <= self.buy_cci.value)).astype(int)
s += (df["adx"] > self.buy_adx.value).astype(int)
s += (df["ema_ribbon"] >= self.buy_ribbon.value).astype(int)
s += (df["close"] > df["vwap"]).astype(int)
s += (df["is_absorption"].rolling(3).sum() > 0).astype(int)
s += ((df["is_pin_bar_bull"] == 1) | (df["liq_sweep_low"] == 1)).astype(int)
s += (df["squeeze_release_bull"] == 1).astype(int)
s += (df["efficiency_ratio"] > self.buy_efficiency.value).astype(int)
s += (df["volatility_ratio"] < self.buy_vol_ratio.value).astype(int)
s += (df["rsi_15m"] > self.buy_rsi_15m.value).astype(int)
return s
def _score_short(self, df):
s = np.zeros(len(df))
s += (df["pressure_ratio"] < (1 - self.buy_pressure.value)).astype(int)
s += (df["rvol"] > self.buy_rvol.value).astype(int)
cc = (1 - df["cvd_rising"]).rolling(int(self.buy_cvd_bars.value)).sum()
s += (cc >= self.buy_cvd_bars.value).astype(int)
s += (df["mfi"] < (100 - self.buy_mfi.value)).astype(int)
rl = 100 - self.buy_rsi_max.value
rh = 100 - self.buy_rsi_min.value
s += ((df["rsi"] > rl) & (df["rsi"] < rh)).astype(int)
s += ((df["macd_hist"] < 0) | (df["macd_hist"] < df["macd_hist"].shift(1))).astype(int)
s += (df["stochrsi_k"] > (100 - self.buy_stochrsi.value)).astype(int)
ci = -self.buy_cci.value
s += ((df["cci"] < ci) & (df["cci"].shift(1) >= ci)).astype(int)
s += (df["adx"] > self.buy_adx.value).astype(int)
s += (df["ema_ribbon_bear"] >= self.buy_ribbon.value).astype(int)
s += (df["close"] < df["vwap"]).astype(int)
s += (df["is_absorption"].rolling(3).sum() > 0).astype(int)
s += ((df["is_pin_bar_bear"] == 1) | (df["liq_sweep_high"] == 1)).astype(int)
s += (df["squeeze_release_bear"] == 1).astype(int)
s += (df["efficiency_ratio"] > self.buy_efficiency.value).astype(int)
s += (df["volatility_ratio"] < self.buy_vol_ratio.value).astype(int)
s += (df["rsi_15m"] < (100 - self.buy_rsi_15m.value)).astype(int)
return s
# =====================================================================
# ENTRY — with 1h trend filter
# =====================================================================
def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
has_data = dataframe["rsi"].notna() & (dataframe["volume"] > 0)
sl = self._score_long(dataframe)
ss = self._score_short(dataframe)
thr = int(self.score_min.value)
adx_min = int(self.trend_adx_min.value)
# 1H TREND FILTER — the key innovation
trend_up = (dataframe["ema9_1h"] > dataframe["ema21_1h"]) & (dataframe["adx_1h"] > adx_min)
trend_down = (dataframe["ema9_1h"] < dataframe["ema21_1h"]) & (dataframe["adx_1h"] > adx_min)
# LONG only when 1h trend is UP
long_sig = has_data & (sl >= thr) & trend_up
# SHORT only when 1h trend is DOWN
short_sig = has_data & (ss >= thr) & trend_down
dataframe.loc[long_sig, "enter_long"] = 1
dataframe.loc[long_sig, "enter_tag"] = "L" + sl[long_sig].astype(int).astype(str)
dataframe.loc[short_sig, "enter_short"] = 1
dataframe.loc[short_sig, "enter_tag"] = "S" + ss[short_sig].astype(int).astype(str)
return dataframe
def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
return dataframe
def confirm_trade_entry(
self, pair, order_type, amount, rate, time_in_force,
current_time, entry_tag, side, **kwargs,
) -> bool:
if self._consec_losses.get(pair, 0) >= 5:
self._consec_losses[pair] = 0
return False
last = self._last_entry.get(pair)
if last is not None and (current_time - last).total_seconds() < 600:
return False
self._last_entry[pair] = current_time
return True
# =====================================================================
# EXIT — PURE TIME-PROFIT (no early_loss_cut, no momentum_exit)
# =====================================================================
def custom_exit(
self, pair, trade, current_time, current_rate, current_profit, **kwargs,
):
if not trade.open_date_utc:
return None
elapsed = (current_time - trade.open_date_utc).total_seconds() / 300
# TP1: time_profit
tp1_c = int(self.tp1_candles.value)
tp1_p = float(self.tp1_pct.value) / 100
if elapsed >= tp1_c and current_profit >= tp1_p:
self._consec_losses[pair] = 0
return "tp1"
# TP2: late_profit
tp2_c = int(self.tp2_candles.value)
tp2_p = float(self.tp2_pct.value) / 100
if elapsed >= tp2_c and current_profit >= tp2_p:
self._consec_losses[pair] = 0
return "tp2"
# Timeout — shorter, just cut the trade
tc = int(self.timeout_candles.value)
if elapsed >= tc:
if current_profit <= 0:
self._consec_losses[pair] = self._consec_losses.get(pair, 0) + 1
else:
self._consec_losses[pair] = 0
return "timeout"
return None
def leverage(self, pair, current_time, current_rate, proposed_leverage,
max_leverage, entry_tag, side, **kwargs) -> float:
return 1.0