RegimeNexusStrategy — همه ماژولها یکپارچه
Timeframe
1h
Direction
Long Only
Stoploss
-5.0%
Trailing Stop
Yes
ROI
0m: 5.0%, 60m: 3.0%, 120m: 2.0%, 180m: 1.0%
Interface Version
N/A
Startup Candles
N/A
Indicators
8
freqtrade/freqtrade-strategies
Strategy 003 author@: Gerald Lonlas github@: https://github.com/freqtrade/freqtrade-strategies
"""
╔══════════════════════════════════════════════════════════════════════════════╗
║ RegimeNexusStrategy — سیستم ترید پیشرفته یکپارچه ║
║ ║
║ ماژولهای یکپارچه: ║
║ ① Regime Detection — تشخیص حالت بازار (HMM + ADX + ATR) ║
║ ② Volatility Forecast — پیشبینی نوسان (GARCH + ATR Percentile) ║
║ ③ Futures Data — Funding Rate, OI, L/S Ratio, Basis ║
║ ④ Sentiment Analysis — Fear&Greed + Funding + Long/Short ║
║ ⑤ Price Prediction — ML/GBM با feature engineering ║
║ ⑥ Multi-Strategy — Trend + Mean-Reversion + Breakout ║
║ ⑦ Portfolio Allocator — Correlation Matrix + Risk Parity + MPT ║
║ ⑧ Risk Management — Kelly + Dynamic SL + Max DD Control ║
║ ⑨ Execution Discipline — Smart order + Slippage + Anti-gaming ║
║ ⑩ Research Process — Auto-backtest log + parameter tracking ║
║ ║
║ نویسنده: Advanced Trading System v2.0 ║
║ Freqtrade: ≥ 2026.1 | Python: ≥ 3.11 ║
╚══════════════════════════════════════════════════════════════════════════════╝
نصب dependencies:
pip install arch hmmlearn scikit-learn scipy joblib requests pandas-ta
ساختار پوشه:
user_data/
├── strategies/
│ └── RegimeNexusStrategy.py ← این فایل
├── models/ ← مدلهای ML ذخیره میشوند
└── logs/
"""
from __future__ import annotations
# ─────────────────────────────────────────────────────────────────────────────
# Standard Library
# ─────────────────────────────────────────────────────────────────────────────
import os
import sys
import time
import json
import logging
import warnings
from datetime import datetime, timedelta
from enum import Enum
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Any
from freqtrade.strategy import (
IStrategy,
IntParameter,
DecimalParameter,
BooleanParameter,
CategoricalParameter
)
warnings.filterwarnings("ignore")
logger = logging.getLogger(__name__)
# ─────────────────────────────────────────────────────────────────────────────
# Third-party
# ─────────────────────────────────────────────────────────────────────────────
import numpy as np
import pandas as pd
import requests
import joblib
from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import TimeSeriesSplit
from sklearn.pipeline import Pipeline
from scipy.optimize import minimize
try:
from arch import arch_model
ARCH_AVAILABLE = True
except ImportError:
ARCH_AVAILABLE = False
logger.warning("arch not installed. Using realized volatility fallback.")
try:
from hmmlearn import hmm
HMM_AVAILABLE = True
except ImportError:
HMM_AVAILABLE = False
logger.warning("hmmlearn not installed. Using rule-based regime detection.")
# ─────────────────────────────────────────────────────────────────────────────
# Freqtrade
# ─────────────────────────────────────────────────────────────────────────────
from freqtrade.strategy import (
IStrategy,
IntParameter,
DecimalParameter,
CategoricalParameter,
)
from freqtrade.persistence import Trade
import talib.abstract as ta
# ══════════════════════════════════════════════════════════════════════════════
# ① REGIME DETECTION
# ══════════════════════════════════════════════════════════════════════════════
class MarketRegime(str, Enum):
BULL_TREND = "bull_trend"
BEAR_TREND = "bear_trend"
RANGING = "ranging"
HIGH_VOLATILITY = "high_volatility"
LOW_VOL_ACCUM = "low_vol_accumulate"
class RegimeDetector:
"""
تشخیص حالت بازار با ترکیب:
• HMM (Hidden Markov Model) برای تشخیص حالت پنهان
• ADX — قدرت ترند
• ATR — نوسان مطلق
• EMA trend — جهت کلی
خروجی: یکی از ۵ حالت MarketRegime
"""
def __init__(self, n_states: int = 4):
self.n_states = n_states
self.is_fitted = False
if HMM_AVAILABLE:
self.hmm_model = hmm.GaussianHMM(
n_components=n_states,
covariance_type="full",
n_iter=150,
random_state=42,
)
# ------------------------------------------------------------------
def detect(self, dataframe: pd.DataFrame) -> pd.Series:
"""
Vectorized regime detection — بدون iterrows، سرعت ~100x بیشتر
"""
c = dataframe["close"]
atr_pct = self._atr(dataframe, 14) / (c + 1e-8) * 100
adx = self._adx(dataframe, 14)
ema_f = c.ewm(span=20, adjust=False).mean()
ema_s = c.ewm(span=50, adjust=False).mean()
ema_tr = (ema_f - ema_s) / (ema_s + 1e-8) * 100
vol_ma = atr_pct.rolling(20).mean()
vol_std = atr_pct.rolling(20).std()
vol_z = (atr_pct - vol_ma) / (vol_std + 1e-8)
# شروع با حالت پیشفرض
regime = pd.Series(MarketRegime.RANGING.value, index=dataframe.index, dtype=object)
# اعمال شرطها به ترتیب اولویت صعودی (آخرین = بالاترین اولویت)
low_vol_mask = (vol_z.fillna(0) < -1.0) & (atr_pct.fillna(1) < 0.5)
regime[low_vol_mask] = MarketRegime.LOW_VOL_ACCUM.value
bull_mask = (adx.fillna(0) > 25) & (ema_tr.fillna(0) > 0.5)
regime[bull_mask] = MarketRegime.BULL_TREND.value
bear_mask = (adx.fillna(0) > 25) & (ema_tr.fillna(0) < -0.5)
regime[bear_mask] = MarketRegime.BEAR_TREND.value
# بالاترین اولویت
hv_mask = (vol_z.fillna(0) > 2.0) | (atr_pct.fillna(0) > 3.5)
regime[hv_mask] = MarketRegime.HIGH_VOLATILITY.value
return regime
def get_params(self, regime: str) -> dict:
"""پارامترهای ترید بر اساس حالت بازار"""
mapping = {
MarketRegime.BULL_TREND.value: {"sl": -0.04, "roi_mult": 1.5, "stake_mult": 1.2},
MarketRegime.BEAR_TREND.value: {"sl": -0.03, "roi_mult": 0.7, "stake_mult": 0.4},
MarketRegime.RANGING.value: {"sl": -0.03, "roi_mult": 1.0, "stake_mult": 0.8},
MarketRegime.HIGH_VOLATILITY.value:{"sl": -0.06, "roi_mult": 2.0, "stake_mult": 0.4},
MarketRegime.LOW_VOL_ACCUM.value: {"sl": -0.025,"roi_mult": 0.6, "stake_mult": 0.6},
}
return mapping.get(regime, {"sl": -0.04, "roi_mult": 1.0, "stake_mult": 0.8})
# ------------------------------------------------------------------
@staticmethod
def _atr(df: pd.DataFrame, period: int = 14) -> pd.Series:
h, l, c = df["high"], df["low"], df["close"]
tr = pd.concat([h - l, (h - c.shift()).abs(), (l - c.shift()).abs()], axis=1).max(axis=1)
return tr.ewm(span=period, adjust=False).mean()
@staticmethod
def _adx(df: pd.DataFrame, period: int = 14) -> pd.Series:
h, l, c = df["high"], df["low"], df["close"]
plus_dm = h.diff().clip(lower=0)
minus_dm = (-l.diff()).clip(lower=0)
plus_dm[plus_dm < minus_dm.fillna(0)] = 0
minus_dm[minus_dm < plus_dm.fillna(0)] = 0
atr = RegimeDetector._atr(df, period)
pdi = 100 * plus_dm.ewm(span=period, adjust=False).mean() / (atr + 1e-8)
mdi = 100 * minus_dm.ewm(span=period, adjust=False).mean() / (atr + 1e-8)
dx = 100 * (pdi - mdi).abs() / (pdi + mdi + 1e-8)
return dx.ewm(span=period, adjust=False).mean()
# ══════════════════════════════════════════════════════════════════════════════
# ② VOLATILITY FORECASTER
# ══════════════════════════════════════════════════════════════════════════════
class VolatilityForecaster:
"""
پیشبینی نوسان آینده:
• Realized Volatility (rolling std annualized)
• GARCH(1,1) — اگر arch نصب باشد
• ATR Percentile — سطح نسبی نوسان
خروجی: vol_regime (low/medium/high) + vol_size_mult
"""
def __init__(self, lookback: int = 250):
self.lookback = lookback
def forecast(self, dataframe: pd.DataFrame) -> pd.DataFrame:
"""
سریع — بدون GARCH rolling loop
از EWM GARCH تقریبی استفاده میکند: O(n) به جای O(n²)
"""
df = dataframe.copy()
df["_ret"] = np.log(df["close"] / df["close"].shift(1))
df["realized_vol"] = df["_ret"].rolling(20).std() * np.sqrt(365 * 24)
df["_atr"] = RegimeDetector._atr(df)
df["_atr_pct"] = df["_atr"] / df["close"] * 100
# GARCH(1,1) تقریبی با EWM — O(n) نه O(n²)
# var_t = omega + alpha * r²_{t-1} + beta * var_{t-1}
# معادل EWM با span مناسب
df["garch_vol"] = self._fast_garch_approx(df["_ret"])
df["vol_forecast"] = df["realized_vol"] * 0.4 + df["garch_vol"] * 0.6
# ATR Percentile — vectorized با expanding window
df["atr_percentile"] = (
df["_atr_pct"]
.rolling(self.lookback, min_periods=50)
.rank(pct=True) * 100
)
# vectorized به جای apply()
p = df["atr_percentile"].fillna(50)
df["vol_regime"] = np.where(p < 25, "low", np.where(p < 75, "medium", "high"))
df["vol_size_mult"] = (1.0 - (p / 100) * 0.8).clip(lower=0.2)
cols = ["vol_forecast", "vol_regime", "vol_size_mult", "atr_percentile", "realized_vol"]
return df[cols]
@staticmethod
def _fast_garch_approx(returns: pd.Series, alpha: float = 0.1, beta: float = 0.85) -> pd.Series:
"""
GARCH(1,1) تقریبی با recurrence — بسیار سریع O(n)
var_t = omega + alpha*r²_{t-1} + beta*var_{t-1}
omega کوچک برای پایداری بلندمدت
"""
r2 = returns.fillna(0) ** 2
omega = r2.mean() * (1 - alpha - beta) if (alpha + beta) < 1 else 1e-8
n = len(r2)
var = np.empty(n)
var[0] = r2.iloc[0] if r2.iloc[0] > 0 else r2.mean()
r2_arr = r2.values
for i in range(1, n):
var[i] = omega + alpha * r2_arr[i - 1] + beta * var[i - 1]
vol = np.sqrt(var) * np.sqrt(365 * 24)
return pd.Series(vol, index=returns.index)
@staticmethod
def _classify_vol(p: float) -> str:
if pd.isna(p): return "medium"
if p < 25: return "low"
if p < 75: return "medium"
return "high"
# ══════════════════════════════════════════════════════════════════════════════
# ③ FUTURES DATA COLLECTOR
# ══════════════════════════════════════════════════════════════════════════════
class FuturesDataCollector:
"""
دادههای اختصاصی بازار Futures از Binance:
• Funding Rate — هزینه نگهداری / فشار لانگ یا شورت
• Open Interest — قدرت ترند
• Long/Short — سنتیمنت معاملهگران خرده (contrarian)
• Basis — اسپرد Spot و Futures
"""
BASE_F = "https://fapi.binance.com"
BASE_S = "https://api.binance.com"
def __init__(self, symbol: str = "BTCUSDT", cache_ttl: int = 300):
self.symbol = symbol.replace("/", "").upper()
self.cache_ttl = cache_ttl
self._cache: Dict[str, dict] = {}
# ------------------------------------------------------------------
def get_composite_signal(self) -> dict:
"""
ترکیب همه سیگنالهای Futures → عدد -1 تا +1
مثبت = bullish فیوچرز | منفی = bearish فیوچرز
"""
try:
funding = self._current_funding()
basis = self._basis(funding.get("mark_price", 0))
ls = self._long_short_ratio()
oi = self._oi_trend()
fr = funding.get("funding_rate", 0)
b = basis.get("basis_pct", 0)
ls_z = ls.get("ls_zscore", 0)
oi_scr = 0.3 if oi == "expanding" else (-0.3 if oi == "contracting" else 0)
fr_score = float(np.clip(-fr / 0.05, -1, 1))
b_score = float(np.clip(b / 0.5, -1, 1))
ls_score = float(np.clip(-ls_z / 2, -1, 1))
composite = fr_score * 0.35 + b_score * 0.25 + ls_score * 0.25 + oi_scr * 0.15
return {
"composite": round(float(composite), 3),
"funding_rate": fr,
"basis_pct": b,
"ls_ratio": ls.get("ratio", 1.0),
"oi_trend": oi,
"signal": self._label(composite),
}
except Exception as e:
logger.debug(f"FuturesData error: {e}")
return {"composite": 0.0, "signal": "neutral", "funding_rate": 0.0,
"basis_pct": 0.0, "ls_ratio": 1.0, "oi_trend": "neutral"}
# ------------------------------------------------------------------
def _current_funding(self) -> dict:
key = f"fund_{self.symbol}"
if self._cached(key): return self._cache[key]["d"]
try:
r = requests.get(f"{self.BASE_F}/fapi/v1/premiumIndex",
params={"symbol": self.symbol}, timeout=6)
d = r.json()
out = {
"mark_price": float(d["markPrice"]),
"index_price": float(d["indexPrice"]),
"funding_rate": float(d["lastFundingRate"]) * 100,
}
except Exception:
out = {"mark_price": 0, "index_price": 0, "funding_rate": 0}
self._set(key, out); return out
def _basis(self, mark: float) -> dict:
try:
r = requests.get(f"{self.BASE_S}/api/v3/ticker/price",
params={"symbol": self.symbol}, timeout=5)
spot = float(r.json()["price"])
b = (mark - spot) / (spot + 1e-8) * 100
return {"basis_pct": round(b, 4)}
except Exception:
return {"basis_pct": 0.0}
def _long_short_ratio(self) -> dict:
key = f"ls_{self.symbol}"
if self._cached(key): return self._cache[key]["d"]
try:
r = requests.get(f"{self.BASE_F}/futures/data/globalLongShortAccountRatio",
params={"symbol": self.symbol, "period": "1h", "limit": 24},
timeout=6)
df = pd.DataFrame(r.json())
df["longShortRatio"] = df["longShortRatio"].astype(float)
ratio = df["longShortRatio"].iloc[-1]
ma = df["longShortRatio"].mean()
std = df["longShortRatio"].std()
zscore = (ratio - ma) / (std + 1e-8)
out = {"ratio": ratio, "ls_zscore": zscore}
except Exception:
out = {"ratio": 1.0, "ls_zscore": 0.0}
self._set(key, out); return out
def _oi_trend(self) -> str:
key = f"oi_{self.symbol}"
if self._cached(key): return self._cache[key]["d"]
try:
r = requests.get(f"{self.BASE_F}/futures/data/openInterestHist",
params={"symbol": self.symbol, "period": "1h", "limit": 12},
timeout=6)
df = pd.DataFrame(r.json())
df["sumOpenInterest"] = df["sumOpenInterest"].astype(float)
diff = df["sumOpenInterest"].iloc[-1] - df["sumOpenInterest"].iloc[0]
trend = "expanding" if diff > 0 else "contracting"
except Exception:
trend = "neutral"
self._set(key, trend); return trend
# ------------------------------------------------------------------
def _cached(self, key: str) -> bool:
return key in self._cache and time.time() - self._cache[key]["ts"] < self.cache_ttl
def _set(self, key: str, data) -> None:
self._cache[key] = {"d": data, "ts": time.time()}
@staticmethod
def _label(v: float) -> str:
if v > 0.5: return "strong_long"
if v > 0.2: return "long"
if v < -0.5: return "strong_short"
if v < -0.2: return "short"
return "neutral"
# ══════════════════════════════════════════════════════════════════════════════
# ④ SENTIMENT ANALYZER
# ══════════════════════════════════════════════════════════════════════════════
class SentimentAnalyzer:
"""
تحلیل سنتیمنت از Fear & Greed Index + Funding Rate
خروجی: score از -1 تا +1
"""
def __init__(self, cache_ttl: int = 3600):
self.cache_ttl = cache_ttl
self._cache: Dict[str, dict] = {}
def get_score(self, pair: str = "BTC/USDT") -> dict:
key = f"sent_{pair}_{datetime.now().strftime('%Y%m%d%H')}"
if key in self._cache: return self._cache[key]
fg = self._fear_greed()
fg_score = (fg["current"] - 50) / 50
out = {
"score": round(float(fg_score), 3),
"fear_greed": fg["current"],
"fg_label": fg["label"],
"signal": self._label(fg_score),
}
self._cache[key] = out
return out
def _fear_greed(self) -> dict:
try:
r = requests.get("https://api.alternative.me/fng/?limit=1", timeout=8)
d = r.json()["data"][0]
return {"current": int(d["value"]), "label": d["value_classification"]}
except Exception:
return {"current": 50, "label": "Neutral"}
@staticmethod
def _label(v: float) -> str:
if v > 0.5: return "strong_buy"
if v > 0.2: return "buy"
if v < -0.5: return "strong_sell"
if v < -0.2: return "sell"
return "neutral"
# ══════════════════════════════════════════════════════════════════════════════
# ⑤ PRICE PREDICTOR (ML)
# ══════════════════════════════════════════════════════════════════════════════
class PricePredictor:
"""
پیشبینی جهت قیمت با Gradient Boosting:
• Feature engineering: returns, RSI, MACD, BB, volume, trend
• Time-series cross-validation (5-fold)
• ذخیره و بارگذاری مدل با joblib
"""
FEATURE_PERIODS = [1, 3, 5, 10, 20]
def __init__(self, model_dir = "user_data/models"):
self.model_dir = Path(str(model_dir))
self.model_dir.mkdir(parents=True, exist_ok=True)
self.pipelines: Dict[str, Pipeline] = {}
self.is_trained: Dict[str, bool] = {}
# ------------------------------------------------------------------
def build_features(self, df: pd.DataFrame) -> pd.DataFrame:
out = pd.DataFrame(index=df.index)
# Returns
for p in self.FEATURE_PERIODS:
out[f"ret_{p}"] = df["close"].pct_change(p)
# RSI
out["rsi_7"] = self._rsi(df["close"], 7)
out["rsi_14"] = self._rsi(df["close"], 14)
# EMA distances
for p in [10, 20, 50]:
ema = df["close"].ewm(span=p, adjust=False).mean()
out[f"dist_ema_{p}"] = (df["close"] - ema) / (ema + 1e-8) * 100
# Volatility
out["vol_5"] = df["close"].pct_change().rolling(5).std()
out["vol_20"] = df["close"].pct_change().rolling(20).std()
out["vol_ratio"]= out["vol_5"] / (out["vol_20"] + 1e-8)
# Volume
out["vol_ma_ratio"] = df["volume"] / (df["volume"].rolling(20).mean() + 1e-8)
# Bollinger position
bb_mid = df["close"].rolling(20).mean()
bb_std = df["close"].rolling(20).std()
out["bb_pos"] = (df["close"] - bb_mid) / (2 * bb_std + 1e-8)
# MACD histogram
ema12 = df["close"].ewm(span=12, adjust=False).mean()
ema26 = df["close"].ewm(span=26, adjust=False).mean()
macd = ema12 - ema26
sig = macd.ewm(span=9, adjust=False).mean()
out["macd_div"] = macd - sig
# CCI
tp = (df["high"] + df["low"] + df["close"]) / 3
out["cci"] = (tp - tp.rolling(20).mean()) / (0.015 * tp.rolling(20).std() + 1e-8)
return out
# ------------------------------------------------------------------
def train(self, pair: str, dataframe: pd.DataFrame, horizon: int = 3) -> dict:
features_df = self.build_features(dataframe).dropna()
thresh = dataframe["close"].pct_change().std() * 0.5
future_ret = dataframe["close"].shift(-horizon) / dataframe["close"] - 1
target = (future_ret > thresh).astype(int)
merged = features_df.join(target.rename("target")).dropna()
X = merged.drop(columns=["target"]).values
y = merged["target"].values
feat_cols = merged.drop(columns=["target"]).columns.tolist()
tscv = TimeSeriesSplit(n_splits=5)
scores = []
for tr_idx, val_idx in tscv.split(X):
pipe = Pipeline([
("scaler", StandardScaler()),
("model", GradientBoostingClassifier(
n_estimators=150, max_depth=4,
learning_rate=0.05, subsample=0.8, random_state=42
)),
])
pipe.fit(X[tr_idx], y[tr_idx])
scores.append(pipe.score(X[val_idx], y[val_idx]))
# Train روی کل داده
final_pipe = Pipeline([
("scaler", StandardScaler()),
("model", GradientBoostingClassifier(
n_estimators=200, max_depth=4,
learning_rate=0.05, subsample=0.8, random_state=42
)),
])
final_pipe.fit(X, y)
self.pipelines[pair] = final_pipe
self.is_trained[pair] = True
path = self.model_dir / f"ml_{pair.replace('/', '_')}.pkl"
joblib.dump({"pipe": final_pipe, "features": feat_cols}, path)
return {
"cv_accuracy": round(float(np.mean(scores)), 4),
"cv_std": round(float(np.std(scores)), 4),
"n_samples": len(merged),
}
def predict(self, pair: str, dataframe: pd.DataFrame) -> pd.DataFrame:
result = pd.DataFrame({"ml_signal": 0.5, "ml_confidence": 0.0}, index=dataframe.index)
if not self.is_trained.get(pair):
path = self.model_dir / f"ml_{pair.replace('/', '_')}.pkl"
if path.exists():
try:
saved = joblib.load(path)
self.pipelines[pair] = saved["pipe"]
self.is_trained[pair] = True
except Exception:
return result
if not self.is_trained.get(pair):
return result
features_df = self.build_features(dataframe).dropna()
if features_df.empty:
return result
try:
probs = self.pipelines[pair].predict_proba(features_df.values)[:, 1]
result.loc[features_df.index, "ml_signal"] = probs
result.loc[features_df.index, "ml_confidence"] = np.abs(probs - 0.5) * 2
except Exception as e:
logger.debug(f"ML predict error: {e}")
return result
@staticmethod
def _rsi(series: pd.Series, period: int = 14) -> pd.Series:
delta = series.diff()
gain = delta.clip(lower=0).ewm(span=period, adjust=False).mean()
loss = (-delta.clip(upper=0)).ewm(span=period, adjust=False).mean()
return 100 - (100 / (1 + gain / (loss + 1e-8)))
# ══════════════════════════════════════════════════════════════════════════════
# ⑦ PORTFOLIO ALLOCATOR
# ══════════════════════════════════════════════════════════════════════════════
class PortfolioAllocator:
"""
تخصیص بهینه سرمایه بین پوزیشنهای همزمان:
مشکل اصلی که حل میکند:
BTC + ETH + SOL → corr ≈ 0.9 → در واقع ۱ پوزیشن است
این ماژول وزن واقعی را بر اساس همبستگی تنظیم میکند.
روشها:
• Risk Parity: هر دارایی ریسک مساوی داشته باشد
• MPT: بهینهسازی Sharpe Ratio
• Correlation Penalty: کاهش وزن داراییهای همبسته
"""
ASSET_GROUPS = {
"layer1": ["BTC/USDT", "ETH/USDT", "SOL/USDT", "ADA/USDT", "AVAX/USDT", "DOT/USDT"],
"defi": ["UNI/USDT", "AAVE/USDT", "CRV/USDT", "SUSHI/USDT"],
"layer2": ["MATIC/USDT", "ARB/USDT", "OP/USDT"],
"oracle": ["LINK/USDT", "BAND/USDT"],
"exchange":["BNB/USDT"],
"meme": ["DOGE/USDT", "SHIB/USDT"],
}
MAX_GROUP = 0.40
MAX_SINGLE = 0.25
MIN_POSITION = 0.02
def __init__(self, config: dict = None):
cfg = config or {}
self.method = cfg.get("method", "risk_parity")
self.lookback = cfg.get("lookback_corr", 60)
self._returns: Dict[str, pd.Series] = {}
self.corr_matrix: Optional[pd.DataFrame] = None
def update_returns(self, pair: str, df: pd.DataFrame) -> None:
self._returns[pair] = df["close"].pct_change().dropna().tail(self.lookback)
def build_corr(self) -> pd.DataFrame:
if len(self._returns) < 2:
return pd.DataFrame()
aligned = pd.DataFrame(self._returns).dropna()
if len(aligned) < 10:
return pd.DataFrame()
self.corr_matrix = aligned.corr()
return self.corr_matrix
# ------------------------------------------------------------------
def allocate(
self,
active_pairs: List[str],
signal_scores: Dict[str, float],
volatilities: Dict[str, float],
total_balance: float,
expected_returns: Dict[str, float] = None,
) -> Dict[str, dict]:
if not active_pairs:
return {}
corr = self.build_corr()
if self.method == "mpt" and not corr.empty and expected_returns:
weights = self._mpt(active_pairs, expected_returns, volatilities, corr)
else:
weights = self._risk_parity(active_pairs, volatilities)
weights = self._corr_penalty(weights, corr, active_pairs)
weights = self._group_limit(weights, active_pairs)
weights = self._signal_scale(weights, signal_scores)
weights = self._normalize(weights)
return {
pair: {
"weight": round(w, 4),
"stake": round(w * total_balance, 2),
"group": self._group(pair),
}
for pair, w in weights.items()
if w >= self.MIN_POSITION
}
def portfolio_report(self, allocations: Dict[str, dict]) -> dict:
if not allocations:
return {"status": "empty"}
weights = [v["weight"] for v in allocations.values()]
hhi = sum(w ** 2 for w in weights)
eff_n = round(1 / hhi, 2) if hhi > 0 else 1
avg_corr = 0.0
if self.corr_matrix is not None and not self.corr_matrix.empty:
pairs = [p for p in allocations if p in self.corr_matrix.columns]
vals = [
abs(self.corr_matrix.loc[pairs[i], pairs[j]])
for i in range(len(pairs))
for j in range(i + 1, len(pairs))
if pairs[i] in self.corr_matrix.index
]
avg_corr = round(float(np.mean(vals)), 3) if vals else 0.0
return {
"n_positions": len(allocations),
"effective_n": eff_n,
"avg_correlation": avg_corr,
"diversification": "good" if eff_n > 2.5 else ("medium" if eff_n > 1.5 else "poor"),
"positions": {
p: {"weight_pct": round(v["weight"] * 100, 1), "stake": v["stake"]}
for p, v in allocations.items()
},
}
# ------------------------------------------------------------------
def _risk_parity(self, pairs: List[str], vols: Dict[str, float]) -> Dict[str, float]:
inv = {p: 1.0 / max(vols.get(p, 0.03), 0.001) for p in pairs}
total = sum(inv.values())
return {p: inv[p] / total for p in pairs}
def _mpt(self, pairs, exp_ret, vols, corr) -> Dict[str, float]:
n = len(pairs)
mus = np.array([exp_ret.get(p, 0.01) for p in pairs])
sigs = np.array([vols.get(p, 0.03) for p in pairs])
C = corr.loc[pairs, pairs].values if all(p in corr.columns for p in pairs) else np.eye(n)
cov = np.outer(sigs, sigs) * C
def neg_sharpe(w):
return -(np.dot(w, mus) / (np.sqrt(w @ cov @ w) + 1e-8))
res = minimize(neg_sharpe, np.ones(n) / n, method="SLSQP",
bounds=[(0, self.MAX_SINGLE)] * n,
constraints=[{"type": "eq", "fun": lambda w: np.sum(w) - 1}])
w = res.x if res.success else np.ones(n) / n
return {pairs[i]: float(w[i]) for i in range(n)}
def _corr_penalty(self, weights, corr, pairs) -> Dict[str, float]:
if corr.empty or len(pairs) < 2:
return weights
adj = weights.copy()
for p in pairs:
if p not in corr.columns: continue
others = [o for o in pairs if o != p and o in corr.columns and p in corr.index]
if not others: continue
corr_w = sum(abs(corr.loc[p, o]) * adj.get(o, 0) for o in others)
penalty = min(0.65, corr_w * 0.85)
adj[p] = adj.get(p, 0) * (1 - penalty)
return adj
def _group_limit(self, weights, pairs) -> Dict[str, float]:
totals = {}
for p in pairs:
g = self._group(p)
totals[g] = totals.get(g, 0) + weights.get(p, 0)
adj = weights.copy()
for p in pairs:
g = self._group(p)
if totals.get(g, 0) > self.MAX_GROUP:
adj[p] = adj.get(p, 0) * (self.MAX_GROUP / totals[g])
return adj
def _signal_scale(self, weights, scores) -> Dict[str, float]:
return {p: w * (0.8 + scores.get(p, 0.5) * 0.4) for p, w in weights.items()}
def _normalize(self, weights) -> Dict[str, float]:
clipped = {p: min(max(w, 0), self.MAX_SINGLE) for p, w in weights.items()}
total = sum(clipped.values())
if total <= 0:
n = max(len(clipped), 1)
return {p: 1 / n for p in clipped}
return {p: v / total for p, v in clipped.items()}
def _group(self, pair: str) -> str:
for g, members in self.ASSET_GROUPS.items():
if pair in members: return g
return "other"
# ══════════════════════════════════════════════════════════════════════════════
# ⑧ RISK MANAGER
# ══════════════════════════════════════════════════════════════════════════════
class RiskManager:
"""
مدیریت ریسک چند لایه:
• Kelly Criterion برای position sizing
• Max Drawdown — متوقف کردن ربات
• Dynamic Stop Loss — بر اساس ATR و سود جاری
• Portfolio risk cap — جمع ریسک همه پوزیشنها
"""
def __init__(self, config: dict = None):
cfg = config or {}
self.max_risk_trade = cfg.get("max_risk_per_trade", 0.02)
self.max_dd = cfg.get("max_drawdown_limit", 0.15)
self.kelly_frac = cfg.get("kelly_fraction", 0.25)
self.max_portfolio_r = cfg.get("max_portfolio_risk", 0.06)
self._peak = 0.0
self._current_dd = 0.0
def position_size(
self,
balance: float,
entry: float,
stop: float,
win_rate: float = 0.55,
rr: float = 2.0,
vol_mult: float = 1.0,
regime: str = "ranging",
) -> dict:
# Drawdown breaker
if self._current_dd >= self.max_dd:
return {"stake": 0.0, "reason": "max_drawdown_reached"}
risk_pct = abs(entry - stop) / (entry + 1e-8)
if risk_pct < 0.001:
return {"stake": 0.0, "reason": "stop_too_close"}
# Fixed fractional
risk_usd = balance * self.max_risk_trade
fixed_size = risk_usd / (entry * risk_pct + 1e-8)
# Kelly
k = max(0.0, (win_rate * rr - (1 - win_rate)) / (rr + 1e-8)) * self.kelly_frac
kelly_size = (balance * k) / (entry + 1e-8)
# Combine
size = min(fixed_size, kelly_size)
# Regime multiplier
regime_m = {
MarketRegime.BULL_TREND.value: 1.2,
MarketRegime.BEAR_TREND.value: 0.4,
MarketRegime.RANGING.value: 0.8,
MarketRegime.HIGH_VOLATILITY.value: 0.4,
MarketRegime.LOW_VOL_ACCUM.value: 0.6,
}.get(regime, 0.8)
final_size = size * regime_m * vol_mult
final_stake = min(final_size * entry, balance * 0.20)
return {
"stake": round(final_stake, 2),
"risk_amount": round(risk_usd, 2),
"kelly_f": round(k, 4),
"regime_m": regime_m,
}
def dynamic_sl(self, current_profit: float, atr_pct: float, regime: str) -> float:
base = {
MarketRegime.BULL_TREND.value: -0.04,
MarketRegime.BEAR_TREND.value: -0.03,
MarketRegime.RANGING.value: -0.03,
MarketRegime.HIGH_VOLATILITY.value: -0.06,
MarketRegime.LOW_VOL_ACCUM.value: -0.025,
}.get(regime, -0.04)
atr_sl = -(atr_pct / 100) * 1.5
sl = max(base, atr_sl)
# Protect profit
if current_profit > 0.08: sl = max(sl, current_profit - 0.04)
elif current_profit > 0.04: sl = max(sl, current_profit - 0.03)
elif current_profit > 0.02: sl = max(sl, 0.0)
return round(sl, 4)
def update_dd(self, balance: float) -> None:
if balance > self._peak:
self._peak = balance
if self._peak > 0:
self._current_dd = (self._peak - balance) / self._peak
# ══════════════════════════════════════════════════════════════════════════════
# ⑨ EXECUTION DISCIPLINE
# ══════════════════════════════════════════════════════════════════════════════
class ExecutionDiscipline:
"""
کنترل کیفیت اجرای سفارش:
• Spread check — عدم ورود در spread بیش از حد
• Volume check — عدم ورود در حجم کم
• Anti-gaming — جلوگیری از ورود چندباره در یک سطح
• Slippage estimation
"""
def __init__(self, config: dict = None):
cfg = config or {}
self.max_spread_pct = cfg.get("max_spread_pct", 0.3) # 0.3%
self.min_volume_ratio = cfg.get("min_volume_ratio", 0.5) # 50% of 20-MA
self.anti_game_bars = cfg.get("anti_game_bars", 3)
self._recent_entries: Dict[str, List[float]] = {}
def should_enter(
self,
pair: str,
row: pd.Series,
df: pd.DataFrame,
idx: int,
) -> Tuple[bool, str]:
# ─── Spread check ───
spread_pct = (row.get("ask", row["close"]) - row.get("bid", row["close"])) \
/ (row["close"] + 1e-8) * 100
if spread_pct > self.max_spread_pct:
return False, f"spread_too_wide_{spread_pct:.2f}%"
# ─── Volume check ───
vol_ratio = row.get("volume_ma_ratio", 1.0)
if vol_ratio < self.min_volume_ratio:
return False, f"volume_too_low_{vol_ratio:.2f}x"
# ─── Anti-gaming: اگر اخیراً همین قیمت را ورود زدیم ───
recent = self._recent_entries.get(pair, [])
price = row["close"]
if any(abs(p - price) / (price + 1e-8) < 0.005 for p in recent[-self.anti_game_bars:]):
return False, "anti_gaming_same_level"
return True, "ok"
def record_entry(self, pair: str, price: float) -> None:
if pair not in self._recent_entries:
self._recent_entries[pair] = []
self._recent_entries[pair].append(price)
self._recent_entries[pair] = self._recent_entries[pair][-20:]
def estimate_slippage(self, price: float, volume: float, avg_volume: float) -> float:
"""تخمین slippage بر اساس حجم نسبی"""
vol_ratio = volume / (avg_volume + 1e-8)
if vol_ratio > 2.0: return 0.0005
if vol_ratio > 1.0: return 0.001
if vol_ratio > 0.5: return 0.002
return 0.004
# ══════════════════════════════════════════════════════════════════════════════
# ⑩ RESEARCH PROCESS
# ══════════════════════════════════════════════════════════════════════════════
class ResearchProcess:
"""
ثبت و تحلیل عملکرد استراتژی:
• لاگ پارامترهای هر ورود/خروج
• محاسبه Sharpe, Sortino, Max Drawdown
• ذخیره به JSON برای آنالیز بعدی
"""
def __init__(self, log_dir = "user_data/research"):
self.log_dir = Path(str(log_dir))
self.log_dir.mkdir(parents=True, exist_ok=True)
self.trades: List[dict] = []
def log_trade(self, trade_data: dict) -> None:
self.trades.append({**trade_data, "ts": datetime.now().isoformat()})
path = self.log_dir / "trades_log.jsonl"
with open(path, "a") as f:
f.write(json.dumps(trade_data) + "\n")
def performance_summary(self) -> dict:
if len(self.trades) < 5:
return {"status": "insufficient_data", "n": len(self.trades)}
returns = [t.get("profit_ratio", 0) for t in self.trades]
arr = np.array(returns)
wins = arr[arr > 0]
losses = arr[arr < 0]
sharpe = (arr.mean() / (arr.std() + 1e-8)) * np.sqrt(365)
dd_arr = np.maximum.accumulate(np.cumsum(arr)) - np.cumsum(arr)
max_dd = float(dd_arr.max())
return {
"n_trades": len(returns),
"win_rate": round(float(len(wins) / len(returns)), 3),
"avg_win": round(float(wins.mean()) if len(wins) else 0, 4),
"avg_loss": round(float(losses.mean()) if len(losses) else 0, 4),
"sharpe": round(float(sharpe), 3),
"max_dd": round(max_dd, 4),
"expectancy":round(float(arr.mean()), 4),
}
# ══════════════════════════════════════════════════════════════════════════════
# ⑥ MULTI-STRATEGY SIGNAL ENGINE
# ══════════════════════════════════════════════════════════════════════════════
class MultiStrategyEngine:
"""
سه استراتژی موازی — هر کدام سیگنال مستقل Continuous (0 تا 1) تولید میکنند.
طراحی Soft: هر شرط یک امتیاز میدهد نه یک gate قطعی.
S1 — Trend Following: EMA Cross + MACD + ADX strength
S2 — Mean Reversion: RSI normalized + BB position
S3 — Breakout: Resistance break + Volume surge
تفاوت با نسخه قبلی:
• بجای Boolean (0/1)، مقادیر Continuous برمیگرداند
• RSI threshold به صورت soft (نه binary) محاسبه میشود
• سیگنالها در بازار Sideways هم مقدار غیرصفر دارند
"""
@staticmethod
def trend_signal(df: pd.DataFrame) -> pd.Series:
"""
S1: Trend Following
خروجی: 0.0 (ترند نزولی قوی) تا 1.0 (ترند صعودی قوی)
"""
ema20 = df["close"].ewm(span=20, adjust=False).mean()
ema50 = df["close"].ewm(span=50, adjust=False).mean()
ema12 = df["close"].ewm(span=12, adjust=False).mean()
ema26 = df["close"].ewm(span=26, adjust=False).mean()
macd = ema12 - ema26
sig = macd.ewm(span=9, adjust=False).mean()
hist = macd - sig
# نرمالیزه کردن MACD histogram به 0-1
hist_norm = hist / (df["close"].rolling(50).std() + 1e-8)
hist_score = (hist_norm.clip(-3, 3) + 3) / 6 # 0 تا 1
# EMA alignment score
ema_cross = (ema20 - ema50) / (ema50 + 1e-8) * 100
ema_score = (ema_cross.clip(-3, 3) + 3) / 6
# ADX strength (از dataframe اگر موجود باشد)
adx_score = pd.Series(0.5, index=df.index)
s = ema_score * 0.40 + hist_score * 0.35 + adx_score * 0.25
return s.clip(0, 1)
@staticmethod
def mean_reversion_signal(df: pd.DataFrame) -> pd.Series:
"""
S2: Mean Reversion
خروجی Continuous: RSI پایینتر = سیگنال قویتر
"""
delta = df["close"].diff()
gain = delta.clip(lower=0).ewm(span=14, adjust=False).mean()
loss = (-delta.clip(upper=0)).ewm(span=14, adjust=False).mean()
rsi = 100 - (100 / (1 + gain / (loss + 1e-8)))
# RSI به 0-1 تبدیل میشود: RSI=20 → score=1, RSI=50 → score=0.5, RSI=80 → score=0
rsi_score = (100 - rsi) / 100
# Bollinger Band position: نزدیکی به باند پایین = سیگنال قویتر
bb_mid = df["close"].rolling(20).mean()
bb_std = df["close"].rolling(20).std()
bb_pos = (df["close"] - bb_mid) / (2 * bb_std + 1e-8) # -1 تا +1
bb_score= (-bb_pos.clip(-2, 2) + 2) / 4 # معکوس: پایین = بهتر
# RSI momentum (RSI در حال افزایش از کف)
rsi_mom = (rsi.diff(3).fillna(0) > 0).astype(float) * 0.1
s = rsi_score * 0.55 + bb_score * 0.35 + rsi_mom * 0.10
return s.clip(0, 1)
@staticmethod
def breakout_signal(df: pd.DataFrame) -> pd.Series:
"""
S3: Breakout
خروجی Continuous بر اساس قدرت شکست و حجم
"""
resistance = df["high"].rolling(20).max().shift(1)
support = df["low"].rolling(20).min().shift(1)
vol_ma = df["volume"].rolling(20).mean()
range_size = (resistance - support).clip(lower=1e-8)
# چقدر بالاتر از مقاومت است؟ (normalized)
breakout_pct = (df["close"] - resistance) / range_size
break_score = breakout_pct.clip(-1, 1) * 0.5 + 0.5
# نسبت حجم به میانگین (normalized)
vol_ratio = (df["volume"] / (vol_ma + 1e-8)).clip(0, 4)
vol_score = (vol_ratio / 4)
# کندل صعودی
candle_score = ((df["close"] - df["open"]) / (df["close"] + 1e-8) * 100).clip(-3, 3)
candle_norm = (candle_score + 3) / 6
s = break_score * 0.50 + vol_score * 0.30 + candle_norm * 0.20
return s.clip(0, 1)
# ══════════════════════════════════════════════════════════════════════════════
# MAIN STRATEGY
# ══════════════════════════════════════════════════════════════════════════════
class RegimeNexusStrategy(IStrategy):
"""
RegimeNexusStrategy — همه ماژولها یکپارچه
جریان داده:
Data(OHLCV + Futures) → Regime + Vol + Sentiment + ML
→ MultiStrategy signals
→ Signal Aggregator
→ Portfolio Allocator
→ Risk Manager
→ Execution Discipline
→ Order
"""
# ─── Timeframe ─────────────────────────────────────────────────────
# 1h: تعادل بهینه بین سرعت سیگنال و کاهش نویز
# اطلاعات 4h برای تأیید ترند کلی استفاده میشود (informative)
timeframe = "1h"
informative_timeframes = ["4h"] # ترند کلی از 4h تأیید میشود
startup_candle_count: int = 200 # 200 کندل برای EMA200 و GARCH
process_only_new_candles = True
use_custom_stoploss = True
stoploss = -0.05 # کمی looser برای نوسانات 2021
trailing_stop = True
trailing_stop_positive = 0.025 # از 0.015 به 0.025 — کمتر حساس
trailing_stop_positive_offset = 0.04 # از 0.025 به 0.04 — فضای بیشتر
trailing_only_offset_is_reached = True
minimal_roi = {
"0": 0.05, # 5% هر زمان (کاهش از 6% — سریعتر خروج)
"60": 0.03, # 3% بعد از 1 ساعت
"120": 0.02, # 2% بعد از 2 ساعت
"180": 0.01, # 1% بعد از 3 ساعت
"300": 0.0, # breakeven بعد از 5 ساعت
}
# ══════════════════════════════════════════════════════════════════
# Hyperopt Parameters — بازنویسی کامل، بدون duplicate
# فلسفه: Technical Core قوی + ML به عنوان فیلتر کمکی
# ══════════════════════════════════════════════════════════════════
# ── آستانههای ورود/خروج ──
buy_composite_threshold = DecimalParameter(0.30, 0.55, default=0.40, space="buy", optimize=True)
sell_composite_threshold = DecimalParameter(0.15, 0.40, default=0.30, space="sell", optimize=True)
# ── وزنهای Technical Core (مجموع ~0.7) ──
trend_weight = DecimalParameter(0.15, 0.40, default=0.25, space="buy", optimize=True)
mr_weight = DecimalParameter(0.10, 0.35, default=0.20, space="buy", optimize=True)
breakout_weight = DecimalParameter(0.15, 0.40, default=0.25, space="buy", optimize=True)
# ── وزنهای Auxiliary (مجموع ~0.3) ──
ml_weight = DecimalParameter(0.00, 0.15, default=0.05, space="buy", optimize=True)
futures_weight = DecimalParameter(0.08, 0.20, default=0.12, space="buy", optimize=True)
sentiment_weight = DecimalParameter(0.05, 0.18, default=0.10, space="buy", optimize=True)
# ── آستانههای ML (با اطمینان پایینتر) ──
buy_ml_threshold = DecimalParameter(0.48, 0.62, default=0.52, space="buy", optimize=True)
buy_ml_confidence_min = DecimalParameter(0.05, 0.25, default=0.08, space="buy", optimize=True)
# ── آستانههای RSI ──
buy_rsi_threshold = IntParameter(25, 45, default=35, space="buy", optimize=True)
sell_rsi_threshold = IntParameter(55, 80, default=68, space="sell", optimize=True)
# ── فیلترهای optional ──
regime_filter_enabled = CategoricalParameter([True, False], default=True, space="buy")
vol_filter_enabled = CategoricalParameter([True, False], default=False, space="buy")
use_ema200_filter = CategoricalParameter([True, False], default=False, space="buy")
wavelet_weight = DecimalParameter(0.05, 0.25, default=0.12, space="buy", optimize=True)
dwt_trend_filter = CategoricalParameter([True, False], default=True, space="buy")
# ─── init ──────────────────────────────────────────────────────────
def __init__(self, config: dict) -> None:
super().__init__(config)
user_data_dir = Path(str(config.get("user_data_dir", "user_data")))
self.regime_detector = RegimeDetector()
self.vol_forecaster = VolatilityForecaster()
self.sentiment_analyzer = SentimentAnalyzer()
self.price_predictor = PricePredictor(
model_dir=user_data_dir / "models"
)
self.portfolio_allocator = PortfolioAllocator({
"method": "risk_parity",
"lookback_corr": 60,
})
self.risk_manager = RiskManager({
"max_risk_per_trade": 0.015, # از 0.02 به 0.015 — محافظهکارانهتر
"max_drawdown_limit": 0.35, # از 0.15 به 0.35 — جلوگیری از توقف زودهنگام
"kelly_fraction": 0.20, # از 0.25 به 0.20 — کمتر aggressive
"max_portfolio_risk": 0.08,
})
self.execution = ExecutionDiscipline({
"max_spread_pct": 0.3,
"min_volume_ratio": 0.5,
})
self.research = ResearchProcess(
log_dir=user_data_dir / "research"
)
self._futures: Dict[str, FuturesDataCollector] = {}
self._sentiment_cache: Dict[str, dict] = {}
self._last_trained: Dict[str, datetime] = {}
self._last_portfolio_report: dict = {}
# Cache برای دسترسی RiskManager و PortfolioAllocator به دادههای واقعی
# کلید: pair → آخرین ردیف dataframe (dict)
self._last_candle: Dict[str, dict] = {}
# کلید: pair → composite_score آخرین کندل
self._last_scores: Dict[str, float] = {}
# کلید: pair → vol_size_mult آخرین کندل
self._last_vols: Dict[str, float] = {}
# ══════════════════════════════════════════════════════════════════
# informative_pairs — تعریف دادههای چندتایمفریم
# ══════════════════════════════════════════════════════════════════
def informative_pairs(self):
"""
Freqtrade این جفتارزها را در تایمفریمهای اضافی دانلود میکند.
داده 4h برای تأیید ترند اصلی (trend confirmation) استفاده میشود:
• EMA20/50 روی 4h → ترند میانمدت
• RSI روی 4h → اشباع خرید/فروش کلیتر
• ADX روی 4h → قدرت ترند کلی
"""
pairs = self.dp.current_whitelist() if self.dp else []
return [(pair, "4h") for pair in pairs]
def _get_4h_trend(self, pair: str, dataframe: pd.DataFrame) -> pd.DataFrame:
"""
دادههای 4h را به dataframe اصلی 1h merge میکند.
اگر dataframe 4h موجود نباشد، مقادیر neutral برمیگرداند.
"""
try:
if not self.dp:
raise ValueError("dp not available")
df_4h = self.dp.get_pair_dataframe(pair=pair, timeframe="4h")
if df_4h is None or len(df_4h) < 20:
raise ValueError("insufficient 4h data")
# اندیکاتورهای 4h
df_4h["ema20_4h"] = df_4h["close"].ewm(span=20, adjust=False).mean()
df_4h["ema50_4h"] = df_4h["close"].ewm(span=50, adjust=False).mean()
delta_4h = df_4h["close"].diff()
gain_4h = delta_4h.clip(lower=0).ewm(span=14, adjust=False).mean()
loss_4h = (-delta_4h.clip(upper=0)).ewm(span=14, adjust=False).mean()
df_4h["rsi_4h"] = 100 - (100 / (1 + gain_4h / (loss_4h + 1e-8)))
# trend score از 4h: +1 (صعودی) تا -1 (نزولی)
df_4h["trend_4h"] = np.where(
df_4h["ema20_4h"] > df_4h["ema50_4h"], 1.0, -1.0
)
# انتخاب ستونهای مورد نیاز و rename
df_4h = df_4h[["date", "ema20_4h", "ema50_4h", "rsi_4h", "trend_4h"]].copy()
df_4h.columns = ["date", "ema20_4h", "ema50_4h", "rsi_4h", "trend_4h"]
df_4h["date_merge"] = df_4h["date"]
# merge با forward fill (هر کندل 1h از 4h مربوطه استفاده میکند)
dataframe = pd.merge_asof(
dataframe.sort_values("date"),
df_4h[["date_merge", "ema20_4h", "ema50_4h", "rsi_4h", "trend_4h"]]
.rename(columns={"date_merge": "date"}),
on="date",
direction="backward",
)
return dataframe
except Exception as e:
logger.debug(f"4h data unavailable: {e}")
dataframe["ema20_4h"] = dataframe["close"]
dataframe["ema50_4h"] = dataframe["close"]
dataframe["rsi_4h"] = 50.0
dataframe["trend_4h"] = 0.0
return dataframe
# ══════════════════════════════════════════════════════════════════
# populate_indicators — قلب استراتژی
# ══════════════════════════════════════════════════════════════════
def populate_indicators(self, dataframe: pd.DataFrame, metadata: dict) -> pd.DataFrame:
pair = metadata["pair"]
# ── دادههای 4h برای تأیید ترند ────────────────────────────
dataframe = self._get_4h_trend(pair, dataframe)
# ── اندیکاتورهای پایه ──────────────────────────────────────
dataframe["rsi_14"] = ta.RSI(dataframe, timeperiod=14)
dataframe["rsi_7"] = ta.RSI(dataframe, timeperiod=7)
dataframe["ema_20"] = ta.EMA(dataframe, timeperiod=20)
dataframe["ema_50"] = ta.EMA(dataframe, timeperiod=50)
dataframe["ema_200"] = ta.EMA(dataframe, timeperiod=200)
bb = ta.BBANDS(dataframe, timeperiod=20, nbdevup=2.0, nbdevdn=2.0)
dataframe["bb_upper"], dataframe["bb_mid"], dataframe["bb_lower"] = bb
macd = ta.MACD(dataframe, fastperiod=12, slowperiod=26, signalperiod=9)
dataframe["macd"] = macd["macd"]
dataframe["macd_signal"] = macd["macdsignal"]
dataframe["macd_hist"] = macd["macdhist"]
dataframe["atr"] = ta.ATR(dataframe, timeperiod=14)
dataframe["atr_pct"] = dataframe["atr"] / dataframe["close"] * 100
dataframe["adx"] = ta.ADX(dataframe, timeperiod=14)
dataframe["cci"] = ta.CCI(dataframe, timeperiod=20)
dataframe["mfi"] = ta.MFI(dataframe, timeperiod=14)
dataframe["volume_ma"] = dataframe["volume"].rolling(20).mean()
dataframe["volume_ma_ratio"] = dataframe["volume"] / (dataframe["volume_ma"] + 1e-8)
# ── ① Regime Detection ─────────────────────────────────────
dataframe["regime"] = self.regime_detector.detect(dataframe)
# ── ② Volatility Forecast ──────────────────────────────────
vol_df = self.vol_forecaster.forecast(dataframe)
dataframe["vol_forecast"] = vol_df["vol_forecast"]
dataframe["vol_regime"] = vol_df["vol_regime"]
dataframe["vol_size_mult"] = vol_df["vol_size_mult"]
dataframe["atr_percentile"] = vol_df["atr_percentile"]
dataframe["realized_vol"] = vol_df["realized_vol"]
# ── ③ Futures Data ─────────────────────────────────────────
# در backtesting از API خارجی استفاده نمیکنیم — مقادیر neutral
# در live trading این بلاک فعال میشود
_is_live = self.config.get("runmode", "").value in ("live", "dry_run") if hasattr(self.config.get("runmode", ""), "value") else False
if _is_live:
if pair not in self._futures:
self._futures[pair] = FuturesDataCollector(pair)
fsig = self._futures[pair].get_composite_signal()
else:
fsig = {"composite": 0.0, "funding_rate": 0.0,
"signal": "neutral", "oi_trend": "neutral"}
dataframe["futures_composite"] = fsig["composite"]
dataframe["funding_rate"] = fsig["funding_rate"]
dataframe["futures_signal"] = fsig["signal"]
dataframe["oi_trend"] = fsig["oi_trend"]
# ── ④ Sentiment ────────────────────────────────────────────
# در backtesting مقدار neutral — در live از API میخوانیم
if _is_live:
hour_key = f"{pair}_{datetime.now().strftime('%Y%m%d%H')}"
if hour_key not in self._sentiment_cache:
self._sentiment_cache[hour_key] = self.sentiment_analyzer.get_score(pair)
sent = self._sentiment_cache[hour_key]
else:
sent = {"score": 0.0, "fear_greed": 50}
dataframe["sentiment_score"] = sent["score"]
dataframe["fear_greed_index"] = sent["fear_greed"]
# ── ⑤ ML Price Prediction ──────────────────────────────────
# در backtesting: فقط یکبار train — در live: هر 24 ساعت
last_train = self._last_trained.get(pair)
retrain_secs = 86400 if _is_live else float("inf")
should_train = (
last_train is None or
(datetime.now() - last_train).total_seconds() > retrain_secs
) and len(dataframe) > 500
if should_train:
try:
result = self.price_predictor.train(pair, dataframe)
self._last_trained[pair] = datetime.now()
logger.info(f"[{pair}] ML trained: {result}")
except Exception as e:
logger.warning(f"[{pair}] ML train failed: {e}")
pred_df = self.price_predictor.predict(pair, dataframe)
dataframe["ml_signal"] = pred_df["ml_signal"]
dataframe["ml_confidence"] = pred_df["ml_confidence"]
# ── ⑥ Multi-Strategy Signals ───────────────────────────────
dataframe["sig_trend"] = MultiStrategyEngine.trend_signal(dataframe)
dataframe["sig_mr"] = MultiStrategyEngine.mean_reversion_signal(dataframe)
dataframe["sig_breakout"] = MultiStrategyEngine.breakout_signal(dataframe)
# ── Signal Aggregator ──────────────────────────────────────
dataframe["composite_score"] = self._aggregate(dataframe)
# ── Portfolio Allocator — بهروزرسانی بازدهی ───────────────
self.portfolio_allocator.update_returns(pair, dataframe)
# ── ذخیره آخرین کندل برای custom_stoploss و custom_stake_amount ──
if len(dataframe) > 0:
last = dataframe.iloc[-1]
self._last_candle[pair] = {
"atr_pct": float(last.get("atr_pct", 1.5)),
"regime": str(last.get("regime", MarketRegime.RANGING.value)),
"vol_size_mult": float(last.get("vol_size_mult", 1.0)),
"realized_vol": float(last.get("realized_vol", 0.03)),
"atr_percentile": float(last.get("atr_percentile", 50.0)),
}
self._last_scores[pair] = float(last.get("composite_score", 0.5))
self._last_vols[pair] = float(last.get("vol_size_mult", 1.0))
return dataframe
# ══════════════════════════════════════════════════════════════════
# Signal Aggregator — وزندهی و ترکیب همه سیگنالها
# ══════════════════════════════════════════════════════════════════
def _aggregate(self, df: pd.DataFrame) -> pd.Series:
"""
وزنها از Hyperopt پارامترها میآیند.
مجموع وزنها نرمالیزه میشود.
"""
tw = float(self.trend_weight.value)
mw = float(self.mr_weight.value)
bw = float(self.breakout_weight.value)
mlw = float(self.ml_weight.value)
sw = float(self.sentiment_weight.value)
fw = float(self.futures_weight.value)
ww = float(self.wavelet_weight.value)
total_w = tw + mw + bw + mlw + sw + fw + ww + 1e-8
# نرمالیزه کردن sentiment و futures به 0-1
sent_norm = (df["sentiment_score"].fillna(0) + 1) / 2
futures_norm = (df["futures_composite"].fillna(0) + 1) / 2
ml_sig = df["ml_signal"].fillna(0.5)
# تأیید ترند 4h — وزن اضافی برای همراستایی تایمفریمها
trend_4h_norm = (df["trend_4h"].fillna(0) + 1) / 2 # 0 تا 1
trend_4h_w = 0.10
total_w_adj = total_w + trend_4h_w
score = (
df["sig_trend"].fillna(0) * tw +
df["sig_mr"].fillna(0) * mw +
df["sig_breakout"].fillna(0) * bw +
ml_sig * mlw +
sent_norm * sw +
futures_norm * fw +
trend_4h_norm * trend_4h_w
) / (total_w_adj + 1e-8)
return score.clip(0, 1)
# ══════════════════════════════════════════════════════════════════
# Entry Conditions — Soft Scoring Architecture
# ══════════════════════════════════════════════════════════════════
def populate_entry_trend(self, dataframe: pd.DataFrame, metadata: dict) -> pd.DataFrame:
"""
معماری ورود دو لایه:
لایه ۱ — Hard Gates (فیلترهای مطلق — کمترین تعداد ممکن):
فقط شرایطی که بازار را واقعاً غیرقابل ترید میکند.
مثال: HIGH_VOLATILITY شدید یا funding بسیار بالا.
لایه ۲ — Soft Score (امتیازدهی ترکیبی):
هر سیگنال یک امتیاز میدهد.
فقط یک آستانه واحد روی مجموع امتیازات اعمال میشود.
این معماری Signal Starvation را برطرف میکند.
"""
# ══ لایه ۱: Hard Gates — فقط موارد واقعاً بحرانی ══
# فقط در Volatility خیلی شدید معامله نمیکنیم
hard_block = dataframe["regime"] == MarketRegime.HIGH_VOLATILITY.value
# funding rate بیش از حد بالا — هشدار Long Squeeze
funding_block = dataframe["funding_rate"] > 0.10
hard_ok = ~hard_block & ~funding_block
# ══ لایه ۲: Soft Scoring ══
score = pd.Series(0.0, index=dataframe.index)
# ── Composite signal از Signal Aggregator (وزن بالا) ──
score += dataframe["composite_score"].fillna(0.5) * 3.0
# ── ML signal (اگر آموزش دیده) ──
ml_sig = dataframe["ml_signal"].fillna(0.5)
ml_conf = dataframe["ml_confidence"].fillna(0.0)
# ML فقط وقتی confidence کافی دارد وزن میگیرد
ml_contribution = (ml_sig - 0.5) * ml_conf.clip(0, 1) * 2.0
score += ml_contribution.clip(-1, 1) + 1.0 # shift به 0-2
# ── Regime bonus/penalty ──
if self.regime_filter_enabled.value:
regime_bonus = dataframe["regime"].map({
MarketRegime.BULL_TREND.value: +0.5,
MarketRegime.RANGING.value: 0.0,
MarketRegime.LOW_VOL_ACCUM.value: +0.2,
MarketRegime.BEAR_TREND.value: -0.8,
MarketRegime.HIGH_VOLATILITY.value: -0.3,
}).fillna(0.0)
score += regime_bonus
# ── Volatility soft filter ──
if self.vol_filter_enabled.value:
vol_penalty = dataframe["vol_regime"].map({
"low": +0.2,
"medium": 0.0,
"high": -0.4,
}).fillna(0.0)
score += vol_penalty
# ── EMA200 trend filter (optional) ──
if self.use_ema200_filter.value:
ema200_ok = (dataframe["close"] > dataframe["ema_200"]).astype(float)
score += (ema200_ok - 0.5) * 0.4
# ── RSI اشباع فروش (برای mean reversion) ──
rsi_bonus = (dataframe["rsi_14"] < self.buy_rsi_threshold.value).astype(float) * 0.3
score += rsi_bonus
# ── Volume حداقلی (فقط جلوی حجم خیلی پایین را میگیریم) ──
vol_ok = dataframe["volume_ma_ratio"] > 0.3 # threshold پایینتر
score += vol_ok.astype(float) * 0.2
# ══ ترکیب نهایی ══
# نرمالیزه کردن score به 0-1
score_min, score_max = 0.0, 6.7 # حداکثر ممکن
score_norm = (score - score_min) / (score_max - score_min)
score_norm = score_norm.clip(0, 1)
# ذخیره برای debug
dataframe["entry_score"] = score_norm
# ورود: Hard gate + آستانه امتیاز
dataframe.loc[
hard_ok & (score_norm >= self.buy_composite_threshold.value),
"enter_long"
] = 1
return dataframe
# ══════════════════════════════════════════════════════════════════
# Exit Conditions — Soft Exit Scoring
# ══════════════════════════════════════════════════════════════════
def populate_exit_trend(self, dataframe: pd.DataFrame, metadata: dict) -> pd.DataFrame:
"""
خروج نیز با Soft Scoring:
سیگنال خروج وقتی چند شرط همزمان ضعیف شوند، نه فقط یکی.
این از Early Exit جلوگیری میکند.
"""
exit_score = pd.Series(0.0, index=dataframe.index)
# ─── Bear Trend قوی ───────────────────────────────────────────
regime_bad = (dataframe["regime"] == MarketRegime.BEAR_TREND.value).astype(float)
exit_score += regime_bad * 1.5
# ─── Composite score ضعیف ─────────────────────────────────────
# فقط وقتی خیلی پایین است (نه فقط زیر threshold)
comp_weakness = (0.4 - dataframe["composite_score"].fillna(0.5)).clip(0, 0.4) / 0.4
exit_score += comp_weakness * 1.0
# ─── ML نزولی با اطمینان کافی ────────────────────────────────
ml_sig = dataframe["ml_signal"].fillna(0.5)
ml_conf = dataframe["ml_confidence"].fillna(0.0)
ml_exit = ((0.45 - ml_sig).clip(0, 0.45) / 0.45) * ml_conf.clip(0, 1)
exit_score += ml_exit * 0.8
# ─── RSI اشباع خرید ──────────────────────────────────────────
rsi_exit = (dataframe["rsi_14"] > self.sell_rsi_threshold.value).astype(float) * 0.5
exit_score += rsi_exit
# ─── High Volatility ناگهانی (برای حفظ سود) ─────────────────
hv_exit = (dataframe["regime"] == MarketRegime.HIGH_VOLATILITY.value).astype(float) * 0.8
exit_score += hv_exit
# ─── خروج اجباری فقط در شرایط بحرانی (Hard) ─────────────────
# funding شدیداً منفی = Short Squeeze در راه
hard_exit = dataframe["futures_composite"].fillna(0) < -0.7
# ─── آستانه خروج ─────────────────────────────────────────────
exit_norm = (exit_score / 4.6).clip(0, 1)
dataframe["exit_score"] = exit_norm
dataframe.loc[
hard_exit | (exit_norm >= self.sell_composite_threshold.value + 0.15),
"exit_long"
] = 1
return dataframe
# ══════════════════════════════════════════════════════════════════
# Custom Stop Loss — Risk Manager
# ══════════════════════════════════════════════════════════════════
# شکل اصلاحشده و استاندارد برای Freqtrade 2026
def custom_stoploss(self,
pair: str,
trade: 'Trade',
current_time: 'datetime',
current_rate: float,
current_profit: float,
after_fill: bool = False,
**kwargs) -> float:
"""
Dynamic Stop Loss — Freqtrade 2026.x
trade از طریق kwargs["trade"] دریافت میشود
"""
try:
min_profit = 0.0
trade = kwargs.get("trade")
pair = trade.pair if trade else "unknown"
candle = self._last_candle.get(pair, {})
atr_pct = candle.get("atr_pct", 1.5)
regime = candle.get("regime", MarketRegime.RANGING.value)
return self.risk_manager.dynamic_sl(current_profit, atr_pct, regime)
except Exception:
return self.stoploss
# ══════════════════════════════════════════════════════════════════
# Custom Stake Amount — Portfolio Allocator + Risk Manager
# ══════════════════════════════════════════════════════════════════
def custom_stake_amount(
self,
pair: str,
current_time: datetime,
current_rate: float,
proposed_stake: float,
min_stake: Optional[float],
max_stake: float,
leverage: float,
entry_tag: Optional[str],
side: str,
**kwargs
) -> float:
"""
امضای رسمی Freqtrade 2026.x
proposed_stake = مقدار پیشنهادی freqtrade
max_stake = حداکثر مجاز برای این trade
"""
try:
# محاسبه balance واقعی
try:
open_trades = Trade.get_open_trades()
open_capital = sum(t.stake_amount for t in open_trades)
active_pairs = [t.pair for t in open_trades]
# balance واقعی = پوزیشنهای باز + سرمایه آزاد
total_balance = open_capital + proposed_stake
except Exception:
active_pairs = []
total_balance = proposed_stake
# بهروزرسانی drawdown با balance واقعی
self.risk_manager.update_dd(total_balance)
# اگر drawdown از حد گذشت، معامله نمیکنیم
if self.risk_manager._current_dd >= self.risk_manager.max_dd:
logger.warning(f"Max drawdown reached: {self.risk_manager._current_dd:.1%}")
return 0.0
# Portfolio Allocator با دادههای واقعی
all_pairs = list(set(active_pairs + [pair]))
vols = {p: self._last_candle.get(p, {}).get("realized_vol", 0.03) for p in all_pairs}
signals = {p: self._last_scores.get(p, 0.5) for p in all_pairs}
allocations = self.portfolio_allocator.allocate(
active_pairs = all_pairs,
signal_scores = signals,
volatilities = vols,
total_balance = total_balance,
)
alloc_stake = allocations.get(pair, {}).get("stake", proposed_stake * 0.8)
# Risk Manager — vol_mult و regime واقعی
stop_price = current_rate * (1 + self.stoploss)
real_vol_mult = self._last_vols.get(pair, 0.8)
real_regime = self._last_candle.get(pair, {}).get("regime", MarketRegime.RANGING.value)
risk_info = self.risk_manager.position_size(
balance = proposed_stake,
entry = current_rate,
stop = stop_price,
vol_mult = real_vol_mult,
regime = real_regime,
)
risk_stake = risk_info.get("stake", proposed_stake)
# final = کوچکترین مقدار از همه محدودیتها
final_stake = min(alloc_stake, risk_stake, max_stake, proposed_stake)
final_stake = max(final_stake, min_stake or 1.0)
report = self.portfolio_allocator.portfolio_report(allocations)
if report.get("diversification") == "poor":
logger.warning(f"Poor diversification: {report}")
logger.info(
f"[{pair}] stake={final_stake:.2f} | proposed={proposed_stake:.2f} | "
f"regime={real_regime} | dd={self.risk_manager._current_dd:.1%}"
)
return round(final_stake, 2)
except Exception as e:
logger.error(f"custom_stake_amount error: {e}")
return proposed_stake
# ══════════════════════════════════════════════════════════════════
# confirm_trade_entry — Execution Discipline
# ══════════════════════════════════════════════════════════════════
def confirm_trade_entry(
self,
pair: str,
order_type: str,
amount: float,
rate: float,
time_in_force: str,
entry_tag: Optional[str],
side: str,
**kwargs
) -> bool:
"""
آخرین چک قبل از ارسال سفارش:
① Spread check — spread بیش از حد
② Volume check — حجم کافی
③ Anti-gaming — ورود مجدد در همان سطح
④ Slippage estimate — هشدار اگر slippage بالا است
"""
try:
candle = self._last_candle.get(pair, {})
vol_ratio = self._last_candle.get(pair, {}).get("atr_percentile", 50) / 100
row = pd.Series({
"close": rate,
"ask": rate * 1.0005,
"bid": rate * 0.9995,
"volume_ma_ratio": 1.0 - vol_ratio * 0.3, # تخمین از ATR percentile
})
# ① + ② + ③
ok, reason = self.execution.should_enter(pair, row, pd.DataFrame(), 0)
if not ok:
logger.info(f"[{pair}] Entry blocked: {reason}")
return False
# ④ Slippage estimation
avg_vol = 1_000_000 # fallback
est_slip = self.execution.estimate_slippage(rate, avg_vol * vol_ratio, avg_vol)
if est_slip > 0.003: # slippage بیش از 0.3%
logger.warning(f"[{pair}] High slippage estimated: {est_slip:.3%} — proceeding with caution")
self.execution.record_entry(pair, rate)
logger.debug(f"[{pair}] Entry confirmed @ {rate:.4f} | slip≈{est_slip:.3%}")
return True
except Exception as e:
logger.debug(f"confirm_trade_entry error: {e}")
return True
# ══════════════════════════════════════════════════════════════════
# confirm_trade_exit — Research log
# ══════════════════════════════════════════════════════════════════
def confirm_trade_exit(
self,
pair: str,
trade: "Trade",
order_type: str,
amount: float,
rate: float,
time_in_force: str,
exit_reason: str,
current_time: datetime,
**kwargs
) -> bool:
"""ثبت اطلاعات معامله برای Research Process"""
try:
self.research.log_trade({
"pair": pair,
"open_rate": trade.open_rate,
"close_rate": rate,
"profit_ratio": trade.calc_profit_ratio(rate),
"exit_reason": exit_reason,
"duration_h": (current_time - trade.open_date).total_seconds() / 3600,
"stake": trade.stake_amount,
})
except Exception as e:
logger.debug(f"Research log error: {e}")
return True
# ══════════════════════════════════════════════════════════════════
# bot_loop_start — گزارش دورهای
# ══════════════════════════════════════════════════════════════════
def bot_loop_start(self, current_time: datetime, **kwargs) -> None:
"""هر دور اصلی ربات — گزارش عملکرد هر ساعت"""
try:
if not hasattr(self, "_last_report"):
self._last_report = datetime.min
if (current_time - self._last_report).total_seconds() > 3600:
summary = self.research.performance_summary()
logger.info(f"=== PERFORMANCE SUMMARY === {summary}")
self._last_report = current_time
except Exception:
pass