Timeframe
5m
Direction
Long Only
Stoploss
-10.0%
Trailing Stop
Yes
ROI
0m: 99900.0%
Interface Version
N/A
Startup Candles
N/A
Indicators
7
freqtrade/freqtrade-strategies
Strategy 003 author@: Gerald Lonlas github@: https://github.com/freqtrade/freqtrade-strategies
import math
from typing import Callable
import numpy as np # noqa
import pandas as pd # noqa
from pandas import DataFrame
from freqtrade.strategy import DecimalParameter, IntParameter
from freqtrade.exchange import timeframe_to_minutes
from freqtrade.strategy import IStrategy
import talib.abstract as ta
import freqtrade.vendor.qtpylib.indicators as qtpylib
from freqtrade.strategy import IntParameter
from pandas import Series
from numpy.typing import ArrayLike
from datetime import datetime, timedelta
import technical.indicators as indicators
from freqtrade.exchange import timeframe_to_prev_date
# NowoIchimoku5mV2
# source: https://github.com/davidzr/freqtrade-strategies/blob/9623c1f3d8c7f60c8b411010fa26377e6ca99ab9/strategies/NowoIchimoku5mV2/NowoIchimoku5mV2.py
class NowoI2(IStrategy):
# Optimal timeframe for the strategy
timeframe = "5m"
informative_timeframe = "1h"
time_factor = int(60 / 5)
startup_candle_count = int(100 * time_factor)
use_exit_signal = False
use_custom_stoploss = True
trailing_stop = True
minimal_roi = {"0": 999}
stoploss = -0.1
srsi_k_min_profit = DecimalParameter(
0.01, 0.99, decimals=3, default=0.036, space="sell", optimize=True
)
above_upper_min_profit = DecimalParameter(
0.001, 0.5, decimals=3, default=0.011, space="sell"
)
limit_factor = DecimalParameter(
0.5, 5, decimals=3, default=1.918, space="sell", optimize=True
)
lower_cloud_factor = DecimalParameter(
0.5, 1.5, decimals=3, default=0.971, space="sell", optimize=True
)
close_above_shifted_upper_cloud = DecimalParameter(
0.5, 2, decimals=3, default=0.603, space="buy", optimize=True
)
def informative_pairs(self):
pairs = self.dp.current_whitelist()
informative_pairs = [(pair, self.informative_timeframe) for pair in pairs]
return informative_pairs
def populate_indicators(self, df_5m: DataFrame, metadata: dict) -> DataFrame:
# Get the informative pair
df_1h = self.dp.get_pair_dataframe(
pair=metadata["pair"], timeframe=self.informative_timeframe
)
df_1h["upper"] = bollinger_bands(
df_1h["close"], moving_average="hma", length=20, mult=2.5
)["upper"]
ichi_1h = indicators.ichimoku(df_1h)
df_1h["conversion_line"] = ichi_1h["tenkan_sen"]
df_1h["base_line"] = ichi_1h["kijun_sen"]
df_1h["lead_1"] = ichi_1h["leading_senkou_span_a"]
df_1h["lead_2"] = ichi_1h["leading_senkou_span_b"]
df_1h["is_cloud_green"] = ichi_1h["cloud_green"]
df_1h["upper_cloud"] = df_1h["lead_1"].where(
df_1h["lead_1"] > df_1h["lead_2"], df_1h["lead_2"]
)
df_1h["lower_cloud"] = df_1h["lead_1"].where(
df_1h["lead_1"] < df_1h["lead_2"], df_1h["lead_2"]
)
df_1h["shifted_upper_cloud"] = df_1h["upper_cloud"].shift(25)
df_1h["shifted_lower_cloud"] = df_1h["lower_cloud"].shift(25)
smoothK = 3
smoothD = 3
lengthRSI = 14
lengthStoch = 14
df_1h["rsi"] = ta.RSI(df_1h, timeperiod=lengthRSI)
stochrsi = (df_1h["rsi"] - df_1h["rsi"].rolling(lengthStoch).min()) / (
df_1h["rsi"].rolling(lengthStoch).max()
- df_1h["rsi"].rolling(lengthStoch).min()
)
df_1h["srsi_k"] = stochrsi.rolling(smoothK).mean() * 100
df_1h["srsi_d"] = df_1h["srsi_k"].rolling(smoothD).mean()
df = merge_informative_pair(
df_5m, df_1h, self.timeframe, self.informative_timeframe, ffill=True
)
# don't overwrite the base dataframe's OHLCV information
skip_columns = [
(s + "_" + self.informative_timeframe)
for s in ["date", "open", "high", "low", "close", "volume"]
]
df.rename(
columns=lambda s: (
s.replace("_{}".format(self.informative_timeframe), "")
if (not s in skip_columns)
else s
),
inplace=True,
)
return df
def custom_stoploss(
self,
pair: str,
trade: "Trade",
current_time: datetime,
current_rate: float,
current_profit: float,
**kwargs,
) -> float:
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1 * self.time_factor].squeeze()
previous_candle = dataframe.iloc[-2 * self.time_factor].squeeze()
if (last_candle is not None) & (previous_candle is not None):
# In dry/live runs trade open date will not match candle open date therefore it must be
# rounded.
trade_date = timeframe_to_prev_date(self.timeframe, trade.open_date_utc)
# Look up trade candle.
trade_candle = dataframe.loc[dataframe["date"] == trade_date]
# trade_candle may be empty for trades that just opened as it is still incomplete.
if not trade_candle.empty:
trade_candle = trade_candle.squeeze()
if (last_candle["srsi_k"] > 80) & (
current_profit > self.srsi_k_min_profit.value
):
return -0.0001
if (
(previous_candle["close"] < previous_candle["upper"])
& (current_rate > last_candle["upper"])
& (current_profit > self.above_upper_min_profit.value)
):
return -0.0001
limit = trade.open_rate + (
(trade.open_rate - trade_candle["shifted_lower_cloud"])
* self.limit_factor.value
)
if current_rate > limit:
return -0.0001
if current_rate < (
trade_candle["shifted_lower_cloud"] * self.lower_cloud_factor.value
):
return -0.0001
return -0.99
def populate_entry_trend(self, df: DataFrame, metadata: dict) -> DataFrame:
df.info(verbose=True)
double_shifted_upper_cloud = df["upper_cloud"].shift(50 * self.time_factor)
close_above_shifted_upper_cloud = (
df["close"]
> df["shifted_upper_cloud"] * self.close_above_shifted_upper_cloud.value
)
close_above_shifted_lower_cloud = df["close"] > df["shifted_lower_cloud"]
close_above_double_shifted_upper_cloud = (
df["close"] > double_shifted_upper_cloud
)
conversion_line_above_base_line = df["conversion_line"] > df["base_line"]
close_above_shifted_conversion_line = df["close"] > df["conversion_line"].shift(
25 * self.time_factor
)
df["should_buy"] = (
(df["close"] > df["open"])
& close_above_shifted_upper_cloud
& close_above_shifted_lower_cloud
& df["is_cloud_green"]
& conversion_line_above_base_line
& close_above_shifted_conversion_line
& close_above_double_shifted_upper_cloud
)
df["buy"] = False
df["buy_allowed"] = True
for i in range(1, len(df)):
df.loc[i, "buy_allowed"] = df.at[i - 1, "buy_allowed"]
if df.at[i - 1, "buy"]:
df.loc[i, "buy_allowed"] = False
if not df.at[i, "is_cloud_green"]:
df.loc[i, "buy_allowed"] = True
df.loc[i, "buy"] = df.at[i, "buy_allowed"] & df.at[i, "should_buy"]
return df
def populate_exit_trend(self, df: DataFrame, metadata: dict) -> DataFrame:
df["sell"] = 0
return df
def merge_informative_pair(
dataframe: pd.DataFrame,
informative: pd.DataFrame,
timeframe: str,
timeframe_inf: str,
ffill: bool = True,
) -> pd.DataFrame:
minutes_inf = timeframe_to_minutes(timeframe_inf)
minutes = timeframe_to_minutes(timeframe)
if minutes == minutes_inf:
# No need to forwardshift if the timeframes are identical
informative["date_merge"] = informative["date"]
elif minutes < minutes_inf:
# Subtract "small" timeframe so merging is not delayed by 1 small candle
# Detailed explanation in https://github.com/freqtrade/freqtrade/issues/4073
informative["date_merge"] = (
informative["date"]
+ pd.to_timedelta(minutes_inf, "m")
- pd.to_timedelta(minutes, "m")
)
else:
raise ValueError(
"Tried to merge a faster timeframe to a slower timeframe."
"This would create new rows, and can throw off your regular indicators."
)
# Rename columns to be unique
informative.columns = [f"{col}_{timeframe_inf}" for col in informative.columns]
# Combine the 2 dataframes
# all indicators on the informative sample MUST be calculated before this point
dataframe = pd.merge(
dataframe,
informative,
left_on="date",
right_on=f"date_merge_{timeframe_inf}",
how="left",
)
dataframe = dataframe.drop(f"date_merge_{timeframe_inf}", axis=1)
if ffill:
dataframe = dataframe.ffill()
return dataframe
def wma(series: Series, length: int) -> Series:
norm = 0
sum = 0
for i in range(1, length - 1):
weight = (length - i) * length
norm = norm + weight
sum = sum + series.shift(i) * weight
return sum / norm
def hma(series: Series, length: int) -> Series:
h = 2 * wma(series, math.floor(length / 2)) - wma(series, length)
hma = wma(h, math.floor(math.sqrt(length)))
return hma
def bollinger_bands(
series: Series, moving_average="sma", length=20, mult=2.0
) -> DataFrame:
basis = None
if moving_average == "sma":
basis = ta.SMA(series, length)
elif moving_average == "hma":
basis = hma(series, length)
else:
raise Exception("moving_average has to be sma or hma")
dev = mult * ta.STDDEV(series, length)
return DataFrame({"upper": basis + dev})