Donchian 돌파 + 거래량 배수 + (옵션) Fear&Greed 게이트 - 룩어헤드 방지: Donchian 상단은 shift(1) - 파라미터 우선순위: 1) config.strategy_parameters[TestDonchianFearGreedStrategy] 2) Hyperopt Parameter .value 3) buy_params (디폴트)
Timeframe
5m
Direction
Long Only
Stoploss
-8.0%
Trailing Stop
No
ROI
0m: 5.0%, 60m: 2.0%, 240m: 0.0%
Interface Version
N/A
Startup Candles
120
Indicators
2
freqtrade/freqtrade-strategies
# -*- coding: utf-8 -*-
from typing import Dict, Any, List
from pathlib import Path
import json
import pandas as pd
import numpy as np
from freqtrade.strategy.interface import IStrategy
from freqtrade.strategy import IntParameter, DecimalParameter, RealParameter
class TestDonchianFearGreedStrategy(IStrategy):
"""
Donchian 돌파 + 거래량 배수 + (옵션) Fear&Greed 게이트
- 룩어헤드 방지: Donchian 상단은 shift(1)
- 파라미터 우선순위:
1) config.strategy_parameters[TestDonchianFearGreedStrategy]
2) Hyperopt Parameter .value
3) buy_params (디폴트)
"""
timeframe = "5m"
startup_candle_count = 120
# ---- 디폴트 파라미터 ----
buy_params = {
"use_fg": 1,
"buy_fg_threshold": -999, # config/JSON로 주입 권장
"buy_vol_mult": 1.95,
"buy_dc_period": 19,
}
# ---- 하이퍼옵트/실행 파라미터 ----
use_fg = IntParameter(0, 1, default=buy_params["use_fg"], space="buy", optimize=False)
buy_fg_threshold = IntParameter(45, 80, default=buy_params["buy_fg_threshold"], space="buy", optimize=False)
buy_vol_mult = DecimalParameter(1.00, 3.00, default=buy_params["buy_vol_mult"], decimals=2, space="buy", optimize=True)
buy_dc_period = IntParameter(12, 40, default=buy_params["buy_dc_period"], space="buy", optimize=True)
# ROI/SL (config 오버라이드가 있으면 거기가 우선)
minimal_roi = {"0": 0.05, "60": 0.02, "240": 0.0}
stoploss = -0.08
# ---------------- 내부 유틸 ----------------
def informative_pairs(self):
# ✅ 레짐 필터용 BTC 데이터
return [("BTC/USDT", self.timeframe)]
@staticmethod
def _pair_safe(pair: str) -> str:
return pair.replace("/", "_").replace(":", "_")
def _strategy_overrides(self) -> Dict[str, Any]:
cfg = self.config or {}
sp = cfg.get("strategy_parameters", {})
return sp.get(self.__class__.__name__, {}) if isinstance(sp, dict) else {}
def _param(self, key: str, cast=None):
# 1) config override
ov = self._strategy_overrides()
if key in ov:
v = ov[key]
return v if v is None or cast is None else cast(v)
# 2) Hyperopt Parameter .value
if hasattr(self, key):
attr = getattr(self, key)
if hasattr(attr, "value"):
v = attr.value
return v if v is None or cast is None else cast(v)
# 3) defaults
v = self.buy_params.get(key, None)
return v if v is None or cast is None else cast(v)
# ---------------- 내부: ENTRY param 읽기 (config 우선) ----------------
def _entry_param(self, key: str, cast=None, default=None, **kwargs):
"""
✅ ENTRY 단일 소스(IGNORE) 강제:
- 오직 config[strategy_parameters][TestDonchianFearGreedStrategyFG]만 읽는다.
- key 제한 없음 (use_trend_filter, trend_ema_period 포함)
- 없으면 Parameter.value → default(있으면) → buy_params 순으로 폴백
"""
cfg = self.config or {}
sp = cfg.get("strategy_parameters", {})
if isinstance(sp, dict):
fg = sp.get("TestDonchianFearGreedStrategyFG", {})
if isinstance(fg, dict) and key in fg:
v = fg.get(key)
try:
return cast(v) if cast else v
except Exception:
return default
# fallback: Parameter .value
if hasattr(self, key):
attr = getattr(self, key)
if hasattr(attr, "value"):
v = attr.value
try:
return cast(v) if cast else v
except Exception:
return default
# fallback: default
if default is not None:
try:
return cast(default) if cast else default
except Exception:
return default
# fallback: buy_params
defaults = getattr(self, "buy_params", {}) or {}
v = defaults.get(key, None)
try:
return cast(v) if cast else v
except Exception:
return v
def _candidate_fg_paths(self, pair: str) -> List[Path]:
userdir = Path(self.config.get("user_data_dir", "/freqtrade/user_data"))
safe = self._pair_safe(pair)
return [
userdir / "collectors" / "data" / safe / "fear_greed.json",
userdir / "collectors" / "data" / "fear_greed.json",
userdir / "data" / "custom" / "fear_greed.csv",
userdir / "data" / "custom" / "fear_greed.json",
]
def _load_fg_series(self, pair: str) -> pd.Series:
"""
다양한 포맷(csv/json/jsonl)을 수용해서 FG 시계열 반환(UTC, float)
파일 없거나 파싱 실패 → 상수 50 폴백
"""
for p in self._candidate_fg_paths(pair):
if not p.exists():
continue
try:
if p.suffix.lower() == ".csv":
df = pd.read_csv(p)
else:
txt = p.read_text(encoding="utf-8").strip()
try:
data = json.loads(txt) # list or dict
if isinstance(data, dict):
data = data.get("data", [])
except Exception:
data = [json.loads(line) for line in txt.splitlines() if line.strip()]
df = pd.DataFrame(data)
# 컬럼 정규화
if "timestamp" in df.columns:
t = pd.to_datetime(df["timestamp"], unit="s", utc=True, errors="coerce")
elif "date" in df.columns:
t = pd.to_datetime(df["date"], utc=True, errors="coerce")
else:
continue
s = pd.Series(df["value"].astype(float).values, index=t, name="fg").sort_index()
s = s.dropna().ffill()
if len(s):
return s
except Exception:
continue
# 폴백
idx = pd.date_range("2000-01-01", periods=1, tz="UTC")
return pd.Series([50.0], index=idx, name="fg")
# ---------------- 지표 ----------------
def populate_indicators(self, dataframe: pd.DataFrame, metadata: Dict[str, Any]) -> pd.DataFrame:
df = dataframe.copy()
df["date"] = pd.to_datetime(df["date"], utc=True, errors="coerce")
p = int(self._entry_param('buy_dc_period', int, default=19))
vol_mult = float(self._entry_param('buy_vol_mult', float, default=1.95))
# Donchian
df["dc_high"] = df["high"].rolling(p, min_periods=p).max().shift(1)
df["dc_low"] = df["low"].rolling(p, min_periods=p).min().shift(1)
# 거래량 배수
vol_ma = df["volume"].rolling(p, min_periods=p).mean()
df["vol_ok"] = df["volume"] > (vol_ma * vol_mult)
pair = metadata.get("pair", "?")
true_rate = float(df["vol_ok"].mean() * 100.0)
import logging
logging.getLogger("freqtrade.strategy").info(
"[VOL][FINAL] pair=%s param=%.2f used=%.2f true_rate=%.1f%%", pair, float(getattr(self.buy_vol_mult, "value", -1)), vol_mult, true_rate
)
# Fear & Greed 병합
try:
s = self._load_fg_series(metadata.get("pair", "BTC/USDT"))
fgdf = s.rename("fg").to_frame().reset_index().rename(columns={"index": "date"})
df = pd.merge_asof(
df.sort_values("date"),
fgdf.sort_values("date"),
on="date",
direction="backward",
tolerance=pd.Timedelta("30D"),
)
df["fg"] = df["fg"].fillna(50.0).astype(float)
except Exception:
df["fg"] = 50.0
return df
# ---------------- 매수 ----------------
def populate_buy_trend(self, dataframe: pd.DataFrame, metadata: Dict[str, Any]) -> pd.DataFrame:
df = dataframe.copy()
# 조건
cond_dc = (df['close'] > df['dc_high'])
cond_vol = df['vol_ok']
# FG 게이트
use_fg = int(self._entry_param('use_fg', int, default=1)) == 1
th = self._read_fg_thr()
import logging, builtins
if not getattr(self, '_fg_logged', False):
logging.getLogger('freqtrade.strategy').info('[FG][FINAL] thr=%s', th)
builtins.print(f"[FG][FINAL] thr={th}")
self._fg_logged = True
cond_fg = (df['fg'] >= th) if use_fg else True
use_trend = int(self._entry_param("use_trend_filter", int) or 0) == 1
trend_p = int(self._entry_param("trend_ema_period", int) or 200)
# trend_ema 없으면 여기서라도 생성(안전장치)
if "trend_ema" not in df.columns:
df["trend_ema"] = df["close"].ewm(span=trend_p, adjust=False).mean()
# 룩어헤드 방지: 확정된 직전캔들로 판단
cond_trend = (df["close"].shift(1) > df["trend_ema"].shift(1)) if use_trend else True
# 과열(Overheat) 필터:
# - 최근 over_lb 캔들 최저가(roll_low) 대비 "직전 종가" 상승률(runup)가 over_thr 초과면 진입 금지
# - 룩어헤드 방지: roll_low/close 모두 shift(1) 기반으로 계산
use_overheat = int(self._entry_param("use_overheat_filter", int) or 0) == 1
over_lb = int(self._entry_param("overheat_lookback", int) or 48)
over_thr = float(self._entry_param("overheat_thr", float) or 0.04)
cond_overheat_ok = True
if use_overheat:
src_low = df["low"] if "low" in df.columns else df["close"]
roll_low = src_low.rolling(over_lb, min_periods=over_lb).min().shift(1)
roll_low = roll_low.replace(0, np.nan)
close_prev = df["close"].shift(1)
runup = (close_prev / roll_low) - 1.0
# over_thr 초과면 과열 → 진입 차단
cond_overheat_ok = (runup <= over_thr) | roll_low.isna()
# 디버그용 컬럼 (불편하면 나중에 제거 가능)
df["overheat_runup"] = runup
df["overheat_ok"] = cond_overheat_ok.astype(int)
# 최종 엔트리
# ===== 시장 급락 레짐(BTC) 필터 =====
use_regime = int(self._entry_param('use_regime_filter', int) or 0) == 1
cond_regime_ok = True
if use_regime and hasattr(self, 'dp') and self.dp is not None and 'date' in df.columns:
try:
rp = str(self._entry_param('regime_pair', str) or 'BTC/USDT')
lb = int(self._entry_param('regime_lookback', int) or 48)
thr = float(self._entry_param('regime_drop_thr', float) or 0.02)
bdf = self.dp.get_pair_dataframe(rp, self.timeframe)
if bdf is not None and not bdf.empty and 'date' in bdf.columns and 'close' in bdf.columns:
b = bdf[['date','close']].copy()
b['close'] = b['close'].astype(float)
b['btc_ret_lb'] = (b['close'] / b['close'].shift(lb) - 1.0)
b = b[['date','btc_ret_lb']].sort_values('date')
df = df.sort_values('date')
df = pd.merge_asof(df, b, on='date', direction='backward')
cond_regime_ok = (df['btc_ret_lb'] >= -thr) | df['btc_ret_lb'].isna()
df['regime_ok'] = cond_regime_ok.astype(int)
except Exception:
cond_regime_ok = True
final_entry = (cond_dc & cond_vol & cond_fg & cond_trend & cond_overheat_ok & cond_regime_ok)
df['buy'] = 0
df.loc[final_entry, 'buy'] = 1
if 'enter_tag' not in df.columns:
df['enter_tag'] = np.nan
tag = 'donchian&vol' + ('&fg' if use_fg else '') + ('&heat' if use_overheat else '') + ('®' if use_regime else '')
df.loc[final_entry, 'enter_tag'] = tag
return df
# ---------------- 매도 ----------------
def populate_sell_trend(self, dataframe: pd.DataFrame, metadata: Dict[str, Any]) -> pd.DataFrame:
df = dataframe.copy()
cond_exit = (df["close"] < df["dc_low"] * 0.999) & (df["close"].shift(1) < df["dc_low"].shift(1) * 0.999) & (df["volume"] > 0)
# EMA exit는 custom_exit로 이동 (✅ 보유시간/손실 게이트로 과발동 차단)
df["sell"] = 0
sp = (self.config.get("strategy_parameters") or {})
_v = sp.get("use_dc_low_break2_exit", None)
if _v is None:
_v = 1
if int(_v) == 1:
df.loc[cond_exit, "sell"] = 1
df.loc[cond_exit, "sell_tag"] = "dc_low_break2"
df.loc[cond_exit, "exit_tag"] = "dc_low_break2"
return df
# ---------------- 보호장치 ----------------
@property
def protections(self):
return [
{"method": "CooldownPeriod", "stop_duration_candles": 5},
{"method": "MaxDrawdown", "lookback_period_candles": 288,
"trade_limit": 20, "stop_duration_candles": 12, "max_allowed_drawdown": 0.10},
]
# ---------------- 내부: FG 임계값 로깅 ----------------
def _read_fg_thr(self):
eff = int(self._entry_param("buy_fg_threshold", int, default=50))
if not getattr(self, "_fg_debug_once", False):
try:
import logging, builtins
logging.getLogger("freqtrade.strategy").info("[FG][DEBUG] eff=%s", eff)
builtins.print(f"[FG][DEBUG] eff={eff}")
except Exception:
pass
self._fg_debug_once = True
return eff
def confirm_trade_entry(
self,
pair,
order_type,
amount,
rate,
time_in_force,
current_time,
**kwargs,
) -> bool:
"""
추가 ENTRY 필터:
- 단기 과열(최근 N캔들 대비 과도한 상승) 구간에서는 돌파 진입을 막는다.
- False breakout + 즉시 -1% 스탑 맞는 구간을 줄이는 게 목적.
"""
# 1) 전략 파라미터 읽기 (없으면 기본값)
try:
sp = (self.config or {}).get("strategy_parameters") or {}
except Exception:
sp = {}
# 토글 스위치: 0이면 아무것도 안 함
use_overheat = int(sp.get("use_overheat_filter") or 0)
if use_overheat != 1:
return True # 필터 OFF면 진입 허용
# 2) dp 없으면 필터 못 씀 → 진입 허용 (fail open)
if not hasattr(self, "dp") or self.dp is None:
return True
# 기본값: 4시간(48캔들), 4% 과열 기준
try:
lookback = int(sp.get("overheat_lookback") or 48)
except Exception:
lookback = 48
try:
thr = float(sp.get("overheat_thr") or 0.04) # 4%
except Exception:
thr = 0.04
# 3) 히스토리 가져오기
try:
df, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
except Exception:
return True
if df is None or df.empty or "close" not in df.columns:
return True
# 스누핑 방지: current_time 까지만 사용
if "date" in df.columns:
df2 = df[df["date"] <= current_time].copy()
else:
df2 = df.copy()
if len(df2) < max(lookback, 20):
# 히스토리가 너무 짧으면 필터 적용하지 않음
return True
close = df2["close"].astype(float)
# 4) lookback 구간에서 현재까지의 최소값/현재값 비교
recent = close.iloc[-lookback:]
if recent.empty:
return True
curr = recent.iloc[-1]
min_recent = recent.min()
if min_recent <= 0:
# 데이터 이상 방지
return True
run_up = (curr / min_recent) - 1.0
# 5) 과열이면 진입 차단
if run_up > thr:
# print(f"[{pair}] overheat filter block: run_up={run_up:.3%}") # 필요하면 디버그
return False
# 나머지는 진입 허용
return True
def custom_exit(self, pair, trade, current_time, current_rate, current_profit, **kwargs):
"""EMA 하락 교차 기반 손실방어 청산(최종/스누핑 방지).
- strategy_parameters에서 값 읽음(백테/드라이런 일치)
- current_time 기준으로 df 컷 → 미래데이터 사용 금지
"""
# LOSS-CUT: exit earlier than stoploss
try:
sp = (self.config or {}).get('strategy_parameters') or {}
use_lc = int(sp.get('use_loss_cut_exit', 0) or 0)
lc_thr = float(sp.get('loss_cut_thr', -0.006))
lc_min = int(sp.get('loss_cut_min_hold_min', 15))
except Exception:
use_lc, lc_thr, lc_min = 0, -0.006, 15
if use_lc and current_profit is not None:
try:
open_dt = getattr(trade, 'open_date_utc', None) or getattr(trade, 'open_date', None)
held_min = int((current_time - open_dt).total_seconds() / 60) if open_dt else 0
except Exception:
held_min = 0
if held_min >= lc_min and current_profit <= lc_thr:
return 'loss_cut_exit'
try:
sp = (self.config or {}).get("strategy_parameters") or {}
use_ema_exit = int(sp.get("use_ema_exit") or 0)
if use_ema_exit != 1:
return None
if not hasattr(self, "dp") or self.dp is None:
return None
# 최소 보유시간(분)
raw_min_hold = sp.get("ema_exit_min_hold_min")
min_hold_min = int(raw_min_hold) if raw_min_hold is not None else 120
od = getattr(trade, "open_date_utc", None)
if od is not None:
held_sec = (current_time - od).total_seconds()
if held_sec < min_hold_min * 60:
return None
# 손실 구간에서만 EMA exit 허용 (0.0이면 '수익>0'일 때만 차단)
loss_thr = float(sp.get("ema_exit_loss_thr") if sp.get("ema_exit_loss_thr") is not None else -0.015)
if current_profit > loss_thr:
return None
fast = int(sp.get("exit_fast_ema") or 3)
slow = int(sp.get("exit_slow_ema") or 5)
price_buf = float(sp.get("ema_exit_price_buf") or 0.0)
df, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
if df is None or df.empty or "close" not in df.columns:
return None
# 스누핑 방지: current_time까지 컷
if "date" in df.columns:
df2 = df[df["date"] <= current_time].copy()
else:
df2 = df.copy()
if len(df2) < max(fast, slow) + 2:
return None
close = df2["close"].astype(float)
ema_fast = close.ewm(span=fast, adjust=False).mean()
ema_slow = close.ewm(span=slow, adjust=False).mean()
# EMA cross_down: 직전봉은 fast>=slow, 현재봉은 fast<slow
# ✅ 교차를 놓친 경우 대비: fast<slow 상태가 2캔들 연속이면 EXIT 허용
cond_now = ema_fast.iloc[-1] < ema_slow.iloc[-1]
cond_prev = ema_fast.iloc[-2] >= ema_slow.iloc[-2]
cond_below_prev = ema_fast.iloc[-2] < ema_slow.iloc[-2]
cond_slow_down = ema_slow.iloc[-1] < ema_slow.iloc[-2]
cross_down = (cond_now and cond_prev) or (cond_now and cond_below_prev and cond_slow_down)
if cross_down:
# 가격이 slow EMA 아래(버퍼 적용 가능)
if close.iloc[-1] < ema_slow.iloc[-1] * (1 - price_buf):
return "ema_cross_exit"
return None
except Exception:
return None