Timeframe
5m
Direction
Long & Short
Stoploss
-5.0%
Trailing Stop
Yes
ROI
N/A
Interface Version
N/A
Startup Candles
300
Indicators
2
freqtrade/freqtrade-strategies
import logging
import pandas as pd
import numpy as np
import talib.abstract as ta # Added for trend EMAs
from freqtrade.strategy import IStrategy, CategoricalParameter, DecimalParameter
from pandas import DataFrame
from pathlib import Path
logger = logging.getLogger(__name__)
class SekkaCVD(IStrategy):
timeframe = '5m'
can_short = True
# DISABLED: Exits rely entirely on trailing stop and stoploss
minimal_roi = {}
## -- TRAILING SETUP --
trailing_stop = True
trailing_stop_positive_offset = 0.03
trailing_stop_positive = 0.01
trailing_only_offset_is_reached = True
# Note: Optimize this via hyperopt (--spaces stoploss) to find a better balance
stoploss = -0.05
startup_candle_count = 300 # > largest rolling window (288)
# Minimum CVD divergence threshold (filter out paper-thin divergences)
CVD_DIV_THRESHOLD = CategoricalParameter(
[0.003, 0.001, 0.005, 0.007, 0.01, 0.015, 0.02, 0.03], default=0.005, space='buy', optimize=True)
# Proximity Buffer: How close to the absolute high/low must the price be? (0.1%)
PROXIMITY_LONG = 1.001 # Price can be up to 0.1% above the absolute low
PROXIMITY_SHORT = 0.999 # Price can be up to 0.1% below the absolute high
# Rolling window sizes (in 5m candles)
WINDOWS = {
'macro': 288, # 24h (6 × 4h candles equivalent)
'medium': 48, # 4h (4 × 1h candles equivalent)
'short': 12, # 1h (4 × 15m candles equivalent)
'micro': 6, # 30m (6 × 5m candles equivalent)
}
def bot_start(self, **kwargs):
"""Preload and cache CVD data at startup"""
self.cvd_cache = {}
def compute_true_cvd(self, pair: str) -> pd.DataFrame:
"""Load raw trades and compute true CVD aggregated to 5m candles"""
pair_filename = pair.replace('/', '_').replace(':', '_')
data_dir = Path(self.config['datadir'])
trading_mode = self.config.get('trading_mode', 'spot')
if trading_mode == 'futures' and data_dir.name != 'futures':
data_dir = data_dir / 'futures'
trades_path = data_dir / f"{pair_filename}-trades.feather"
if not trades_path.exists():
raise FileNotFoundError(f"Trades file not found: {trades_path}")
trades_df = pd.read_feather(trades_path)
if not pd.api.types.is_datetime64_any_dtype(trades_df['timestamp']):
trades_df['timestamp'] = pd.to_datetime(trades_df['timestamp'], unit='ms', utc=True)
trades_df.set_index('timestamp', inplace=True)
trades_df['taker_buy'] = trades_df['side'] == 'buy'
trades_df['buy_vol'] = trades_df['amount'].where(trades_df['taker_buy'], 0)
trades_df['sell_vol'] = trades_df['amount'].where(~trades_df['taker_buy'], 0)
resampled = trades_df.resample('5min').agg(
buy_vol=('buy_vol', 'sum'),
sell_vol=('sell_vol', 'sum')
)
resampled = resampled.fillna(0)
resampled['delta'] = resampled['buy_vol'] - resampled['sell_vol']
resampled['total_vol'] = resampled['buy_vol'] + resampled['sell_vol']
resampled['cvd'] = resampled['delta'].cumsum()
return resampled[['delta', 'cvd', 'total_vol']]
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
pair = metadata['pair']
if pair not in self.cvd_cache:
self.cvd_cache[pair] = self.compute_true_cvd(pair)
cvd_df = self.cvd_cache[pair]
dataframe.index = pd.to_datetime(dataframe['date'], utc=True)
dataframe = dataframe.join(cvd_df[['delta', 'cvd', 'total_vol']], how='left')
dataframe['cvd'] = dataframe['cvd'].ffill()
dataframe['total_vol'] = dataframe['total_vol'].fillna(0)
dataframe.reset_index(drop=True, inplace=True)
# --- Trend Definition (For 5m Flexibility) ---
dataframe['ema50'] = ta.EMA(dataframe, timeperiod=50)
dataframe['ema200'] = ta.EMA(dataframe, timeperiod=200)
# --- Calculate Daily Anchored VWAP ---
dataframe['typical_price'] = (dataframe['high'] + dataframe['low'] + dataframe['close']) / 3
dataframe['pv'] = dataframe['typical_price'] * dataframe['volume']
dataframe['day'] = dataframe['date'].dt.date
daily_groups = dataframe.groupby('day')
cumulative_pv = daily_groups['pv'].cumsum()
cumulative_vol = daily_groups['volume'].cumsum().clip(lower=1e-8)
dataframe['vwap_daily'] = cumulative_pv / cumulative_vol
# Compute raw divergence values per window
for name, window in self.WINDOWS.items():
price_high = dataframe['high'].rolling(window).max()
price_low = dataframe['low'].rolling(window).min()
cvd_high = dataframe['cvd'].rolling(window).max()
cvd_low = dataframe['cvd'].rolling(window).min()
rolling_vol = dataframe['total_vol'].rolling(window).sum().clip(lower=1)
# Price at rolling extreme (Boolean with 0.1% Proximity Buffer applied)
dataframe[f'price_at_low_{name}'] = (dataframe['low'] <= (price_low * self.PROXIMITY_LONG))
dataframe[f'price_at_high_{name}'] = (dataframe['high'] >= (price_high * self.PROXIMITY_SHORT))
# CVD diverging from extreme (boolean)
dataframe[f'cvd_above_low_{name}'] = (dataframe['cvd'] > cvd_low)
dataframe[f'cvd_below_high_{name}'] = (dataframe['cvd'] < cvd_high)
# Divergence strength normalized by volume
dataframe[f'cvd_div_bull_{name}'] = (dataframe['cvd'] - cvd_low) / rolling_vol
dataframe[f'cvd_div_bear_{name}'] = (cvd_high - dataframe['cvd']) / rolling_vol
# Cleanup
dataframe.drop(columns=['typical_price', 'pv', 'day'], inplace=True, errors='ignore')
return dataframe
def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
threshold = self.CVD_DIV_THRESHOLD.value
# 1. Higher Timeframe (HTF) Strict Alignment
bullish_htf = True
bearish_htf = True
# We only strictly loop the larger timeframes now
htf_windows = ['macro', 'medium', 'short']
for name in htf_windows:
bullish_htf = bullish_htf & (
dataframe[f'price_at_low_{name}'] &
dataframe[f'cvd_above_low_{name}'] &
(dataframe[f'cvd_div_bull_{name}'] > threshold)
)
bearish_htf = bearish_htf & (
dataframe[f'price_at_high_{name}'] &
dataframe[f'cvd_below_high_{name}'] &
(dataframe[f'cvd_div_bear_{name}'] > threshold)
)
# 2. Micro (5m) Flexibility Logic
# Define the clear trend
uptrend = dataframe['ema50'] > dataframe['ema200']
downtrend = dataframe['ema50'] < dataframe['ema200']
# Define strict micro divergence
micro_bull_strict = (
dataframe['price_at_low_micro'] &
dataframe['cvd_above_low_micro'] &
(dataframe['cvd_div_bull_micro'] > threshold)
)
micro_bear_strict = (
dataframe['price_at_high_micro'] &
dataframe['cvd_below_high_micro'] &
(dataframe['cvd_div_bear_micro'] > threshold)
)
# Combine HTF with Micro Flexibility:
# Require HTF alignment. For Micro, either have strict divergence OR be trading with the clear trend.
bullish_all = bullish_htf & (micro_bull_strict | uptrend)
bearish_all = bearish_htf & (micro_bear_strict | downtrend)
# 3. Apply the VWAP Fair Value Filter
bullish_all = bullish_all & (dataframe['close'] < dataframe['vwap_daily'])
bearish_all = bearish_all & (dataframe['close'] > dataframe['vwap_daily'])
dataframe.loc[
bullish_all,
['enter_long', 'enter_tag']
] = (1, 'cvd_bullish_div_vwap')
dataframe.loc[
bearish_all,
['enter_short', 'enter_tag']
] = (1, 'cvd_bearish_div_vwap')
return dataframe
def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# Exits are managed via ROI, trailing stop, and stoploss.
return dataframe