Timeframe
15m
Direction
Long & Short
Stoploss
-2.0%
Trailing Stop
Yes
ROI
0m: 4.0%, 480m: 2.5%, 1440m: 1.5%, 2880m: 0.0%
Interface Version
3
Startup Candles
2000
Indicators
9
freqtrade/freqtrade-strategies
Strategy 003 author@: Gerald Lonlas github@: https://github.com/freqtrade/freqtrade-strategies
"""
XGBoost Rolling Scoring Strategy
- Initial training on first 2000 candles
- Retrain every 500 candles (approx 5 days for 15m)
- Predicts direction probability for every candle
- Entry: confidence > 0.55
"""
from pandas import DataFrame
import talib.abstract as ta
import numpy as np
from freqtrade.strategy import IStrategy
class XGBoostRollingStrategy(IStrategy):
INTERFACE_VERSION = 3
timeframe = '15m'
can_short = True
stoploss = -0.02
trailing_stop = True
trailing_stop_positive = 0.005
trailing_stop_positive_offset = 0.012
trailing_only_offset_is_reached = True
minimal_roi = {"0": 0.04, "480": 0.025, "1440": 0.015, "2880": 0}
max_open_trades = 6
startup_candle_count = 2000 # Minimal warmup, training handles its own data needs
process_only_new_candles = True # Live mode: train once, retrain periodically
use_exit_signal = True
# Per-pair model cache
_models = {}; _scaler_m = {}; _scaler_s = {}; _feature_cols = {}; _trained = {}
def _build_features(self, df: DataFrame) -> DataFrame:
d = df.copy()
for p in [1, 3, 6, 12, 24, 48]:
d[f'r{p}'] = d['close'].pct_change(p)
d['hlr'] = (d['high'] - d['low']) / d['close']
d['cpos'] = (d['close'] - d['low']) / (d['high'] - d['low'] + 0.0001)
for p in [9, 21, 55]:
d[f'e{p}'] = ta.EMA(d, timeperiod=p)
d[f'e{p}d'] = (d['close'] - d[f'e{p}']) / d[f'e{p}'] * 100
macd = ta.MACD(d); d['md'] = macd['macd']; d['ms'] = macd['macdsignal']
d['rsi'] = ta.RSI(d, timeperiod=14)
bb = ta.BBANDS(d, timeperiod=20)
d['bw'] = (bb['upperband'] - bb['lowerband']) / bb['middleband']
d['atr'] = ta.ATR(d, timeperiod=14) / d['close'] * 100
d['vr'] = d['volume'] / ta.SMA(d['volume'], timeperiod=20)
d['adx'] = ta.ADX(d, timeperiod=14)
stoch = ta.STOCH(d); d['sk'] = stoch['slowk']; d['sd'] = stoch['slowd']
tp = (d['high'] + d['low'] + d['close']) / 3
mf = tp * d['volume']
pf = mf.where(tp > tp.shift(1), 0).rolling(14).sum()
nf = mf.where(tp < tp.shift(1), 0).rolling(14).sum()
d['mfi'] = 100 - 100 / (1 + pf / (nf + 0.0001))
return d
def _get_X(self, df: DataFrame):
# Get feature matrix from dataframe
fcols = [c for c in df.columns if c in [
'r1','r3','r6','r12','r24','r48','hlr','cpos',
'e9d','e21d','e55d','md','rsi','bw','atr','vr','adx','sk','sd','mfi'
]]
X = df[fcols].values
# Fill NaN with 0
X = np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0)
return X, fcols
def _train(self, df: DataFrame, pair: str) -> bool:
"""Train on all available data up to current point"""
import xgboost as xgb
df = self._build_features(df.copy())
# Target: 1 if close 12 candles later is up > 0.5%, -1 if down > 0.5%
fwd = df['close'].shift(-12) / df['close'] - 1
y = np.where(fwd > 0.005, 2, np.where(fwd < -0.005, 0, 1)) # XGBoost needs [0,1,2]
X, fcols = self._get_X(df)
# Use all rows except last 12 (no forward target)
valid = np.arange(len(df) - 12)
valid = valid[valid < len(X)]
X_train = X[valid]
y_train = y[valid]
if len(X_train) < 200: return False
# Scale
sm = np.mean(X_train, axis=0); ss = np.std(X_train, axis=0) + 0.0001
Xs = (X_train - sm) / ss
# Class weights
u, c = np.unique(y_train, return_counts=True)
w = {u[i]: len(y_train)/(len(u)*c[i]) for i in range(len(u))}
sw = np.array([w[label] for label in y_train])
# Train 5 models
models = []
for seed in [42, 73, 99, 17, 55]:
m = xgb.XGBClassifier(n_estimators=200, max_depth=4, learning_rate=0.05,
subsample=0.8, colsample_bytree=0.7, min_child_weight=3,
reg_alpha=0.2, reg_lambda=1.0, random_state=seed,
eval_metric='mlogloss', use_label_encoder=False, verbosity=0)
m.fit(Xs, y_train, sample_weight=sw, verbose=False)
models.append(m)
self._models[pair] = models; self._scaler_m[pair] = sm; self._scaler_s[pair] = ss
self._feature_cols[pair] = fcols; self._trained[pair] = len(df)
return True
def _predict(self, df: DataFrame, pair: str) -> DataFrame:
if pair not in self._models or len(df) < 1: return df
df2 = self._build_features(df)
X, fcols = self._get_X(df2)
sm = self._scaler_m[pair][:len(fcols)]
ss = self._scaler_s[pair][:len(fcols)]
Xs = (X - sm) / (ss + 0.0001)
all_probs = np.mean([m.predict_proba(Xs) for m in self._models[pair]], axis=0)
classes = self._models[pair][0].classes_
long_idx = list(classes).index(2) if 2 in classes else -1
short_idx = list(classes).index(0) if 0 in classes else -1
df['xg_long'] = all_probs[:, long_idx] if long_idx >= 0 else 0
df['xg_short'] = all_probs[:, short_idx] if short_idx >= 0 else 0
df['xg_conf'] = np.maximum(df['xg_long'].values, df['xg_short'].values)
return df
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
pair = metadata['pair']
n = len(dataframe)
# Live mode: train first 2000, retrain every 500
if pair not in self._trained and n >= 2100:
self._train(dataframe.iloc[:2000].copy(), pair)
elif pair in self._trained and n - self._trained[pair] >= 500:
self._train(dataframe.copy(), pair)
# Predict
if pair in self._models:
last_only = self.process_only_new_candles and n <= self._trained.get(pair, 0) + 1
dataframe = self._predict(dataframe, pair)
return dataframe
def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
if 'xg_conf' not in dataframe.columns: return dataframe
dataframe.loc[(dataframe['xg_conf'] > 0.55) & (dataframe['xg_long'] > dataframe['xg_short']),
['enter_long','enter_tag']] = (1, 'xgL')
dataframe.loc[(dataframe['xg_conf'] > 0.55) & (dataframe['xg_short'] > dataframe['xg_long']),
['enter_short','enter_tag']] = (1, 'xgS')
return dataframe
def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
if 'xg_conf' not in dataframe.columns: return dataframe
dataframe.loc[dataframe['xg_conf'] < 0.45, ['exit_long','exit_short','exit_tag']] = (1, 1, 'xgE')
return dataframe