Timeframe
15m
Direction
Long Only
Stoploss
-99.0%
Trailing Stop
No
ROI
0m: 50.0%, 60m: 45.0%, 120m: 40.0%, 240m: 30.0%
Interface Version
N/A
Startup Candles
N/A
Indicators
5
freqtrade/freqtrade-strategies
Strategy 003 author@: Gerald Lonlas github@: https://github.com/freqtrade/freqtrade-strategies
import warnings
warnings.filterwarnings('ignore')
import logging
from functools import reduce
import datetime
import talib.abstract as ta
import pandas_ta as pta
import logging
import numpy as np
import pandas as pd
import freqtrade.vendor.qtpylib.indicators as qtpylib
from technical import qtpylib
from datetime import timedelta, datetime, timezone
from pandas import DataFrame, Series
from typing import Optional
from freqtrade.strategy.interface import IStrategy
from technical.pivots_points import pivots_points
from freqtrade.exchange import timeframe_to_prev_date, timeframe_to_minutes
from freqtrade.persistence import Trade
from freqtrade.strategy import (
BooleanParameter,
CategoricalParameter,
DecimalParameter,
IStrategy,
IntParameter,
RealParameter,
merge_informative_pair,
)
from scipy.signal import argrelextrema
import warnings
import math
warnings.simplefilter(action="ignore", category=pd.errors.PerformanceWarning)
logger = logging.getLogger(__name__)
class NOTankAi_15_Cleaned(IStrategy):
exit_profit_only = True
trailing_stop = False
position_adjustment_enable = True
ignore_roi_if_entry_signal = True
max_entry_position_adjustment = 2
max_dca_multiplier = 1
process_only_new_candles = True
can_short = False
use_exit_signal = True
startup_candle_count: int = 200
stoploss = -0.99
timeframe = "15m"
# DCA
position_adjustment_enable = True
initial_safety_order_trigger = DecimalParameter(
low=-0.02, high=-0.01, default=-0.018, decimals=3, space="entry", optimize=True, load=True
)
max_safety_orders = IntParameter(1, 6, default=2, space="entry", optimize=True)
safety_order_step_scale = DecimalParameter(
low=1.05, high=1.5, default=1.25, decimals=2, space="entry", optimize=True, load=True
)
safety_order_volume_scale = DecimalParameter(
low=1.1, high=2, default=1.4, decimals=1, space="entry", optimize=True, load=True
)
# Custom Functions
increment = DecimalParameter(
low=1.0005, high=1.002, default=1.001, decimals=4, space="entry", optimize=True, load=True
)
last_entry_price = None
# Protections
cooldown_lookback = IntParameter(2, 48, default=1, space="protection", optimize=True)
stop_duration = IntParameter(12, 200, default=4, space="protection", optimize=True)
use_stop_protection = BooleanParameter(default=True, space="protection", optimize=True)
minimal_roi = {
"0": 0.5,
"60": 0.45,
"120": 0.4,
"240": 0.3,
"360": 0.25,
"720": 0.2,
"1440": 0.15,
"2880": 0.1,
"3600": 0.05,
"7200": 0.02,
}
plot_config = {
"main_plot": {},
"subplots": {
"extrema": {
"&s-extrema": {"color": "#f53580", "type": "line"},
"&s-minima_sort_threshold": {"color": "#4ae747", "type": "line"},
"&s-maxima_sort_threshold": {"color": "#5b5e4b", "type": "line"},
},
"min_max": {
"maxima": {"color": "#a29db9", "type": "line"},
"minima": {"color": "#ac7fc", "type": "line"},
"maxima_check": {"color": "#a29db9", "type": "line"},
"minima_check": {"color": "#ac7fc", "type": "line"},
},
},
}
@property
def protections(self):
prot = []
prot.append(
{"method": "CooldownPeriod", "stop_duration_candles": self.cooldown_lookback.value}
)
if self.use_stop_protection.value:
prot.append(
{
"method": "StoplossGuard",
"lookback_period_candles": 24 * 3,
"trade_limit": 2,
"stop_duration_candles": self.stop_duration.value,
"only_per_pair": False,
}
)
return prot
def custom_stake_amount(
self,
pair: str,
current_time: datetime,
current_rate: float,
proposed_stake: float,
min_stake: Optional[float],
max_stake: float,
leverage: float,
entry_tag: Optional[str],
side: str,
**kwargs,
) -> float:
return proposed_stake / self.max_dca_multiplier
def custom_entry_price(
self,
pair: str,
trade: Optional["Trade"],
current_time: datetime,
proposed_rate: float,
entry_tag: Optional[str],
side: str,
**kwargs,
) -> float:
dataframe, last_updated = self.dp.get_analyzed_dataframe(
pair=pair, timeframe=self.timeframe
)
entry_price = (dataframe["close"].iat[-1] + dataframe["open"].iat[-1] + proposed_rate) / 3
if proposed_rate < entry_price:
entry_price = proposed_rate
if self.last_entry_price is not None and abs(entry_price - self.last_entry_price) < 0.0005:
entry_price *= self.increment.value
self.last_entry_price = entry_price
return entry_price
def confirm_trade_exit(
self,
pair: str,
trade: Trade,
order_type: str,
amount: float,
rate: float,
time_in_force: str,
exit_reason: str,
current_time: datetime,
**kwargs,
) -> bool:
if exit_reason == "partial_exit" and trade.calc_profit_ratio(rate) < 0:
logger.info(f"{trade.pair} partial exit is below 0")
self.dp.send_msg(f"{trade.pair} partial exit is below 0")
return False
if exit_reason == "trailing_stop_loss" and trade.calc_profit_ratio(rate) < 0:
logger.info(f"{trade.pair} trailing stop price is below 0")
self.dp.send_msg(f"{trade.pair} trailing stop price is below 0")
return False
return True
def adjust_trade_position(
self,
trade: Trade,
current_time: datetime,
current_rate: float,
current_profit: float,
min_stake: Optional[float],
max_stake: float,
current_entry_rate: float,
current_exit_rate: float,
current_entry_profit: float,
current_exit_profit: float,
**kwargs,
) -> Optional[float]:
dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe)
filled_entries = trade.select_filled_orders(trade.entry_side)
count_of_entries = trade.nr_of_successful_entries
trade_duration = (current_time - trade.open_date_utc).seconds / 60
if current_profit > 0.25 and trade.nr_of_successful_exits == 0:
return -(trade.stake_amount / 4)
if current_profit > 0.40 and trade.nr_of_successful_exits == 1:
return -(trade.stake_amount / 3)
if current_profit > -0.15 and trade.nr_of_successful_entries == 1:
return None
if current_profit > -0.3 and trade.nr_of_successful_entries == 2:
return None
if current_profit > -0.6 and trade.nr_of_successful_entries == 3:
return None
try:
stake_amount = filled_entries[0].cost
if count_of_entries == 1:
stake_amount = stake_amount * 1
elif count_of_entries == 2:
stake_amount = stake_amount * 1
elif count_of_entries == 3:
stake_amount = stake_amount * 1
else:
stake_amount = stake_amount
return stake_amount
except Exception as exception:
return None
return None
def leverage(
self,
pair: str,
current_time: "datetime",
current_rate: float,
proposed_leverage: float,
max_leverage: float,
side: str,
**kwargs,
) -> float:
window_size = 50
dataframe, _ = self.dp.get_analyzed_dataframe(pair=pair, timeframe=self.timeframe)
historical_close_prices = dataframe["close"].tail(window_size)
historical_high_prices = dataframe["high"].tail(window_size)
historical_low_prices = dataframe["low"].tail(window_size)
base_leverage = 10
rsi_values = ta.RSI(historical_close_prices, timeperiod=14)
atr_values = ta.ATR(
historical_high_prices, historical_low_prices, historical_close_prices, timeperiod=14
)
macd_line, signal_line, _ = ta.MACD(
historical_close_prices, fastperiod=12, slowperiod=26, signalperiod=9
)
sma_values = ta.SMA(historical_close_prices, timeperiod=20)
current_rsi = rsi_values[-1] if len(rsi_values) > 0 else 50.0
current_atr = atr_values[-1] if len(atr_values) > 0 else 0.0
current_macd = (
macd_line[-1] - signal_line[-1] if len(macd_line) > 0 and len(signal_line) > 0 else 0.0
)
current_sma = sma_values[-1] if len(sma_values) > 0 else 0.0
dynamic_rsi_low = (
np.nanmin(rsi_values)
if len(rsi_values) > 0 and not np.isnan(np.nanmin(rsi_values))
else 30.0
)
dynamic_rsi_high = (
np.nanmax(rsi_values)
if len(rsi_values) > 0 and not np.isnan(np.nanmax(rsi_values))
else 70.0
)
dynamic_atr_low = (
np.nanmin(atr_values)
if len(atr_values) > 0 and not np.isnan(np.nanmin(atr_values))
else 0.002
)
dynamic_atr_high = (
np.nanmax(atr_values)
if len(atr_values) > 0 and not np.isnan(np.nanmax(atr_values))
else 0.005
)
long_increase_factor = 1.5
long_decrease_factor = 0.5
short_increase_factor = 1.5
short_decrease_factor = 0.5
volatility_decrease_factor = 0.8
if side == "long":
if current_rsi < dynamic_rsi_low:
base_leverage *= long_increase_factor
elif current_rsi > dynamic_rsi_high:
base_leverage *= long_decrease_factor
if current_atr > (current_rate * 0.03):
base_leverage *= volatility_decrease_factor
if current_macd > 0:
base_leverage *= long_increase_factor
if current_rate < current_sma:
base_leverage *= long_decrease_factor
elif side == "short":
if current_rsi > dynamic_rsi_high:
base_leverage *= short_increase_factor
elif current_rsi < dynamic_rsi_low:
base_leverage *= short_decrease_factor
if current_atr > (current_rate * 0.03):
base_leverage *= volatility_decrease_factor
if current_macd < 0:
base_leverage *= short_increase_factor
if current_rate > current_sma:
base_leverage *= short_decrease_factor
adjusted_leverage = max(min(base_leverage, max_leverage), 1.0)
return adjusted_leverage
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
dataframe["rsi"] = ta.RSI(dataframe)
dataframe["DI_values"] = ta.PLUS_DI(dataframe) - ta.MINUS_DI(dataframe)
dataframe["DI_cutoff"] = 0
maxima = np.zeros(len(dataframe))
minima = np.zeros(len(dataframe))
maxima[argrelextrema(dataframe["close"].values, np.greater, order=5)] = 1
minima[argrelextrema(dataframe["close"].values, np.less, order=5)] = 1
dataframe["maxima"] = maxima
dataframe["minima"] = minima
dataframe["&s-extrema"] = 0
min_peaks = argrelextrema(dataframe["close"].values, np.less, order=5)[0]
max_peaks = argrelextrema(dataframe["close"].values, np.greater, order=5)[0]
dataframe.loc[min_peaks, "&s-extrema"] = -1
dataframe.loc[max_peaks, "&s-extrema"] = 1
dataframe["DI_catch"] = np.where(dataframe["DI_values"] > dataframe["DI_cutoff"], 0, 1)
dataframe["minima_sort_threshold"] = dataframe["close"].rolling(window=10).min()
dataframe["maxima_sort_threshold"] = dataframe["close"].rolling(window=10).max()
dataframe["min_threshold_mean"] = dataframe["minima_sort_threshold"].expanding().mean()
dataframe["max_threshold_mean"] = dataframe["maxima_sort_threshold"].expanding().mean()
dataframe["maxima_check"] = (
dataframe["maxima"].rolling(4).apply(lambda x: int((x != 1).all()), raw=True).fillna(0)
)
dataframe["minima_check"] = (
dataframe["minima"].rolling(4).apply(lambda x: int((x != 1).all()), raw=True).fillna(0)
)
return dataframe
def populate_entry_trend(self, df: DataFrame, metadata: dict) -> DataFrame:
# Condições para entrada long
df.loc[
(
(df["DI_catch"] == 1) # Condição DI_catch
& (df["maxima_check"] == 1) # Condição maxima_check
& (df["&s-extrema"] < 0) # Condição extrema
& (df["minima"].shift(1) == 1) # Condição minima anterior
& (df["volume"] > 0) # Volume maior que 0
& (df["rsi"] < 30) # RSI abaixo de 30 (condição adicional para limitar entradas)
),
["enter_long", "enter_tag"],
] = (1, "Minima")
df.loc[
(
(df["minima_check"] == 0) # Condição minima_check
& (df["volume"] > 0) # Volume maior que 0
& (df["rsi"] < 30) # RSI abaixo de 30 (condição adicional para limitar entradas)
),
["enter_long", "enter_tag"],
] = (1, "Minima Full Send")
df.loc[
(
(df["DI_catch"] == 1) # Condição DI_catch
& (df["minima_check"] == 0) # Condição minima_check
& (df["minima_check"].shift(5) == 1) # Condição minima_check anterior
& (df["volume"] > 0) # Volume maior que 0
& (df["rsi"] < 30) # RSI abaixo de 30 (condição adicional para limitar entradas)
),
["enter_long", "enter_tag"],
] = (1, "Minima Check")
# Condições para entrada short
df.loc[
(
(df["DI_catch"] == 1) # Condição DI_catch
& (df["minima_check"] == 1) # Condição minima_check
& (df["&s-extrema"] > 0) # Condição extrema
& (df["maxima"].shift(1) == 1) # Condição maxima anterior
& (df["volume"] > 0) # Volume maior que 0
& (df["rsi"] > 70) # RSI acima de 70 (condição adicional para limitar entradas)
),
["enter_short", "enter_tag"],
] = (1, "Maxima")
df.loc[
(
(df["maxima_check"] == 0) # Condição maxima_check
& (df["volume"] > 0) # Volume maior que 0
& (df["rsi"] > 70) # RSI acima de 70 (condição adicional para limitar entradas)
),
["enter_short", "enter_tag"],
] = (1, "Maxima Full Send")
df.loc[
(
(df["DI_catch"] == 1) # Condição DI_catch
& (df["maxima_check"] == 0) # Condição maxima_check
& (df["maxima_check"].shift(5) == 1) # Condição maxima_check anterior
& (df["volume"] > 0) # Volume maior que 0
& (df["rsi"] > 70) # RSI acima de 70 (condição adicional para limitar entradas)
),
["enter_short", "enter_tag"],
] = (1, "Maxima Check")
return df
def populate_exit_trend(self, df: DataFrame, metadata: dict) -> DataFrame:
df.loc[((df["maxima_check"] == 0) & (df["volume"] > 0)), ["exit_long", "exit_tag"]] = (
1,
"Maxima Check",
)
df.loc[
(
(df["DI_catch"] == 1)
& (df["&s-extrema"] > 0)
& (df["maxima"].shift(1) == 1)
& (df["volume"] > 0)
),
["exit_long", "exit_tag"],
] = (1, "Maxima")
df.loc[((df["maxima_check"] == 0) & (df["volume"] > 0)), ["exit_long", "exit_tag"]] = (
1,
"Maxima Full Send",
)
df.loc[((df["minima_check"] == 0) & (df["volume"] > 0)), ["exit_short", "exit_tag"]] = (
1,
"Minima Check",
)
df.loc[
(
(df["DI_catch"] == 1)
& (df["&s-extrema"] < 0)
& (df["minima"].shift(1) == 1)
& (df["volume"] > 0)
),
["exit_short", "exit_tag"],
] = (1, "Minima")
df.loc[((df["minima_check"] == 0) & (df["volume"] > 0)), ["exit_short", "exit_tag"]] = (
1,
"Minima Full Send",
)
return df