RTAI Strategy for precise 1-minute microstructure mean-reversion.
Timeframe
1m
Direction
Long Only
Stoploss
-99.0%
Trailing Stop
No
ROI
0m: 1000.0%
Interface Version
3
Startup Candles
N/A
Indicators
1
freqtrade/freqtrade-strategies
This strategy uses custom_stoploss() to enforce a fixed risk/reward ratio by first calculating a dynamic initial stoploss via ATR - last negative peak
"""
Enhanced RTAI Strategy - Microstructure Mean-Reversion
=====================================================
Advanced 1-minute mean-reversion strategy implementing the precise logic
from the context file:
ENTRY LOGIC:
- |OFI_z| > 2.25 AND sign(OFI_z) != sign(MPD_z) AND |MPD_z| > 1.5
- Risk Filters: TOBI ∈ [0.25,0.75], WallRatio < 0.25, VPIN < p98_threshold
EXIT LOGIC:
- MPD_z crosses 0 OR Kyle λ spikes 5× OR LPI > 1.5
POSITION SIZING:
- S = tanh(0.6 × |OFI_z| × |MPD_z|) × sign(-OFI_z)
This strategy specifically fades short-term imbalances when microstructure
shows price divergence from fair value.
"""
from freqtrade.strategy import IStrategy, DecimalParameter, IntParameter
from freqtrade.persistence import Trade
from typing import Dict, List, Optional, Tuple
import pandas as pd
import numpy as np
import talib.abstract as ta
import logging
# Import our enhanced indicators
try:
from lib.rtai_indicators import add_all_rtai_indicators, joint_signal_score
except ImportError:
try:
from strategies.lib.rtai_indicators import add_all_rtai_indicators, joint_signal_score
except ImportError:
# Fallback to old location for backward compatibility
from user_data.strategies.lib.rtai_indicators import add_all_rtai_indicators, joint_signal_score
logger = logging.getLogger(__name__)
class RTAIStrategy(IStrategy):
"""
RTAI Strategy for precise 1-minute microstructure mean-reversion.
Implements the exact logic specified for optimal mean-reversion trading
based on order flow imbalance and micro-price divergence.
"""
# === STRATEGY METADATA ===
INTERFACE_VERSION = 3
can_short: bool = True # Enable short selling for mean-reversion
# === TIMEFRAME & SETUP ===
timeframe = '1m' # Critical: Strategy designed for 1-minute bars
startup_candle_count: int = 200 # Need history for robust Z-scores
# === POSITION MANAGEMENT ===
max_open_trades = 2 # Conservative for mean-reversion
process_only_new_candles = True
use_exit_signal = True
exit_profit_only = False
ignore_roi_if_entry_signal = True
# === DISABLE BUILT-IN RISK MANAGEMENT ===
# We use custom logic instead
minimal_roi = {"0": 10} # High value to effectively disable
stoploss = -0.99 # Very low to disable (custom logic used)
trailing_stop = False # Custom adaptive stops used
# === STRATEGY PARAMETERS (OPTIMIZABLE) ===
# Entry thresholds (from context file)
ofi_entry_threshold = DecimalParameter(2.0, 2.5, default=2.25, space='buy', optimize=True)
mpd_entry_threshold = DecimalParameter(1.2, 1.8, default=1.5, space='buy', optimize=True)
# Risk filter thresholds
tobi_min = DecimalParameter(0.20, 0.30, default=0.25, space='buy', optimize=True)
tobi_max = DecimalParameter(0.70, 0.80, default=0.75, space='buy', optimize=True)
wall_ratio_max = DecimalParameter(0.20, 0.30, default=0.25, space='buy', optimize=True)
vpin_percentile = DecimalParameter(0.95, 0.99, default=0.98, space='buy', optimize=True)
# Exit thresholds
kyle_spike_multiplier = DecimalParameter(3.0, 7.0, default=5.0, space='sell', optimize=True)
lpi_exit_threshold = DecimalParameter(1.2, 1.8, default=1.5, space='sell', optimize=True)
# Position sizing
max_position_percent = DecimalParameter(5.0, 10.0, default=8.0, space='buy', optimize=True)
conviction_multiplier = DecimalParameter(0.5, 0.8, default=0.6, space='buy', optimize=True)
# === PERFORMANCE TRACKING ===
total_signals_generated = 0
successful_exits = 0
def populate_indicators(self, dataframe: pd.DataFrame, metadata: dict) -> pd.DataFrame:
"""
Populate all enhanced RTAI indicators for microstructure analysis.
This implements the enhanced indicator suite from rtai_indicators_enhanced.py
with all the optimizations specified in the context file.
"""
pair = metadata.get('pair', 'UNKNOWN')
logger.info(f"🔬 Calculating enhanced RTAI indicators for {pair} ({len(dataframe)} candles)")
# === 1. MICROSTRUCTURE DATA INTEGRATION ===
# Get tick-level data from RTAIDataProvider if available
if hasattr(self, 'dp') and self.dp:
try:
# Enhanced data from our custom DataProvider
enhanced_df = self.dp.get_pair_dataframe(pair=pair, timeframe=self.timeframe)
if enhanced_df is not None and not enhanced_df.empty:
# Merge microstructure columns
microstructure_cols = [
'bid', 'ask', 'bid_size', 'ask_size',
'buy_volume', 'sell_volume', 'volume_imbalance',
'long_liquidations', 'short_liquidations', 'liquidation_imbalance',
'open_interest', 'funding_rate', 'index_price', 'mark_price'
]
for col in microstructure_cols:
if col in enhanced_df.columns:
dataframe[col] = enhanced_df[col]
logger.info(f"✅ Merged {len([c for c in microstructure_cols if c in enhanced_df.columns])} microstructure columns")
else:
logger.warning(f"⚠️ No enhanced microstructure data for {pair}")
except Exception as e:
logger.error(f"❌ Error getting microstructure data: {e}")
# === 2. ENHANCED RTAI INDICATORS ===
# This adds all the enhanced indicators from the context file
dataframe = add_all_rtai_indicators(dataframe)
# === 3. DYNAMIC THRESHOLDS ===
# VPIN dynamic threshold (98th percentile, updated every 15 minutes)
vpin_window = 15 # minutes
dataframe['vpin_dynamic_threshold'] = dataframe['vpin'].rolling(
window=vpin_window, min_periods=5
).quantile(self.vpin_percentile.value)
# Kyle Lambda baseline for spike detection (5× median rule)
kyle_window = 90 # 90 minutes for stable baseline
dataframe['kyle_baseline'] = dataframe['kyle_lambda'].rolling(
window=kyle_window, min_periods=10
).median()
# === 4. MARKET REGIME DETECTION ===
# ATR for volatility regime
dataframe['atr'] = ta.ATR(dataframe, timeperiod=14)
dataframe['atr_regime'] = dataframe['atr'].rolling(100).rank(pct=True)
# Volume regime (for filtering low-volume periods)
dataframe['volume_regime'] = (
dataframe['volume'].rolling(30).mean() /
dataframe['volume'].rolling(120).mean()
)
# === 5. SIGNAL PREPARATION ===
# Joint conviction score for position sizing (from context file)
dataframe['conviction_score'] = joint_signal_score(
dataframe['ofi_z'], dataframe['mpd_z']
)
# Risk-adjusted signal strength
volatility_penalty = np.clip(dataframe['atr_regime'], 0.1, 1.0)
dataframe['risk_adjusted_conviction'] = (
dataframe['conviction_score'] / volatility_penalty
)
# === 6. SIGNAL VALIDATION ===
# Check data quality for critical indicators
critical_indicators = ['ofi_z', 'mpd_z', 'tobi', 'wall_ratio', 'vpin']
missing_indicators = [ind for ind in critical_indicators if ind not in dataframe.columns]
if missing_indicators:
logger.warning(f"⚠️ Missing critical indicators: {missing_indicators}")
else:
logger.info(f"✅ All critical indicators present for {pair}")
return dataframe
def populate_entry_trend(self, dataframe: pd.DataFrame, metadata: dict) -> pd.DataFrame:
"""
Implement the precise mean-reversion entry logic from the context file.
ENTRY CONDITION (ALL must be true):
1. |OFI_z| > 2.25 (strong flow imbalance)
2. sign(OFI_z) != sign(MPD_z) (divergence = mean-reversion opportunity)
3. |MPD_z| > 1.5 (significant micro-price divergence)
4. TOBI ∈ [0.25, 0.75] (balanced book, no extreme skew)
5. WallRatio < 0.25 (no large walls blocking)
6. VPIN < dynamic_p98 (flow not toxic)
7. Minimum volume and volatility filters
"""
# === CORE DIVERGENCE SIGNALS ===
strong_ofi = np.abs(dataframe['ofi_z']) > self.ofi_entry_threshold.value
strong_mpd = np.abs(dataframe['mpd_z']) > self.mpd_entry_threshold.value
divergence = np.sign(dataframe['ofi_z']) != np.sign(dataframe['mpd_z'])
# === RISK FILTERS ===
balanced_book = (
(dataframe['tobi'] >= self.tobi_min.value) &
(dataframe['tobi'] <= self.tobi_max.value)
)
no_walls = dataframe['wall_ratio'] < self.wall_ratio_max.value
flow_not_toxic = dataframe['vpin'] < dataframe['vpin_dynamic_threshold']
# === MARKET CONDITION FILTERS ===
sufficient_volume = (
dataframe['volume'] > dataframe['volume'].rolling(20).mean() * 0.5
)
normal_volatility = dataframe['atr_regime'] < 0.9 # Not in extreme vol regime
# === COMBINED CONDITIONS ===
base_conditions = (
strong_ofi & strong_mpd & divergence &
balanced_book & no_walls & flow_not_toxic &
sufficient_volume & normal_volatility
)
# === LONG ENTRIES: Fade sell pressure ===
# OFI_z < -2.25 (sell pressure) AND MPD_z > 1.5 (price too low) → BUY
long_specific = (
(dataframe['ofi_z'] < -self.ofi_entry_threshold.value) &
(dataframe['mpd_z'] > self.mpd_entry_threshold.value)
)
dataframe.loc[base_conditions & long_specific, 'enter_long'] = 1
# === SHORT ENTRIES: Fade buy pressure ===
# OFI_z > 2.25 (buy pressure) AND MPD_z < -1.5 (price too high) → SELL
short_specific = (
(dataframe['ofi_z'] > self.ofi_entry_threshold.value) &
(dataframe['mpd_z'] < -self.mpd_entry_threshold.value)
)
dataframe.loc[base_conditions & short_specific, 'enter_short'] = 1
# === ENTRY TAGGING FOR ANALYSIS ===
long_entries = dataframe['enter_long'] == 1
short_entries = dataframe['enter_short'] == 1
if long_entries.any():
dataframe.loc[long_entries, 'enter_tag'] = (
'FADE_SELL|OFI:' + dataframe.loc[long_entries, 'ofi_z'].round(2).astype(str) +
'|MPD:' + dataframe.loc[long_entries, 'mpd_z'].round(2).astype(str) +
'|CONV:' + dataframe.loc[long_entries, 'conviction_score'].round(3).astype(str)
)
if short_entries.any():
dataframe.loc[short_entries, 'enter_tag'] = (
'FADE_BUY|OFI:' + dataframe.loc[short_entries, 'ofi_z'].round(2).astype(str) +
'|MPD:' + dataframe.loc[short_entries, 'mpd_z'].round(2).astype(str) +
'|CONV:' + dataframe.loc[short_entries, 'conviction_score'].round(3).astype(str)
)
# === LOGGING ===
total_long = long_entries.sum()
total_short = short_entries.sum()
if total_long > 0 or total_short > 0:
self.total_signals_generated += total_long + total_short
logger.info(f"🎯 Entry signals: {total_long} LONG, {total_short} SHORT (Total: {self.total_signals_generated})")
return dataframe
def populate_exit_trend(self, dataframe: pd.DataFrame, metadata: dict) -> pd.DataFrame:
"""
Implement mean-reversion exit logic from the context file.
EXIT CONDITIONS (ANY can trigger):
1. MPD_z crosses zero (mean-reversion complete)
2. Kyle λ spikes 5× median (high impact risk)
3. LPI_z > 1.5 (liquidation cascade risk)
4. Extreme VPIN (>0.99, toxic flow)
"""
# === PRIMARY EXIT: Mean-reversion complete ===
# For LONG: MPD was positive (price too low), now crosses to negative/zero
mpd_long_reversion = (
(dataframe['mpd_z'].shift(1) > 0) &
(dataframe['mpd_z'] <= 0)
)
# For SHORT: MPD was negative (price too high), now crosses to positive/zero
mpd_short_reversion = (
(dataframe['mpd_z'].shift(1) < 0) &
(dataframe['mpd_z'] >= 0)
)
# === RISK MANAGEMENT EXITS ===
# Kyle Lambda spike (market impact risk)
kyle_spike = (
dataframe['kyle_lambda'] >
dataframe['kyle_baseline'] * self.kyle_spike_multiplier.value
)
# Liquidation cascade risk
liquidation_cascade = dataframe['lpi_z'] > self.lpi_exit_threshold.value
# Extreme toxic flow
toxic_flow = dataframe['vpin'] > 0.99
# === COMBINED EXIT CONDITIONS ===
risk_exits = kyle_spike | liquidation_cascade | toxic_flow
# === APPLY EXITS ===
dataframe.loc[mpd_long_reversion | risk_exits, 'exit_long'] = 1
dataframe.loc[mpd_short_reversion | risk_exits, 'exit_short'] = 1
# === EXIT TAGGING FOR ANALYSIS ===
long_exits = dataframe['exit_long'] == 1
short_exits = dataframe['exit_short'] == 1
# Determine exit reason
def get_exit_reason(row):
if row['vpin'] > 0.99:
return 'TOXIC_FLOW'
elif row['lpi_z'] > self.lpi_exit_threshold.value:
return 'LIQ_CASCADE'
elif row['kyle_lambda'] > row.get('kyle_baseline', 0) * self.kyle_spike_multiplier.value:
return 'KYLE_SPIKE'
else:
return 'MPD_REVERSION'
if long_exits.any():
exit_reasons = dataframe.loc[long_exits].apply(get_exit_reason, axis=1)
dataframe.loc[long_exits, 'exit_tag'] = exit_reasons
if short_exits.any():
exit_reasons = dataframe.loc[short_exits].apply(get_exit_reason, axis=1)
dataframe.loc[short_exits, 'exit_tag'] = exit_reasons
# === LOGGING ===
total_long_exits = long_exits.sum()
total_short_exits = short_exits.sum()
if total_long_exits > 0 or total_short_exits > 0:
self.successful_exits += total_long_exits + total_short_exits
logger.info(f"🚪 Exit signals: {total_long_exits} LONG, {total_short_exits} SHORT")
return dataframe
def custom_stake_amount(self, pair: str, current_time, current_rate: float,
proposed_stake: float, min_stake: float, max_stake: float,
leverage: float, entry_tag: str, side: str, **kwargs) -> float:
"""
Dynamic position sizing based on signal conviction from context file.
Formula: pos_size = S × min(max_pos_pct, account_nav / ADRV)
Where: S = tanh(0.6 × |OFI_z| × |MPD_z|) × sign(-OFI_z)
"""
try:
# Get current market data
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
if dataframe.empty:
return proposed_stake
latest_row = dataframe.iloc[-1]
# === CONVICTION CALCULATION ===
ofi_z = latest_row.get('ofi_z', 0)
mpd_z = latest_row.get('mpd_z', 0)
# Signal strength from context file formula
conviction_raw = self.conviction_multiplier.value * abs(ofi_z) * abs(mpd_z)
signal_strength = np.tanh(conviction_raw) # Normalized 0-1
# === BASE POSITION SIZE ===
total_capital = self.wallets.get_total_stake_amount()
base_position_pct = self.max_position_percent.value / 100.0
base_stake = total_capital * base_position_pct
# === RISK ADJUSTMENTS ===
# Reduce size in high volatility
atr_regime = latest_row.get('atr_regime', 0.5)
volatility_adjustment = 1.0 / (1.0 + atr_regime)
# Reduce size if VPIN is elevated (risky regime)
vpin = latest_row.get('vpin', 0.5)
vpin_adjustment = 1.0 if vpin < 0.7 else 0.7
# === FINAL CALCULATION ===
adjusted_stake = (
base_stake *
signal_strength *
volatility_adjustment *
vpin_adjustment
)
# Apply limits
final_stake = max(min_stake, min(adjusted_stake, max_stake))
logger.info(f"💰 Position sizing for {pair}: "
f"conviction={signal_strength:.3f}, "
f"vol_adj={volatility_adjustment:.3f}, "
f"stake=${final_stake:.2f}")
return final_stake
except Exception as e:
logger.error(f"❌ Error in position sizing: {e}")
return proposed_stake
def custom_exit(self, pair: str, trade: Trade, current_time, current_rate: float,
current_profit: float, **kwargs) -> Optional[str]:
"""
Custom exit logic with adaptive stop-loss from context file.
Adaptive Stop: ATR(60s) × 2, tighten to 1× when Kyle > threshold
"""
try:
# Get current market data
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
if dataframe.empty:
return None
latest_row = dataframe.iloc[-1]
# === ADAPTIVE STOP LOSS ===
atr = latest_row.get('atr', 0.001)
kyle_lambda = latest_row.get('kyle_lambda', 0)
kyle_baseline = latest_row.get('kyle_baseline', 1e-6)
# Adaptive stop multiplier based on Kyle Lambda
kyle_ratio = abs(kyle_lambda) / (abs(kyle_baseline) + 1e-9)
if kyle_ratio > 3.0:
stop_multiplier = 1.0 # Tight stop when impact risk high
else:
stop_multiplier = 2.0 # Normal stop
# Calculate stop distance
stop_distance_pct = (atr * stop_multiplier) / current_rate
# Apply stop based on trade direction
if trade.is_short:
stop_price = trade.open_rate * (1 + stop_distance_pct)
if current_rate >= stop_price:
return "adaptive_stop_loss"
else:
stop_price = trade.open_rate * (1 - stop_distance_pct)
if current_rate <= stop_price:
return "adaptive_stop_loss"
# === RISK MANAGEMENT EXITS ===
vpin = latest_row.get('vpin', 0.5)
lpi_z = latest_row.get('lpi_z', 0)
# Exit on toxic flow
if vpin > 0.99:
return "toxic_flow_exit"
# Exit on liquidation cascade
if abs(lpi_z) > self.lpi_exit_threshold.value:
return "liquidation_cascade_exit"
# === PROFIT TAKING ===
# Take profits when mean-reversion is significant
mpd_z = latest_row.get('mpd_z', 0)
min_profit_threshold = 0.003 # 0.3% minimum profit
if current_profit > min_profit_threshold:
if trade.is_short and mpd_z > 1.0:
return "mean_reversion_profit"
elif not trade.is_short and mpd_z < -1.0:
return "mean_reversion_profit"
return None
except Exception as e:
logger.error(f"❌ Error in custom exit: {e}")
return None
def leverage(self, pair: str, current_time, current_rate: float,
proposed_leverage: float, max_leverage: float, entry_tag: str,
side: str, **kwargs) -> float:
"""
Conservative leverage for mean-reversion strategy.
Higher leverage only for very high conviction signals.
"""
base_leverage = 1.5 # Conservative base
try:
# Get signal strength from entry tag
if entry_tag and 'CONV:' in entry_tag:
conv_str = entry_tag.split('CONV:')[1].split('|')[0]
conviction = float(conv_str)
# Higher leverage for higher conviction
if conviction > 0.8:
leverage_multiplier = 1.5 # Up to 2.25x
elif conviction > 0.6:
leverage_multiplier = 1.2 # Up to 1.8x
else:
leverage_multiplier = 1.0 # Base 1.5x
final_leverage = base_leverage * leverage_multiplier
return min(final_leverage, max_leverage)
except Exception:
pass
return min(base_leverage, max_leverage)
def confirm_trade_entry(self, pair: str, order_type: str, amount: float,
rate: float, time_in_force: str, current_time,
entry_tag: str, side: str, **kwargs) -> bool:
"""
Final confirmation before entering trade.
Last-minute validation of all conditions.
"""
try:
# Get fresh market data
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
if dataframe.empty:
return False
latest_row = dataframe.iloc[-1]
# === DOUBLE-CHECK CORE CONDITIONS ===
ofi_z = latest_row.get('ofi_z', 0)
mpd_z = latest_row.get('mpd_z', 0)
# Ensure signals are still strong
if (abs(ofi_z) < self.ofi_entry_threshold.value or
abs(mpd_z) < self.mpd_entry_threshold.value):
logger.warning(f"⚠️ Signal weakened for {pair}, skipping entry")
return False
# Ensure divergence still exists
if np.sign(ofi_z) == np.sign(mpd_z):
logger.warning(f"⚠️ No divergence for {pair}, skipping entry")
return False
# === RISK CHECKS ===
vpin = latest_row.get('vpin', 1.0)
wall_ratio = latest_row.get('wall_ratio', 1.0)
tobi = latest_row.get('tobi', 0.5)
if vpin > self.vpin_percentile.value:
logger.warning(f"⚠️ Toxic flow for {pair} (VPIN: {vpin:.3f}), skipping")
return False
if wall_ratio > self.wall_ratio_max.value:
logger.warning(f"⚠️ Large walls detected for {pair}, skipping")
return False
if not (self.tobi_min.value <= tobi <= self.tobi_max.value):
logger.warning(f"⚠️ Unbalanced book for {pair} (TOBI: {tobi:.3f}), skipping")
return False
# === FINAL CONFIRMATION ===
logger.info(f"✅ Trade entry confirmed for {pair}: "
f"OFI_z={ofi_z:.2f}, MPD_z={mpd_z:.2f}, "
f"VPIN={vpin:.3f}, TOBI={tobi:.3f}")
return True
except Exception as e:
logger.error(f"❌ Error in trade confirmation: {e}")
return False
def informative_pairs(self) -> List[Tuple[str, str]]:
"""
Define informative pairs for additional market context.
"""
return [] # Mean-reversion strategy focuses on single pair
def bot_loop_start(self, **kwargs) -> None:
"""
Called at the start of each bot loop.
Use for logging and monitoring.
"""
if self.total_signals_generated > 0:
success_rate = (self.successful_exits / self.total_signals_generated) * 100
logger.info(f"📊 RTAI Strategy Stats: "
f"Signals: {self.total_signals_generated}, "
f"Exits: {self.successful_exits}, "
f"Success Rate: {success_rate:.1f}%")
# Export the enhanced strategy
# Export only the main strategy class for Freqtrade discovery
# Removed __all__ to prevent duplicate detection issues