OSIRIS OMEGA — Walk-Forward Validated Ensemble Strategy
Timeframe
5m
Direction
Long & Short
Stoploss
-10.0%
Trailing Stop
No
ROI
0m: 50.0%, 480m: 2.0%
Interface Version
3
Startup Candles
550
Indicators
1
freqtrade/freqtrade-strategies
author@: lenik
"""
═══════════════════════════════════════════════════════════════════════════════
OSIRIS OMEGA — Walk-Forward Validated Ensemble Strategy
═══════════════════════════════════════════════════════════════════════════════
28/29 signals passed walk-forward validation across 7 splits.
Top signals: 80-100% WR out-of-sample with PF 5-999.
ARCHITECTURE:
1) Composite scoring engine: 10 weight configs × percentiled metrics
2) 25 walk-forward validated signals with confirmation + time filters
3) Per-signal ATR-based stops, targets, and R-multiple trailing
4) Supports long + short (can_short = True for futures)
═══════════════════════════════════════════════════════════════════════════════
"""
import logging
import numpy as np
import pandas as pd
from pandas import DataFrame
from typing import Optional, Dict, List, Tuple
from freqtrade.strategy import IStrategy, merge_informative_pair
from freqtrade.persistence import Trade
import talib.abstract as ta
try:
from freqtrade.strategy import stoploss_from_open
except ImportError:
def stoploss_from_open(open_relative_stop, current_profit, is_short=False):
if current_profit == 0:
return 1
if is_short:
return -1 + ((1 - open_relative_stop) / (1 - current_profit))
return 1 - ((1 + open_relative_stop) / (1 + current_profit))
logger = logging.getLogger(__name__)
# ═══════════════════════════════════════════════════════════════════════════
# SIGNAL TABLE — Walk-Forward Validated
# ═══════════════════════════════════════════════════════════════════════════
# (name, weight_key, dir, threshold, confirm_bars, confirm_method,
# time_filter, stop_mult, target_mult, trail_mult, max_bars)
VALIDATED_SIGNALS: List[Tuple] = [
# ★★★ TIER 1: OOS >= 90%
("S_W6_T82_C2p_asia", "W6", "short", 82, 2, "price", "asia", 2.0, 6.0, 1.5, 96),
("L_W4_T85_C2b_us", "W4", "long", 85, 2, "both", "us", 2.0, 0, 1.5, 96),
("L_W4_T85_C2p_us", "W4", "long", 85, 2, "price", "us", 2.0, 0, 1.5, 96),
("L_W0_T80_C2b_us", "W0", "long", 80, 2, "both", "us", 2.0, 0, 1.5, 96),
("L_W4_T88_C0n_all", "W4", "long", 88, 0, "none", "all", 1.5, 6.0, 1.0, 96),
("S_W6_T80_C2b_asia", "W6", "short", 80, 2, "both", "asia", 2.0, 0, 1.5, 96),
("S_W5_T80_C1p_asia", "W5", "short", 80, 1, "price", "asia", 1.5, 4.5, 1.0, 72),
("S_W7_T80_C1p_all", "W7", "short", 80, 1, "price", "all", 2.0, 6.0, 1.5, 96),
("S_W5_T82_C1p_all", "W5", "short", 82, 1, "price", "all", 2.0, 0, 1.5, 96),
# ★★★ TIER 2: OOS 75-89%
("S_W6_T80_C1p_overlap", "W6", "short", 80, 1, "price", "overlap", 2.0, 6.0, 1.5, 96),
("S_W8_T75_ovlp_trl", "W8", "short", 75, 0, "none", "overlap", 2.0, 0, 1.5, 96),
("L_W4_T85_C2b_all", "W4", "long", 85, 2, "both", "all", 1.5, 0, 1.0, 72),
("S_W3_T78_C1p_asia", "W3", "short", 78, 1, "price", "asia", 1.5, 4.5, 1.0, 72),
("S_W8_T78_ovlp_fix", "W8", "short", 78, 0, "none", "overlap", 2.0, 8.0, 1.5, 120),
("S_W8_T78_ovlp_trl", "W8", "short", 78, 0, "none", "overlap", 2.0, 0, 1.5, 96),
("S_W6_T82_C1s_asia", "W6", "short", 82, 1, "score_hold", "asia", 2.0, 6.0, 1.5, 96),
("S_W8_T78_C1s_peak", "W8", "short", 78, 1, "score_hold", "peak", 2.0, 6.0, 1.5, 96),
("S_W8_T78_C2p_all", "W8", "short", 78, 2, "price", "all", 2.0, 6.0, 1.5, 96),
("S_W2_T75_overlap", "W2", "short", 75, 0, "none", "overlap", 2.0, 8.0, 1.5, 120),
("L_W9_T95_C2b_us", "W9", "long", 95, 2, "both", "us", 2.0, 0, 1.5, 96),
("S_W6_T75_C1p_overlap", "W6", "short", 75, 1, "price", "overlap", 2.0, 6.0, 1.5, 96),
("L_W9_T95_C2p_london", "W9", "long", 95, 2, "price", "london", 2.0, 6.0, 1.5, 96),
("S_W8_T78_C1p_asia", "W8", "short", 78, 1, "price", "asia", 2.0, 6.0, 1.5, 96),
# ★★★ TIER 3: OOS 70-74%
("S_W4_T75_ovlp_trl", "W4", "short", 75, 0, "none", "overlap", 2.0, 0, 1.5, 96),
("S_W4_T78_ovlp_trl", "W4", "short", 78, 0, "none", "overlap", 2.0, 0, 1.5, 96),
]
WEIGHT_CONFIGS: Dict[str, Tuple[int, ...]] = {
"W0": (3, 2, 2, 1, 1, 2, 3, 2, 2, 1),
"W1": (2, 1, 3, 1, 1, 3, 2, 2, 2, 1),
"W2": (2, 2, 2, 2, 2, 2, 2, 2, 2, 2),
"W3": (3, 2, 3, 0, 2, 3, 3, 2, 3, 0),
"W4": (4, 2, 2, 1, 1, 2, 4, 3, 1, 1),
"W5": (2, 1, 4, 1, 1, 4, 2, 1, 3, 1),
"W6": (3, 3, 1, 1, 1, 1, 3, 3, 1, 1),
"W7": (1, 1, 3, 1, 2, 3, 2, 2, 4, 1),
"W8": (3, 1, 2, 2, 3, 2, 2, 2, 1, 2),
"W9": (4, 3, 3, 0, 0, 2, 3, 3, 0, 0),
}
TIME_FILTERS: Dict[str, Optional[List[int]]] = {
"all": None,
"london": [7, 8, 9, 10, 11, 12, 13, 14, 15, 16],
"us": [13, 14, 15, 16, 17, 18, 19, 20, 21],
"asia": [0, 1, 2, 3, 4, 5, 6, 7, 8],
"overlap": [13, 14, 15, 16],
"peak": [8, 9, 10, 14, 15, 16],
}
class OsirisOmegaStrategy(IStrategy):
"""OSIRIS OMEGA — Walk-Forward Validated Ensemble Strategy"""
INTERFACE_VERSION = 3
can_short = False # True for futures
timeframe = "5m"
minimal_roi = {"0": 0.50, "480": 0.02}
stoploss = -0.10
use_custom_stoploss = True
trailing_stop = False
startup_candle_count = 550
process_only_new_candles = True
_trade_signals: Dict[str, dict] = {}
def informative_pairs(self):
pairs = self.dp.current_whitelist()
informative = []
for pair in pairs:
informative.append((pair, "1h"))
informative.append((pair, "4h"))
return informative
# ═══════════════════════════════════════════════════════════════════
# FAST ROLLING PERCENTILE
# ═══════════════════════════════════════════════════════════════════
@staticmethod
def _pct_rank(arr: np.ndarray, window: int = 500) -> np.ndarray:
N = len(arr)
result = np.full(N, 50.0)
for i in range(window, N):
win = arr[i - window : i]
result[i] = np.searchsorted(np.sort(win), arr[i]) / window * 100
return result
# ═══════════════════════════════════════════════════════════════════
# POPULATE INDICATORS
# ═══════════════════════════════════════════════════════════════════
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
pair = metadata["pair"]
N = len(dataframe)
dataframe["atr14"] = ta.ATR(dataframe, timeperiod=14)
if "date" in dataframe.columns:
dataframe["hour"] = pd.to_datetime(dataframe["date"], utc=True).dt.hour
else:
dataframe["hour"] = 0
# ─── RAW VECTORS ───
C = dataframe["close"].values.astype(np.float64)
H = dataframe["high"].values.astype(np.float64)
L = dataframe["low"].values.astype(np.float64)
O = dataframe["open"].values.astype(np.float64)
V = dataframe["volume"].values.astype(np.float64)
ret = np.zeros(N)
ret[1:] = np.where(C[:-1] != 0, (C[1:] - C[:-1]) / C[:-1], 0)
rng = H - L
rng_safe = np.where(rng > 0, rng, 1)
bar_bp = np.where(rng > 0, ((C - L) / rng_safe) * V, 0)
bar_sp = np.where(rng > 0, ((H - C) / rng_safe) * V, 0)
close_pos = np.where(rng > 0, (C - L) / rng, 0.5)
# MII
def fast_mii(w):
mii = np.zeros(N)
if N < w:
return mii
bp_c = np.convolve(bar_bp, np.ones(w), "valid")
sp_c = np.convolve(bar_sp, np.ones(w), "valid")
tot = bp_c + sp_c
off = w - 1
ln = min(len(bp_c), N - off)
mii[off:off + ln] = np.where(tot[:ln] > 0, (bp_c[:ln] - sp_c[:ln]) / tot[:ln], 0)
return mii
mii6 = fast_mii(6)
mii12 = fast_mii(12)
# PER
def fast_per(w):
per = np.zeros(N)
if N <= w:
return per
net = np.abs(C[w:] - C[:-w])
step = np.abs(np.diff(C))
path = np.zeros(max(0, N - w))
for wi in range(w):
end = N - w + wi
if end > len(step):
break
path += step[wi:end]
sz = min(len(net), len(path), N - w)
per[w:w + sz] = np.where(path[:sz] > 0, net[:sz] / path[:sz], 0)
return per
per6 = fast_per(6)
# Vol Z
vol_z = np.zeros(N)
for i in range(20, N):
win = V[i - 20:i]
m, s = np.mean(win), np.std(win)
vol_z[i] = (V[i] - m) / s if s > 0 else 0
# Conviction
bar_conv = close_pos * np.clip(vol_z, 0, 5) * np.sign(C - O)
cs = np.cumsum(bar_conv)
conv3 = np.zeros(N)
conv3[3:] = cs[3:] - cs[:-3]
# Momentum accel
mom3 = np.zeros(N)
mom3[3:] = np.where(C[:-3] != 0, (C[3:] - C[:-3]) / C[:-3] * 100, 0)
mom_acc = np.zeros(N)
mom_acc[4:] = mom3[4:] - mom3[1:-3]
# FCI
fci12 = np.ones(N)
for i in range(12, N):
h_max = np.max(H[i - 12:i + 1])
l_min = np.min(L[i - 12:i + 1])
tr = (h_max - l_min) / C[i] * 100 if C[i] > 0 else 0
ab = np.mean(H[i - 12:i + 1] - L[i - 12:i + 1]) / C[i] * 100 if C[i] > 0 else 0
fci12[i] = tr / (ab * 12) if ab > 0 else 1
# Absorption
bull_abs = np.zeros(N)
bear_abs = np.zeros(N)
for i in range(20, N):
if rng[i] == 0:
continue
lw = (min(C[i], O[i]) - L[i]) / rng[i]
uw = (H[i] - max(C[i], O[i])) / rng[i]
va = np.mean(V[max(0, i - 20):i])
vr = V[i] / va if va > 0 else 1
if lw > 0.55 and vr > 1.3 and close_pos[i] > 0.55:
bull_abs[i] = lw * vr
if uw > 0.55 and vr > 1.3 and close_pos[i] < 0.45:
bear_abs[i] = uw * vr
# ─── PERCENTILES (computed ONCE for all scores) ───
mii6_p = self._pct_rank(mii6)
mii12_p = self._pct_rank(mii12)
conv3_p = self._pct_rank(conv3)
per6_p = self._pct_rank(per6)
fci12_p = self._pct_rank(fci12)
vol_z_p = self._pct_rank(vol_z)
mom_acc_p = self._pct_rank(mom_acc)
# ─── HTF MII ───
htf_1h_mii = np.zeros(N)
htf_4h_mii = np.zeros(N)
if self.dp:
for tf in ["1h", "4h"]:
inf_df = self.dp.get_pair_dataframe(pair=pair, timeframe=tf)
if inf_df.empty or len(inf_df) < 7:
continue
ch = inf_df["close"].values.astype(np.float64)
hh = inf_df["high"].values.astype(np.float64)
lh = inf_df["low"].values.astype(np.float64)
vh = inf_df["volume"].values.astype(np.float64)
nh = len(ch)
rng_h = hh - lh
rng_h_safe = np.where(rng_h > 0, rng_h, 1)
bp_h = ((ch - lh) / rng_h_safe) * vh
sp_h = ((hh - ch) / rng_h_safe) * vh
mii_h = np.zeros(nh)
for ii in range(6, nh):
bp = np.sum(bp_h[ii - 5:ii + 1])
sp = np.sum(sp_h[ii - 5:ii + 1])
t = bp + sp
mii_h[ii] = (bp - sp) / t if t > 0 else 0
# Merge via informative pair system
inf_mii = inf_df[["date"]].copy()
inf_mii[f"mii_{tf}"] = mii_h
dataframe = merge_informative_pair(
dataframe, inf_mii, self.timeframe, tf, ffill=True
)
# Extract mapped HTF columns
for col in dataframe.columns:
if "mii_1h" in col and col != "mii_1h":
htf_1h_mii = dataframe[col].fillna(0).values
break
for col in dataframe.columns:
if "mii_4h" in col and col != "mii_4h":
htf_4h_mii = dataframe[col].fillna(0).values
break
# ─── BUILD SCORES (using pre-computed percentiles) ───
needed = set()
for sig in VALIDATED_SIGNALS:
needed.add((sig[1], sig[2]))
for wkey, direction in needed:
w = WEIGHT_CONFIGS[wkey]
tw = sum(w)
if tw == 0:
continue
s = np.zeros(N)
if direction == "long":
s += w[0] * mii6_p + w[1] * mii12_p + w[2] * conv3_p
s += w[3] * (100 - per6_p) + w[4] * (100 - fci12_p) + w[5] * vol_z_p
s += w[6] * np.clip(htf_1h_mii * 200 + 50, 0, 100)
s += w[7] * np.clip(htf_4h_mii * 200 + 50, 0, 100)
s += w[8] * np.clip(bull_abs * 33, 0, 100)
s += w[9] * mom_acc_p
else:
s += w[0] * (100 - mii6_p) + w[1] * (100 - mii12_p) + w[2] * (100 - conv3_p)
s += w[3] * (100 - per6_p) + w[4] * (100 - fci12_p) + w[5] * vol_z_p
s += w[6] * np.clip(-htf_1h_mii * 200 + 50, 0, 100)
s += w[7] * np.clip(-htf_4h_mii * 200 + 50, 0, 100)
s += w[8] * np.clip(bear_abs * 33, 0, 100)
s += w[9] * (100 - mom_acc_p)
dataframe[f"score_{wkey}_{direction[0]}"] = s / tw
return dataframe
# ═══════════════════════════════════════════════════════════════════
# POPULATE ENTRY TREND
# ═══════════════════════════════════════════════════════════════════
def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
N = len(dataframe)
hours = dataframe["hour"].values
close_v = dataframe["close"].values
el = np.zeros(N, dtype=int)
es = np.zeros(N, dtype=int)
sig_n = [""] * N
sig_sm = np.full(N, 2.0)
sig_tm = np.full(N, 0.0)
sig_tl = np.full(N, 1.5)
sig_mb = np.full(N, 96, dtype=int)
for sig in VALIDATED_SIGNALS:
name, wkey, direction, thresh, cb, cm, tf_name, sm, tm, tlm, mb = sig
if direction == "short" and not self.can_short:
continue
score_col = f"score_{wkey}_{direction[0]}"
if score_col not in dataframe.columns:
continue
scores = dataframe[score_col].values
tf_hours = TIME_FILTERS.get(tf_name)
cd = 0
for i in range(max(550, cb), N):
if cd > 0:
cd -= 1
continue
if tf_hours is not None and hours[i] not in tf_hours:
continue
ti = i - cb if cb > 0 else i
if ti < 0 or scores[ti] < thresh:
continue
# Confirmation
if cb > 0 and cm != "none":
ok = True
for j in range(1, cb + 1):
c = ti + j
if c >= N:
ok = False
break
if cm in ("price", "both"):
if direction == "long" and close_v[c] <= close_v[c - 1]:
ok = False
break
if direction == "short" and close_v[c] >= close_v[c - 1]:
ok = False
break
if cm in ("score_hold", "both"):
if scores[c] < thresh * 0.95:
ok = False
break
if not ok:
continue
if direction == "long":
el[i] = 1
else:
es[i] = 1
sig_n[i] = name
sig_sm[i] = sm
sig_tm[i] = tm
sig_tl[i] = tlm
sig_mb[i] = mb
cd = cb + 6
dataframe["enter_long"] = el
dataframe["enter_short"] = es
dataframe["omega_sig"] = sig_n
dataframe["omega_sm"] = sig_sm
dataframe["omega_tm"] = sig_tm
dataframe["omega_tl"] = sig_tl
dataframe["omega_mb"] = sig_mb
logger.info(f"OMEGA {metadata['pair']}: {el.sum()} long + {es.sum()} short signals")
return dataframe
# ═══════════════════════════════════════════════════════════════════
# EXIT (handled by custom_exit / custom_stoploss)
# ═══════════════════════════════════════════════════════════════════
def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
dataframe["exit_long"] = 0
dataframe["exit_short"] = 0
return dataframe
# ═══════════════════════════════════════════════════════════════════
# CUSTOM ENTRY TAG
# ═══════════════════════════════════════════════════════════════════
def custom_entry_tag(self, pair: str, trade: Trade, order_type: str, **kwargs) -> str:
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
if len(dataframe) == 0:
return "omega"
tag = dataframe.iloc[-1].get("omega_sig", "")
return str(tag) if tag else "omega"
# ═══════════════════════════════════════════════════════════════════
# CONFIRM TRADE ENTRY — Store signal params for exit system
# ═══════════════════════════════════════════════════════════════════
def confirm_trade_entry(self, pair, order_type, amount, rate, time_in_force,
current_time, entry_tag, side, **kwargs) -> bool:
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
if len(dataframe) > 0:
last = dataframe.iloc[-1]
sm = last.get("omega_sm", 2.0)
tm = last.get("omega_tm", 0.0)
tl = last.get("omega_tl", 1.5)
mb = last.get("omega_mb", 96)
atr_entry = last.get("atr14", 0)
# Compute fixed stop distance % at entry time
stop_pct = (atr_entry * float(sm if not (isinstance(sm, float) and np.isnan(sm)) else 2.0)) / rate if rate > 0 and atr_entry > 0 else 0.02
stop_pct = max(stop_pct, 0.005)
key = f"{pair}_{current_time}"
self._trade_signals[key] = {
"stop_mult": float(sm) if not (isinstance(sm, float) and np.isnan(sm)) else 2.0,
"target_mult": float(tm) if not (isinstance(tm, float) and np.isnan(tm)) else 0.0,
"trail_mult": float(tl) if not (isinstance(tl, float) and np.isnan(tl)) else 1.5,
"max_bars": int(mb) if mb > 0 else 96,
"stop_pct": stop_pct, # Fixed entry-time stop distance
}
return True
def _get_sig_params(self, trade: Trade) -> dict:
"""Retrieve signal params for this trade."""
for key, params in self._trade_signals.items():
if trade.pair in key:
return params
tag = trade.enter_tag if hasattr(trade, "enter_tag") and trade.enter_tag else ""
for sig in VALIDATED_SIGNALS:
if sig[0] == tag:
return {"stop_mult": sig[7], "target_mult": sig[8],
"trail_mult": sig[9], "max_bars": sig[10]}
return {"stop_mult": 2.0, "target_mult": 6.0, "trail_mult": 1.5, "max_bars": 96, "stop_pct": 0.005}
# ═══════════════════════════════════════════════════════════════════
# CUSTOM STOPLOSS — ATR + R-Multiple Progressive Trail
# ═══════════════════════════════════════════════════════════════════
def custom_stoploss(self, pair, trade, current_time, current_rate,
current_profit, **kwargs) -> float:
p = self._get_sig_params(trade)
is_short = trade.is_short if hasattr(trade, "is_short") else False
# Use FIXED entry-time stop distance (not current ATR)
stop_pct = p.get("stop_pct", 0)
if stop_pct <= 0:
# Fallback: compute from current ATR
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
if len(dataframe) == 0:
return -0.10
atr = dataframe.iloc[-1].get("atr14", 0)
sm = p.get("stop_mult", 2.0)
stop_pct = max((atr * sm) / trade.open_rate, 0.005) if atr > 0 and trade.open_rate > 0 else 0.02
# Trail mult from signal
trail_m = p.get("trail_mult", 1.5)
# Progressive R-multiple trailing (mirrors V3 logic)
if current_profit > 0 and stop_pct > 0:
r = current_profit / stop_pct
if r >= 3.0:
# Lock 2R profit
return stoploss_from_open(2.0 * stop_pct, current_profit, is_short=is_short)
elif r >= 2.0:
# Trail at trail_mult * ATR from best
return stoploss_from_open(1.0 * stop_pct, current_profit, is_short=is_short)
elif r >= 1.0:
# Break-even + small buffer
return stoploss_from_open(0.002, current_profit, is_short=is_short)
# Initial: fixed ATR-based stop from entry
return max(-stop_pct, -0.10)
# ═══════════════════════════════════════════════════════════════════
# CUSTOM EXIT — R:R Target + Time Management
# ═══════════════════════════════════════════════════════════════════
def custom_exit(self, pair, trade, current_time, current_rate,
current_profit, **kwargs) -> Optional[str]:
p = self._get_sig_params(trade)
tm = p.get("target_mult", 0)
mb = p.get("max_bars", 96)
# Use fixed entry stop distance for R:R
stop_pct = p.get("stop_pct", 0.005)
if stop_pct <= 0:
stop_pct = 0.005
# R:R target
if tm > 0:
tp = stop_pct * tm
if current_profit >= tp:
return "omega_tp"
# Early TP if time elapsed > 50% of max and profit > 60% target
if current_profit >= tp * 0.6:
h = (current_time - trade.open_date_utc).total_seconds() / 3600
if h > mb * 5 / 60 * 0.5:
return "omega_tp_early"
# Time exits
h = (current_time - trade.open_date_utc).total_seconds() / 3600
mh = mb * 5 / 60 # max_bars * 5min = hours
if h > mh and current_profit > 0.001:
return "omega_time_profit"
if h > mh * 1.5:
return "omega_time_force"
return None