Timeframe
1d
Direction
Long & Short
Stoploss
-52.0%
Trailing Stop
No
ROI
0m: 1000.0%
Interface Version
3
Startup Candles
N/A
Indicators
5
freqtrade/freqtrade-strategies
Strategy 003 author@: Gerald Lonlas github@: https://github.com/freqtrade/freqtrade-strategies
# =============================================================================
# Promoted: 2026-05-24 15:16:49 UTC
# Source:
# generator = strategies_generator_v14_short
# fthypt = strategy_strategies_generator_v14_short_2026-05-24_12-05-22.fthypt
# best_epoch = 122
# iteration = 1
# seed = 17277
# config = /home/moutonneux/freqtrade/backtest_configs/hl_150pairs_mot3.json
#
# Validation results (out-of-sample):
# timerange = 20241117-20260517
# trades = 11
# profit = 1.22%
# max_drawdown = 3.76%
# sharpe = 0.021215455189529442
# deflated_sharpe = None
# p-value = None
# Walk-forward windows:
# wf_1 (20241117-20250518): 3 trades, profit=-1.99%, dd=3.76%, sharpe=-0.06962282031353315
# wf_2 (20250518-20251116): 6 trades, profit=+2.89%, dd=2.20%, sharpe=0.2388390101139699
# wf_3 (20251116-20260517): 3 trades, profit=+0.13%, dd=2.15%, sharpe=0.00768069624898791
# positive_ratio = 2/3
# Validation thresholds used:
# max_drawdown = 0.25
# max_permutation_pvalue = 0.15
# min_dsr = 0.5
# min_oos_trades = 7
# min_positive_windows = 0.65
# slippage_bps = 5.0
# =============================================================================
# =============================================================================
# edge_strategy9 - standalone v14 short strategy
# Generated by strategies_generator pipeline (codegen/writer.py)
#
# This file is FULLY SELF-CONTAINED.
# - No import from external_indicators_v11
# - No import from strategies_generator_v14_*
# - All indicator classes are inlined verbatim from the source library
# - All DCA / leverage / ROI / exit / volume / volatility logic is inlined
# with chosen hyperopt parameters frozen as class constants
#
# By construction this strategy produces IDENTICAL backtest results to running
# strategies_generator_v14_short with the same buy_params/sell_params dict.
# =============================================================================
from __future__ import annotations
import math
from datetime import datetime, timedelta
from typing import Optional, List, Dict, Union
from functools import reduce
import logging
import numpy as np
import pandas as pd
from pandas import DataFrame, concat
from scipy import stats, signal
import warnings
warnings.simplefilter("ignore")
from ta.volatility import AverageTrueRange
import ta as clean_ta
import talib.abstract as ta
from freqtrade.persistence import Trade
from freqtrade.strategy import (
BooleanParameter,
CategoricalParameter,
DecimalParameter,
IntParameter,
IStrategy,
timeframe_to_minutes,
)
# ----- Imports replicated from external_indicators_v11 (needed by inlined indicators/helpers) -----
from datetime import datetime, timedelta, timezone
import os
from datetime import datetime
import time
from technical import qtpylib
from tvDatafeed import TvDatafeed, Interval
from datetime import datetime, timedelta
from pytz import UTC
from freqtrade.strategy import CategoricalParameter
from technical.util import resample_to_interval, resampled_merge
from scipy.signal import argrelextrema
from sklearn.cluster import DBSCAN
from collections import defaultdict
from sklearn.cluster import DBSCAN, KMeans
import talib
from pandas import DataFrame
from sklearn.linear_model import LinearRegression
from external_indicators_v14_addons import *
logger = logging.getLogger(__name__)
def lerp(a: float, b: float, t: float) -> float:
return (1 - t) * a + t * b
class _FrozenParam:
"""Lightweight stand-in for a freqtrade Parameter that always returns the
same chosen value. Used by the inlined indicator classes below so they can
be called with the frozen size selections of this strategy without needing
to touch the generator's optimization machinery."""
__slots__ = ("value", "range")
def __init__(self, val):
self.value = val
self.range = [val]
# ===== Inlined Indicateur base class =====
class Indicateur:
def __init__(self, name: str, types: list, enable: bool):
self.name = name
self.types = types
self.enable = enable
# ----- Inlined indicateur_VWAP -----
class indicateur_VWAP(Indicateur):
def __init__(self):
name = "VWAP"
types = ["trend", "over"]
enable = True
Indicateur.__init__(self=self, name=name, types=types, enable=enable)
def get_dataframe(
self,
dataframe,
ps1: CategoricalParameter,
ps2: CategoricalParameter,
ps3: CategoricalParameter,
type: str,
):
for size1 in ps1.range:
s1 = self.sizes["s1"].get(size1)
for size2 in ps2.range:
s2 = self.sizes["s2"].get(size2)
for size3 in ps3.range:
s3 = self.sizes["s3"].get(size3)
window = int(s1*s3)
if window<1:
window=1
dataframe[f'{type}_VWAP_{size1}_{size2}_{size3}'] = clean_ta.volume.VolumeWeightedAveragePrice(dataframe['high'], dataframe['low'], dataframe['close'], dataframe['volume'], window=window).volume_weighted_average_price()
#dataframe[f'{type}_VWAP_{size1}_{size2}_{size3}_RVWAP'] = clean_ta.volume.VolumeWeightedAveragePrice(dataframe['high'], dataframe['low'], dataframe['close'], dataframe['volume'], window=int(s2)).volume_weighted_average_price().rolling(int(s3)).mean()
return dataframe
sizes = {
"s1": {"p1": 1, "p2": 2, "p3": 3, "p4": 4, "p5": 5, "p6": 6, "p7": 7, "p8": 8, "p9": 9, "p10": 10, "p11": 11, "p12": 12, "p13": 13, "p14": 14, "p15": 15},
"s2": {"p1": 1, "p2": 2, "p3": 3, "p4": 4, "p5": 5, "p6": 6, "p7": 7, "p8": 8, "p9": 9, "p10": 10, "p11": 11, "p12": 12, "p13": 13, "p14": 14, "p15": 15},
"s3": {"p1": 1.0, "p2": 1.5, "p3": 2.0, "p4": 2.5, "p5": 3.0, "p6": 3.5, "p7": 4.0, "p8": 4.25, "p9": 0.85, "p10": 0.6, "p11": 0.4, "p12": 0.3, "p13": 0.5, "p14": 0.75, "p15": 0.25},
}
def get_conditions(self, dataframe, s1, s2, s3, type, condition_type, condition_value, conditions, value):
if condition_type not in ["buy", "sell"]:
raise SyntaxError
cv_dict = self.conditions_values.get(condition_type, {}).get(condition_value)
if condition_value != 0 and cv_dict is None:
return conditions
v = cv_dict.get(value) if condition_value != 0 else None
if condition_value == 1:
conditions.append(
dataframe[f'{type}_VWAP_{s1}_{s2}_{s3}'].shift(v) <= dataframe[f'{type}_VWAP_{s1}_{s2}_{s3}']
)
if condition_value == 2:
conditions.append(
dataframe[f'{type}_VWAP_{s1}_{s2}_{s3}'].shift(v) >= dataframe[f'{type}_VWAP_{s1}_{s2}_{s3}']
)
if condition_value == 3:
conditions.append(
dataframe[f'{type}_VWAP_{s1}_{s2}_{s3}'].shift(v) > dataframe[f'{type}_VWAP_{s1}_{s2}_{s3}']
)
if condition_value == 4:
conditions.append(
dataframe[f'{type}_VWAP_{s1}_{s2}_{s3}'].shift(v) < dataframe[f'{type}_VWAP_{s1}_{s2}_{s3}']
)
if condition_value == 5:
conditions.append(
(dataframe[f'{type}_VWAP_{s1}_{s2}_{s3}'] > dataframe[f'{type}_VWAP_{s1}_{s2}_{s3}'].shift(v)) & (dataframe['close'] > dataframe[f'{type}_VWAP_{s1}_{s2}_{s3}'])
)
if condition_value == 6:
conditions.append(
(dataframe[f'{type}_VWAP_{s1}_{s2}_{s3}'] < dataframe[f'{type}_VWAP_{s1}_{s2}_{s3}'].shift(v)) & (dataframe['close'] < dataframe[f'{type}_VWAP_{s1}_{s2}_{s3}'])
)
if condition_value == 7:
conditions.append(
dataframe[f'{type}_VWAP_{s1}_{s2}_{s3}'].shift(v) < dataframe['close']
)
if condition_value == 8:
conditions.append(
dataframe[f'{type}_VWAP_{s1}_{s2}_{s3}'].shift(v) > dataframe['close']
)
if condition_value == 9:
conditions.append(
(dataframe['close'] > dataframe[f'{type}_VWAP_{s1}_{s2}_{s3}']) & (dataframe['close'].shift(v) <= dataframe[f'{type}_VWAP_{s1}_{s2}_{s3}'].shift(v))
)
if condition_value == 10:
conditions.append(
(dataframe['close'] < dataframe[f'{type}_VWAP_{s1}_{s2}_{s3}']) & (dataframe['close'].shift(v) >= dataframe[f'{type}_VWAP_{s1}_{s2}_{s3}'].shift(v))
)
return conditions
conditions_values = {
"buy": {
1: {"p1": 1, "p2": 3, "p3": 5, "p4": 8, "p5": 15, "p6": 20, "p7": 30, "p8": 40, "p9": 50, "p10": 75},
2: {"p1": 1, "p2": 3, "p3": 5, "p4": 8, "p5": 15, "p6": 20, "p7": 30, "p8": 40, "p9": 50, "p10": 75},
3: {"p1": 1, "p2": 3, "p3": 5, "p4": 8, "p5": 15, "p6": 20, "p7": 30, "p8": 40, "p9": 50, "p10": 75},
4: {"p1": 1, "p2": 3, "p3": 5, "p4": 8, "p5": 15, "p6": 20, "p7": 30, "p8": 40, "p9": 50, "p10": 75},
5: {"p1": 1, "p2": 3, "p3": 5, "p4": 8, "p5": 15, "p6": 20, "p7": 30, "p8": 40, "p9": 50, "p10": 75},
6: {"p1": 1, "p2": 3, "p3": 5, "p4": 8, "p5": 15, "p6": 20, "p7": 30, "p8": 40, "p9": 50, "p10": 75},
7: {"p1": 1, "p2": 3, "p3": 5, "p4": 8, "p5": 15, "p6": 20, "p7": 30, "p8": 40, "p9": 50, "p10": 75},
8: {"p1": 1, "p2": 3, "p3": 5, "p4": 8, "p5": 15, "p6": 20, "p7": 30, "p8": 40, "p9": 50, "p10": 75},
9: {"p1": 1, "p2": 3, "p3": 5, "p4": 8, "p5": 15, "p6": 20, "p7": 30, "p8": 40, "p9": 50, "p10": 75},
10: {"p1": 1, "p2": 3, "p3": 5, "p4": 8, "p5": 15, "p6": 20, "p7": 30, "p8": 40, "p9": 50, "p10": 75},
},
"sell": {
1: {"p1": 1, "p2": 3, "p3": 5, "p4": 8, "p5": 15, "p6": 20, "p7": 30, "p8": 40, "p9": 50, "p10": 75},
2: {"p1": 1, "p2": 3, "p3": 5, "p4": 8, "p5": 15, "p6": 20, "p7": 30, "p8": 40, "p9": 50, "p10": 75},
3: {"p1": 1, "p2": 3, "p3": 5, "p4": 8, "p5": 15, "p6": 20, "p7": 30, "p8": 40, "p9": 50, "p10": 75},
4: {"p1": 1, "p2": 3, "p3": 5, "p4": 8, "p5": 15, "p6": 20, "p7": 30, "p8": 40, "p9": 50, "p10": 75},
5: {"p1": 1, "p2": 3, "p3": 5, "p4": 8, "p5": 15, "p6": 20, "p7": 30, "p8": 40, "p9": 50, "p10": 75},
6: {"p1": 1, "p2": 3, "p3": 5, "p4": 8, "p5": 15, "p6": 20, "p7": 30, "p8": 40, "p9": 50, "p10": 75},
7: {"p1": 1, "p2": 3, "p3": 5, "p4": 8, "p5": 15, "p6": 20, "p7": 30, "p8": 40, "p9": 50, "p10": 75},
8: {"p1": 1, "p2": 3, "p3": 5, "p4": 8, "p5": 15, "p6": 20, "p7": 30, "p8": 40, "p9": 50, "p10": 75},
9: {"p1": 1, "p2": 3, "p3": 5, "p4": 8, "p5": 15, "p6": 20, "p7": 30, "p8": 40, "p9": 50, "p10": 75},
10: {"p1": 1, "p2": 3, "p3": 5, "p4": 8, "p5": 15, "p6": 20, "p7": 30, "p8": 40, "p9": 50, "p10": 75},
},
}
# ----- Inlined indicateur_MFI -----
class indicateur_MFI(Indicateur):
"""Money Flow Index: volume-weighted RSI."""
def __init__(self):
Indicateur.__init__(self, name="MFI",
types=["over", "momentum"], enable=True)
sizes = {
"s1": {"p1": 7, "p2": 10, "p3": 14, "p4": 18, "p5": 21, "p6": 25, "p7": 28, "p8": 35, "p9": 42, "p10": 50, "p11": 60, "p12": 70, "p13": 80, "p14": 100, "p15": 120},
"s2": {"p1": 1, "p2": 2, "p3": 3, "p4": 5, "p5": 7, "p6": 10, "p7": 14, "p8": 18, "p9": 21, "p10": 25, "p11": 30, "p12": 40, "p13": 50, "p14": 60, "p15": 75},
"s3": {"p1": 1, "p2": 2, "p3": 3, "p4": 4, "p5": 5, "p6": 7, "p7": 10, "p8": 14,
"p9": 18, "p10": 21, "p11": 25, "p12": 30, "p13": 40, "p14": 50, "p15": 60},
}
def get_dataframe(self, dataframe, ps1, ps2, ps3, type):
for size1 in ps1.range:
s1 = self.sizes["s1"].get(size1)
mfi = ta.MFI(dataframe, timeperiod=max(2, int(s1)))
for size3 in ps3.range:
s3 = self.sizes["s3"].get(size3)
smooth = mfi.rolling(max(1, int(s3))).mean()
for size2 in ps2.range:
pfx = f"{type}_MFI_{size1}_{size2}_{size3}"
dataframe[f"{pfx}_mfi"] = mfi
dataframe[f"{pfx}_smooth"] = smooth
return dataframe
conditions_values = {
"buy": {
1: {f"p{i}": 5 + 3 * i for i in range(1, 11)}, # MFI < v (8..35)
2: {f"p{i}": 60 + 4 * i for i in range(1, 11)}, # MFI > v (64..100)
3: {"p1": 1, "p2": 2, "p3": 3, "p4": 5, "p5": 7, "p6": 10, "p7": 14, "p8": 18, "p9": 21, "p10": 25, "p11": 30, "p12": 40, "p13": 50, "p14": 60, "p15": 75}, 4: {"p1": 1, "p2": 2, "p3": 3, "p4": 5, "p5": 7, "p6": 10, "p7": 14, "p8": 18, "p9": 21, "p10": 25, "p11": 30, "p12": 40, "p13": 50, "p14": 60, "p15": 75},
5: {f"p{i}": 10 + 3 * i for i in range(1, 11)}, # threshold for cross from below
6: {f"p{i}": 60 + 4 * i for i in range(1, 11)}, # threshold for cross from above
7: {"p1": 1, "p2": 2, "p3": 3, "p4": 5, "p5": 7, "p6": 10, "p7": 14, "p8": 18, "p9": 21, "p10": 25, "p11": 30, "p12": 40, "p13": 50, "p14": 60, "p15": 75},
8: {f"p{i}": 5 + 5 * i for i in range(1, 11)}, # extreme deviation from 50
9: {"p1": 1, "p2": 2, "p3": 3, "p4": 5, "p5": 7, "p6": 10, "p7": 14, "p8": 18, "p9": 21, "p10": 25, "p11": 30, "p12": 40, "p13": 50, "p14": 60, "p15": 75}, 10: {"p1": 1, "p2": 2, "p3": 3, "p4": 5, "p5": 7, "p6": 10, "p7": 14, "p8": 18, "p9": 21, "p10": 25, "p11": 30, "p12": 40, "p13": 50, "p14": 60, "p15": 75},
},
}
conditions_values["sell"] = conditions_values["buy"]
def get_conditions(self, dataframe, s1, s2, s3, type, condition_type, condition_value, conditions, value):
if condition_type not in ("buy", "sell"):
raise SyntaxError
cv_dict = self.conditions_values.get(condition_type, {}).get(condition_value)
if condition_value != 0 and cv_dict is None:
return conditions
v = cv_dict.get(value) if condition_value != 0 else None
pfx = f"{type}_MFI_{s1}_{s2}_{s3}"
mfi = dataframe[f"{pfx}_mfi"]
sm = dataframe[f"{pfx}_smooth"]
c = dataframe["close"]
if condition_value == 1: # MFI oversold
conditions.append(mfi < v)
if condition_value == 2: # MFI overbought
conditions.append(mfi > v)
if condition_value == 3: # rising
conditions.append(mfi > mfi.shift(v))
if condition_value == 4: # falling
conditions.append(mfi < mfi.shift(v))
if condition_value == 5: # cross v from below
conditions.append((mfi > v) & (mfi.shift(1) <= v))
if condition_value == 6: # cross v from above
conditions.append((mfi < v) & (mfi.shift(1) >= v))
if condition_value == 7: # smooth cross 50 up
conditions.append((sm > 50) & (sm.shift(v) <= 50))
if condition_value == 8: # extreme deviation from 50
conditions.append((mfi - 50).abs() > v)
if condition_value == 9: # bearish divergence (MFI overbought + price up)
conditions.append((mfi > 70) & (c > c.shift(v)))
if condition_value == 10: # bullish divergence (MFI oversold + price down)
conditions.append((mfi < 30) & (c < c.shift(v)))
return conditions
# ----- Inlined indicateur_BollingerBands -----
class indicateur_BollingerBands(Indicateur):
"""Bollinger Bands: MA ± k*StdDev. Computes mid/high/low/pct_b/bandwidth.
THE classic mean-reversion tool — surprisingly missing from v11.
"""
def __init__(self):
Indicateur.__init__(self, name="BollingerBands",
types=["volatility", "over"], enable=True)
sizes = {
"s1": {"p1": 10, "p2": 14, "p3": 16, "p4": 18, "p5": 20, "p6": 24, "p7": 28,
"p8": 32, "p9": 40, "p10": 50, "p11": 60, "p12": 80, "p13": 100, "p14": 120, "p15": 150},
"s2": {"p1": 1.0, "p2": 1.25, "p3": 1.5, "p4": 1.75, "p5": 2.0, "p6": 2.25, "p7": 2.5,
"p8": 2.75, "p9": 3.0, "p10": 3.5, "p11": 1.5, "p12": 2.0, "p13": 2.5, "p14": 1.8, "p15": 2.2},
"s3": {"p1": 1, "p2": 2, "p3": 3, "p4": 5, "p5": 7, "p6": 10, "p7": 14, "p8": 18, "p9": 21, "p10": 25, "p11": 30, "p12": 40, "p13": 50, "p14": 60, "p15": 75},
}
def get_dataframe(self, dataframe, ps1, ps2, ps3, type):
for size1 in ps1.range:
s1 = self.sizes["s1"].get(size1)
for size2 in ps2.range:
s2 = self.sizes["s2"].get(size2)
window = max(2, int(s1))
ma = dataframe["close"].rolling(window).mean()
std = dataframe["close"].rolling(window).std()
high = ma + s2 * std
low = ma - s2 * std
pct_b = (dataframe["close"] - low) / (high - low).replace(0, np.nan)
bandwidth = (high - low) / ma.replace(0, np.nan)
for size3 in ps3.range:
pfx = f"{type}_BB_{size1}_{size2}_{size3}"
dataframe[f"{pfx}_mid"] = ma
dataframe[f"{pfx}_high"] = high
dataframe[f"{pfx}_low"] = low
dataframe[f"{pfx}_pctb"] = pct_b
dataframe[f"{pfx}_bw"] = bandwidth
return dataframe
conditions_values = {
"buy": {
1: {"p1": 1, "p2": 1, "p3": 1, "p4": 1, "p5": 1, "p6": 1, "p7": 1, "p8": 1, "p9": 1, "p10": 1},
2: {"p1": 1, "p2": 1, "p3": 1, "p4": 1, "p5": 1, "p6": 1, "p7": 1, "p8": 1, "p9": 1, "p10": 1},
3: {"p1": 0.0, "p2": 0.05, "p3": 0.1, "p4": 0.15, "p5": 0.2, "p6": 0.25, "p7": 0.3, "p8": 0.35, "p9": 0.4, "p10": 0.5},
4: {"p1": 0.5, "p2": 0.6, "p3": 0.65, "p4": 0.7, "p5": 0.75, "p6": 0.8, "p7": 0.85, "p8": 0.9, "p9": 0.95, "p10": 1.0},
5: {"p1": 0.005, "p2": 0.01, "p3": 0.015, "p4": 0.02, "p5": 0.025, "p6": 0.03, "p7": 0.04, "p8": 0.05, "p9": 0.06, "p10": 0.08},
6: {"p1": 0.02, "p2": 0.03, "p3": 0.04, "p4": 0.05, "p5": 0.06, "p6": 0.08, "p7": 0.1, "p8": 0.12, "p9": 0.15, "p10": 0.2},
7: {"p1": 1, "p2": 2, "p3": 3, "p4": 5, "p5": 7, "p6": 10, "p7": 14, "p8": 18, "p9": 21, "p10": 25, "p11": 30, "p12": 40, "p13": 50, "p14": 60, "p15": 75}, 8: {"p1": 1, "p2": 2, "p3": 3, "p4": 5, "p5": 7, "p6": 10, "p7": 14, "p8": 18, "p9": 21, "p10": 25, "p11": 30, "p12": 40, "p13": 50, "p14": 60, "p15": 75}, 9: {"p1": 1, "p2": 2, "p3": 3, "p4": 5, "p5": 7, "p6": 10, "p7": 14, "p8": 18, "p9": 21, "p10": 25, "p11": 30, "p12": 40, "p13": 50, "p14": 60, "p15": 75}, 10: {"p1": 1, "p2": 2, "p3": 3, "p4": 5, "p5": 7, "p6": 10, "p7": 14, "p8": 18, "p9": 21, "p10": 25, "p11": 30, "p12": 40, "p13": 50, "p14": 60, "p15": 75},
11: {"p1": 10, "p2": 14, "p3": 20, "p4": 25, "p5": 30, "p6": 40, "p7": 50, "p8": 60, "p9": 75, "p10": 96},
12: {"p1": 2, "p2": 3, "p3": 4, "p4": 5, "p5": 6, "p6": 7, "p7": 8, "p8": 10, "p9": 12, "p10": 15},
},
}
conditions_values["sell"] = conditions_values["buy"]
def get_conditions(self, dataframe, s1, s2, s3, type, condition_type, condition_value, conditions, value):
if condition_type not in ("buy", "sell"):
raise SyntaxError
cv_dict = self.conditions_values.get(condition_type, {}).get(condition_value)
if condition_value != 0 and cv_dict is None:
return conditions
v = cv_dict.get(value) if condition_value != 0 else None
pfx = f"{type}_BB_{s1}_{s2}_{s3}"
c = dataframe["close"]
if condition_value == 1:
conditions.append(dataframe[c.name] < dataframe[f"{pfx}_low"])
if condition_value == 2:
conditions.append(dataframe[c.name] > dataframe[f"{pfx}_high"])
if condition_value == 3:
conditions.append(dataframe[f"{pfx}_pctb"] < v)
if condition_value == 4:
conditions.append(dataframe[f"{pfx}_pctb"] > v)
if condition_value == 5:
conditions.append(dataframe[f"{pfx}_bw"] < v)
if condition_value == 6:
conditions.append(dataframe[f"{pfx}_bw"] > v)
if condition_value == 7:
conditions.append((c > dataframe[f"{pfx}_mid"])
& (c.shift(v) <= dataframe[f"{pfx}_mid"].shift(v)))
if condition_value == 8:
conditions.append((c < dataframe[f"{pfx}_mid"])
& (c.shift(v) >= dataframe[f"{pfx}_mid"].shift(v)))
if condition_value == 9:
conditions.append((c < dataframe[f"{pfx}_low"])
& (c.shift(v) >= dataframe[f"{pfx}_low"].shift(v)))
if condition_value == 10:
conditions.append((c > dataframe[f"{pfx}_high"])
& (c.shift(v) <= dataframe[f"{pfx}_high"].shift(v)))
if condition_value == 11:
bw = dataframe[f"{pfx}_bw"]
conditions.append(bw < bw.rolling(max(2, v)).quantile(0.2))
if condition_value == 12:
conditions.append(c.rolling(max(2, v)).min() > dataframe[f"{pfx}_mid"])
return conditions
# ----- Inlined indicateur_CoppockCurve -----
class indicateur_CoppockCurve(Indicateur):
def __init__(self):
name = "CoppockCurve"
types = ["momentum", "trend"]
enable = True
Indicateur.__init__(self=self, name=name, types=types, enable=enable)
def get_dataframe(
self,
dataframe,
ps1: CategoricalParameter,
ps2: CategoricalParameter,
ps3: CategoricalParameter,
type: str,
):
for size1 in ps1.range:
s1 = self.sizes["s1"].get(size1)
for size2 in ps2.range:
s2 = self.sizes["s2"].get(size2)
for size3 in ps3.range:
s3 = self.sizes["s3"].get(size3)
dataframe[f"{type}_CoppockCurve_{size1}_{size2}_{size3}"] = (dataframe["close"].pct_change(int(s2)) + dataframe["close"].pct_change(int(s3))).rolling(window=s1).mean()
return dataframe
sizes = {
"s1": {"p1": 3, "p2": 6, "p3": 9, "p4": 14, "p5": 20, "p6": 24, "p7": 28, "p8": 32, "p9": 40, "p10": 48, "p11": 55, "p12": 70, "p13": 90, "p14": 100, "p15": 120},
"s2": {"p1": 3, "p2": 6, "p3": 9, "p4": 14, "p5": 20, "p6": 24, "p7": 28, "p8": 32, "p9": 40, "p10": 48, "p11": 55, "p12": 70, "p13": 90, "p14": 100, "p15": 120},
"s3": {"p1": 3, "p2": 6, "p3": 9, "p4": 14, "p5": 20, "p6": 24, "p7": 28, "p8": 32, "p9": 40, "p10": 48, "p11": 55, "p12": 70, "p13": 90, "p14": 100, "p15": 120},
}
def get_conditions(self, dataframe, s1, s2, s3, type, condition_type, condition_value, conditions, value):
if condition_type not in ["buy", "sell"]:
raise SyntaxError
cv_dict = self.conditions_values.get(condition_type, {}).get(condition_value)
if condition_value != 0 and cv_dict is None:
return conditions
v = cv_dict.get(value) if condition_value != 0 else None
if condition_type == "buy" or condition_type == "sell":
cc = dataframe[f"{type}_CoppockCurve_{s1}_{s2}_{s3}"]
if condition_value == 1:
conditions.append(cc > v)
if condition_value == 2:
conditions.append(cc > cc.shift(v))
if condition_value == 3:
conditions.append((cc.diff(1) > 0) & (cc.diff(1) > cc.diff(1).shift(v)))
if condition_value == 4:
conditions.append(cc < v)
if condition_value == 5:
conditions.append(cc < cc.shift(v))
if condition_value == 6:
conditions.append((cc.diff(1) < 0) & (cc.diff(1) < cc.diff(1).shift(v)))
if condition_value == 7:
conditions.append((cc > 0) & (cc.shift(v) <= 0))
if condition_value == 8:
conditions.append((cc < 0) & (cc.shift(v) >= 0))
if condition_value == 9:
conditions.append(cc.diff() > cc.diff().shift(v))
if condition_value == 10:
conditions.append(cc.diff() < cc.diff().shift(v))
return conditions
conditions_values = {
"buy": {
1: {"p1": -75, "p2": -60, "p3": -40, "p4": -20, "p5": 0, "p6": 20, "p7": 40, "p8": 60, "p9": 75, "p10": 100},
2: {"p1": 1, "p2": 3, "p3": 5, "p4": 8, "p5": 15, "p6": 20, "p7": 30, "p8": 40, "p9": 50, "p10": 75},
3: {"p1": 1, "p2": 3, "p3": 5, "p4": 8, "p5": 15, "p6": 20, "p7": 30, "p8": 40, "p9": 50, "p10": 75},
4: {"p1": -75, "p2": -60, "p3": -40, "p4": -20, "p5": 0, "p6": 20, "p7": 40, "p8": 60, "p9": 75, "p10": 100},
5: {"p1": 1, "p2": 3, "p3": 5, "p4": 8, "p5": 15, "p6": 20, "p7": 30, "p8": 40, "p9": 50, "p10": 75},
6: {"p1": 1, "p2": 3, "p3": 5, "p4": 8, "p5": 15, "p6": 20, "p7": 30, "p8": 40, "p9": 50, "p10": 75},
7: {"p1": 1, "p2": 3, "p3": 5, "p4": 8, "p5": 15, "p6": 20, "p7": 30, "p8": 40, "p9": 50, "p10": 75},
8: {"p1": 1, "p2": 3, "p3": 5, "p4": 8, "p5": 15, "p6": 20, "p7": 30, "p8": 40, "p9": 50, "p10": 75},
9: {"p1": 1, "p2": 3, "p3": 5, "p4": 8, "p5": 15, "p6": 20, "p7": 30, "p8": 40, "p9": 50, "p10": 75},
10: {"p1": 1, "p2": 3, "p3": 5, "p4": 8, "p5": 15, "p6": 20, "p7": 30, "p8": 40, "p9": 50, "p10": 75},
},
"sell": {
1: {"p1": -75, "p2": -60, "p3": -40, "p4": -20, "p5": 0, "p6": 20, "p7": 40, "p8": 60, "p9": 75, "p10": 100},
2: {"p1": 1, "p2": 3, "p3": 5, "p4": 8, "p5": 15, "p6": 20, "p7": 30, "p8": 40, "p9": 50, "p10": 75},
3: {"p1": 1, "p2": 3, "p3": 5, "p4": 8, "p5": 15, "p6": 20, "p7": 30, "p8": 40, "p9": 50, "p10": 75},
4: {"p1": -75, "p2": -60, "p3": -40, "p4": -20, "p5": 0, "p6": 20, "p7": 40, "p8": 60, "p9": 75, "p10": 100},
5: {"p1": 1, "p2": 3, "p3": 5, "p4": 8, "p5": 15, "p6": 20, "p7": 30, "p8": 40, "p9": 50, "p10": 75},
6: {"p1": 1, "p2": 3, "p3": 5, "p4": 8, "p5": 15, "p6": 20, "p7": 30, "p8": 40, "p9": 50, "p10": 75},
7: {"p1": 1, "p2": 3, "p3": 5, "p4": 8, "p5": 15, "p6": 20, "p7": 30, "p8": 40, "p9": 50, "p10": 75},
8: {"p1": 1, "p2": 3, "p3": 5, "p4": 8, "p5": 15, "p6": 20, "p7": 30, "p8": 40, "p9": 50, "p10": 75},
9: {"p1": 1, "p2": 3, "p3": 5, "p4": 8, "p5": 15, "p6": 20, "p7": 30, "p8": 40, "p9": 50, "p10": 75},
10: {"p1": 1, "p2": 3, "p3": 5, "p4": 8, "p5": 15, "p6": 20, "p7": 30, "p8": 40, "p9": 50, "p10": 75},
},
}
# ===== Strategy class =====
class edge_strategy9(IStrategy):
INTERFACE_VERSION = 3
can_short = True
timeframe = "1d"
process_only_new_candles = True
use_exit_signal = True
exit_profit_only = False
ignore_roi_if_entry_signal = True
startup_candle_count: int = 400
position_adjustment_enable = True
minimal_roi = {'0': 10.0}
stoploss = -0.52
trailing_stop = False
trailing_stop_positive = None
trailing_stop_positive_offset = 0.0
trailing_only_offset_is_reached = False
max_open_trades = 6
order_types = {'entry': 'limit', 'exit': 'limit', 'stoploss': 'market', 'stoploss_on_exchange': False}
order_time_in_force = {'entry': 'GTC', 'exit': 'GTC'}
cust_proposed_initial_stakes: Dict[str, float] = {}
# ----- Frozen buy params -----
_adjust_need_entry_signal = True
_adjust_require_volume_spike = False
_cumulative_entry_call = 8
_dd_throttle_floor_pct = 0.9
_entry_filter_mode = 'block_weekend'
_exit_conditions_type = 'AND'
_initial_safety_order_trigger = -0.178
_leverage_value = 2
_max_dca_age_candles = 200
_max_exposure_ratio = 0.5
_max_so_multiplier_orig = 2
_min_volume_filter = 5000000
_momentum_buy_conditions = 14
_momentum_buy_value = 'p9'
_momentum_indicator = 'CoppockCurve'
_momentum_size1 = 'p9'
_momentum_size2 = 'p6'
_momentum_size3 = 'p10'
_need_cumulative_entry_call = True
_over_buy_conditions = 0
_over_buy_value = 'p5'
_over_indicator = 'MFI'
_over_size1 = 'p9'
_over_size2 = 'p12'
_over_size3 = 'p12'
_overbuy_factor = 0.7
_pair_cooldown_hours = 16
_partial_fill_compensation_scale = 1.0
_safety_order_step_scale = 15
_safety_order_volume_scale = 1.2
_so_power_w = 0.5
_tradable_balance_ratio = 0.5
_trailing_so_atr_period = 21
_trend_buy_conditions = 16
_trend_buy_value = 'p8'
_trend_indicator = 'VWAP'
_trend_size1 = 'p6'
_trend_size2 = 'p14'
_trend_size3 = 'p10'
_use_cumulative_end_entry = False
_use_custom_leverage = False
_use_custom_stake = True
_use_dca_age_limit = False
_use_entry_filter = True
_use_max_exposure_cap = True
_use_min_volume_filter = True
_use_pair_cooldown = False
_use_position_adjustment = False
_use_power_so_scaling = False
_use_trailing_so_trigger = True
_use_vol_sizing = False
_use_volatility_filter = True
_vol_ref_window = 500
_vol_sizing_atr_period = 7
_volatility_buy_conditions = 15
_volatility_buy_value = 'p1'
_volatility_indicator = 'BollingerBands'
_volatility_size = 20
_volatility_size1 = 'p3'
_volatility_size2 = 'p4'
_volatility_size3 = 'p3'
_volatility_threshold = 0.053
_volume_rolling = 96
_volume_spike_threshold = 2.5
_volume_spike_window = 15
# ----- Frozen sell params -----
_csl_atr_mult = 1.8
_csl_giveback_ratio = 0.83
_csl_mode = 'profit_lock'
_csl_profit_threshold = 0.09
_exit_only_profit = False
_max_hold_hours = 750
_momentum_sell_conditions = 8
_momentum_sell_value = 'p8'
_momentum_use_sell = True
_my_custom_stoploss = -0.56
_over_sell_conditions = 16
_over_sell_value = 'p1'
_over_use_sell = False
_pair_lock_hours = 4
_profit_ratio_needed = -0.078
_timestop_hours = 48
_timestop_profit_threshold = -0.28
_trend_sell_conditions = 12
_trend_sell_value = 'p10'
_trend_use_sell = False
_use_csl = False
_use_my_custom_stoploss = True
_use_pair_lock = False
_use_timestop = True
_use_timestop_profit_condition = True
_volatility_sell_conditions = 0
_volatility_sell_value = 'p5'
_volatility_use_sell = True
def bot_start(self, **kwargs) -> None:
if self.dp.runmode.value in ("backtest", "hyperopt"):
self._open_trades = []
self.scaled_entries = {}
self.trailing_buy_data = {}
self.daily_profit_tracker = {}
self.daily_trades_closed = {}
self.partial_exits = {}
self._entry_atr_cache = {}
def custom_exit(self, pair: str, trade: 'Trade', current_time: 'datetime', current_rate: float, current_profit: float, **kwargs):
# Timestop: force exit after 48h
_trade_hours = (current_time - trade.open_date_utc).total_seconds() / 3600
if _trade_hours >= 48:
if current_profit < -0.28:
return f'timestop_{int(_trade_hours)}h'
if not getattr(self, '_use_my_custom_stoploss', True):
return None
entry_tag = 'empty'
if hasattr(trade, 'entry_tag') and trade.entry_tag is not None:
entry_tag = trade.entry_tag
if current_profit <= self._my_custom_stoploss:
return f'stop_loss ({entry_tag})'
return None
def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float, rate: float, time_in_force: str, exit_reason: str, current_time: datetime, **kwargs) -> bool:
if self._exit_only_profit:
trade_duration_hours = (current_time - trade.open_date_utc).total_seconds() / 3600
if trade_duration_hours >= self._max_hold_hours:
pass
elif trade.calc_profit_ratio(rate) < self._profit_ratio_needed:
return False
if trade.amount == amount and pair in self.cust_proposed_initial_stakes:
del self.cust_proposed_initial_stakes[pair]
if pair in self._entry_atr_cache:
del self._entry_atr_cache[pair]
return True
def leverage(self, pair: str, current_time: datetime, current_rate: float, proposed_leverage: float, max_leverage: float, entry_tag: str | None, side: str, **kwargs) -> float:
if self._use_custom_leverage:
return float(self._leverage_value)
return 1.0
def custom_stake_amount(self, pair: str, current_time: datetime, current_rate: float, proposed_stake: float, min_stake: float, max_stake: float, **kwargs) -> float:
if self._use_custom_stake:
base_stake = proposed_stake / self._get_max_so_multiplier() * self._overbuy_factor
self.cust_proposed_initial_stakes[pair] = base_stake
else:
base_stake = proposed_stake
ratio = self._tradable_balance_ratio
max_trades = self.config.get("max_open_trades", 1)
available_wallet = self.wallets.get_total_stake_amount()
quota = (available_wallet * ratio) / max_trades
base_stake = min(base_stake, quota)
if self.dp.runmode.value not in ("hyperopt", "backtest") and base_stake < 15.0:
logger.info(f"Stake amount for {pair} is too small ({base_stake:.2f} < 15.0), adjusting to 15.0.")
stake_final = max(15.0, base_stake)
stake_final = min(stake_final, max_stake)
# Cache entry ATR for trailing SO trigger
try:
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
if dataframe is not None and not dataframe.empty:
_entry_atr = dataframe['vol_atr_pct_21'].iloc[-1]
if _entry_atr > 0 and np.isfinite(_entry_atr):
self._entry_atr_cache[pair] = _entry_atr
except Exception:
pass
return stake_final
def adjust_trade_position(self, trade: Trade, current_time: datetime, current_rate: float, current_profit: float, min_stake: float, max_stake: float, **kwargs) -> Optional[float]:
if not self._use_position_adjustment:
return None
filled_buys = trade.select_filled_orders(trade.entry_side)
count_of_buys = len(filled_buys)
if current_profit > self._initial_safety_order_trigger:
return None
dataframe = None
if self._adjust_need_entry_signal or self._adjust_require_volume_spike or True:
dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe)
if self._adjust_need_entry_signal:
if dataframe is None or dataframe.empty:
return None
entry_col = "enter_short" if self.can_short else "enter_long"
if dataframe.iloc[-1].get(entry_col, 0) != 1:
return None
if self._adjust_require_volume_spike:
if dataframe is None or dataframe.empty:
return None
vol_window = int(self._volume_spike_window)
vol_threshold = self._volume_spike_threshold
if len(dataframe) < vol_window + 1:
return None
recent_volume = dataframe['volume'].iloc[-1]
avg_volume = dataframe['volume'].rolling(vol_window).mean().iloc[-1]
if recent_volume < avg_volume * vol_threshold:
return None
if 1 <= count_of_buys <= self._max_so_multiplier_orig:
isot = abs(self._initial_safety_order_trigger)
sss = self._safety_order_step_scale
if sss > 1:
safety_order_trigger = isot + (isot * sss * (math.pow(sss, (count_of_buys - 1)) - 1) / (sss - 1))
elif sss < 1:
safety_order_trigger = isot + (isot * sss * (1 - math.pow(sss, (count_of_buys - 1))) / (1 - sss))
else:
safety_order_trigger = isot * count_of_buys
# Trailing SO trigger: adjust spacing by current_ATR / entry_ATR
if dataframe is not None and not dataframe.empty:
_entry_atr = self._entry_atr_cache.get(trade.pair, 0)
if _entry_atr > 0:
_cur_atr = dataframe['vol_atr_pct_21'].iloc[-1]
if _cur_atr > 0 and np.isfinite(_cur_atr):
_atr_ratio = max(0.5, min(3.0, _cur_atr / _entry_atr))
safety_order_trigger *= _atr_ratio
if current_profit <= (-1 * abs(safety_order_trigger)):
try:
_so_scale = math.pow(self._safety_order_volume_scale, (count_of_buys - 1))
actual_initial_stake = filled_buys[0].cost
stake_amount = actual_initial_stake
already_bought = sum(filled_buy.cost for filled_buy in filled_buys)
if trade.pair in self.cust_proposed_initial_stakes:
if self.cust_proposed_initial_stakes[trade.pair] > 0:
proposed_initial_stake = self.cust_proposed_initial_stakes[trade.pair]
current_actual_stake = already_bought * _so_scale
current_stake_preposition = proposed_initial_stake * _so_scale
current_stake_preposition_compensation = current_stake_preposition + abs(current_stake_preposition - current_actual_stake)
total_so_stake = lerp(current_actual_stake, current_stake_preposition_compensation, self._partial_fill_compensation_scale)
stake_amount = total_so_stake
else:
stake_amount = stake_amount * _so_scale
else:
stake_amount = stake_amount * _so_scale
return stake_amount
except Exception as exc:
logger.info(f'Error adjusting position for {trade.pair}: {exc}')
return None
return None
def _get_max_so_multiplier(self) -> float:
if self._max_so_multiplier_orig > 0:
sovs = self._safety_order_volume_scale
if sovs > 1:
first_line = sovs * (math.pow(sovs, self._max_so_multiplier_orig - 1) - 1)
divisor = sovs - 1
return 2 + first_line / divisor
if sovs < 1:
first_line = sovs * (1 - math.pow(sovs, self._max_so_multiplier_orig - 1))
divisor = 1 - sovs
return 2 + first_line / divisor
return self._max_so_multiplier_orig
return self._max_so_multiplier_orig
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
dataframe['volume_mean_rolling'] = dataframe['volume'].rolling(self._volume_rolling).mean()
_atr_21 = AverageTrueRange(high=dataframe['high'], low=dataframe['low'], close=dataframe['close'], window=21).average_true_range()
dataframe['vol_atr_pct_21'] = _atr_21 / dataframe['close']
# trend: indicateur_VWAP sizes=p6,p14,p10
_ind = indicateur_VWAP()
dataframe = _ind.get_dataframe(dataframe=dataframe, ps1=_FrozenParam('p6'), ps2=_FrozenParam('p14'), ps3=_FrozenParam('p10'), type='trend')
# over: indicateur_MFI sizes=p9,p12,p12
_ind = indicateur_MFI()
dataframe = _ind.get_dataframe(dataframe=dataframe, ps1=_FrozenParam('p9'), ps2=_FrozenParam('p12'), ps3=_FrozenParam('p12'), type='over')
# volatility: indicateur_BollingerBands sizes=p3,p4,p3
_ind = indicateur_BollingerBands()
dataframe = _ind.get_dataframe(dataframe=dataframe, ps1=_FrozenParam('p3'), ps2=_FrozenParam('p4'), ps3=_FrozenParam('p3'), type='volatility')
# momentum: indicateur_CoppockCurve sizes=p9,p6,p10
_ind = indicateur_CoppockCurve()
dataframe = _ind.get_dataframe(dataframe=dataframe, ps1=_FrozenParam('p9'), ps2=_FrozenParam('p6'), ps3=_FrozenParam('p10'), type='momentum')
return dataframe
def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
conditions = []
# WARNING: trend buy cv=16 has NO branch in indicateur_VWAP.get_conditions (valid: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
# -> the generator selected a silent no-op condition; skipping the call to keep the strategy self-documenting.
# WARNING: volatility buy cv=15 has NO branch in indicateur_BollingerBands.get_conditions (valid: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12])
# -> the generator selected a silent no-op condition; skipping the call to keep the strategy self-documenting.
# WARNING: momentum buy cv=14 has NO branch in indicateur_CoppockCurve.get_conditions (valid: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
# -> the generator selected a silent no-op condition; skipping the call to keep the strategy self-documenting.
if self._use_volatility_filter:
window_size = int(self._volatility_size)
if len(dataframe) >= window_size:
atr = AverageTrueRange(high=dataframe['high'], low=dataframe['low'], close=dataframe['close'], window=window_size).average_true_range()
atr_pct = atr / dataframe['close']
conditions.append((atr_pct < self._volatility_threshold))
if self._use_min_volume_filter:
conditions.append((dataframe['volume_mean_rolling'] > self._min_volume_filter))
try:
if conditions:
dataframe.loc[reduce(lambda x, y: x & y, conditions), 'enter_short'] = 1
if self._need_cumulative_entry_call:
dataframe['enter_short'] = dataframe['enter_short'].rolling(window=self._cumulative_entry_call).min()
if self._use_cumulative_end_entry:
dataframe['enter_short'] = ((dataframe['enter_short'] == 0) & (dataframe['enter_short'].shift(1) == 1)).astype(int)
else:
dataframe['enter_short'] = 0
except TypeError:
dataframe['enter_short'] = 0
return dataframe
def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
conditions = []
# momentum sell condition cv=8 value=p8
_ind = indicateur_CoppockCurve()
conditions = _ind.get_conditions(dataframe=dataframe, s1='p9', s2='p6', s3='p10', type='momentum', condition_type='sell', condition_value=8, value='p8', conditions=conditions)
try:
if conditions:
if self._exit_conditions_type == "OR":
exit_mask = reduce(lambda x, y: x | y, conditions)
else:
exit_mask = reduce(lambda x, y: x & y, conditions)
dataframe.loc[exit_mask, 'exit_short'] = 1
else:
dataframe['exit_short'] = 0
except TypeError:
dataframe['exit_short'] = 0
return dataframe
def confirm_trade_entry(self, pair, order_type, amount, rate, time_in_force,
current_time, entry_tag, side, **kwargs):
# Entry gate (active because use_entry_filter=True at generation).
if current_time.weekday() in (5, 6): # block weekend
return False
# Portfolio exposure cap (ratio > 0.5 = block)
try:
open_trades = Trade.get_open_trades()
total_stake = sum(float(t.stake_amount or 0) * float(t.leverage or 1) for t in open_trades)
wallet = self.wallets.get_total_stake_amount()
if wallet > 0 and (total_stake / wallet) > 0.5:
return False
except Exception:
pass
return True