Timeframe
5m
Direction
Long & Short
Stoploss
-4.0%
Trailing Stop
No
ROI
0m: 10000.0%
Interface Version
3
Startup Candles
200
Indicators
11
freqtrade/freqtrade-strategies
Strategy 003 author@: Gerald Lonlas github@: https://github.com/freqtrade/freqtrade-strategies
"""
OsirisGoldV3 — Confluence Scoring + Time-Profit Edge (BTC/USDT 5m)
====================================================================
COMPILES BEST ELEMENTS FROM:
Research — Soft confluence scoring (not binary AND/OR)
V9 — Fixed SL + time-profit edge (trailing kills profits)
XRSI — Breakeven guard + loss-streak cooldown
Omega — Validated patterns (RSI+MFI+BB+MACD multi-TF)
TARGET: 5-8 trades/day | 55-65% WR | 1.5-2.0 R:R | Positive EV
EV = 0.60 × (1.5 × SL) - 0.40 × SL = +0.50 × SL per trade
ARCHITECTURE:
ENTRY: Score 0-20 points across 5 categories
Trigger when score >= threshold (default 10)
EXIT: Fixed ATR-based SL (no trailing)
Partial profit at 1R
Time-profit edge: winners held longer
Breakeven at 0.5R
RISK: Loss-streak guard (pause after N consecutive losses)
Max 1 open trade
"""
import logging
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from pandas import DataFrame
from freqtrade.persistence import Trade
from freqtrade.strategy import (
IStrategy,
DecimalParameter,
IntParameter,
merge_informative_pair,
)
import talib.abstract as ta
try:
from freqtrade.strategy import stoploss_from_open
except ImportError:
def stoploss_from_open(open_relative_stop, current_profit, is_short=False):
if current_profit == 0:
return 1
if is_short:
return -1 + ((1 - open_relative_stop) / (1 - current_profit))
return 1 - ((1 + open_relative_stop) / (1 + current_profit))
logger = logging.getLogger(__name__)
class OsirisGoldV3(IStrategy):
INTERFACE_VERSION = 3
can_short = True
timeframe = "5m"
stoploss = -0.04
minimal_roi = {"0": 100}
trailing_stop = False
use_custom_stoploss = True
startup_candle_count = 200
process_only_new_candles = True
# ── Risk params ──────────────────────────────────────────────────────────
atr_sl_mult = DecimalParameter(
1.5, 3.5, default=2.5, decimals=1, space="sell", optimize=True
)
rr_target = DecimalParameter(
1.2, 2.5, default=1.5, decimals=1, space="sell", optimize=True
)
be_trigger = DecimalParameter(
0.3, 0.8, default=0.5, decimals=1, space="sell", optimize=True
)
max_hold = IntParameter(12, 48, default=24, space="sell", optimize=True)
loser_max_hold = IntParameter(6, 24, default=12, space="sell", optimize=True)
# ── Entry scoring params ─────────────────────────────────────────────────
score_threshold = IntParameter(7, 14, default=10, space="buy", optimize=True)
score_threshold_short = IntParameter(7, 14, default=10, space="buy", optimize=True)
loss_streak_limit = IntParameter(3, 7, default=5, space="buy", optimize=True)
cooldown_min = IntParameter(3, 12, default=6, space="buy", optimize=True)
_last_entry_time: dict = {}
_consecutive_losses: dict = {}
_last_loss_time: dict = {}
# ── Informative pairs ────────────────────────────────────────────────────
def informative_pairs(self):
pairs = self.dp.current_whitelist() if self.dp else []
return [(p, "15m") for p in pairs] + [(p, "1h") for p in pairs]
# ── Indicators ───────────────────────────────────────────────────────────
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
pair = metadata["pair"]
# ═══ 5m Core Indicators ══════════════════════════════════════════════
# Momentum
dataframe["rsi14"] = ta.RSI(dataframe, timeperiod=14)
dataframe["rsi7"] = ta.RSI(dataframe, timeperiod=7)
dataframe["mfi"] = ta.MFI(dataframe, timeperiod=14)
macd = ta.MACD(dataframe, fastperiod=12, slowperiod=26, signalperiod=9)
dataframe["macd_hist"] = macd["macdhist"]
stoch = ta.STOCH(dataframe, fastk_period=14, slowk_period=3, slowd_period=3)
dataframe["stoch_k"] = stoch["slowk"]
dataframe["stoch_d"] = stoch["slowd"]
dataframe["cci"] = ta.CCI(dataframe, timeperiod=20)
# Trend
dataframe["ema9"] = ta.EMA(dataframe, timeperiod=9)
dataframe["ema21"] = ta.EMA(dataframe, timeperiod=21)
dataframe["ema50"] = ta.EMA(dataframe, timeperiod=50)
dataframe["adx"] = ta.ADX(dataframe, timeperiod=14)
dataframe["atr"] = ta.ATR(dataframe, timeperiod=14)
# Bollinger
bb = ta.BBANDS(dataframe, timeperiod=20, nbdevup=2.0, nbdevdn=2.0)
dataframe["bb_upper"] = bb["upperband"]
dataframe["bb_lower"] = bb["lowerband"]
dataframe["bb_mid"] = bb["middleband"]
dataframe["bb_width"] = (bb["upperband"] - bb["lowerband"]) / bb["middleband"]
# Volume
dataframe["volume_sma"] = ta.SMA(dataframe["volume"], timeperiod=20)
dataframe["rvol"] = dataframe["volume"] / dataframe["volume_sma"]
dataframe["obv"] = ta.OBV(dataframe)
dataframe["obv_sma"] = ta.SMA(dataframe["obv"], timeperiod=20)
# Candle properties
dataframe["is_green"] = (dataframe["close"] > dataframe["open"]).astype(int)
dataframe["is_red"] = (dataframe["close"] < dataframe["open"]).astype(int)
body = abs(dataframe["close"] - dataframe["open"])
candle_range = dataframe["high"] - dataframe["low"]
dataframe["body_pct"] = body / candle_range.replace(0, np.nan)
dataframe["lower_wick"] = (
np.minimum(dataframe["close"], dataframe["open"]) - dataframe["low"]
) / candle_range.replace(0, np.nan)
dataframe["upper_wick"] = (
dataframe["high"] - np.maximum(dataframe["close"], dataframe["open"])
) / candle_range.replace(0, np.nan)
# Pullback detection
dataframe["red_count"] = (
dataframe["is_red"]
.rolling(5, min_periods=1)
.sum()
)
dataframe["green_count"] = (
dataframe["is_green"]
.rolling(5, min_periods=1)
.sum()
)
# Bullish patterns
dataframe["bullish_engulf"] = (
(dataframe["is_green"] == 1)
& (dataframe["is_red"].shift(1) == 1)
& (dataframe["close"] > dataframe["open"].shift(1))
& (dataframe["open"] < dataframe["close"].shift(1))
).astype(int)
dataframe["hammer"] = (
(dataframe["lower_wick"] > 0.6)
& (dataframe["body_pct"] > 0.1)
& (dataframe["upper_wick"] < 0.15)
).astype(int)
# Bearish patterns
dataframe["bearish_engulf"] = (
(dataframe["is_red"] == 1)
& (dataframe["is_green"].shift(1) == 1)
& (dataframe["close"] < dataframe["open"].shift(1))
& (dataframe["open"] > dataframe["close"].shift(1))
).astype(int)
dataframe["shooting_star"] = (
(dataframe["upper_wick"] > 0.6)
& (dataframe["body_pct"] > 0.1)
& (dataframe["lower_wick"] < 0.15)
).astype(int)
# ═══ 15m Indicators ═════════════════════════════════════════════════
inf_15m = self.dp.get_pair_dataframe(pair=pair, timeframe="15m")
if not inf_15m.empty:
inf_15m["rsi_15m"] = ta.RSI(inf_15m, timeperiod=14)
inf_15m["ema50_15m"] = ta.EMA(inf_15m, timeperiod=50)
macd_15 = ta.MACD(inf_15m, fastperiod=12, slowperiod=26, signalperiod=9)
inf_15m["macd_hist_15m"] = macd_15["macdhist"]
m = merge_informative_pair(
dataframe,
inf_15m[["date", "rsi_15m", "ema50_15m", "macd_hist_15m"]],
self.timeframe, "15m", ffill=True,
)
dataframe["rsi_15m"] = m["rsi_15m_15m"].values
dataframe["ema50_15m"] = m["ema50_15m_15m"].values
dataframe["macd_15m"] = m["macd_hist_15m_15m"].values
else:
dataframe["rsi_15m"] = 50
dataframe["ema50_15m"] = dataframe["close"]
dataframe["macd_15m"] = 0
# ═══ 1h Indicators ══════════════════════════════════════════════════
inf_1h = self.dp.get_pair_dataframe(pair=pair, timeframe="1h")
if not inf_1h.empty:
inf_1h["ema9_1h"] = ta.EMA(inf_1h, timeperiod=9)
inf_1h["ema21_1h"] = ta.EMA(inf_1h, timeperiod=21)
inf_1h["adx_1h"] = ta.ADX(inf_1h, timeperiod=14)
inf_1h["rsi_1h"] = ta.RSI(inf_1h, timeperiod=14)
m1h = merge_informative_pair(
dataframe,
inf_1h[["date", "ema9_1h", "ema21_1h", "adx_1h", "rsi_1h"]],
self.timeframe, "1h", ffill=True,
)
dataframe["ema9_1h"] = m1h["ema9_1h_1h"].values
dataframe["ema21_1h"] = m1h["ema21_1h_1h"].values
dataframe["adx_1h"] = m1h["adx_1h_1h"].values
dataframe["rsi_1h"] = m1h["rsi_1h_1h"].values
else:
dataframe["ema9_1h"] = dataframe["close"]
dataframe["ema21_1h"] = dataframe["close"]
dataframe["adx_1h"] = 25
dataframe["rsi_1h"] = 50
# ═══ Derived signals ════════════════════════════════════════════════
dataframe["trend_1h_up"] = (dataframe["ema9_1h"] > dataframe["ema21_1h"]).astype(int)
dataframe["trend_1h_dn"] = (dataframe["ema9_1h"] < dataframe["ema21_1h"]).astype(int)
dataframe["m15_pos"] = (dataframe["macd_15m"] > 0).astype(int)
dataframe["m15_neg"] = (dataframe["macd_15m"] < 0).astype(int)
dataframe["ema_ribbon_up"] = (
(dataframe["ema9"] > dataframe["ema21"])
& (dataframe["ema21"] > dataframe["ema50"])
).astype(int)
dataframe["ema_ribbon_dn"] = (
(dataframe["ema9"] < dataframe["ema21"])
& (dataframe["ema21"] < dataframe["ema50"])
).astype(int)
return dataframe
# ── Scoring functions ────────────────────────────────────────────────────
def _score_long(self, row) -> int:
"""Score a row for LONG entry (0-20 points)"""
s = 0
# ── Category 1: Trend alignment (0-4 pts) ───────────────────────────
if row.get("trend_1h_up", 0) == 1:
s += 2
if row.get("ema_ribbon_up", 0) == 1:
s += 1
if row.get("adx_1h", 0) > 20:
s += 1
# ── Category 2: Momentum oversold (0-5 pts) ─────────────────────────
rsi = row.get("rsi14", 50)
if rsi < 35:
s += 2
elif rsi < 45:
s += 1
mfi = row.get("mfi", 50)
if mfi < 30:
s += 1
stoch = row.get("stoch_k", 50)
if stoch < 25:
s += 1
if row.get("cci", 0) < -100:
s += 1
# ── Category 3: Volume confirmation (0-3 pts) ───────────────────────
rvol = row.get("rvol", 1)
if rvol > 1.5:
s += 2
elif rvol > 1.0:
s += 1
if row.get("obv", 0) > row.get("obv_sma", 0):
s += 1
# ── Category 4: Price action (0-4 pts) ──────────────────────────────
if row.get("is_green", 0) == 1:
s += 1
close = row.get("close", 0)
bb_lower = row.get("bb_lower", 0)
bb_mid = row.get("bb_mid", 0)
if close > 0 and bb_lower > 0:
if close <= bb_mid and row.get("low", close) <= bb_lower * 1.005:
s += 1
if row.get("bullish_engulf", 0) == 1:
s += 1
if row.get("hammer", 0) == 1:
s += 1
# ── Category 5: Multi-TF confluence (0-4 pts) ───────────────────────
if row.get("m15_pos", 0) == 1:
s += 1
rsi_15 = row.get("rsi_15m", 50)
if rsi_15 > 40:
s += 1
if close > row.get("ema50_15m", close):
s += 1
rsi_1h = row.get("rsi_1h", 50)
if 35 < rsi_1h < 65:
s += 1
return s
def _score_short(self, row) -> int:
"""Score a row for SHORT entry (0-20 points)"""
s = 0
# ── Category 1: Trend alignment (0-4 pts) ───────────────────────────
if row.get("trend_1h_dn", 0) == 1:
s += 2
if row.get("ema_ribbon_dn", 0) == 1:
s += 1
if row.get("adx_1h", 0) > 20:
s += 1
# ── Category 2: Momentum overbought (0-5 pts) ───────────────────────
rsi = row.get("rsi14", 50)
if rsi > 65:
s += 2
elif rsi > 55:
s += 1
mfi = row.get("mfi", 50)
if mfi > 70:
s += 1
stoch = row.get("stoch_k", 50)
if stoch > 75:
s += 1
if row.get("cci", 0) > 100:
s += 1
# ── Category 3: Volume confirmation (0-3 pts) ───────────────────────
rvol = row.get("rvol", 1)
if rvol > 1.5:
s += 2
elif rvol > 1.0:
s += 1
if row.get("obv", 0) < row.get("obv_sma", 0):
s += 1
# ── Category 4: Price action (0-4 pts) ──────────────────────────────
if row.get("is_red", 0) == 1:
s += 1
close = row.get("close", 0)
bb_upper = row.get("bb_upper", 0)
bb_mid = row.get("bb_mid", 0)
if close > 0 and bb_upper > 0:
if close >= bb_mid and row.get("high", close) >= bb_upper * 0.995:
s += 1
if row.get("bearish_engulf", 0) == 1:
s += 1
if row.get("shooting_star", 0) == 1:
s += 1
# ── Category 5: Multi-TF confluence (0-4 pts) ───────────────────────
if row.get("m15_neg", 0) == 1:
s += 1
rsi_15 = row.get("rsi_15m", 50)
if rsi_15 < 60:
s += 1
if close < row.get("ema50_15m", close):
s += 1
rsi_1h = row.get("rsi_1h", 50)
if 35 < rsi_1h < 65:
s += 1
return s
# ── Entry signals ────────────────────────────────────────────────────────
def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
thr_long = int(self.score_threshold.value)
thr_short = int(self.score_threshold_short.value)
has_data = dataframe["rsi14"].notna() & (dataframe["volume"] > 0)
# Vectorized scoring via apply (row-by-row for scoring flexibility)
cols_needed = [
"trend_1h_up", "trend_1h_dn", "ema_ribbon_up", "ema_ribbon_dn",
"adx_1h", "rsi14", "mfi", "stoch_k", "cci", "rvol",
"obv", "obv_sma", "is_green", "is_red", "close", "low", "high",
"bb_lower", "bb_upper", "bb_mid", "bullish_engulf", "hammer",
"bearish_engulf", "shooting_star", "m15_pos", "m15_neg",
"rsi_15m", "ema50_15m", "rsi_1h",
]
# Compute scores
score_long = dataframe[cols_needed].apply(
lambda r: self._score_long(r), axis=1
)
score_short = dataframe[cols_needed].apply(
lambda r: self._score_short(r), axis=1
)
dataframe["score_long"] = score_long
dataframe["score_short"] = score_short
# Entry signals with cooldown (1 candle gap minimum)
prev_long = dataframe["enter_long"].shift(1).fillna(0) if "enter_long" in dataframe.columns else 0
prev_short = dataframe["enter_short"].shift(1).fillna(0) if "enter_short" in dataframe.columns else 0
long_signal = has_data & (score_long >= thr_long)
short_signal = has_data & (score_short >= thr_short)
# Tag with score for analysis
dataframe.loc[long_signal, "enter_long"] = 1
dataframe.loc[long_signal, "enter_tag"] = (
"score_" + score_long[long_signal].astype(int).astype(str)
)
dataframe.loc[short_signal, "enter_short"] = 1
dataframe.loc[short_signal, "enter_tag"] = (
"score_" + score_short[short_signal].astype(int).astype(str)
)
return dataframe
def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
return dataframe
# ── Trade management ─────────────────────────────────────────────────────
def confirm_trade_entry(
self, pair, order_type, amount, rate, time_in_force,
current_time, entry_tag, side, **kwargs,
) -> bool:
# Cooldown between entries
last = self._last_entry_time.get(pair)
cd_seconds = int(self.cooldown_min.value) * 300 # in candles
if last is not None and (current_time - last).total_seconds() < cd_seconds:
return False
# Loss-streak guard
losses = self._consecutive_losses.get(pair, 0)
limit = int(self.loss_streak_limit.value)
if losses >= limit:
last_loss = self._last_loss_time.get(pair)
if last_loss is not None:
pause_min = losses * 5 # 5 min per consecutive loss
if (current_time - last_loss).total_seconds() < pause_min * 60:
return False
self._consecutive_losses[pair] = 0
self._last_entry_time[pair] = current_time
return True
def custom_stoploss(
self, pair, trade: Trade, current_time,
current_rate: float, current_profit: float, **kwargs,
) -> float:
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
if dataframe.empty:
return -0.03
last = dataframe.iloc[-1]
atr = last.get("atr", 0)
if atr <= 0 or trade.open_rate <= 0:
return -0.03
# ATR-based SL
sl_distance = atr * float(self.atr_sl_mult.value)
sl_pct = sl_distance / trade.open_rate
# Breakeven at be_trigger × SL
be_profit = sl_pct * float(self.be_trigger.value)
if current_profit >= be_profit:
is_short = getattr(trade, "is_short", False)
return stoploss_from_open(0.001, current_profit, is_short=is_short)
return -sl_pct
def custom_exit(
self, pair, trade, current_time, current_rate, current_profit, **kwargs,
):
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
if dataframe.empty:
return None
last = dataframe.iloc[-1]
atr = last.get("atr", 0)
if atr <= 0 or trade.open_rate <= 0:
return None
sl_distance = atr * float(self.atr_sl_mult.value)
sl_pct = sl_distance / trade.open_rate
tp_pct = sl_pct * float(self.rr_target.value)
# TP hit
if current_profit >= tp_pct:
return "tp_hit"
# Time management
if trade.open_date_utc:
elapsed = (current_time - trade.open_date_utc).total_seconds() / 300
# Losers: exit faster
if current_profit < 0 and elapsed >= self.loser_max_hold.value:
return "loser_timeout"
# Winners or breakeven: hold longer
if elapsed >= self.max_hold.value:
return "timeout"
return None
def confirm_trade_exit(
self, pair, trade, order_type, amount, rate,
time_in_force, exit_reason, current_time, **kwargs,
) -> bool:
# Track consecutive losses for loss-streak guard
profit = trade.calc_profit_ratio(rate)
if profit < 0:
self._consecutive_losses[pair] = self._consecutive_losses.get(pair, 0) + 1
self._last_loss_time[pair] = current_time
else:
self._consecutive_losses[pair] = 0
return True
def leverage(
self, pair, current_time, current_rate, proposed_leverage,
max_leverage, entry_tag, side, **kwargs,
) -> float:
return 1.0