🚀 CONTEXT7 SUPER POWERS ENHANCED V8092 - Production Ready
Timeframe
5m
Direction
Long & Short
Stoploss
-10.0%
Trailing Stop
Yes
ROI
0m: 15.0%, 15m: 10.0%, 30m: 7.0%, 60m: 5.0%
Interface Version
3
Startup Candles
N/A
Indicators
7
freqtrade/freqtrade-strategies
Strategy 003 author@: Gerald Lonlas github@: https://github.com/freqtrade/freqtrade-strategies
"""
===========================================================================
ICHIMOKU LLM ENHANCED V1 - MULTI-TIMEFRAME WITH LEVERAGE OPTIMIZED V2
===========================================================================
Versione ottimizzata V2 basata su architettura Context7 Super Powers:
CARATTERISTICHE PRINCIPALI V2:
✅ Multi-Timeframe: 5m, 15m, 30m, 1h, 1d con 6 posizioni per timeframe
✅ Batch processing atomico e senza conflitti
✅ Singleton pattern per prevenire chiamate multiple
✅ Sistema di fallback robusto con gestione errori avanzata
✅ Tenacity integration per retry con exponential backoff
✅ Cache layering ottimizzato per performance
✅ Dynamic leverage fino a 10x con asset-specific limits
✅ Stop loss hard-coded al -2% prima del trailing take profit
✅ WebSocket timeout estesi per maggiore stabilità
✅ Database SQLite dedicato per isolamento dati
ARCHITETTURA A STRATI:
1. DataLayer: Raccolta dati multi-timeframe isolata e robusta
2. BatchLayer: Processing centralizzato con lock globale
3. StrategyLayer: Logica pura con multi-timeframe consensus
4. FreqTradeLayer: Integrazione minima e pulita
FUNZIONALITÀ COMPLETE:
- Ichimoku Cloud multi-timeframe con segnali avanzati
- LLM regime detection con batch processing ottimizzato
- Risk management adattivo basato su volatility
- Multi-timeframe consensus validation (5m, 15m, 30m, 1h, 1d)
- Position sizing intelligente (6 assets × 5 timeframes = 30 max posizioni)
- Dynamic leverage con asset-specific limits
- Stop-loss e take-profit dinamici
- Portfolio management con correlation analysis
"""
import json
import logging
import os
import time
import requests
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple, Any, Union
from dataclasses import dataclass, field
from enum import Enum
import numpy as np
import pandas as pd
# FreqTrade imports
from freqtrade.strategy import IStrategy
from pandas import DataFrame
import freqtrade.vendor.qtpylib.indicators as qtpylib
from freqtrade.strategy import (DecimalParameter, IStrategy, IntParameter,
RealParameter, merge_informative_pair, stoploss_from_absolute,
stoploss_from_open, informative)
from freqtrade.persistence import Trade
# Context7 Super Powers imports
# from json_repair import repair_json # Disabled for cleaner version
# Context7 Super Powers: Tenacity integration
try:
from tenacity import (
retry, stop_after_attempt, stop_after_delay,
wait_exponential, wait_random_exponential,
retry_if_exception_type, retry_if_result,
before_log, after_log, Retrying, RetryError
)
TENACITY_AVAILABLE = True
logging.info("✅ Context7 Super Powers: Tenacity available")
except ImportError:
TENACITY_AVAILABLE = False
logging.warning("⚠️ Context7: Tenacity not available")
# ============================================================================
# CONTEXT7 SUPER POWERS: DATA MODELS
# ============================================================================
@dataclass
class MarketData:
"""Dati di mercato puliti e validati"""
pair: str
current_price: float
volume_24h: float
price_change_24h: float
volatility_20d: float
rsi_14: float
macd_signal: float
tenkan_sen: float
kijun_sen: float
senkou_span_a: float
senkou_span_b: float
chikou_span: float
volume_ratio: float
timestamp: datetime = field(default_factory=datetime.now)
@dataclass
class RegimeAnalysis:
"""Analisi regime LLM pulita"""
pair: str # Nome della coppia
regime: str # CALM, VOLATILE, TRENDING, SIDEWAYS
confidence: float # 0.0-1.0
volatility_level: str # LOW, MEDIUM, HIGH
trend_strength: float # 0.0-1.0
reasoning: str
timestamp: datetime = field(default_factory=datetime.now)
@dataclass
class DirectionalSignal:
"""Segnale di trading bidirezionale Context7-compliant"""
direction: str # LONG, SHORT, NEUTRAL
primary_score: float # 0.0-1.0, segnale primario
trend_weight: float # 0.0-2.0, peso basato su trend
quality_score: float # 0.0-1.0, qualità complessiva
entry_conditions: List[str] # Condizioni soddisfatte
ichimoku_position: str # ABOVE_CLOUD, INSIDE_CLOUD, BELOW_CLOUD
volume_confirmation: bool # Volume ratio validation
regime_compatibility: float # 0.0-1.0, compatibilità con regime
timestamp: datetime = field(default_factory=datetime.now)
@dataclass
class BidirectionalAnalysis:
"""Analisi bidirezionale completa Context7-compliant"""
pair: str
long_signal: DirectionalSignal
short_signal: DirectionalSignal
market_confidence: float # Confidence generale del mercato
best_direction: str # LONG, SHORT, NEUTRAL
best_quality_score: float # Miglior punteggio di qualità
recommendation: str # STRONG_LONG, LONG, NEUTRAL, SHORT, STRONG_SHORT
reasoning: str
timestamp: datetime = field(default_factory=datetime.now)
@dataclass
class BatchRequest:
"""Richiesta batch pulita"""
assets: List[MarketData]
analysis_type: str
timestamp: datetime = field(default_factory=datetime.now)
@dataclass
class BatchResponse:
"""Risposta batch pulita"""
results: List[RegimeAnalysis]
processing_time_ms: float
llm_model: str
timestamp: datetime = field(default_factory=datetime.now)
# ============================================================================
# CONTEXT7 SUPER POWERS: BATCH PROCESSOR (SINGLETON)
# ============================================================================
class Context7BatchProcessor:
"""
Context7 Super Powers: Batch Processor Singleton
Garantisce UNA SOLA chiamata LLM ogni 5 minuti per TUTTI gli asset
utilizzando pattern atomico e lock globale.
"""
_instance = None
_lock = False
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
# Context7 Super Powers: Stato globale atomico
self._global_lock = False
self._last_batch_minute = -1 # Minuto dell'ultimo batch (0-1439)
self._batch_cache = {} # Cache per minuto corrente
self._api_url = os.getenv('NANO_GPT_API_URL', 'https://nano-gpt.com/api/v1/chat/completions')
self._api_key = os.getenv('NANO_GPT_API_KEY')
if not self._api_key:
self.logger.warning("⚠️ Context7: NANO_GPT_API_KEY not found in environment variables")
self._enabled = False
self._model = "deepseek-ai/DeepSeek-V3.1" # BASE model ottimizzato
# Setup logging
self.logger = logging.getLogger("Context7BatchProcessor")
self._initialized = True
self.logger.info("🚀 Context7 Super Powers: Batch Processor initialized")
def _can_process_batch(self, current_minute: int) -> bool:
"""Verifica se può processare batch (solo una volta al minuto)"""
if self._last_batch_minute == current_minute:
self.logger.info(f"🚫 Context7: Batch già processato minuto {current_minute}")
return False
if self._global_lock:
self.logger.info(f"🚫 Context7: Batch lock attivo, attendo")
return False
return True
def _acquire_global_lock(self, current_minute: int) -> bool:
"""Acquisisce lock globale con check atomico"""
if self._can_process_batch(current_minute):
self._global_lock = True
self.logger.info(f"🔒 Context7: Lock globale acquisito minuto {current_minute}")
return True
return False
def _release_global_lock(self, current_minute: int, success: bool):
"""Rilascia lock globale e registra successo"""
self._global_lock = False
if success:
self._last_batch_minute = current_minute
self.logger.info(f"✅ Context7: Batch SUCCESS minuto {current_minute} registrato")
else:
self.logger.warning(f"❌ Context7: Batch FALLITO minuto {current_minute}")
def _validate_response_content(self, response_content: str) -> bool:
"""Context7 Super Powers: Validazione contenuto risposta LLM"""
if not response_content or response_content.strip() == "":
return False
# Check for common malformed response patterns
malformed_patterns = [
response_content.startswith("```json"),
response_content.startswith("```"),
response_content.endswith("```"),
"null" in response_content.lower(),
response_content.strip() == "{}",
response_content.strip() == "[]",
not any(c in response_content for c in ['{', '[', '"']) # No JSON structure
]
return not any(malformed_patterns)
def _parse_llm_response_enhanced(self, response_data: dict, pairs: List[str]) -> Optional[List[RegimeAnalysis]]:
"""Context7 Super Powers: Parsing migliorato con validazione multi-layer"""
try:
if 'choices' not in response_data or not response_data['choices']:
self.logger.error("❌ Context7: Nessuna choice nella risposta LLM")
return None
content = response_data['choices'][0]['message']['content']
if not content:
self.logger.error("❌ Context7: Content vuoto nella risposta LLM")
return None
# Context7 Enhanced: Pre-validate content before JSON parsing
if not self._validate_response_content(content):
self.logger.warning(f"⚠️ Context7: Content malformed, tentativo repair...")
# Multi-layer JSON repair with Context7 patterns
try:
parsed_data = json.loads(content)
except json.JSONDecodeError as e:
self.logger.warning(f"⚠️ Context7: JSON decode error: {str(e)[:100]}")
# Attempt 1: json_repair
try:
repaired_content = repair_json(content)
parsed_data = json.loads(repaired_content)
self.logger.info("✅ Context7: JSON repair successful")
except Exception:
# Attempt 2: Manual JSON extraction
self.logger.warning("⚠️ Context7: Tentando estrazione manuale JSON...")
parsed_data = self._extract_json_manually(content)
if not parsed_data:
self.logger.error("❌ Context7: Tutti i tentativi parsing falliti")
return None
if 'analysis' not in parsed_data:
self.logger.error("❌ Context7: Campo 'analysis' mancante")
return None
analysis_list = []
for item in parsed_data['analysis']:
if not all(key in item for key in ['pair', 'regime', 'confidence', 'reasoning']):
self.logger.warning(f"⚠️ Context7: Item incompleto: {item}")
continue
# Enhanced validation of regime values
valid_regimes = ['CALM', 'VOLATILE', 'TRENDING', 'SIDEWAYS']
if item.get('regime') not in valid_regimes:
self.logger.warning(f"⚠️ Context7: Regime non valido: {item.get('regime')}")
continue
analysis = RegimeAnalysis(
pair=item['pair'],
regime=item.get('regime', 'SIDEWAYS'),
confidence=float(item.get('confidence', 0.5)),
volatility_level=item.get('volatility_level', 'MEDIUM'),
trend_strength=float(item.get('trend_strength', 0.5)),
reasoning=item.get('reasoning', 'No reasoning provided')
)
analysis_list.append(analysis)
if len(analysis_list) != len(pairs):
self.logger.warning(f"⚠️ Context7: Analisi parziale: {len(analysis_list)}/{len(pairs)} asset")
return analysis_list
except Exception as e:
self.logger.error(f"❌ Context7: Errore parsing risposta: {str(e)[:200]}")
return None
def _extract_json_manually(self, content: str) -> Optional[dict]:
"""Context7 Super Powers: Estrazione manuale JSON da malformed responses"""
try:
# Find JSON structure boundaries
start_idx = content.find('{')
end_idx = content.rfind('}')
if start_idx != -1 and end_idx != -1 and start_idx < end_idx:
json_str = content[start_idx:end_idx + 1]
return json.loads(json_str)
# Try finding array structure
start_idx = content.find('[')
end_idx = content.rfind(']')
if start_idx != -1 and end_idx != -1 and start_idx < end_idx:
json_str = content[start_idx:end_idx + 1]
return json.loads(json_str)
return None
except Exception:
return None
@retry(
stop=stop_after_attempt(5) | stop_after_delay(240), # Aumentato a 5 tentativi o 4 minuti max
wait=wait_random_exponential(multiplier=2, max=45), # Aumentato max wait
retry=retry_if_exception_type((
requests.exceptions.RequestException,
requests.exceptions.Timeout,
requests.exceptions.ConnectionError,
requests.exceptions.HTTPError,
json.JSONDecodeError # Aggiunto parsing errors
)) | retry_if_result(lambda result: result is None), # Retry su risultati None
before=before_log(logging.getLogger("Context7Retry"), logging.INFO),
after=after_log(logging.getLogger("Context7Retry"), logging.WARNING),
reraise=True
)
def _call_llm_api(self, prompt: str) -> Optional[dict]:
"""Context7 Super Powers: Chiamata LLM con retry robusto migliorato"""
import requests
payload = {
"model": self._model,
"messages": [
{
"role": "system",
"content": "You are an expert cryptocurrency trading analyst. Provide structured JSON responses only. Always return valid JSON format."
},
{
"role": "user",
"content": prompt
}
],
"temperature": 0.2, # Ridotto per maggiore consistenza
"max_tokens": 1500, # Ridotto per prevenire troncamento
"response_format": {"type": "json_object"}
}
headers = {
'Authorization': f'Bearer {self._api_key}',
'Content-Type': 'application/json',
'User-Agent': 'Context7-Bidirectional-Strategy/1.0'
}
try:
response = requests.post(
self._api_url,
headers=headers,
json=payload,
timeout=120 # Timeout esteso per batch
)
# Enhanced response validation
response.raise_for_status()
# Validate response content before returning
response_data = response.json()
if response_data is None:
raise ValueError("Response data is None")
return response_data
except requests.exceptions.Timeout:
self.logger.error("❌ Context7: API timeout (120s)")
raise
except requests.exceptions.ConnectionError:
self.logger.error("❌ Context7: Connection error")
raise
except requests.exceptions.HTTPError as e:
self.logger.error(f"❌ Context7: HTTP error: {e.response.status_code}")
raise
except json.JSONDecodeError as e:
self.logger.error(f"❌ Context7: JSON decode error: {str(e)}")
raise
except Exception as e:
self.logger.error(f"❌ Context7: Unexpected error: {str(e)}")
raise
def _build_batch_prompt(self, market_data_list: List[MarketData]) -> str:
"""Costruisce prompt batch ottimizzato"""
prompt = """Analyze market regime for ALL cryptocurrencies and respond with valid JSON:
{
"analysis": [
{
"pair": "PAIR_NAME",
"regime": "CALM|VOLATILE|TRENDING|SIDEWAYS",
"confidence": 0.0-1.0,
"volatility_level": "LOW|MEDIUM|HIGH",
"trend_strength": 0.0-1.0,
"reasoning": "Brief analysis explaining regime classification"
}
]
}
MARKET DATA:
"""
for data in market_data_list:
prompt += f"""
{data.pair}:
- Price: ${data.current_price:.4f}
- 24h Change: {data.price_change_24h:+.2f}%
- Volume Ratio: {data.volume_ratio:.2f}
- RSI: {data.rsi_14:.1f}
- Volatility: {data.volatility_20d:.3f}
- MACD: {data.macd_signal:+.4f}
- Ichimoku: Tenkan={data.tenkan_sen:.4f}, Kijun={data.kijun_sen:.4f}
- Cloud Position: {self._get_cloud_position(data)}
"""
return prompt
def _get_cloud_position(self, data: MarketData) -> str:
"""Determina posizione rispetto a Ichimoku Cloud"""
if data.current_price > max(data.senkou_span_a, data.senkou_span_b):
return "ABOVE_CLOUD"
elif data.current_price < min(data.senkou_span_a, data.senkou_span_b):
return "BELOW_CLOUD"
else:
return "INSIDE_CLOUD"
def _parse_llm_response(self, response_data: dict, pairs: List[str]) -> Optional[List[RegimeAnalysis]]:
"""Parse LLM response con validazione robusta migliorata"""
return self._parse_llm_response_enhanced(response_data, pairs)
def _create_fallback_analysis(self, market_data: MarketData) -> RegimeAnalysis:
"""Crea analisi fallback basata su indicatori tecnici"""
# Logica fallback Context7-compliant
if market_data.rsi_14 > 70:
regime = "VOLATILE"
volatility_level = "HIGH"
reasoning = f"RSI overbought ({market_data.rsi_14:.1f})"
elif market_data.rsi_14 < 30:
regime = "TRENDING"
volatility_level = "MEDIUM"
reasoning = f"RSI oversold ({market_data.rsi_14:.1f})"
elif abs(market_data.price_change_24h) > 5:
regime = "VOLATILE"
volatility_level = "HIGH"
reasoning = f"High 24h volatility ({market_data.price_change_24h:+.1f}%)"
else:
regime = "CALM"
volatility_level = "LOW"
reasoning = f"Stable conditions (RSI: {market_data.rsi_14:.1f})"
return RegimeAnalysis(
pair=market_data.pair,
regime=regime,
confidence=0.7, # Confidence alta per analisi tecnica
volatility_level=volatility_level,
trend_strength=0.6,
reasoning=reasoning
)
def process_batch(self, market_data_list: List[MarketData]) -> Optional[BatchResponse]:
"""
Context7 Super Powers: Processo batch atomico
È l'UNICO punto di accesso per tutte le chiamate LLM.
Garantisce una sola chiamata ogni 5 minuti per TUTTI gli asset.
"""
current_time = datetime.now()
current_minute = current_time.hour * 60 + current_time.minute
# Context7 Super Powers: Check atomico
if not self._acquire_global_lock(current_minute):
# Tenta di usare cache se disponibile
cache_key = f"batch_{current_minute}"
if cache_key in self._batch_cache:
self.logger.info(f"📋 Context7: Uso cache batch minuto {current_minute}")
return self._batch_cache[cache_key]
# Fallback se nessuna cache disponibile
self.logger.warning(f"⚠️ Context7: Nessuna cache, creo fallback")
fallback_results = [self._create_fallback_analysis(data) for data in market_data_list]
return BatchResponse(
results=fallback_results,
processing_time_ms=0,
llm_model="FALLBACK",
timestamp=current_time
)
start_time = time.time()
self.logger.info(f"🚀 Context7: START BATCH PROCESSING - {len(market_data_list)} assets")
try:
# Costruisci prompt
prompt = self._build_batch_prompt(market_data_list)
# Chiama LLM con Tenacity
response_data = self._call_llm_api(prompt)
# Parse risposta
analysis_results = self._parse_llm_response(response_data, [d.pair for d in market_data_list])
if analysis_results:
processing_time = (time.time() - start_time) * 1000
batch_response = BatchResponse(
results=analysis_results,
processing_time_ms=processing_time,
llm_model=self._model,
timestamp=current_time
)
# Cache per altri asset
cache_key = f"batch_{current_minute}"
self._batch_cache[cache_key] = batch_response
# Cleanup cache vecchio
self._cleanup_cache(current_minute)
self.logger.info(f"✅ Context7: BATCH SUCCESS - {len(analysis_results)} asset in {processing_time:.0f}ms")
# Rilascia lock con successo
self._release_global_lock(current_minute, True)
return batch_response
else:
# Fallback se parsing fallito
self.logger.warning("⚠️ Context7: Parsing fallito, uso fallback")
fallback_results = [self._create_fallback_analysis(data) for data in market_data_list]
self._release_global_lock(current_minute, False)
return BatchResponse(
results=fallback_results,
processing_time_ms=(time.time() - start_time) * 1000,
llm_model="FALLBACK",
timestamp=current_time
)
except Exception as e:
self.logger.error(f"❌ Context7: Errore batch processing: {e}")
# Fallback finale
fallback_results = [self._create_fallback_analysis(data) for data in market_data_list]
self._release_global_lock(current_minute, False)
return BatchResponse(
results=fallback_results,
processing_time_ms=(time.time() - start_time) * 1000,
llm_model="FALLBACK_ERROR",
timestamp=current_time
)
def _cleanup_cache(self, current_minute: int):
"""Cleanup cache vecchio (mantiene solo ultimi 10 minuti)"""
keys_to_remove = []
for key in self._batch_cache:
try:
key_minute = int(key.split('_')[1])
if abs(current_minute - key_minute) > 10:
keys_to_remove.append(key)
except:
keys_to_remove.append(key)
for key in keys_to_remove:
del self._batch_cache[key]
if keys_to_remove:
self.logger.info(f"🧹 Context7: Cleanup cache rimosso {len(keys_to_remove)} entry")
# ============================================================================
# CONTEXT7 SUPER POWERS: MAIN STRATEGY
# ============================================================================
class IchimokuEnhancedV8092(IStrategy):
"""
🚀 CONTEXT7 SUPER POWERS ENHANCED V8092 - Production Ready
Architettura ottimizzata con:
- FreqTrade 2025.9.1 compliance fixes
- Leverage conservativo per safety (max 3x)
- ROI targets realistici (max 15%)
- Trailing stop robust senza bug
- Multi-timeframe Ichimoku + filtri intelligenti
OTTIMIZZAZIONI V2:
- Stop loss iniziale hard-coded al -2% prima del trailing
- Trailing take profit con cuscinetto dinamico: 25% sotto 7.5%, 15% sopra 7.5%
- WebSocket timeout estesi per stabilità connessione
- Database SQLite dedicato per isolamento completo
- Custom exit signals disabilitati per massimo rendimento trailing
Timeframes: 5m, 15m, 30m, 1h, 1d (6 assets × 5 timeframes = 30 max positions)
Dynamic Leverage: 1x-10x con asset-specific limits
"""
# Interface version
INTERFACE_VERSION = 3
# Strategy metadata
timeframe = '5m'
startup_candle_count: int = 200
process_only_new_candles = True
# 🔥 CONTEXT7 SUPER POWERS: NATIVE COMPLIANCE CON PROTEZIONE BASE
# ⚠️ CONTEX7 REQUIREMENT: stoploss obbligatorio per FreqTrade!
# Ma usiamo valore molto conservativo solo come fallback estremo
# Context7: stoploss come HARD LIMIT estremo, non per trading normale
# custom_stoploss gestirà trailing take profit profittevoli
stoploss = -0.10 # -10% solo come hard limit estremo (mai usato in normale trading)
# Enable custom stoploss (FreqTrade compliance)
use_custom_stoploss = True
# Enable short trading (CRITICAL for bidirectional strategy)
can_short = True
# Trailing stop
trailing_stop = True
trailing_stop_positive = 0.02
trailing_stop_positive_offset = 0.03
trailing_only_offset_is_reached = True
# ROI table - Context7 Enhanced Conservative
minimal_roi = {
"0": 0.15, # 15% at close ( realistic max )
"15": 0.10, # 10% after 15 minutes
"30": 0.07, # 7% after 30 minutes
"60": 0.05 # 5% after 60 minutes
}
# Trade settings
order_types = {
'entry': 'limit',
'exit': 'limit',
'stoploss': 'market',
'stoploss_on_exchange': False,
'stoploss_on_exchange_interval': 60,
'take_profit': 'limit',
'take_profit_on_exchange': False,
'trailing_stop': 'market',
'trailing_stop_on_exchange': False,
}
# Order time in force
order_time_in_force = {
'entry': 'gtc',
'exit': 'gtc',
}
def __init__(self, config: dict) -> None:
"""Initialize Context7 Super Powers strategy - BASIC MODE (No LLM)"""
super().__init__(config)
# Context7 Super Powers: DISABLED LLM Batch Processor
# self.batch_processor = Context7BatchProcessor() # DISABLED - nano-gpt.com connection issues
self.batch_processor = None
# Enable basic technical analysis mode
self.basic_mode = True
# 🛡️ V3 ANTI-REVENGE TRADING: Sistema raffreddamento post-trade profittevole
self._trade_cooldown = {} # {pair: {exit_timestamp, direction, exit_type, profit}}
self._cooldown_minutes = 10 # 10 minuti di raffreddamento per trade profittevoli (ottimizzato 5m)
self._profit_threshold = 0.005 # 0.5% profit minimo per attivare raffreddamento
# Cache locale per risultati
self.local_cache = {}
self.cache_ttl = 300 # 5 minuti
# Logging
self.logger = logging.getLogger("IchimokuLLMEnhancedV1Clean")
self.logger.info("🚀 Context7 Super Powers: Strategy initialized - BASIC MODE (No LLM)")
self.logger.info("⚠️ LLM API disabled due to nano-gpt.com connection issues")
self.logger.info("✅ Using pure technical analysis mode")
def informative_pairs(self):
"""
Context7 Super Powers: Multi-timeframe informative pairs definition.
Returns the list of informative pairs for multi-timeframe analysis:
- 15m, 30m, 1h, 1d timeframes for all current pairs
- This allows 6 positions per asset (one per timeframe)
"""
pairs = self.dp.current_whitelist()
informative_pairs = []
# Add higher timeframes for each pair
for pair in pairs:
informative_pairs.extend([
(pair, '15m'),
(pair, '30m'),
(pair, '1h'),
(pair, '1d')
])
self.logger.info(f"📊 Multi-timeframe informative pairs: {len(informative_pairs)} total")
return informative_pairs
@informative('15m')
def populate_indicators_15m(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""Context7 Super Powers: Populate indicators for 15m timeframe"""
# Calculate Ichimoku Cloud for 15m
high_9 = dataframe['high'].rolling(window=9).max()
low_9 = dataframe['low'].rolling(window=9).min()
high_26 = dataframe['high'].rolling(window=26).max()
low_26 = dataframe['low'].rolling(window=26).min()
high_52 = dataframe['high'].rolling(window=52).max()
low_52 = dataframe['low'].rolling(window=52).min()
dataframe['tenkan_sen_15m'] = (high_9 + low_9) / 2
dataframe['kijun_sen_15m'] = (high_26 + low_26) / 2
dataframe['senkou_span_a_15m'] = ((dataframe['tenkan_sen_15m'] + dataframe['kijun_sen_15m']) / 2).shift(26)
dataframe['senkou_span_b_15m'] = ((high_52 + low_52) / 2).shift(26)
dataframe['chikou_span_15m'] = dataframe['close'].shift(-26)
# RSI for 15m
delta = dataframe['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / loss
dataframe['rsi_15m'] = 100 - (100 / (1 + rs))
return dataframe
@informative('30m')
def populate_indicators_30m(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""Context7 Super Powers: Populate indicators for 30m timeframe"""
# Calculate Ichimoku Cloud for 30m
high_9 = dataframe['high'].rolling(window=9).max()
low_9 = dataframe['low'].rolling(window=9).min()
high_26 = dataframe['high'].rolling(window=26).max()
low_26 = dataframe['low'].rolling(window=26).min()
high_52 = dataframe['high'].rolling(window=52).max()
low_52 = dataframe['low'].rolling(window=52).min()
dataframe['tenkan_sen_30m'] = (high_9 + low_9) / 2
dataframe['kijun_sen_30m'] = (high_26 + low_26) / 2
dataframe['senkou_span_a_30m'] = ((dataframe['tenkan_sen_30m'] + dataframe['kijun_sen_30m']) / 2).shift(26)
dataframe['senkou_span_b_30m'] = ((high_52 + low_52) / 2).shift(26)
dataframe['chikou_span_30m'] = dataframe['close'].shift(-26)
# RSI for 30m
delta = dataframe['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / loss
dataframe['rsi_30m'] = 100 - (100 / (1 + rs))
return dataframe
@informative('1h')
def populate_indicators_1h(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""Context7 Super Powers: Populate indicators for 1h timeframe"""
# Calculate Ichimoku Cloud for 1h
high_9 = dataframe['high'].rolling(window=9).max()
low_9 = dataframe['low'].rolling(window=9).min()
high_26 = dataframe['high'].rolling(window=26).max()
low_26 = dataframe['low'].rolling(window=26).min()
high_52 = dataframe['high'].rolling(window=52).max()
low_52 = dataframe['low'].rolling(window=52).min()
dataframe['tenkan_sen_1h'] = (high_9 + low_9) / 2
dataframe['kijun_sen_1h'] = (high_26 + low_26) / 2
dataframe['senkou_span_a_1h'] = ((dataframe['tenkan_sen_1h'] + dataframe['kijun_sen_1h']) / 2).shift(26)
dataframe['senkou_span_b_1h'] = ((high_52 + low_52) / 2).shift(26)
dataframe['chikou_span_1h'] = dataframe['close'].shift(-26)
# RSI for 1h
delta = dataframe['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / loss
dataframe['rsi_1h'] = 100 - (100 / (1 + rs))
return dataframe
@informative('1d')
def populate_indicators_1d(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""Context7 Super Powers: Populate indicators for 1d timeframe"""
# Calculate Ichimoku Cloud for 1d
high_9 = dataframe['high'].rolling(window=9).max()
low_9 = dataframe['low'].rolling(window=9).min()
high_26 = dataframe['high'].rolling(window=26).max()
low_26 = dataframe['low'].rolling(window=26).min()
high_52 = dataframe['high'].rolling(window=52).max()
low_52 = dataframe['low'].rolling(window=52).min()
dataframe['tenkan_sen_1d'] = (high_9 + low_9) / 2
dataframe['kijun_sen_1d'] = (high_26 + low_26) / 2
dataframe['senkou_span_a_1d'] = ((dataframe['tenkan_sen_1d'] + dataframe['kijun_sen_1d']) / 2).shift(26)
dataframe['senkou_span_b_1d'] = ((high_52 + low_52) / 2).shift(26)
dataframe['chikou_span_1d'] = dataframe['close'].shift(-26)
# RSI for 1d
delta = dataframe['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / loss
dataframe['rsi_1d'] = 100 - (100 / (1 + rs))
return dataframe
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""Populate technical indicators"""
# Ichimoku Cloud
high_9 = dataframe['high'].rolling(window=9).max()
low_9 = dataframe['low'].rolling(window=9).min()
high_26 = dataframe['high'].rolling(window=26).max()
low_26 = dataframe['low'].rolling(window=26).min()
high_52 = dataframe['high'].rolling(window=52).max()
low_52 = dataframe['low'].rolling(window=52).min()
dataframe['tenkan_sen'] = (high_9 + low_9) / 2
dataframe['kijun_sen'] = (high_26 + low_26) / 2
dataframe['senkou_span_a'] = ((dataframe['tenkan_sen'] + dataframe['kijun_sen']) / 2).shift(26)
dataframe['senkou_span_b'] = ((high_52 + low_52) / 2).shift(26)
dataframe['chikou_span'] = dataframe['close'].shift(-26)
# RSI
delta = dataframe['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / loss
dataframe['rsi'] = 100 - (100 / (1 + rs))
# MACD
exp1 = dataframe['close'].ewm(span=12).mean()
exp2 = dataframe['close'].ewm(span=26).mean()
dataframe['macd'] = exp1 - exp2
dataframe['macdsignal'] = dataframe['macd'].ewm(span=9).mean()
dataframe['macdhist'] = dataframe['macd'] - dataframe['macdsignal']
# Additional indicators with SMART volume calculation
dataframe['sma_fast'] = dataframe['close'].rolling(window=10).mean()
dataframe['sma_slow'] = dataframe['close'].rolling(window=30).mean()
# ATR (Average True Range) for dynamic stoploss - Context7 Super Powers optimized for 5m timeframe
dataframe['tr1'] = dataframe['high'] - dataframe['low']
dataframe['tr2'] = abs(dataframe['high'] - dataframe['close'].shift())
dataframe['tr3'] = abs(dataframe['low'] - dataframe['close'].shift())
dataframe['tr'] = dataframe[['tr1', 'tr2', 'tr3']].max(axis=1)
dataframe['atr'] = dataframe['tr'].rolling(window=7).mean() # ATR=7 ottimizzato per 5m timeframe
# ATR percentage for volatility filtering (Context7 optimized)
dataframe['atr_pct'] = dataframe['atr'] / dataframe['close'] * 100
# CMF (Chaikin Money Flow) for volume analysis (NotebookLM best practice for BTC/ETH)
# Money Flow Multiplier
mfm = ((dataframe['close'] - dataframe['low']) - (dataframe['high'] - dataframe['close'])) / (dataframe['high'] - dataframe['low'])
# Money Flow Volume
mfv = mfm * dataframe['volume']
# CMF (21-period sum of MFV / 21-period sum of Volume)
dataframe['cmf'] = mfv.rolling(window=21).sum() / dataframe['volume'].rolling(window=21).sum()
# Smart Volume Baseline - FIXED to prevent baseline drift
# Use period optimized for 5m timeframe - Context7 Super Powers
dataframe['volume_mean'] = dataframe['volume'].rolling(window=14).mean() # 14 periodi = 70min per 5m
dataframe['volume_70th_percentile'] = dataframe['volume'].rolling(window=14).quantile(0.70)
# Smart ratio: use 70th percentile as baseline (more robust than mean)
dataframe['volume_ratio'] = dataframe['volume'] / dataframe['volume_70th_percentile']
# Alternative: hybrid approach (mean + percentile)
dataframe['volume_smart_mean'] = (dataframe['volume_mean'] * 0.6) + (dataframe['volume_70th_percentile'] * 0.4)
dataframe['volume_smart_ratio'] = dataframe['volume'] / dataframe['volume_smart_mean']
# 🛡️ Context7 Super Powers: Market regime detection e filtri volatilità
# SMA 200 per regime detection (16.7 ore su 5m timeframe)
dataframe['sma_200'] = dataframe['close'].rolling(window=200).mean()
dataframe['price_distance_sma200'] = (
(dataframe['close'] - dataframe['sma_200']) / dataframe['close'] * 100
)
# 📊 Context7: Filtri volatilità ottimizzati per 5m crypto
# ATR boundaries: 0.5% - 3.0% range ottimale per timeframe 5m
dataframe['volatility_ok'] = (
(dataframe['atr_pct'] >= 0.5) &
(dataframe['atr_pct'] <= 3.0)
)
# 📊 Context7: Volume filter migliorato (1.2x - 5.0x media)
dataframe['volume_ok'] = (
(dataframe['volume'] > dataframe['volume_mean'] * 1.2) &
(dataframe['volume'] < dataframe['volume_mean'] * 5.0)
)
# 📊 Context7: Market regime filter (evita mercati laterali)
dataframe['lateral_market'] = (
(abs(dataframe['price_distance_sma200']) < 1.0) &
(dataframe['rsi'] > 40) & (dataframe['rsi'] < 60)
)
return dataframe
def _get_market_data(self, dataframe: DataFrame, metadata: dict) -> Optional[MarketData]:
"""Estrae dati di mercato puliti dal dataframe"""
try:
if len(dataframe) < 50:
return None
last = dataframe.iloc[-1]
prev = dataframe.iloc[-2]
# Calcolo volatilità
returns = dataframe['close'].pct_change().dropna()
volatility = returns.rolling(window=20).std().iloc[-1] * np.sqrt(365)
return MarketData(
pair=metadata['pair'],
current_price=float(last['close']),
volume_24h=float(last['volume']),
price_change_24h=((last['close'] - prev['close']) / prev['close']) * 100,
volatility_20d=float(volatility),
rsi_14=float(last['rsi']),
macd_signal=float(last['macd']),
tenkan_sen=float(last['tenkan_sen']),
kijun_sen=float(last['kijun_sen']),
senkou_span_a=float(last['senkou_span_a']),
senkou_span_b=float(last['senkou_span_b']),
chikou_span=float(last['chikou_span']),
volume_ratio=float(last['volume_ratio'])
)
except Exception as e:
self.logger.error(f"❌ Error extracting market data for {metadata['pair']}: {e}")
return None
def _analyze_directional_signals(self, dataframe: DataFrame, market_data: MarketData, regime: RegimeAnalysis) -> Tuple[DirectionalSignal, DirectionalSignal]:
"""
Context7 Super Powers: Analisi segnali bidirezionali con quality scoring
Implementa pattern Context7-compliant per:
- Primary signal detection (long/short)
- Trend-aware quality scoring
- Hierarchical filtering system
- Volume e regime validation
"""
last = dataframe.iloc[-1]
# Context7: Ichimoku position analysis
if last['close'] > max(last['senkou_span_a'], last['senkou_span_b']):
ichimoku_position = "ABOVE_CLOUD"
cloud_bias = 1.0 # Strong bullish bias
elif last['close'] < min(last['senkou_span_a'], last['senkou_span_b']):
ichimoku_position = "BELOW_CLOUD"
cloud_bias = -1.0 # Strong bearish bias
else:
ichimoku_position = "INSIDE_CLOUD"
cloud_bias = 0.0 # Neutral bias
# Context7 Super Powers: Dynamic volume confirmation based on volatility
atr_value = last.get('atr', 0.01)
price = last['close']
volatility_pct = (atr_value / price) * 100
# Dynamic threshold based on market volatility (Context7 Super Powers: less strict thresholds)
if volatility_pct > 3.0: # High volatility
dynamic_threshold = 0.50
elif volatility_pct > 1.5: # Medium volatility
dynamic_threshold = 0.55
else: # Low volatility
dynamic_threshold = 0.60
volume_confirmation = last['volume_smart_ratio'] > dynamic_threshold
# Context7 Super Powers: Debug logging for volume analysis
# Note: pair info will be added in calling method
volume_weight = min(last['volume_smart_ratio'], 2.0) # Cap at 2.0
# Analyze LONG signal
long_signal = self._analyze_single_direction(
direction="LONG",
dataframe=dataframe,
market_data=market_data,
regime=regime,
ichimoku_position=ichimoku_position,
cloud_bias=cloud_bias,
volume_confirmation=volume_confirmation,
volume_weight=volume_weight,
last=last
)
# Analyze SHORT signal
short_signal = self._analyze_single_direction(
direction="SHORT",
dataframe=dataframe,
market_data=market_data,
regime=regime,
ichimoku_position=ichimoku_position,
cloud_bias=cloud_bias,
volume_confirmation=volume_confirmation,
volume_weight=volume_weight,
last=last
)
return long_signal, short_signal
def _analyze_single_direction(self, direction: str, dataframe: DataFrame, market_data: MarketData,
regime: RegimeAnalysis, ichimoku_position: str, cloud_bias: float,
volume_confirmation: bool, volume_weight: float, last) -> DirectionalSignal:
"""
Context7 Super Powers: Analisi segnale singolo con quality scoring gerarchico
Implementa best practice da Machine Learning for Trading:
- Signal strength evaluation
- Multiple factor confirmation
- Trend-aware weighting
- Hierarchical quality thresholds
"""
entry_conditions = []
primary_score = 0.0
if direction == "LONG":
# Primary signal: Ichimoku bullish conditions
if last['tenkan_sen'] > last['kijun_sen']:
primary_score += 0.3
entry_conditions.append("Tenkan above Kijun")
if ichimoku_position == "ABOVE_CLOUD":
primary_score += 0.4
entry_conditions.append("Price above cloud")
elif ichimoku_position == "INSIDE_CLOUD" and cloud_bias > 0:
primary_score += 0.2
entry_conditions.append("Price inside cloud with bullish bias")
# Context7: Momentum confirmation
if last['macd'] > last['macdsignal']:
primary_score += 0.2
entry_conditions.append("MACD bullish")
# Context7: RSI conditions (regime-aware)
if regime.regime == 'CALM' and last['rsi'] < 40:
primary_score += 0.15
entry_conditions.append("RSI oversold in calm market")
elif regime.regime == 'TRENDING' and 40 < last['rsi'] < 70:
primary_score += 0.1
entry_conditions.append("RSI in trend range")
elif regime.regime == 'VOLATILE' and last['rsi'] < 35:
primary_score += 0.1
entry_conditions.append("RSI oversold in volatile market")
# Context7: Moving average confirmation
if last['close'] > last['sma_fast'] and last['sma_fast'] > last['sma_slow']:
primary_score += 0.15
entry_conditions.append("Price above MAs")
# Context7: Volume confirmation
if volume_confirmation:
primary_score += 0.1 * volume_weight
entry_conditions.append(f"Volume confirmed (ratio: {last['volume_smart_ratio']:.2f})")
else: # SHORT
# Primary signal: Ichimoku bearish conditions
if last['tenkan_sen'] < last['kijun_sen']:
primary_score += 0.3
entry_conditions.append("Tenkan below Kijun")
if ichimoku_position == "BELOW_CLOUD":
primary_score += 0.4
entry_conditions.append("Price below cloud")
elif ichimoku_position == "INSIDE_CLOUD" and cloud_bias < 0:
primary_score += 0.2
entry_conditions.append("Price inside cloud with bearish bias")
# Context7: Momentum confirmation
if last['macd'] < last['macdsignal']:
primary_score += 0.2
entry_conditions.append("MACD bearish")
# Context7: RSI conditions (regime-aware)
if regime.regime == 'CALM' and last['rsi'] > 60:
primary_score += 0.15
entry_conditions.append("RSI overbought in calm market")
elif regime.regime == 'TRENDING' and 30 < last['rsi'] < 60:
primary_score += 0.1
entry_conditions.append("RSI in trend range")
elif regime.regime == 'VOLATILE' and last['rsi'] > 65:
primary_score += 0.1
entry_conditions.append("RSI overbought in volatile market")
# Context7: Moving average confirmation
if last['close'] < last['sma_fast'] and last['sma_fast'] < last['sma_slow']:
primary_score += 0.15
entry_conditions.append("Price below MAs")
# Context7: Volume confirmation
if volume_confirmation:
primary_score += 0.1 * volume_weight
entry_conditions.append(f"Volume confirmed (ratio: {last['volume_smart_ratio']:.2f})")
# Context7: Calculate trend weight (regime-aware)
trend_weight = 1.0 # Base weight
if regime.regime == 'TRENDING':
trend_weight = 2.0 if direction == "LONG" and regime.trend_strength > 0.6 else 1.5
elif regime.regime == 'VOLATILE':
trend_weight = 0.8 # Reduce weight in volatile markets
elif regime.regime == 'CALM':
trend_weight = 1.2 # Slight increase in calm markets
# Context7: Calculate regime compatibility
regime_compatibility = self._calculate_regime_compatibility(direction, regime, last)
# Context7: Calculate final quality score (hierarchical)
# Pattern from Machine Learning for Trading: multi-factor scoring
quality_score = (
primary_score * 0.4 + # Primary signals (40%)
trend_weight * 0.2 + # Trend awareness (20%)
regime_compatibility * 0.2 + # Regime compatibility (20%)
(volume_weight / 2.0) * 0.2 # Volume confirmation (20%)
)
# Cap quality score at 1.0
quality_score = min(quality_score, 1.0)
return DirectionalSignal(
direction=direction,
primary_score=min(primary_score, 1.0),
trend_weight=trend_weight,
quality_score=quality_score,
entry_conditions=entry_conditions,
ichimoku_position=ichimoku_position,
volume_confirmation=volume_confirmation,
regime_compatibility=regime_compatibility
)
def _calculate_regime_compatibility(self, direction: str, regime: RegimeAnalysis, last) -> float:
"""
Context7 Super Powers: Calcolo compatibilità regime basato su best practice
Implementa pattern Context7-compliant per regime-based signal filtering
"""
base_compatibility = 0.5 # Base compatibility
if regime.regime == 'TRENDING':
# Trending markets favor the dominant direction
if direction == "LONG" and last['close'] > last['sma_slow']:
base_compatibility = 0.9
elif direction == "SHORT" and last['close'] < last['sma_slow']:
base_compatibility = 0.9
else:
base_compatibility = 0.3 # Against trend in trending market
elif regime.regime == 'CALM':
# Calm markets good for both directions with proper setup
base_compatibility = 0.8
# RSI positioning in calm markets
if direction == "LONG" and last['rsi'] < 45:
base_compatibility = 0.9
elif direction == "SHORT" and last['rsi'] > 55:
base_compatibility = 0.9
elif regime.regime == 'VOLATILE':
# Volatile markets require extra confirmation
base_compatibility = 0.6
# Strong momentum required in volatile markets
macd_strength = abs(last['macd'] - last['macdsignal'])
if macd_strength > 0.001: # Strong MACD divergence
base_compatibility = 0.8
elif regime.regime == 'SIDEWAYS':
# Sideways markets favor mean reversion
if direction == "LONG" and last['rsi'] < 35:
base_compatibility = 0.8
elif direction == "SHORT" and last['rsi'] > 65:
base_compatibility = 0.8
else:
base_compatibility = 0.4
# Adjust for regime confidence
adjusted_compatibility = base_compatibility * regime.confidence
return min(adjusted_compatibility, 1.0)
def _get_bidirectional_analysis(self, dataframe: DataFrame, market_data: MarketData, regime: RegimeAnalysis) -> BidirectionalAnalysis:
"""
Context7 Super Powers: Analisi bidirezionale completa con raccomandazione
Implementa pattern NautilusTrader per position management decision
"""
# Get directional signals
long_signal, short_signal = self._analyze_directional_signals(dataframe, market_data, regime)
# Market confidence based on regime and signal quality
market_confidence = regime.confidence * max(long_signal.quality_score, short_signal.quality_score)
# Determine best direction and recommendation
if long_signal.quality_score > short_signal.quality_score + 0.1: # Significant margin
best_direction = "LONG"
best_quality_score = long_signal.quality_score
if long_signal.quality_score > 0.8:
recommendation = "STRONG_LONG"
elif long_signal.quality_score > 0.6:
recommendation = "LONG"
else:
recommendation = "NEUTRAL"
elif short_signal.quality_score > long_signal.quality_score + 0.1: # Significant margin
best_direction = "SHORT"
best_quality_score = short_signal.quality_score
if short_signal.quality_score > 0.8:
recommendation = "STRONG_SHORT"
elif short_signal.quality_score > 0.6:
recommendation = "SHORT"
else:
recommendation = "NEUTRAL"
else:
best_direction = "NEUTRAL"
best_quality_score = max(long_signal.quality_score, short_signal.quality_score)
recommendation = "NEUTRAL"
# Generate reasoning
if recommendation in ["STRONG_LONG", "LONG"]:
reasoning = f"Bullish setup: {', '.join(long_signal.entry_conditions[:3])} | Quality: {long_signal.quality_score:.2f}"
elif recommendation in ["STRONG_SHORT", "SHORT"]:
reasoning = f"Bearish setup: {', '.join(short_signal.entry_conditions[:3])} | Quality: {short_signal.quality_score:.2f}"
else:
reasoning = f"Neutral/unclear: Long({long_signal.quality_score:.2f}) vs Short({short_signal.quality_score:.2f})"
return BidirectionalAnalysis(
pair=market_data.pair,
long_signal=long_signal,
short_signal=short_signal,
market_confidence=market_confidence,
best_direction=best_direction,
best_quality_score=best_quality_score,
recommendation=recommendation,
reasoning=reasoning
)
def _get_regime_analysis(self, market_data: MarketData) -> RegimeAnalysis:
"""Basic regime analysis without LLM API"""
# Check cache locale
cache_key = f"{market_data.pair}_{market_data.timestamp.minute}"
if cache_key in self.local_cache:
cached_time = self.local_cache[cache_key]['timestamp']
if (datetime.now() - cached_time).total_seconds() < self.cache_ttl:
return self.local_cache[cache_key]['analysis']
# Basic Technical Analysis for Regime Detection
# Determine volatility level
if market_data.volatility_20d < 0.015:
volatility_level = "LOW"
elif market_data.volatility_20d < 0.03:
volatility_level = "MEDIUM"
else:
volatility_level = "HIGH"
# Determine regime based on multiple factors
regime_score = 0.0
# Price movement factor
if abs(market_data.price_change_24h) < 1:
regime_score += 0.3 # Calm
elif abs(market_data.price_change_24h) > 3:
regime_score += 0.1 # Too volatile
else:
regime_score += 0.2 # Moderate
# RSI factor
if 40 < market_data.rsi_14 < 60:
regime_score += 0.3 # Neutral/calm
elif market_data.rsi_14 > 70 or market_data.rsi_14 < 30:
regime_score += 0.0 # Extreme conditions
else:
regime_score += 0.2 # Moderate
# Volume factor
if 0.8 < market_data.volume_ratio < 1.5:
regime_score += 0.2 # Normal volume
elif market_data.volume_ratio > 2:
regime_score += 0.0 # Too high volume
else:
regime_score += 0.1 # Low volume
# Determine regime
if regime_score >= 0.7:
regime = "CALM"
confidence = 0.8
elif regime_score >= 0.5:
regime = "TRENDING"
confidence = 0.6
elif regime_score >= 0.3:
regime = "SIDEWAYS"
confidence = 0.5
else:
regime = "VOLATILE"
confidence = 0.4
# Calculate trend strength based on price consistency
trend_strength = min(0.9, abs(market_data.price_change_24h) / 5)
# Generate reasoning
reasoning = f"Basic regime analysis: {regime} (volatility: {volatility_level}, price_change: {market_data.price_change_24h:.1f}%, RSI: {market_data.rsi_14:.0f})"
# Create regime analysis
analysis = RegimeAnalysis(
pair=market_data.pair,
regime=regime,
confidence=confidence,
volatility_level=volatility_level,
trend_strength=trend_strength,
reasoning=reasoning
)
# Cache locale
self.local_cache[cache_key] = {
'analysis': analysis,
'timestamp': datetime.now()
}
return analysis
def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""
Context7 Super Powers: Popola segnali LONG usando sistema bidirezionale
Implementa pattern Context7-compliant con hierarchical filtering e quality scoring
"""
dataframe.loc[:, 'enter_long'] = 0
dataframe.loc[:, 'enter_short'] = 0
dataframe.loc[:, 'enter_tag'] = ''
if len(dataframe) < 50:
return dataframe
# Get market data and regime analysis
market_data = self._get_market_data(dataframe, metadata)
if not market_data:
return dataframe
regime = self._get_regime_analysis(market_data)
# Context7 Super Powers: Get bidirectional analysis
bidirectional = self._get_bidirectional_analysis(dataframe, market_data, regime)
# Context7: Hierarchical filtering system
# Solo segnali LONG con alta qualità passano il filtro
if bidirectional.recommendation in ["STRONG_LONG", "LONG"]:
# Context7: Multi-layer validation
quality_threshold = 0.6 # Base threshold
# Threshold più alto per mercati volatili
if regime.regime == 'VOLATILE':
quality_threshold = 0.7
# Threshold più basso per mercati trending con forte trend
elif regime.regime == 'TRENDING' and regime.trend_strength > 0.7:
quality_threshold = 0.5
if bidirectional.long_signal.quality_score >= quality_threshold:
# Context7 Super Powers: Enhanced validation with ATR, volume e regime filters
if (bidirectional.long_signal.volume_confirmation and
bidirectional.long_signal.regime_compatibility > 0.3 and
dataframe['volatility_ok'].iloc[-1] and # ATR 0.5%-3.0%
dataframe['volume_ok'].iloc[-1] and # Volume 1.2x-5.0x media
not dataframe['lateral_market'].iloc[-1]): # No lateral markets
dataframe.iloc[-1, dataframe.columns.get_loc('enter_long')] = 1
dataframe.iloc[-1, dataframe.columns.get_loc('enter_tag')] = 'context7_long_optimized'
self.logger.info(f"🟢 BUY {metadata['pair']} - CONTEXT7 SUPER POWERS OPTIMIZED:")
self.logger.info(f" Recommendation: {bidirectional.recommendation}")
self.logger.info(f" Quality Score: {bidirectional.long_signal.quality_score:.3f} (threshold: {quality_threshold})")
self.logger.info(f" Primary Score: {bidirectional.long_signal.primary_score:.3f}")
self.logger.info(f" Trend Weight: {bidirectional.long_signal.trend_weight:.2f}")
self.logger.info(f" Regime Compatibility: {bidirectional.long_signal.regime_compatibility:.2f}")
self.logger.info(f" 🛡️ ATR Filter: {dataframe['atr_pct'].iloc[-1]:.2f}% (✅ 0.5%-3.0%)")
self.logger.info(f" 📊 Volume Filter: {dataframe['volume_smart_ratio'].iloc[-1]:.2f}x (✅ 1.2x-5.0x)")
self.logger.info(f" 🚫 Lateral Market: {dataframe['lateral_market'].iloc[-1]} (✅ False)")
self.logger.info(f" Entry Conditions: {', '.join(bidirectional.long_signal.entry_conditions)}")
self.logger.info(f" Ichimoku Position: {bidirectional.long_signal.ichimoku_position}")
self.logger.info(f" Market Regime: {regime.regime} (confidence: {regime.confidence:.2f})")
self.logger.info(f" Reasoning: {bidirectional.reasoning}")
else:
self.logger.info(f"🚫 BUY {metadata['pair']} - CONTEXT7 SUPER POWERS FILTERS BLOCKED:")
self.logger.info(f" Volume OK: {bidirectional.long_signal.volume_confirmation}")
self.logger.info(f" Regime Compatibility: {bidirectional.long_signal.regime_compatibility:.2f}")
self.logger.info(f" 🛡️ ATR Filter: {dataframe['atr_pct'].iloc[-1]:.2f}% (❌ not in 0.5%-3.0%)")
self.logger.info(f" 📊 Volume Filter: {dataframe['volume_smart_ratio'].iloc[-1]:.2f}x (❌ not in 1.2x-5.0x)")
self.logger.info(f" 🚫 Lateral Market: {dataframe['lateral_market'].iloc[-1]} (❌ lateral market detected)")
else:
self.logger.info(f"🚫 BUY {metadata['pair']} - Context7: Quality score below threshold")
self.logger.info(f" Quality: {bidirectional.long_signal.quality_score:.3f} < {quality_threshold}")
else:
self.logger.info(f"🚫 BUY {metadata['pair']} - Context7: No LONG signal")
self.logger.info(f" Recommendation: {bidirectional.recommendation}")
self.logger.info(f" Best Direction: {bidirectional.best_direction}")
self.logger.info(f" Long Quality: {bidirectional.long_signal.quality_score:.3f}")
self.logger.info(f" Short Quality: {bidirectional.short_signal.quality_score:.3f}")
return dataframe
def populate_sell_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""
Context7 Super Powers: Popola segnali SHORT usando sistema bidirezionale
Implementa pattern Context7-compliant con hierarchical filtering e quality scoring
"""
dataframe.loc[:, 'sell'] = 0
if len(dataframe) < 50:
return dataframe
# Get market data and regime analysis
market_data = self._get_market_data(dataframe, metadata)
if not market_data:
return dataframe
regime = self._get_regime_analysis(market_data)
# Context7 Super Powers: Get bidirectional analysis
bidirectional = self._get_bidirectional_analysis(dataframe, market_data, regime)
# Context7: Hierarchical filtering system
# Solo segnali SHORT con alta qualità passano il filtro
if bidirectional.recommendation in ["STRONG_SHORT", "SHORT"]:
# Context7: Multi-layer validation
quality_threshold = 0.6 # Base threshold
# Threshold più alto per mercati volatili
if regime.regime == 'VOLATILE':
quality_threshold = 0.7
# Threshold più basso per mercati trending con forte trend
elif regime.regime == 'TRENDING' and regime.trend_strength > 0.7:
quality_threshold = 0.5
if bidirectional.short_signal.quality_score >= quality_threshold:
# Context7: Additional volume and regime validation
if (bidirectional.short_signal.volume_confirmation and
bidirectional.short_signal.regime_compatibility > 0.3):
dataframe.iloc[-1, dataframe.columns.get_loc('enter_short')] = 1
dataframe.iloc[-1, dataframe.columns.get_loc('enter_tag')] = 'context7_short'
self.logger.info(f"🔴 SELL {metadata['pair']} - CONTEXT7 BIDIRECTIONAL:")
self.logger.info(f" Recommendation: {bidirectional.recommendation}")
self.logger.info(f" Quality Score: {bidirectional.short_signal.quality_score:.3f} (threshold: {quality_threshold})")
self.logger.info(f" Primary Score: {bidirectional.short_signal.primary_score:.3f}")
self.logger.info(f" Trend Weight: {bidirectional.short_signal.trend_weight:.2f}")
self.logger.info(f" Regime Compatibility: {bidirectional.short_signal.regime_compatibility:.2f}")
self.logger.info(f" Entry Conditions: {', '.join(bidirectional.short_signal.entry_conditions)}")
self.logger.info(f" Ichimoku Position: {bidirectional.short_signal.ichimoku_position}")
self.logger.info(f" Market Regime: {regime.regime} (confidence: {regime.confidence:.2f})")
self.logger.info(f" Reasoning: {bidirectional.reasoning}")
else:
self.logger.info(f"🚫 SELL {metadata['pair']} - Context7: Volume or regime validation failed")
self.logger.info(f" Volume OK: {bidirectional.short_signal.volume_confirmation}")
self.logger.info(f" Regime Compatibility: {bidirectional.short_signal.regime_compatibility:.2f}")
else:
self.logger.info(f"🚫 SELL {metadata['pair']} - Context7: Quality score below threshold")
self.logger.info(f" Quality: {bidirectional.short_signal.quality_score:.3f} < {quality_threshold}")
else:
self.logger.info(f"🚫 SELL {metadata['pair']} - Context7: No SHORT signal")
self.logger.info(f" Recommendation: {bidirectional.recommendation}")
self.logger.info(f" Best Direction: {bidirectional.best_direction}")
self.logger.info(f" Long Quality: {bidirectional.long_signal.quality_score:.3f}")
self.logger.info(f" Short Quality: {bidirectional.short_signal.quality_score:.3f}")
return dataframe
def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str,
amount: float, rate: float, time_in_force: str,
exit_reason: str, current_time: datetime, **kwargs) -> bool:
"""
🔥 CONTEXT7 SUPER POWERS: TRADE EXIT CONFIRMATION E CLEANUP
Gestisce cleanup tracking sicuro quando trade viene effettivamente chiuso.
Implementato secondo best practices professionali.
"""
trade_key = f"{pair}_{trade.id}"
# ✅ CONTEXT7 SUPER POWERS: Cleanup tracking per tutti gli exit reasons
if trade_key in self._max_profit_tracking:
tracking = self._max_profit_tracking[trade_key]
final_profit = trade.calc_profit_ratio(rate) if rate else 0
self.logger.info(f"✅ CONTEXT7 TRADE CLOSED {pair}:")
self.logger.info(f" Exit Reason: {exit_reason}")
# 🔥 CONTEXT7 SUPER POWERS: SAFE LOGGING PATTERN - Pre-calc values
final_profit_display = f"{final_profit:.3%}"
max_tracked_display = f"{tracking['max_profit']:.3%}"
target_display = (f"{tracking['min_profit_target']:.3%}"
if tracking['min_profit_target'] is not None
else "N/A")
self.logger.info(f" Final Profit: {final_profit_display}")
self.logger.info(f" Max Tracked: {max_tracked_display}")
self.logger.info(f" Target Was: {target_display}")
del self._max_profit_tracking[trade_key]
self.logger.info(f"🗑️ CONTEXT7 Cleaned tracking for {trade_key}")
# 🔥 CONTEXT7 SUPER POWERS: DATABASE CONSISTENCY VERIFICATION
# Log solo dopo che FreqTrade ha aggiornato il database
self.logger.info(f"✅ CONTEXT7 DATABASE SYNC: Trade {trade.id} successfully closed and synced")
# Conferma sempre l'exit
return True
def custom_stoploss(self, pair: str, trade: Trade, current_time: datetime,
current_rate: float, current_profit: float, after_fill: bool, **kwargs) -> float | None:
"""
🔥 CONTEXT7 SUPER POWERS DEFINITIVE SOLUTION - BASATA SU DOCUMENTAZIONE UFFICIALE
🚨 SOLUZIONE ROBUSTA E INATTACCABILE:
- Basata su examples FreqTrade ufficiali Context7
- Usa stoploss_from_open per precisione assolute
- Restituisce distance negative corrette
- Trigger take profit solo quando profit tocca soglia
✅ LOGICA CONTEX7 COMPLIANT BASATA SU DOCUMENTAZIONE:
1. Tracking profitto massimo con cuscinetto dinamico
2. Calcolo stop assoluto usando stoploss_from_open
3. Restituzione distance negativa relativa a current_rate
4. Trigger take profit automatico quando soglia raggiunta
"""
# ✅ CONTEXT7 SUPER POWERS: custom_stoploss chiamato correttamente da FreqTrade
try:
# 🛡️ Inizializzazione tracking robusta
if not hasattr(self, '_max_profit_tracking'):
self._max_profit_tracking = {}
trade_key = f"{pair}_{trade.id}"
# 🔥 CONTEXT7 SUPER POWERS: CONFIGURAZIONE FUTURES 3X-OPTIMIZED
# Parametri ottimizzati per crypto futures 5m con leverage 3x
activation_threshold = 0.0025 # 0.25% (più basso per futures)
CONSERVATIVE_CUSHION = 0.20 # 20% (più aggressivo del 25% precedente)
AGGRESSIVE_CUSHION = 0.12 # 12% (più aggressivo del 15% precedente)
PROFIT_BREAKPOINT = 0.05 # 5% (switch point più basso del 7.5% precedente)
# 🔥 CONTEXT7 SUPER POWERS: NATIVE COMPLIANCE - NO INTERFERENZA
if current_profit < activation_threshold:
# ✅ SUPER POWERS NATIVE FIX: return None per native FreqTrade behavior
# Sotto soglia: lascia FreqTrade native gestire stop loss (no hardcoded!)
if trade_key in self._max_profit_tracking:
del self._max_profit_tracking[trade_key]
# Context7 Native Pattern: return None = usa FreqTrade native stoploss
# Nessuna interferenza hardcoded che causa perdite!
self.logger.info(f"🛡️ CONTEXT7 NATIVE MODE {pair}: Profit {current_profit:.3%} → Using FreqTrade native stoploss")
return None
# 🚀 Prima attivazione - inizializza tracking
if trade_key not in self._max_profit_tracking:
self._max_profit_tracking[trade_key] = {
'max_profit': current_profit,
'min_profit_target': None,
'triggered': False
}
self.logger.info(f"🎯 CONTEXT7 TRAILING ACTIVATED {pair}: {current_profit:.2%}")
tracking = self._max_profit_tracking[trade_key]
# 📈 Aggiornamento massimo - PERMETTI sempre nuovi massimi anche dopo trigger
if current_profit > tracking['max_profit']:
# Se era già triggered e troviamo nuovo massimo, ripristiniamo tracking
was_triggered = tracking.get('triggered', False)
tracking['max_profit'] = current_profit
tracking['triggered'] = False # Reset per permettere nuovo trailing
if was_triggered:
self.logger.info(f"🔄 CONTEXT7 NEW HIGH AFTER TRIGGER {pair}: {current_profit:.2%} - Resetting trailing")
# 🔥 CONTEXT7 SUPER POWERS: CALCOLO CUSHION DINAMICO FUTURES-OPTIMIZED
if tracking['max_profit'] <= PROFIT_BREAKPOINT: # Sotto 5%
cushion_pct = CONSERVATIVE_CUSHION # 20% conservativo futures
cushion_type = "FUTURES CONSERVATIVE (20%)"
else: # Sopra 5%
cushion_pct = AGGRESSIVE_CUSHION # 12% aggressivo futures
cushion_type = "FUTURES AGGRESSIVE (12%)"
# Calcola profitto minimo target (soglia take profit)
min_profit_target = tracking['max_profit'] - (tracking['max_profit'] * cushion_pct)
min_profit_target = max(min_profit_target, 0.002) # Minimo 0.2%
tracking['min_profit_target'] = min_profit_target
self.logger.info(f"📈 CONTEXT7 NEW MAX {pair}: {tracking['max_profit']:.2%} → TARGET: {min_profit_target:.2%} ({cushion_type})")
# 🔒 Bloccaggio quando profitto diminuisce dal massimo
elif current_profit < tracking['max_profit'] and not tracking['triggered']:
tracking['triggered'] = True
# Non ricalcolare min_profit_target - mantieni quello dal massimo
target_pct = tracking['min_profit_target'] if tracking['min_profit_target'] is not None else 0.0
self.logger.info(f"🔒 CONTEXT7 LOCKED {pair}: Max {tracking['max_profit']:.2%} → Target {target_pct:.2%}")
# 💥 TAKE PROFIT TRIGGER - SOLO quando profitto scende sotto target
if (tracking['min_profit_target'] is not None and
current_profit <= tracking['min_profit_target'] and tracking['triggered']):
# 💥 TAKE PROFIT TRIGGER - CALCOLO DISTANZA RELATIVA (Context7 compliant)
# ✅ SUPER POWERS FIX: Distance relativa, non assoluta!
# Se current_profit = 1.69% e target = 1.27%
# Distance da current_rate = target - current = 1.27% - 1.69% = -0.42%
relative_distance = tracking['min_profit_target'] - current_profit
self.logger.info(f"💥 CONTEXT7 TAKE PROFIT {pair}: {current_profit:.3%} ≤ Target {tracking['min_profit_target']:.3%}")
self.logger.info(f"🎯 CONTEXT7 RELATIVE DISTANCE {pair}: {relative_distance:.4f} (Current: {current_profit:.3%} → Target: {tracking['min_profit_target']:.3%})")
# ⚠️ CONTEXT7 SUPER POWERS: NON cancellare tracking qui
# FreqTrade potrebbe richiamare più volte prima dell'esecuzione
# Cleanup avverrà in confirm_trade_exit
# 🔥 SUPER POWERS RETURN: Distance relativa per esecuzione corretta
return relative_distance
# 🛡️ Fallback: Nessuna azione necessaria - return None
return None
except Exception as e:
self.logger.error(f"❌ CONTEXT7 CRITICAL ERROR {pair}: {e}")
# Fallback sicuro a None per evitare ulteriori danni
return None
def custom_exit(self, pair: str, trade: 'Trade', current_time: datetime,
current_rate: float, current_profit: float, **kwargs) -> str:
"""
🚀 CONTEXT7 SUPER POWERS V3: REVENGE TRADING CORRETTO
🚨 CORREZIONE FONDAMENTALE: Il cooling-off si attiva SOLO su uscite reali,
non su massimi temporanei intra-trade.
"""
# 🛡️ COOLING-OFF SYSTEM: Attiva pausa anti-revenge trading SOLO su uscite reali
try:
# DEBUG: Log solo quando necessario
self.logger.info(f"🔧 DEBUG EXIT {pair}: Trade {trade.id}, Profitto={current_profit:.3%}, Stato={trade.is_open}")
# ⚠️ IMPORTANTE: Il trailing stop gestisce autonomamente il take profit
# Non è necessario attivare cooling-off qui perché FreqTrade gestisce
# automaticamente le protezioni (CooldownPeriod, StoplossGuard, MaxDrawdown)
# DISABLED: I custom exit signals sono disabilitati
# Il trailing take profit lavora in modo autonomo ed efficace
# Il revenge trading è gestito dalle protezioni FreqTrade native
return None
except Exception as e:
self.logger.error(f"❌ Errore nel cooling-off system per {pair}: {e}")
return None
def confirm_trade_entry(self, pair: str, order_type: str, amount: float,
rate: float, time_in_force: str, current_time: datetime,
entry_tag: Optional[str], side: str, **kwargs) -> bool:
"""
Context7 Super Powers V2: Trade confirmation with cooling-off system anti-revenge trading
Verifica se la pair è in periodo di cooling-off prima di confermare il trade.
"""
try:
# 🛡️ COOLING-OFF SYSTEM: Verifica se pair è in pausa anti-revenge trading
if self._is_in_cooldown(pair):
remaining_time = self._get_remaining_cooldown(pair)
self.logger.warning(f"🛡️ COOLING-OFF BLOCK {pair}: Trade bloccato per {remaining_time:.1f} min (anti-revenge trading)")
return False # Blocca trade durante cooling-off
# ✅ Trade permesso (non in cooling-off)
return True
except Exception as e:
self.logger.error(f"❌ Errore in confirm_trade_entry per {pair}: {e}")
return True # Permissive fallback per evitare blocchi del sistema
def bot_loop_start(self, current_time: datetime) -> None:
"""
Context7 Super Powers V2: Bot loop start con monitoraggio cooling-off system
Chiamato all'inizio di ogni bot loop per monitorare lo stato del cooling-off system.
"""
try:
# Debug cooling-off system ogni 5 minuti
if not hasattr(self, '_last_debug_time'):
self._last_debug_time = current_time
time_since_debug = (current_time - self._last_debug_time).total_seconds()
if time_since_debug >= 300: # 5 minuti
self.debug_cooling_status()
self._last_debug_time = current_time
except Exception as e:
self.logger.error(f"❌ Errore in bot_loop_start: {e}")
def _leverage_first_version(self, pair: str, current_time: datetime, current_rate: float,
proposed_leverage: float, max_leverage: int, entry_tag: Optional[str],
side: str, **kwargs) -> float:
"""
Context7 Super Powers V2: Dynamic leverage with asset-specific optimization
"""
try:
# Asset-specific leverage limits (V2 optimized)
asset_leverage_limits = {
'BTC/USDT:USDT': 8, # High liquidity
'ETH/USDT:USDT': 7, # High liquidity
'SOL/USDT:USDT': 6, # Medium-high liquidity
'BNB/USDT:USDT': 6, # Medium-high liquidity
'AVAX/USDT:USDT': 5, # Medium liquidity
'DOT/USDT:USDT': 5 # Medium liquidity
}
# Get asset-specific limit
asset_limit = asset_leverage_limits.get(pair, 4) # Conservative default
# Apply conservative limit for safety - Context7 Super Powers optimized
final_leverage = min(proposed_leverage, asset_limit, max_leverage, 3.0)
self.logger.info(f"🎯 V2 Leverage {pair}: {final_leverage}x (asset_limit: {asset_limit})")
return final_leverage
except Exception as e:
self.logger.error(f"Error in leverage calculation: {e}")
return 3 # Conservative fallback
def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""
Populate entry signals using Context7 Super Powers bidirectional analysis
Enhanced with regime-aware signal generation
"""
try:
# Centralized input validation (CRITICAL for security)
if not self._validate_inputs(dataframe, metadata, 'populate_entry_trend'):
return dataframe
# Get basic entry/exit signals from parent methods
dataframe = super().populate_entry_trend(dataframe, metadata)
# Context7: Signal enhancement is handled by parent class methods
# The parent class already generates Context7 Super Powers signals
# No additional processing needed here
return dataframe
except Exception as e:
self.logger.error(f"Error in populate_entry_trend: {e}")
return super().populate_entry_trend(dataframe, metadata)
def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""
Populate exit signals using parent implementation
Enhanced with Context7 Super Powers logic
"""
try:
# Get basic exit signals from parent methods
dataframe = super().populate_exit_trend(dataframe, metadata)
# Context7: Could add enhanced exit logic here in future versions
# For now, use parent implementation
return dataframe
except Exception as e:
self.logger.error(f"Error in populate_exit_trend: {e}")
return super().populate_exit_trend(dataframe, metadata)
def leverage(self, pair: str, current_time: datetime, current_rate: float,
proposed_leverage: float, max_leverage: float, entry_tag: str | None, side: str,
**kwargs) -> float:
"""
Context7 Super Powers: Dynamic leverage up to 10x with intelligent risk management.
Simple and effective leverage implementation based on Context7 best practices.
"""
# Asset-specific leverage limits - Context7 Safety First (Conservative)
asset_limits = {
"BTC/USDT:USDT": 3.0, # Reduced from 8x for safety
"ETH/USDT:USDT": 3.0, # Reduced from 7x for safety
"SOL/USDT:USDT": 2.5, # Reduced from 6x for safety
"BNB/USDT:USDT": 2.5, # Reduced from 6x for safety
"AVAX/USDT:USDT": 2.0, # Reduced from 5x for safety
"DOT/USDT:USDT": 2.0 # Reduced from 5x for safety
}
# Get asset-specific limit
asset_limit = asset_limits.get(pair, 5.0)
# Base leverage with moderate multiplier
base_leverage = 2.0
# Dynamic adjustment based on market conditions
# Simple and effective - no complex calculations needed
if side == "long":
# Conservative leverage for long positions
dynamic_leverage = min(base_leverage * 1.5, asset_limit, max_leverage, 3.0)
else:
# Slightly higher for short positions (downtrends can be sharper)
dynamic_leverage = min(base_leverage * 2.0, asset_limit, max_leverage, 3.0)
# Log leverage calculation
self.logger.info(f"🎯 LEVERAGE {pair} ({side.upper()}): {dynamic_leverage:.1f}x (max: {asset_limit:.1f}x)")
return dynamic_leverage
def _validate_inputs(self, dataframe: DataFrame, metadata: dict, method_name: str) -> bool:
"""
Centralized input validation for security and stability.
Returns True if validation passes, False otherwise.
"""
try:
# Validate dataframe
if not isinstance(dataframe, pd.DataFrame):
self.logger.error(f"{method_name}: dataframe must be a pandas DataFrame")
return False
if dataframe.empty:
self.logger.error(f"{method_name}: Empty dataframe received")
return False
if len(dataframe) < self.startup_candle_count:
self.logger.warning(f"{method_name}: Insufficient data: {len(dataframe)} < {self.startup_candle_count}")
return False
# Validate metadata
if not isinstance(metadata, dict):
self.logger.error(f"{method_name}: metadata must be a dictionary")
return False
if 'pair' not in metadata:
self.logger.error(f"{method_name}: metadata must contain 'pair' key")
return False
if not isinstance(metadata['pair'], str) or not metadata['pair'].strip():
self.logger.error(f"{method_name}: Invalid pair in metadata: {metadata.get('pair')}")
return False
return True
except Exception as e:
self.logger.error(f"{method_name}: Input validation error: {e}")
return False
# ============================================================================
# 🛡️ COOLING-OFF SYSTEM HELPER METHODS (Context7 Super Powers)
# ============================================================================
def _set_cooldown(self, pair: str, direction: str = "any", exit_type: str = "take_profit", profit: float = 0.0) -> None:
"""
Imposta periodo di cooling-off per una pair dopo trade profittevole.
Args:
pair: Trading pair (es. 'BTC/USDT')
direction: Direzione del trade ('long', 'short', 'any')
exit_type: Tipo di uscita ('take_profit', 'stop_loss', 'timeout')
profit: Profitto percentuale del trade
"""
try:
current_time = datetime.now()
self._trade_cooldown[pair] = {
'exit_timestamp': current_time,
'direction': direction,
'exit_type': exit_type,
'profit': profit,
'cooldown_end': current_time + timedelta(minutes=self._cooldown_minutes)
}
self.logger.info(f"🛡️ COOLING-OFF SET {pair}: {self._cooldown_minutes}min pausa attivata (direzione: {direction}, profit: {profit:.2%})")
except Exception as e:
self.logger.error(f"❌ Errore in _set_cooldown per {pair}: {e}")
def _is_in_cooldown(self, pair: str, direction: str = "any") -> bool:
"""
Verifica se una pair è in periodo di cooling-off.
Args:
pair: Trading pair
direction: Direzione del trade da verificare
Returns:
bool: True se in cooling-off, False altrimenti
"""
try:
if pair not in self._trade_cooldown:
return False
cooldown_data = self._trade_cooldown[pair]
current_time = datetime.now()
# Verifica se il cooling-off è scaduto
if current_time >= cooldown_data['cooldown_end']:
del self._trade_cooldown[pair]
return False
# Verifica se la direzione è bloccata
if cooldown_data['direction'] == "any" or direction == "any" or cooldown_data['direction'] == direction:
return True
return False
except Exception as e:
self.logger.error(f"❌ Errore in _is_in_cooldown per {pair}: {e}")
return False # Fail-safe per evitare blocchi permanenti
def _get_remaining_cooldown(self, pair: str) -> float:
"""
Calcola minuti rimanenti di cooling-off per una pair.
Args:
pair: Trading pair
Returns:
float: Minuti rimanenti (0 se non in cooldown)
"""
try:
if pair not in self._trade_cooldown:
return 0.0
cooldown_data = self._trade_cooldown[pair]
current_time = datetime.now()
if current_time >= cooldown_data['cooldown_end']:
del self._trade_cooldown[pair]
return 0.0
remaining = (cooldown_data['cooldown_end'] - current_time).total_seconds() / 60
return max(0.0, remaining)
except Exception as e:
self.logger.error(f"❌ Errore in _get_remaining_cooldown per {pair}: {e}")
return 0.0
def debug_cooling_status(self) -> None:
"""
Stampa stato completo del cooling-off system per debugging.
"""
try:
current_time = datetime.now()
self.logger.info("🔍 COOLING-OFF SYSTEM STATUS:")
if not self._trade_cooldown:
self.logger.info(" ✅ Nessuna pair in cooling-off")
return
for pair, data in self._trade_cooldown.items():
remaining = (data['cooldown_end'] - current_time).total_seconds() / 60
if remaining > 0:
self.logger.info(f" ⏳ {pair}: {remaining:.1f}min rimanenti (direzione: {data['direction']}, profit: {data['profit']:.2%})")
else:
# Cleanup expired entries
del self._trade_cooldown[pair]
except Exception as e:
self.logger.error(f"❌ Errore in debug_cooling_status: {e}")
@property
def protections(self):
"""
Context7 Super Powers: Enhanced protections for 5m timeframe optimized
1. CooldownPeriod: 10 minutes anti-revenge trading (ottimizzato per 5m)
2. StoplossGuard: Blocca trading dopo troppi stop loss
3. MaxDrawdown: Protezione drawdown esteso
"""
return [
{
"method": "CooldownPeriod",
"stop_duration": 10 # 10 minuti cooling-off (2 candele 5m)
},
{
"method": "StoplossGuard",
"lookback_period_candles": 24, # 2 ore su 5m
"trade_limit": 4, # Massimo 4 stop loss in 2 ore
"stop_duration_candles": 6, # 30 minuti pausa
"required_profit": 0.0,
"only_per_pair": False,
"only_per_side": False
},
{
"method": "MaxDrawdown",
"lookback_period_candles": 48, # 4 ore su 5m
"trade_limit": 20,
"stop_duration_candles": 12, # 1 ora pausa
"max_allowed_drawdown": 0.15 # 15% drawdown massimo
}
]
# ============================================================================
# CONTEXT7 SUPER POWERS: INITIALIZATION
# ============================================================================
# Initialize batch processor at module level - DISABLED
# batch_processor_instance = Context7BatchProcessor() # DISABLED - LLM connection issues
batch_processor_instance = None
# Export strategy
__strategy__ = IchimokuEnhancedV8092