Timeframe
5m
Direction
Long Only
Stoploss
-6.0%
Trailing Stop
No
ROI
0m: 13.8%, 29m: 5.4%, 65m: 2.6%, 179m: 0.0%
Interface Version
N/A
Startup Candles
N/A
Indicators
2
freqtrade/freqtrade-strategies
Strategy 003 author@: Gerald Lonlas github@: https://github.com/freqtrade/freqtrade-strategies
# pragma pylint: disable=W0105, C0103, C0114, C0115, C0116, C0301, C0302, C0303, C0325, C0411, C0413, W1203, W291
"""
####################################################################################
EQUAL - base class for 'simple' time series prediction
Handles most of the logic for time series prediction. Subclasses should
override the model-related functions
Note that I use gain rather than price because it is a normalised value, and works better with prediction
algorithms. I use the actual (future) gain to train a base model, which is then further refined for each
individual pair.
The model is created if it does not exist, and is trained on all available data before being saved.
Models are saved in user_data/strategies/GYN/models/<class>/<class>.sav, where <class> is the name
of the current class (Shifted if running this directly, or the name of the subclass).
If the model already exits, then it is just loaded and used.
So, it makes sense to do initial training over a long period of time to create the base model.
If training, then no backtesting or tuning for individual pairs is performed (way faster).
If you want to retrain (e.g. you changed indicators), then delete the model and run the strategy over a
long time period
####################################################################################
"""
import copy
import cProfile
import os
import pstats
import sys
import traceback
from datetime import datetime
from functools import reduce
from pathlib import Path
from typing import Optional
import random
import logging
import warnings
import time
import joblib
import numpy as np
import pandas as pd
import pywt
from pandas import DataFrame, Series
import talib.abstract as ta
import finta
import technical.indicators as ftt
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.preprocessing import RobustScaler
from freqtrade import leverage
import freqtrade.vendor.qtpylib.indicators as qtpylib
from freqtrade.persistence import Trade
import freqtrade.vendor.qtpylib.indicators as qtpylib
from freqtrade.strategy import CategoricalParameter, DecimalParameter, IStrategy
from lightgbm import LGBMRegressor
from sklearn.linear_model import PassiveAggressiveRegressor, SGDRegressor
from xgboost import XGBRegressor
group_dir = str(Path(__file__).parent)
strat_dir = str(Path(__file__).parent.parent)
sys.path.append(strat_dir)
sys.path.append(group_dir)
import utils.custom_indicators as cta
import utils.Wavelets as Wavelets
import utils.Forecasters as Forecasters
from utils.DataframeUtils import DataframeUtils, ScalerType # pylint: disable=E0401
logger = logging.getLogger(__name__)
# log.setLevel(logging.DEBUG)
warnings.simplefilter(action="ignore", category=pd.errors.PerformanceWarning)
warnings.simplefilter(action="ignore", category=FutureWarning)
warnings.simplefilter(action="ignore", category=UserWarning)
pd.options.mode.chained_assignment = None # default='warn'
class tsp0chicken(IStrategy):
# Do *not* hyperopt for the roi and stoploss spaces
plot_config = {
"main_plot": {
"close": {"color": "cornflowerblue"},
},
"subplots": {
"Diff": {
"predicted_gain": {"color": "purple"},
"shifted_pred": {"color": "skyblue"},
# "squeeze": {"color": "red"},
"gain": {"color": "green"},
"target_profit": {"color": "lightgreen"},
"target_loss": {"color": "lightsalmon"},
"buy_region": {"color": "darkseagreen"},
"sell_region": {"color": "darksalmon"},
},
},
}
# Trailing stop:s
trailing_stop = False
trailing_stop_positive = None
trailing_stop_positive_offset = 0.0
trailing_only_offset_is_reached = False
timeframe = "5m"
use_custom_stoploss = False
can_short = False
leverage_input = 15.0
# if setting can-short to True, remember to update the config file:
# "trading_mode": "futures",
# "margin_mode": "isolated",
# Recommended
use_exit_signal = True
exit_profit_only = False
ignore_roi_if_entry_signal = True
# Required
startup_candle_count: int = 128 # must be power of 2
process_only_new_candles = True
custom_trade_info = {} # pair-specific data
curr_pair = ""
###################################
# Strategy Specific Variable Storage
## Hyperopt Variables
lookahead = 0
df_coeffs: DataFrame = None
coeff_table = None
coeff_array = None
gain_data = None
merge_indicators = False # set to False to not merge indicators into prediction data
use_rolling = False # True = rolling (slow but realistic), False = Jumping (much faster, less realistic)
single_col_prediction = False # True = use only gain. False = use all columns (better, but much slower)
wavelet_type: Wavelets.WaveletType = Wavelets.WaveletType.DWT
wavelet = None
forecaster_type: Forecasters.ForecasterType = Forecasters.ForecasterType.PA
# forecaster_type:Forecasters.ForecasterType = Forecasters.ForecasterType.SGD
# forecaster_type:Forecasters.ForecasterType = Forecasters.ForecasterType.SVR
forecaster = None
data = None
dc = 0
har = []
wavelet_size = 64 # needed for consistently-sized transforms
win_size = wavelet_size # this can vary (64)
train_min_len = wavelet_size # longer = slower (64)
train_len = min(128, wavelet_size * 4) # longer = slower 256 (64*4)
# scale_len = wavelet_size // 2 # no. recent candles to use when scaling
scale_len = min(8, wavelet_size // 2) # no. recent candles to use when scaling (64/2)
win_size = min(32, wavelet_size) # (64)
model_window = wavelet_size # longer = slower (64)
# setting this to some form of harmonic ptp
profit_nstd = 1.0
loss_nstd = 1.0
training_data = None
training_labels = None
training_mode = False # do not set manually
supports_incremental_training = True
model_per_pair = False
combine_models = True
model_trained = False
new_model = False
detrend_data = False
scale_results = False
norm_data = False # changing this requires new models
dataframeUtils = None
scaler = RobustScaler()
model = None
base_forecaster = None
curr_dataframe: DataFrame = None
target_profit = 0.0
target_loss = 0.0
# hyperparams
# Buy hyperspace params:
buy_params = {
"cexit_min_profit_th": 0.5,
"cexit_profit_nstd": 1.1,
"enable_bb_check": False,
"enable_squeeze": True,
"entry_bb_factor": 1.0,
"entry_bb_width": 0.02,
"entry_guard_metric": 0.0,
"enable_guard_metric": True, # value loaded from strategy
}
# Sell hyperspace params:
sell_params = {
"cexit_loss_nstd": 2.8,
"cexit_metric_overbought": 0.56,
"cexit_metric_take_profit": 0.61,
"cexit_min_loss_th": -0.9,
"exit_bb_factor": 1.02,
"exit_guard_metric": 0.4,
"enable_exit_signal": False, # value loaded from strategy
}
# ROI table:
minimal_roi = {
"0": 0.138,
"29": 0.054,
"65": 0.026,
"179": 0
}
# Stoploss:
stoploss = -0.06
# Entry
# the following flags apply to both entry and exit
enable_guard_metric = CategoricalParameter([True, False], default=True, space="buy", load=True, optimize=False)
enable_bb_check = CategoricalParameter([True, False], default=True, space="buy", load=True, optimize=True)
enable_squeeze = CategoricalParameter([True, False], default=True, space="buy", load=True, optimize=True)
entry_guard_metric = DecimalParameter(-0.8, 0.0, default=-0.2, decimals=1, space="buy", load=True, optimize=True)
entry_bb_width = DecimalParameter(0.020, 0.20, default=0.02, decimals=3, space="buy", load=True, optimize=True)
entry_bb_factor = DecimalParameter(0.90, 1.20, default=1.0, decimals=2, space="buy", load=True, optimize=True)
# Exit
# use exit signal? If disabled, just rely on the custom exit checks (or stoploss) to get out
enable_exit_signal = CategoricalParameter([True, False], default=True, space="sell", load=True, optimize=False)
exit_guard_metric = DecimalParameter(0.4, 0.8, default=0.4, decimals=1, space="sell", load=True, optimize=True)
exit_bb_factor = DecimalParameter(0.90, 1.10, default=1.02, decimals=2, space="sell", load=True, optimize=True)
# Custom Exit
# No. Standard Deviations of profit/loss for target, and lower limit
cexit_min_profit_th = DecimalParameter(0.5, 1.5, default=0.7, decimals=1, space="buy", load=True, optimize=True)
cexit_profit_nstd = DecimalParameter(1.0, 3.0, default=1.1, decimals=1, space="buy", load=True, optimize=True)
cexit_min_loss_th = DecimalParameter(-1.5, -0.5, default=-0.9, decimals=1, space="sell", load=True, optimize=True)
cexit_loss_nstd = DecimalParameter(1.0, 3.0, default=2.8, decimals=1, space="sell", load=True, optimize=True)
# Guard metric sell limits - used to bail out when in profit
cexit_metric_overbought = DecimalParameter(0.55, 0.99, default=0.56, decimals=2, space="sell", load=True, optimize=True)
cexit_metric_take_profit = DecimalParameter(0.55, 0.99, default=0.61, decimals=2, space="sell", load=True, optimize=True)
###################################
@property
def protections(self):
return [
{
"method": "CooldownPeriod",
"stop_duration_candles": 5
},
{
"method": "MaxDrawdown",
"lookback_period_candles": 48,
"trade_limit": 20,
"stop_duration_candles": 4,
"max_allowed_drawdown": 0.2
},
{
"method": "StoplossGuard",
"lookback_period_candles": 24,
"trade_limit": 4,
"stop_duration_candles": 2,
"only_per_pair": False
},
{
"method": "LowProfitPairs",
"lookback_period_candles": 6,
"trade_limit": 2,
"stop_duration_candles": 60,
"required_profit": 0.02
},
{
"method": "LowProfitPairs",
"lookback_period_candles": 24,
"trade_limit": 4,
"stop_duration_candles": 2,
"required_profit": 0.01
}
]
def leverage(self, pair: str, current_time: datetime, current_rate: float,
proposed_leverage: float, max_leverage: float, entry_tag: Optional[str], side: str,
**kwargs) -> float:
if self.leverage_input > max_leverage:
return max_leverage
return self.leverage_input
def bot_start(self, **kwargs) -> None:
if self.dataframeUtils is None:
self.dataframeUtils = DataframeUtils()
self.dataframeUtils.set_scaler_type(ScalerType.Robust)
if self.wavelet is None:
self.wavelet = Wavelets.make_wavelet(self.wavelet_type)
if self.forecaster is None:
self.forecaster = Forecasters.make_forecaster(self.forecaster_type)
self.forecaster.set_detrend(self.detrend_data)
if (not self.forecaster.supports_multiple_columns()) and (not self.single_col_prediction):
logger.info(" ****")
logger.info(f" **** ERROR: forecaster ({self.forecaster_type.name}) does not support multiple indicators")
logger.info(" ****")
if not self.forecaster.supports_retrain():
logger.info(" ****")
logger.info(f" **** WARNING: forecaster ({self.forecaster_type.name}) does not support retrainings")
logger.info(" ****")
# Just enable variables here, we overwrite them later
self.win_size = self.wavelet_size # this can vary
self.train_min_len = self.wavelet_size # longer = slower
self.train_len = min(128, self.wavelet_size * 4) # longer = slower
# scale_len = wavelet_size // 2 # no. recent candles to use when scaling
self.scale_len = min(16, self.wavelet_size // 2) # no. recent candles to use when scaling
self.win_size = min(32, self.wavelet_size)
self.model_window = self.wavelet_size # longer = slower
logger.info("")
logger.info(f" wavelet_type: {self.wavelet_type.name} ({self.wavelet_size})")
logger.info(f" win_size: {self.win_size}")
logger.info(f" forecaster_type: {self.forecaster.get_name()}")
logger.info(f" detrend_data: {self.forecaster.detrend_data}")
logger.info("")
return
# update saved data based on current pairlist
def update_pairlist_data(self):
# this only makes sense in 'live' modes
if self.dp.runmode.value in ("backtest", "plot", "hyperopt"):
return
# current pairlist
curr_pairlist = np.array(self.dp.current_whitelist())
# pairlist from previous calls
saved_pairlist = np.array(list(self.custom_trade_info.keys()))
# get the pairs that are no longer in the list
removed_pairs = np.setdiff1d(saved_pairlist, curr_pairlist)
added_pairs = np.setdiff1d(curr_pairlist, saved_pairlist)
if len(removed_pairs) > 0:
logger.info(" Pairlist changed:")
logger.info(f" old pairs: {saved_pairlist}")
logger.info(f" new pairs: {curr_pairlist}")
logger.info(f" pairs removed: {removed_pairs}")
logger.info(f" pairs added: {added_pairs}")
for pair in removed_pairs:
logger.info(f" Removing historical data for: {pair}")
del self.custom_trade_info[pair]
###################################
"""
Indicator Definitions
"""
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# Ch1cK3N t4c0s 09/2/2024 - Use FFT to find the optimal wavelet size for each pair.
# Use Hurst Dominant Cycle and Harmonics to generate envelopes and calculate wavelet
# average movement for trailing stoploss.
# NOTE: if you change the indicators, you need to regenerate the model
start_time = time.time()
pair = metadata['pair']
dataframe = dataframe.copy()
dataframe['zero'] = 0
dataframe['ha_close'] = (dataframe['high'] + dataframe['low'] + dataframe['close']) / 3
dataframe['ha_open'] = dataframe['ha_close'].shift(1)
dataframe['ha_high'] = dataframe[['high', 'ha_open', 'ha_close']].max(axis=1)
dataframe['ha_low'] = dataframe[['low', 'ha_open', 'ha_close']].min(axis=1)
if self.dp.runmode.value in ('dry_run'):
window_size = 200 #self.window_size.value # Adjust this value as appropriate
else:
window_size = None
if len(dataframe) < 200: #self.window_size.value:
raise ValueError(f"Insufficient data points for FFT: {len(dataframe)}. Need at least {window_size} data points.")
# Perform FFT to identify cycles with a rolling window
freq, power = perform_fft(dataframe['ha_close'], window_size=window_size)
if len(freq) == 0 or len(power) == 0:
raise ValueError("FFT resulted in zero or invalid frequencies. Check the data or the FFT implementation.")
# Filter out the zero-frequency component and limit the frequency to below 500
positive_mask = (freq > 0) & (1 / freq < 30)
positive_freqs = freq[positive_mask]
positive_power = power[positive_mask]
# Convert frequencies to periods
cycle_periods = 1 / positive_freqs
# Set a threshold to filter out insignificant cycles based on power
power_threshold = 0.01 * np.max(positive_power)
significant_indices = positive_power > power_threshold
significant_periods = cycle_periods[significant_indices]
significant_power = positive_power[significant_indices]
# Identify the dominant cycle
dominant_freq_index = np.argmax(significant_power)
dominant_freq = positive_freqs[dominant_freq_index]
cycle_period = int(np.abs(1 / dominant_freq)) if dominant_freq != 0 else np.inf
if cycle_period == np.inf:
raise ValueError("No dominant frequency found. Check the data or the method used.")
# Calculate harmonics for the dominant cycle
harmonics = [cycle_period / (i + 1) for i in range(1, 4)]
dataframe['dc_EWM'] = dataframe['ha_close'].ewm(span=int(cycle_period)).mean()
dataframe['dc_1/2'] = dataframe['ha_close'].ewm(span=int(harmonics[0])).mean()
dataframe['dc_1/3'] = dataframe['ha_close'].ewm(span=int(harmonics[1])).mean()
dataframe['dc_1/4'] = dataframe['ha_close'].ewm(span=int(harmonics[2])).mean()
# Base pair dataframe timeframe indicators
curr_pair = metadata["pair"]
window_size = int(cycle_period)
# set global variables.
self.curr_dataframe = dataframe
self.curr_pair = curr_pair
self.lookahead = int(harmonics[2])
self.dc = int(cycle_period)
self.har = harmonics
self.wavelet_size = int(cycle_period) # needed for consistently-sized transforms
self.win_size = self.wavelet_size # this can vary (64)
self.train_min_len = self.wavelet_size # longer = slower (64)
self.train_len = self.wavelet_size * 4 # longer = slower 256 (64*4)
self.scale_len = int(harmonics[0]) # no. recent candles to use when scaling (64/2)
self.win_size = self.wavelet_size # (64)
self.model_window = self.wavelet_size # longer = slower (64)
self.update_pairlist_data()
# backward looking gain
dataframe["gain"] = (
100.0
* (dataframe["ha_close"] - dataframe["ha_close"].shift(self.lookahead))
/ dataframe["ha_close"].shift(self.lookahead)
)
dataframe["gain"].fillna(0.0, inplace=True)
dataframe["gain"] = dataframe["gain"].round(4)
# need to save the gain data for later scaling
self.gain_data = dataframe["gain"].to_numpy().copy()
# target profit/loss thresholds
dataframe["profit"] = dataframe["gain"].clip(lower=0.0)
dataframe["loss"] = dataframe["gain"].clip(upper=0.0)
dataframe = self.update_gain_targets(dataframe)
# Bollinger Bands
bollinger = qtpylib.bollinger_bands(dataframe["close"], window=window_size, stds=2)
dataframe["bb_lowerband"] = bollinger["lower"]
dataframe["bb_middleband"] = bollinger["mid"]
dataframe["bb_upperband"] = bollinger["upper"]
dataframe["bb_width"] = (dataframe["bb_upperband"] - dataframe["bb_lowerband"]) / dataframe["bb_middleband"]
dataframe["bb_gain"] = (dataframe["bb_upperband"] - dataframe["close"]) / dataframe["close"]
dataframe["bb_loss"] = (dataframe["bb_lowerband"] - dataframe["close"]) / dataframe["close"]
# RSI
dataframe["rsi"] = ta.RSI(dataframe, timeperiod=window_size)
# init prediction column
dataframe["predicted_gain"] = 0.0
# RMI: https://www.tradingview.com/script/kwIt9OgQ-Relative-Momentum-Index/
dataframe["rmi"] = cta.RMI(dataframe, length=window_size, mom=5)
# scaled version for use as guard metric
dataframe["srmi"] = 2.0 * (dataframe["rmi"] - 50.0) / 100.0
# guard metric must be in range [-1,+1], with -ve values indicating oversold and +ve values overbought
dataframe["guard_metric"] = dataframe["srmi"]
# create and init the model, if first time (dataframe has to be populated first)
if self.model is None:
# print(" Loading model")
self.load_model(np.shape(dataframe))
# add the predictions
# print(" Making predictions...")
dataframe = self.add_predictions(dataframe)
dataframe.fillna(0.0, inplace=True)
# #DBG (cannot include this in 'real' strat because it's forward looking):
# dataframe['dwt'] = self.get_dwt(dataframe['gain'])
# logger.info(f'{pair} - Gain: {dataframe['gain'].iloc[-1]} | Profit: {dataframe['profit'].iloc[-1]:.2f} | Loss: {dataframe['loss'].iloc[-1]:.2f}')
logger.info(f'{pair} - DC: {cycle_period:.2f} | 1/2: {harmonics[0]:.2f} | 1/3: {harmonics[1]:.2f} | 1/4: {harmonics[2]:.2f}')
end_time = time.time()
logger.info(f"Indicators done for {pair} in {end_time - start_time:.2f} secs")
return dataframe
def update_gain_targets(self, dataframe):
# win_size = max(self.lookahead, 6)
win_size = self.scale_len
self.profit_nstd = float(self.cexit_profit_nstd.value)
self.loss_nstd = float(self.cexit_loss_nstd.value)
dataframe["target_profit"] = (
dataframe["profit"].rolling(window=win_size).mean()
+ self.profit_nstd * dataframe["profit"].rolling(window=win_size).std()
)
dataframe["target_loss"] = dataframe["loss"].rolling(window=win_size).mean() - self.loss_nstd * abs(
dataframe["loss"].rolling(window=win_size).std()
)
dataframe["target_profit"] = dataframe["target_profit"].clip(lower=float(self.cexit_min_profit_th.value))
dataframe["target_loss"] = dataframe["target_loss"].clip(upper=float(self.cexit_min_loss_th.value))
dataframe["target_profit"] = np.nan_to_num(dataframe["target_profit"])
dataframe["target_loss"] = np.nan_to_num(dataframe["target_loss"])
dataframe["local_mean"] = dataframe["close"].rolling(window=win_size).mean()
dataframe["local_min"] = dataframe["close"].rolling(window=win_size).min()
dataframe["local_max"] = dataframe["close"].rolling(window=win_size).max()
return dataframe
def smooth(self, y, window):
box = np.ones(window) / window
y_smooth = np.convolve(y, box, mode="same")
# Hack: constrain to 3 decimal places (should be elsewhere, but convenient here)
y_smooth = np.round(y_smooth, decimals=3)
return np.nan_to_num(y_smooth)
# -----------------------
# look ahead to get future gain. Do *not* put this into the main dataframe!
def get_future_gain(self, dataframe):
df = self.convert_dataframe(dataframe)
future_gain = df["gain"].shift(-self.lookahead).to_numpy()
future_gain[-self.lookahead :] = 0.0
future_gain = np.round(future_gain, decimals=3)
future_gain = np.nan_to_num(future_gain)
# future_gain = dataframe['gain'].shift(-self.lookahead).to_numpy()
# return self.smooth(future_gain, 8)
return future_gain
# -------------
# Normalisation
array_scaler = RobustScaler()
def update_scaler(self, data):
if not self.array_scaler:
self.array_scaler = RobustScaler()
self.array_scaler.fit(data.reshape(-1, 1))
def norm_array(self, a):
return self.array_scaler.transform(a.reshape(-1, 1))
def denorm_array(self, a):
return self.array_scaler.inverse_transform(a.reshape(-1, 1)).squeeze()
# scales array data, based on array target
def scale_array(self, target, data):
# detrend the input arrays
t = np.arange(0, len(target))
t_poly = np.polyfit(t, target, 1)
t_line = np.polyval(t_poly, target)
x = target - t_line
t = np.arange(0, len(data))
d_poly = np.polyfit(t, data, 1)
d_line = np.polyval(d_poly, data)
y = data - d_line
# scale untrended data
self.update_scaler(x)
y_scaled = self.denorm_array(y)
# retrend
y_scaled = y_scaled + d_line
return y_scaled
def convert_dataframe(self, dataframe: DataFrame) -> DataFrame:
df = dataframe.copy()
# convert date column so that it can be scaled.
if "date" in df.columns:
dates = pd.to_datetime(df["date"], utc=True)
df["date"] = dates.astype("int64")
df.fillna(0.0, inplace=True)
df.set_index("date")
df.reindex()
# print(f' norm_data:{self.norm_data}')
if self.norm_data:
# scale the dataframe
self.scaler.fit(df)
df = pd.DataFrame(self.scaler.transform(df), columns=df.columns)
return df
###################################
# Model-related funcs. Override in subclass to use a different type of model
def get_model_path(self, pair):
category = self.__class__.__name__
root_dir = group_dir + "/models/" + category
model_name = category
if self.model_per_pair and (len(pair) > 0):
model_name = model_name + "_" + pair.split("/")[0]
path = root_dir + "/" + model_name + ".sav"
return path
def load_model(self, df_shape):
model_path = self.get_model_path("")
# load from file or create new model
if os.path.exists(model_path):
# use joblib to reload model state
print(" loading from: ", model_path)
self.model = joblib.load(model_path)
self.model_trained = True
self.new_model = False
self.training_mode = False
# set the model in the forecaster
self.forecaster.set_model(self.model)
else:
self.model = self.forecaster.get_model()
self.model_trained = False
self.new_model = True
self.training_mode = True
# sklearn family of regressors sometimes support starting with an existing model (warm_start),
# or incremental training (partial_fit())
if hasattr(self.model, "warm_start"):
self.model.warm_start = True
self.supports_incremental_training = True # override default
if hasattr(self.model, "partial_fit"):
self.supports_incremental_training = True # override default
# if self.model is None:
# print("*** ERR: model was not created properly ***")
return
# -------------
def save_model(self):
# save trained model
model_path = self.get_model_path("")
# create directory if it doesn't already exist
save_dir = os.path.dirname(model_path)
if not os.path.exists(save_dir):
os.makedirs(save_dir)
# extract underlying model from forecaster
model = self.forecaster.get_model()
# use joblib to save model state
if model_path is not None:
logger.info(f"Saving to: {model_path}")
joblib.dump(model, model_path)
return
# -------------
# train the model. Override if not an sklearn-compatible algorithm
# set save_model=False if you don't want to save the model (needed for ML algorithms)
def train_model(self, forecaster: Forecasters.base_forecaster, data: np.array, results: np.array, save_model):
if forecaster is None:
logger.info("***ERR: no forecaster ***")
return
x = np.nan_to_num(data)
y = np.nan_to_num(results)
forecaster.train(x, y, incremental=True)
# print(f' train_model() data:{np.shape(data)} results:{np.shape(results)}')
return
# -------------
# initial training of the model
def init_model(self, dataframe: DataFrame):
# if model is not yet trained, or this is a new model and we want to combine across pairs, then train
if (not self.model_trained) or (self.new_model and self.combine_models):
df = dataframe
future_gain_data = self.get_future_gain(df)
data = self.get_data(df)
if self.single_col_prediction:
training_data = dataframe["gain"].to_numpy()
# training_data = self.smooth(training_data, 2)
training_data = training_data.reshape(-1, 1)
else:
training_data = data.copy()
training_data = training_data[: -self.lookahead - 1]
training_labels = future_gain_data[: -self.lookahead - 1].copy()
if not self.model_trained:
logger.info(f"Initial training ({self.curr_pair})")
else:
logger.info(f"Incremental training ({self.curr_pair})")
if self.forecaster.supports_retrain:
# loop through data and train on self.wavelet_length amounts of data
start = 0
end = self.train_len - 1
num_buffs = int((np.shape(training_data)[0]) / self.train_len)
for i in range(num_buffs):
logger.info(f'Training start:{start} end:{end} self.train_len:{self.train_len}')
self.train_model(self.forecaster, training_data[start:end], training_labels[start:end], True)
start = start + self.train_len
end = end + self.train_len
else:
self.train_model(self.forecaster, training_data, training_labels, True)
self.model_trained = True
if self.new_model:
self.save_model()
# print(f' model_trained:{self.model_trained} new_model:{self.new_model} combine_models:{self.combine_models}')
return
# -------------
# set the data for this straegy. Override if necessary
def get_data(self, dataframe):
# default is to just normalise the dataframe and convert to numpy array
self.curr_dataframe = dataframe
df = dataframe.copy()
gain = df["gain"].to_numpy()
# gain = self.smooth(gain, 2)
df["gain"] = gain
self.data = np.array(self.convert_dataframe(df))
return self.data
# -------------
# generate predictions for an np array (intended to be overriden if needed)
def predict_data(self, forecaster: Forecasters.base_forecaster, data):
x = np.nan_to_num(data)
preds = forecaster.forecast(x, self.lookahead)
# print(f' data:{np.shape(data)} preds:{np.shape(preds)}')
# # smooth predictions to try and avoid drastic changes
# preds = self.smooth(preds, 2)
# scale the results to generally match the input characteristics
if self.scale_results:
preds = self.scale_array(data[-8:], preds)
preds = np.clip(preds, -3.0, 3.0)
return preds
# -------------
# single prediction (for use in rolling calculation)
def predict(self, gain, dataframe) -> float:
# Get the start and end index labels of the series
start = gain.index[0]
end = gain.index[-1]
# Get the integer positions of the labels in the dataframe index
start_row = dataframe.index.get_loc(start)
end_row = dataframe.index.get_loc(end) + 1 # need to add the 1, don't know why!
# if end_row < (self.wavelet_size + self.lookahead):
if start_row < (self.wavelet_size + self.lookahead): # need buffer for training
return 0.0
# train on previous data
train_end = start_row - self.lookahead - 1
train_start = max(0, train_end - self.train_len)
scale_start = max(0, end - self.scale_len)
if (not self.training_mode) and (self.supports_incremental_training):
train_data = self.training_data[train_start : start - 1].copy()
train_results = self.training_labels[train_start : start - 1].copy()
# pair_forecaster = copy.deepcopy(self.forecaster) # reset to avoid over-training
pair_forecaster = self.forecaster
self.train_model(pair_forecaster, train_data, train_results, False)
else:
pair_forecaster = self.forecaster
# predict for current window
dslice = self.training_data[start:end].copy()
self.gain_data = np.array(dataframe["gain"].iloc[scale_start:end]) # needed for scaling
y_pred = self.predict_data(pair_forecaster, dslice)
return y_pred[-1]
# -------------
# alternate rolling prediction approach. The pandas rolling mechanism seems to have issues for some reason
def rolling_predict(self, gain, window_size):
win_size = window_size
x = np.nan_to_num(np.array(gain))
preds = np.zeros(len(x), dtype=float)
nrows = np.shape(self.training_data)[0]
start = 0
end = start + win_size
scale_start = max(0, end - self.scale_len)
# train_end = max(0, start - self.lookahead - 1)
# train_end = max(0, start - 1)
# train_end = max(0, start - self.lookahead - 1)
train_end = min(end - self.lookahead - 1, nrows - self.lookahead - 2) # potential lookahead problem
train_start = max(0, train_end - self.train_len)
# get the forecaster for this pair
if self.custom_trade_info[self.curr_pair]["forecaster"] is None:
# make a deep copy so that we don't override the baseline model
pair_forecaster = copy.deepcopy(self.forecaster)
self.custom_trade_info[self.curr_pair]["forecaster"] = pair_forecaster
else:
pair_forecaster = self.custom_trade_info[self.curr_pair]["forecaster"]
# loop through each row
while end <= len(x):
if start < (self.wavelet_size + self.lookahead): # need buffer for training
preds[end - 1] = 0.0
else:
# (re-)train the model on prior data and get predictions
if (not self.training_mode) and (self.supports_incremental_training):
train_data = self.training_data[train_start:train_end].copy()
train_results = self.training_labels[train_start:train_end].copy()
# pair_forecaster = copy.deepcopy(self.forecaster) # reset to avoid over-training
self.train_model(pair_forecaster, train_data, train_results, False)
logger.info(f'start:{start} end:{end} train_start:{train_start} train_end:{train_end}')
# rebuild data up to end of current window
dslice = self.training_data[start:end].copy()
self.gain_data = x[scale_start:end] # needed for scaling
forecast = self.predict_data(pair_forecaster, dslice)
logger.info(f'Forecast:{forecast}')
preds[end - 1] = forecast[-1]
# move the window to the next segment
end = end + 1
start = start + 1
# train_end = start - self.lookahead - 1
# train_end = start - 1
train_end = min(end - self.lookahead - 1, nrows - self.lookahead - 2) # potential lookahead problem
train_start = max(0, train_end - self.train_len)
# save the updated/trained forecaster
self.custom_trade_info[self.curr_pair]["forecaster"] = pair_forecaster
return preds
# ----------
# add predictions in a jumping fashion. This is a compromise - the rolling version is very slow
# Note: you probably need to manually tune the parameters, since there is some limited lookahead here
def add_jumping_predictions(self, dataframe: DataFrame) -> DataFrame:
df = dataframe
# roll through the close data and predict for each step
nrows = np.shape(df)[0]
# set up training data
future_gain_data = self.get_future_gain(df)
data = self.get_data(dataframe)
self.training_data = data.copy()
self.training_labels = np.zeros(np.shape(future_gain_data), dtype=float)
self.training_labels = future_gain_data.copy()
# initialise the prediction array, using the close data
pred_array = np.zeros(np.shape(future_gain_data), dtype=float)
win_size = self.model_window
# loop until we get to/past the end of the buffer
# start = win_size
start = self.lookahead + self.train_len
end = start + win_size - 1
# train_end = max(0, start - self.lookahead - 1)
# train_end = max(0, end - self.lookahead - 1)
train_size = self.train_len
# train_start = max(0, train_end - train_size)
scale_start = max(0, end - self.scale_len)
train_end = min(end - self.lookahead - 1, nrows - self.lookahead - 2) # potential lookahead problem
train_start = max(0, train_end - self.train_len)
# get the forecaster for this pair
if self.custom_trade_info[self.curr_pair]["forecaster"] is None:
# make a deep copy so that we don't override the baseline model
pair_forecaster = copy.deepcopy(self.forecaster)
else:
pair_forecaster = self.custom_trade_info[self.curr_pair]["forecaster"]
# loop through the rows
while end < nrows:
# extract the data and coefficients from the current window
# (re-)train the model on prior data and get predictions
if (not self.training_mode) and (self.supports_incremental_training):
train_data = self.training_data[train_start:train_end].copy()
train_results = self.training_labels[train_start:train_end].copy()
pair_forecaster = copy.deepcopy(self.forecaster) # reset to avoid over-training
self.train_model(pair_forecaster, train_data, train_results, False)
# logger.info(f'train_data: {np.shape(train_data)}')
# logger.info(f'train_results: {np.shape(train_results)}')
# rebuild data up to end of current window
dslice = self.training_data[start:end].copy()
self.gain_data = np.array(dataframe["gain"].iloc[scale_start:end]) # needed for scaling
preds = self.predict_data(pair_forecaster, dslice)
# print(f'dslice: {np.shape(dslice)}')
# print(f'preds: {np.shape(preds)}')
# copy the predictions for this window into the main predictions array
pred_array[start:end] = preds.copy()
# move the window to the next segment
end = end + win_size
start = start + win_size
train_end = end - self.lookahead - 1
train_start = max(0, train_end - train_size)
# make sure the last section gets processed (the loop above may not exactly fit the data)
# Note that we cannot use the last section for training because we don't have forward looking data
# predict for last window
dslice = self.training_data[-win_size:]
# preds = self.forecaster.predict(dslice)
slen = win_size
self.gain_data = np.array(dataframe["gain"].iloc[-slen:]) # needed for scaling
preds = self.predict_data(pair_forecaster, dslice)
pred_array[-len(preds) :] = preds.copy()
dataframe["predicted_gain"] = pred_array.copy()
# save the updated/trained forecaster
self.custom_trade_info[self.curr_pair]["forecaster"] = pair_forecaster
return dataframe
# -------------
def add_rolling_predictions(self, dataframe: DataFrame) -> DataFrame:
try:
# set up training data
future_gain_data = self.get_future_gain(dataframe)
data = self.get_data(dataframe)
if self.single_col_prediction:
self.training_data = dataframe["gain"].to_numpy()
# self.training_data = self.smooth(self.training_data, 1)
self.training_data = self.training_data.reshape(-1, 1)
else:
self.training_data = data.copy()
self.training_labels = np.zeros(np.shape(future_gain_data), dtype=float)
self.training_labels = future_gain_data.copy()
# dataframe['predicted_gain'] = dataframe['gain'].rolling(window=self.model_window).apply(self.predict, args=(dataframe,))
dataframe["predicted_gain"] = self.rolling_predict(dataframe["gain"], self.model_window)
# dataframe['predicted_gain'] = self.smooth(dataframe['predicted_gain'], 2)
except Exception as e:
print("*** Exception in add_rolling_predictions()")
print(e) # prints the error message
print(traceback.format_exc()) # prints the full traceback
return dataframe
# -------------
# add the latest prediction, and update training periodically
def add_latest_prediction(self, dataframe: DataFrame) -> DataFrame:
df = dataframe
try:
# set up training data
# TODO: see if we can do this incrementally instead of rebuilding every time, or just use portion of data
future_gain_data = self.get_future_gain(df)
data = self.get_data(dataframe)
plen = len(self.custom_trade_info[self.curr_pair]["predictions"])
dlen = len(dataframe["gain"])
clen = min(plen, dlen)
# self.training_data = data[-clen:].copy()
# self.training_labels = future_gain_data[-clen:].copy()
self.training_data = data
self.training_labels = future_gain_data
pred_array = np.zeros(clen, dtype=float)
logger.info(f"[predictions]:{np.shape(self.custom_trade_info[self.curr_pair]['predictions'])} pred_array:{np.shape(pred_array)}")
# copy previous predictions and shift down by 1
pred_array[-clen:] = self.custom_trade_info[self.curr_pair]["predictions"][-clen:].copy()
pred_array = np.roll(pred_array, -1)
pred_array[-1] = 0.0
# train on previous data
# train_end = clen - self.model_window - self.lookahead
train_end = np.shape(self.training_data)[0] - self.lookahead - 2
train_start = max(0, train_end - self.train_len)
# cannot use last portion because we are looking ahead
tslice = self.training_data[train_start:train_end]
lslice = self.training_labels[train_start:train_end]
# get the forecaster for this pair
if self.custom_trade_info[self.curr_pair]["forecaster"] is None:
# make a deep copy so that we don't override the baseline model
pair_forecaster = copy.deepcopy(self.forecaster)
# forecaster should already be there, so print warning
print(f"*** WARNING: No pre-existing forecaster. Creating from model")
else:
pair_forecaster = self.custom_trade_info[self.curr_pair]["forecaster"]
# update forecaster and get predictions
self.train_model(pair_forecaster, tslice, lslice, False)
slen = min(clen, self.scale_len)
self.gain_data = np.array(dataframe["gain"].iloc[-slen:]) # needed for scaling
preds = self.predict_data(pair_forecaster, self.training_data[-self.model_window :])
# self.forecaster = copy.deepcopy(base_forecaster) # restore original model
# only replace last prediction (i.e. don't overwrite the historical predictions)
pred_array[-1] = preds[-1]
dataframe["predicted_gain"] = 0.0
dataframe["predicted_gain"][-clen:] = pred_array[-clen:].copy()
self.custom_trade_info[self.curr_pair]["predictions"][-clen:] = pred_array[-clen:].copy()
# save the updated/trained forecaster
self.custom_trade_info[self.curr_pair]["forecaster"] = pair_forecaster
""""""
# Debug: logger.info info if in buy or sell region (nothing otherwise)
pg = preds[-1]
if pg <= dataframe["target_loss"].iloc[-1]:
logger.info(f"Downwards predict {pg:6.2f}% loss for: {self.curr_pair}")
elif pg >= dataframe["target_profit"].iloc[-1]:
logger.info(f"Upwards predict {pg:6.2f}% profit for: {self.curr_pair}")
""""""
except Exception as e:
print("*** Exception in add_latest_prediction()")
print(e) # prints the error message
print(traceback.format_exc()) # prints the full traceback
return dataframe
# -------------
# add predictions to dataframe['predicted_gain']
def add_predictions(self, dataframe: DataFrame) -> DataFrame:
# print(f" {self.curr_pair} adding predictions")
run_profiler = False
if run_profiler:
prof = cProfile.Profile()
prof.enable()
self.scaler = RobustScaler() # reset scaler each time
self.init_model(dataframe)
if self.curr_pair not in self.custom_trade_info:
self.custom_trade_info[self.curr_pair] = {
"forecaster": None,
"initialised": False,
"predictions": None,
"curr_prediction": 0.0,
"curr_target": 0.0,
}
if self.training_mode:
logger.info(f"Training mode. Skipping backtest for {self.curr_pair}")
dataframe["predicted_gain"] = 0.0
else:
"""
if not self.custom_trade_info[self.curr_pair]["initialised"]:
logger.info(f" backtesting {self.curr_pair}")
if self.use_rolling:
dataframe = self.add_rolling_predictions(dataframe)
else:
dataframe = self.add_jumping_predictions(dataframe)
self.custom_trade_info[self.curr_pair]["initialised"] = True
self.custom_trade_info[self.curr_pair]["predictions"] = dataframe["predicted_gain"].copy()
else:
# logger.info(f' updating latest prediction for: {self.curr_pair}')
dataframe = self.add_latest_prediction(dataframe)
# save latest prediction and threshold for later use (where dataframe is not available)
self.custom_trade_info[self.curr_pair]["curr_prediction"] = dataframe["predicted_gain"].iloc[-1]
self.custom_trade_info[self.curr_pair]["curr_target"] = dataframe["target_profit"].iloc[-1]
"""
logger.info(f"Backtesting {self.curr_pair}")
if self.use_rolling:
dataframe = self.add_rolling_predictions(dataframe)
else:
dataframe = self.add_jumping_predictions(dataframe)
# predictions can spike, so constrain range
dataframe["predicted_gain"] = dataframe["predicted_gain"].clip(lower=-3.0, upper=3.0)
# save target rate for later use
dataframe["curr_target"] = dataframe["close"] * (1.0 + dataframe["predicted_gain"] / 100.0)
# TODO: really should set target to value predicted at previous buy signal
# save latest prediction and threshold for later use (where dataframe is not available)
curr_prediction = dataframe["predicted_gain"].iloc[-1]
curr_target = dataframe["close"].iloc[-1] * (1.0 + curr_prediction / 100.0)
self.custom_trade_info[self.curr_pair]["curr_prediction"] = curr_prediction
self.custom_trade_info[self.curr_pair]["curr_target"] = curr_target
# add shifted version, for debug only
dataframe["shifted_pred"] = dataframe["predicted_gain"].shift(self.lookahead)
if run_profiler:
prof.disable()
# print profiling output
stats = pstats.Stats(prof).strip_dirs().sort_stats("cumtime")
stats.print_stats(20) # top 20 rows
return dataframe
###################################
def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
conditions_long = []
conditions_short = []
guard_conditions_long = []
guard_conditions_short = []
dataframe.loc[:, "enter_tag"] = ""
dataframe["enter_long"] = 0
dataframe["buy_region"] = 0
dataframe["enter_short"] = 0
dataframe["sell_region"] = 0
if self.training_mode:
return dataframe
# Atualizar os alvos de ganho aqui para que possamos usar parâmetros do hyperopt
dataframe = self.update_gain_targets(dataframe)
# Verificar volume de negociação
conditions_long.append(dataframe["volume"] > 1.0)
conditions_short.append(dataframe["volume"] > 1.0)
# Métrica de proteção (guard metric)
if self.enable_guard_metric.value:
# Guard metric em região de sobrevenda
guard_conditions_long.append(dataframe["guard_metric"] < self.entry_guard_metric.value)
guard_conditions_short.append(dataframe["guard_metric"] > self.entry_guard_metric.value)
# Verificação de Bollinger Bands
if self.enable_bb_check.value:
lower_limit = dataframe["bb_middleband"] - self.entry_bb_factor.value * (dataframe["bb_middleband"] - dataframe["bb_lowerband"])
upper_limit = dataframe["bb_middleband"] + self.entry_bb_factor.value * (dataframe["bb_upperband"] - dataframe["bb_middleband"])
dataframe["bullish"] = np.where((dataframe["close"] <= lower_limit), 1, 0)
dataframe["bearish"] = np.where((dataframe["close"] >= upper_limit), 1, 0)
guard_conditions_long.append(dataframe["bullish"] > 0)
guard_conditions_short.append(dataframe["bearish"] > 0)
if self.enable_squeeze.value:
if not ("squeeze_long" in dataframe.columns):
dataframe["squeeze_long"] = np.where((dataframe["bb_width"] >= self.entry_bb_width.value), 1, 0)
if not ("squeeze_short" in dataframe.columns):
dataframe["squeeze_short"] = np.where((dataframe["bb_width"] >= self.entry_bb_width.value), 1, 0)
guard_conditions_long.append(dataframe["squeeze_long"] > 0)
guard_conditions_short.append(dataframe["squeeze_short"] > 0)
# Adicionar coluna que combina condições de guard (para plotagem)
if guard_conditions_long:
dataframe.loc[reduce(lambda x, y: x & y, guard_conditions_long), "buy_region"] = 1
if guard_conditions_short:
dataframe.loc[reduce(lambda x, y: x & y, guard_conditions_short), "sell_region"] = -1
# Condições para posições longas
model_cond_long = (
(dataframe["buy_region"] > 0) &
(
qtpylib.crossed_above(dataframe["predicted_gain"], dataframe["target_profit"]) |
(
(dataframe["predicted_gain"] > dataframe["target_profit"]) &
(dataframe["predicted_gain"].shift() < dataframe["target_profit"].shift())
)
)
)
conditions_long.append(model_cond_long)
# Condições para posições curtas
model_cond_short = (
(dataframe["sell_region"] < 0) &
(
qtpylib.crossed_below(dataframe["predicted_gain"], dataframe["target_loss"]) |
(
(dataframe["predicted_gain"] < dataframe["target_loss"]) &
(dataframe["predicted_gain"].shift() > dataframe["target_loss"].shift())
)
)
)
conditions_short.append(model_cond_short)
# Definir tags de entrada
dataframe.loc[model_cond_long, "enter_tag"] += "model_long "
dataframe.loc[model_cond_short, "enter_tag"] += "model_short"
if conditions_long:
dataframe.loc[reduce(lambda x, y: x & y, conditions_long), "enter_long"] = 1
if conditions_short:
dataframe.loc[reduce(lambda x, y: x & y, conditions_short), "enter_short"] = 1
return dataframe
def confirm_trade_entry(
self,
pair: str,
order_type: str,
amount: float,
rate: float,
time_in_force: str,
current_time: datetime,
entry_tag: Optional[str],
side: str,
**kwargs,
) -> bool:
# this only makes sense in 'live' modes
if self.dp.runmode.value in ("backtest", "plot", "hyperopt"):
return True
# in 'real' systems, there is often a delay between the signal and the trade
# double-check that predicted gain is still above threshold
if pair in self.custom_trade_info:
curr_pred = self.custom_trade_info[pair]["curr_prediction"]
# check rate against target
curr_target = self.custom_trade_info[pair]["curr_target"]
if rate >= curr_target:
if self.dp.runmode.value not in ("backtest", "plot", "hyperopt"):
logger.info("")
logger.info(f" *** {pair} Trade cancelled. Rate ({rate:.2f}) above target ({curr_target:.2f}) ")
logger.info("")
return False
# just debug
if self.dp.runmode.value not in ("backtest", "plot", "hyperopt"):
logger.info("")
logger.info(
f" Trade Entry: {pair}, rate: {rate:.4f} Predicted gain: {curr_pred:.2f}% Target: {curr_target:.2f}"
)
logger.info("")
return True
###################################
"""
exit Signal
"""
def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
conditions_long = []
conditions_short = []
guard_conditions_long = []
guard_conditions_short = []
dataframe.loc[:, "exit_tag"] = ""
dataframe["exit_long"] = 0
dataframe["sell_region"] = 0
dataframe["exit_short"] = 0
dataframe["sell_region"] = 0
if self.training_mode or (not self.enable_exit_signal.value):
return dataframe
dataframe["sell_region"] = 0
# Atualizar os alvos de ganho aqui para que possamos usar parâmetros do hyperopt
dataframe = self.update_gain_targets(dataframe)
# Verificar volume de negociação
conditions_short.append(dataframe["volume"] > 1.0)
if self.enable_guard_metric.value:
# Guard metric em região de sobrecompra
guard_conditions_short.append(dataframe["guard_metric"] > self.entry_guard_metric.value)
if self.enable_bb_check.value:
upper_limit = dataframe["bb_middleband"] + self.exit_bb_factor.value * (dataframe["bb_upperband"] - dataframe["bb_middleband"])
dataframe["bearish"] = np.where((dataframe["close"] >= upper_limit), 1, 0)
# Região bearish
guard_conditions_short.append(dataframe["bearish"] < 0)
if self.enable_squeeze.value:
if not ("squeeze_short" in dataframe.columns):
dataframe["squeeze_short"] = np.where((dataframe["bb_width"] >= self.entry_bb_width.value), 1, 0)
guard_conditions_short.append(dataframe["squeeze_short"] > 0)
if guard_conditions_short:
dataframe.loc[reduce(lambda x, y: x & y, guard_conditions_short), "sell_region"] = -1
# Triggers do modelo
model_cond_short = (
(dataframe["sell_region"] < 0) &
(
qtpylib.crossed_below(dataframe["predicted_gain"], dataframe["target_loss"]) |
(
(dataframe["predicted_gain"] < dataframe["target_loss"]) &
(dataframe["predicted_gain"].shift() < dataframe["target_loss"].shift())
)
)
)
else:
model_cond_short = qtpylib.crossed_below(dataframe["predicted_gain"], dataframe["target_loss"])
conditions_short.append(model_cond_short)
dataframe.loc[model_cond_short, "exit_tag"] += "model_exit_long "
if conditions_short:
dataframe.loc[reduce(lambda x, y: x & y, conditions_short), "exit_long"] = 1
# Condições para saídas de posições longas
dataframe["buy_region"] = 0
conditions_long.append(dataframe["volume"] > 1.0)
if self.enable_guard_metric.value:
guard_conditions_long.append(dataframe["guard_metric"] < self.entry_guard_metric.value)
if self.enable_bb_check.value:
lower_limit = dataframe["bb_middleband"] - self.exit_bb_factor.value * (dataframe["bb_middleband"] - dataframe["bb_lowerband"])
dataframe["bullish"] = np.where((dataframe["close"] <= lower_limit), 1, 0)
guard_conditions_long.append(dataframe["bullish"] > 0)
if self.enable_squeeze.value:
if not ("squeeze_long" in dataframe.columns):
dataframe["squeeze_long"] = np.where((dataframe["bb_width"] >= self.entry_bb_width.value), 1, 0)
guard_conditions_long.append(dataframe["squeeze_long"] > 0)
if guard_conditions_long:
dataframe.loc[reduce(lambda x, y: x & y, guard_conditions_long), "buy_region"] = 1
model_cond_long = (
(dataframe["buy_region"] > 0) &
(
qtpylib.crossed_above(dataframe["predicted_gain"], dataframe["target_profit"]) |
(
(dataframe["predicted_gain"] > dataframe["target_profit"]) &
(dataframe["predicted_gain"].shift() > dataframe["target_profit"].shift())
)
)
)
else:
model_cond_long = qtpylib.crossed_above(dataframe["predicted_gain"], dataframe["target_profit"])
conditions_long.append(model_cond_long)
dataframe.loc[model_cond_long, "exit_tag"] += "model_exit_short"
if conditions_long:
dataframe.loc[reduce(lambda x, y: x & y, conditions_long), "exit_long"] = 1
return dataframe
###################################
def confirm_trade_exit(
self,
pair: str,
trade: Trade,
order_type: str,
amount: float,
rate: float,
time_in_force: str,
exit_reason: str,
current_time: datetime,
**kwargs,
) -> bool:
if self.dp.runmode.value not in ("backtest", "plot", "hyperopt"):
logger.info("")
logger.info(f" Trade Exit: {pair}, rate: {rate:.4f)}")
logger.info("")
return True
###################################
"""
Custom Stoploss
"""
"""
def custom_stoploss(
self,
pair: str,
trade: Trade,
current_time: datetime,
current_rate: float,
current_profit: float,
after_fill: bool,
**kwargs,
) -> float:
# Assuming trade.side correctly identifies 'long' or 'short' trades via your custom setup.
trade_side = getattr(trade, 'side', 'long') # Default to 'long' if 'side' is not set
# Adjust stoploss based on the side of the trade
if trade_side == 'long':
# For long trades, use a trailing stop loss that becomes tighter as profit increases
if current_profit > 0.05: # If profit is over 5%
# Calculate new stop loss as 2.5% below the current rate
new_stop_loss = -(0.025 * current_rate)
return max(new_stop_loss, -0.1) # Ensure stop loss does not exceed 10%
else:
return max(self.stoploss, -0.1) # Use default or -10% as stoploss
elif trade_side == 'short':
# For short trades, adjust the stoploss inversely
if current_profit > 0.05: # If profit is over 5%
# Calculate new stop loss as 2.5% above the current rate
new_stop_loss = 0.025 * current_rate
return min(new_stop_loss, 0.1) # Ensure stop loss does not exceed 10%
else:
return min(self.stoploss, 0.1) # Use default or 10% as stoploss
return self.stoploss
"""
###################################
"""
Custom Exit
(Note that this runs even if use_custom_stoploss is False)
"""
# simplified version of custom exit
def custom_exit(self, pair: str, trade: Trade, current_time: "datetime", current_rate: float, current_profit: float, **kwargs):
dataframe, _ = self.dp.get_analyzed_dataframe(pair=pair, timeframe=self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
if not self.use_custom_stoploss:
return None
# Check volume as a basic filter
if last_candle["volume"] <= 1.0:
logger.info(f"Exiting {pair} due to insufficient volume: {last_candle['volume']}")
return None
# Define the action based on the type of trade: long or short
if trade.is_short:
return self.handle_short_exit(pair, trade, current_time, current_rate, current_profit, last_candle)
else:
return self.handle_long_exit(pair, trade, current_time, current_rate, current_profit, last_candle)
def handle_short_exit(self, pair, trade, current_time, current_rate, current_profit, last_candle):
# Handle exit conditions for short trades
if current_profit > 0.0:
if last_candle["guard_metric"] <= self.cexit_metric_overbought.value:
logger.info(f"Covering short for {pair} as overbought guard metric reached: {last_candle['guard_metric']}")
return "cover_overbought"
if current_profit > 0.005:
if last_candle["guard_metric"] <= self.cexit_metric_take_profit.value:
logger.info(f"Taking profit for short on {pair} as guard metric reached: {last_candle['guard_metric']}")
return "cover_take_profit"
if last_candle["predicted_loss"] >= last_candle["target_profit"]:
logger.info(f"Predicting rise for {pair}, covering short.")
return "predict_rise"
if last_candle["exit_short"] > 0:
logger.info(f"Exit signal triggered for short on {pair}")
return "exit_signal_short"
return self.check_common_exit_conditions(trade, current_time, current_profit)
def handle_long_exit(self, pair, trade, current_time, current_rate, current_profit, last_candle):
# Handle exit conditions for long trades
if current_profit > 0.0:
if last_candle["guard_metric"] >= self.cexit_metric_overbought.value:
logger.info(f"Selling {pair} as overbought guard metric reached: {last_candle['guard_metric']}")
return "metric_overbought"
if current_profit > 0.005:
if last_candle["guard_metric"] >= self.cexit_metric_take_profit.value:
logger.info(f"Taking profit on {pair} as take profit guard metric reached: {last_candle['guard_metric']}")
return "take_profit"
if last_candle["predicted_gain"] <= last_candle["target_loss"]:
logger.info(f"Predicting drop for {pair}, selling for potential market drop.")
return "predict_drop"
if last_candle["exit_long"] > 0:
logger.info(f"Exit signal triggered for long on {pair}")
return "exit_signal"
return self.check_common_exit_conditions(trade, current_time, current_profit)
def check_common_exit_conditions(self, trade, current_time, current_profit):
time_delta = current_time - trade.open_date_utc
num_hours = time_delta.total_seconds() / 3600
num_days = time_delta.days
# Check common time-based exit conditions
if num_hours >= 12 and current_profit > 0.001:
logger.info(f"Unclogging position held over 12 hours with profit.")
return "unclog_12h"
if num_days >= 1 and current_profit > 0:
logger.info(f"Unclogging position held over 1 day with profit.")
return "unclog_1d"
if num_days >= 7:
logger.info(f"Unclogging position held over 7 days, at a loss or profit.")
return "unclog_7d"
return None
def perform_fft(price_data, window_size=None):
if window_size is not None:
# Apply rolling window to smooth the data
price_data = price_data.rolling(window=window_size, center=True).mean().dropna()
normalized_data = (price_data - np.mean(price_data)) / np.std(price_data)
n = len(normalized_data)
fft_data = np.fft.fft(normalized_data)
freq = np.fft.fftfreq(n)
power = np.abs(fft_data) ** 2
power[np.isinf(power)] = 0
return freq, power