Timeframe
5m
Direction
Long Only
Stoploss
-100.0%
Trailing Stop
No
ROI
0m: 2.8%, 10m: 1.8%, 30m: 1.0%, 40m: 0.5%
Interface Version
2
Startup Candles
N/A
Indicators
2
freqtrade/freqtrade-strategies
Strategy 003 author@: Gerald Lonlas github@: https://github.com/freqtrade/freqtrade-strategies
# --- Do not remove these libs ---
from freqtrade.strategy.interface import IStrategy
from typing import Dict, List
from functools import reduce
from pandas import DataFrame
# --------------------------------
import logging
import math
from datetime import datetime, timedelta, timezone
from freqtrade.persistence import Trade
import time
from typing import Dict, List, Optional
# ---
import sys
import talib.abstract as ta
import numpy as np
import freqtrade.vendor.qtpylib.indicators as qtpylib
import datetime
from technical.util import resample_to_interval, resampled_merge
from datetime import datetime, timedelta
from freqtrade.persistence import Trade
from freqtrade.strategy import stoploss_from_open, DecimalParameter, IntParameter, \
CategoricalParameter
import technical.indicators as ftt
######################################## Warning ########################################
# You won't get a lot of benefits by simply changing to this strategy #
# with the HyperOpt values changed. #
# #
# You should test it closely, trying backtesting and dry running, and we recommend #
# customizing the terms of sale and purchase as well. #
# #
# You should always be careful in real trading! #
#########################################################################################
def EWO(dataframe, ema_length=5, ema2_length=35):
# df = dataframe.copy()
ema1 = ta.EMA(dataframe, timeperiod=ema_length)
ema2 = ta.EMA(dataframe, timeperiod=ema2_length)
emadif = (ema1 - ema2) / dataframe['close'] * 100
return emadif
def lerp(a: float, b: float, t: float) -> float:
"""Linear interpolate on the scale given by a to b, using t as the point on that scale.
Examples
--------
50 == lerp(0, 100, 0.5)
4.2 == lerp(1, 5, 0.8)
"""
return (1 - t) * a + t * b
logger = logging.getLogger(__name__)
class SMAOffset_Hippocritical_dca(IStrategy):
# Original: SMAOffsetProtectOptV1 by kkeue, shared on the freqtrade discord at 2021-06-19
# Added dca of Stash86, modified and optimized it.
# Added jorikito#2815 's partial fill compensation
# Use this option with caution!
# enables a full 1st slot and base+safety-order for the 2nd slot before you run out of money in your wallet.
# (combo from the .ods-file: 4+3, see rows "Overbuy calculation")
overbuy_factor = 1.295
position_adjustment_enable = True
initial_safety_order_trigger = -0.02
max_so_multiplier_orig = 3
safety_order_step_scale = 2
safety_order_volume_scale = 1.8
# just for initialization, now we calculate it...
max_so_multiplier = max_so_multiplier_orig
# We will store the size of stake of each trade's first order here
cust_proposed_initial_stakes = {}
# Amount the strategy should compensate previously partially filled orders for successive safety orders (0.0 - 1.0)
partial_fill_compensation_scale = 1
if (max_so_multiplier_orig > 0):
if (safety_order_volume_scale > 1):
# print(safety_order_volume_scale * (math.pow(safety_order_volume_scale,(max_so_multiplier - 1)) - 1))
firstLine = (safety_order_volume_scale *
(math.pow(safety_order_volume_scale, (max_so_multiplier_orig - 1)) - 1))
divisor = (safety_order_volume_scale - 1)
max_so_multiplier = (2 + firstLine / divisor)
# max_so_multiplier = (2 +
# (safety_order_volume_scale *
# (math.pow(safety_order_volume_scale, (max_so_multiplier - 1)) - 1) /
# (safety_order_volume_scale - 1)))
elif (safety_order_volume_scale < 1):
firstLine = safety_order_volume_scale * \
(1 - math.pow(safety_order_volume_scale, (max_so_multiplier_orig - 1)))
divisor = 1 - safety_order_volume_scale
max_so_multiplier = (2 + firstLine / divisor)
# max_so_multiplier = (2 + (safety_order_volume_scale * (
# 1 - math.pow(safety_order_volume_scale, (max_so_multiplier - 1))) / (
# 1 - safety_order_volume_scale)))
# Since stoploss can only go up and can't go down, if you set your stoploss here, your lowest stoploss will always be tied to the first buy rate
# So disable the hard stoploss here, and use custom_sell or custom_stoploss to handle the stoploss trigger
stoploss = -1
def custom_exit(self, pair: str, trade: 'Trade', current_time: 'datetime', current_rate: float,
current_profit: float, **kwargs):
tag = super().custom_sell(pair, trade, current_time, current_rate, current_profit, **kwargs)
if tag:
return tag
entry_tag = 'empty'
if hasattr(trade, 'entry_tag') and trade.entry_tag is not None:
entry_tag = trade.entry_tag
if current_profit <= -0.35:
return f'stop_loss ({entry_tag})'
return None
def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float,
rate: float, time_in_force: str, exit_reason: str,
current_time: datetime, **kwargs) -> bool:
# remove pair from custom initial stake dict only if full exit
if trade.amount == amount and pair in self.cust_proposed_initial_stakes:
del self.cust_proposed_initial_stakes[pair]
return True
# Let unlimited stakes leave funds open for DCA orders
def custom_stake_amount(self, pair: str, current_time: datetime, current_rate: float,
proposed_stake: float, min_stake: float, max_stake: float,
**kwargs) -> float:
custom_stake = proposed_stake / self.max_so_multiplier * self.overbuy_factor
self.cust_proposed_initial_stakes[
pair] = custom_stake # Setting of first stake size just before each first order of a trade
return custom_stake # set to static 10 to simulate partial fills of 10$, etc
def adjust_trade_position(self, trade: Trade, current_time: datetime,
current_rate: float, current_profit: float, min_stake: float,
max_stake: float, **kwargs) -> Optional[float]:
if current_profit > self.initial_safety_order_trigger:
return None
filled_buys = trade.select_filled_orders(trade.entry_side)
count_of_buys = len(filled_buys)
if 1 <= count_of_buys <= self.max_so_multiplier_orig:
# if (1 <= count_of_buys) and (open_trade_value < self.stake_amount * self.overbuy_factor):
safety_order_trigger = (abs(self.initial_safety_order_trigger) * count_of_buys)
if self.safety_order_step_scale > 1:
safety_order_trigger = abs(self.initial_safety_order_trigger) + (
abs(self.initial_safety_order_trigger) * self.safety_order_step_scale * (
math.pow(self.safety_order_step_scale, (count_of_buys - 1)) - 1) / (
self.safety_order_step_scale - 1))
elif self.safety_order_step_scale < 1:
safety_order_trigger = abs(self.initial_safety_order_trigger) + (
abs(self.initial_safety_order_trigger) * self.safety_order_step_scale * (
1 - math.pow(self.safety_order_step_scale, (count_of_buys - 1))) / (
1 - self.safety_order_step_scale))
if current_profit <= (-1 * abs(safety_order_trigger)):
try:
# This returns first order actual stake size
actual_initial_stake = filled_buys[0].cost
# Fallback for when the initial stake was not set for whatever reason
stake_amount = actual_initial_stake
already_bought = sum(filled_buy.cost for filled_buy in filled_buys)
if trade.pair in self.cust_proposed_initial_stakes:
if self.cust_proposed_initial_stakes[trade.pair] > 0:
# This calculates the amount of stake that will get used for the current safety order,
# including compensation for any partial buys
proposed_initial_stake = self.cust_proposed_initial_stakes[trade.pair]
current_actual_stake = already_bought * math.pow(self.safety_order_volume_scale,
(count_of_buys - 1))
current_stake_preposition = proposed_initial_stake * math.pow(self.safety_order_volume_scale,
(count_of_buys - 1))
current_stake_preposition_compensation = current_stake_preposition + abs(
current_stake_preposition - current_actual_stake)
total_so_stake = lerp(current_actual_stake, current_stake_preposition_compensation,
self.partial_fill_compensation_scale)
# Set the calculated stake amount
stake_amount = total_so_stake
else:
# Fallback stake amount calculation
stake_amount = stake_amount * math.pow(self.safety_order_volume_scale, (count_of_buys - 1))
else:
# Fallback stake amount calculation
stake_amount = stake_amount * math.pow(self.safety_order_volume_scale, (count_of_buys - 1))
amount = stake_amount / current_rate
logger.info(
f"Initiating safety order buy #{count_of_buys} "
f"for {trade.pair} with stake amount of {stake_amount}. "
f"which equals {amount}. "
f"Previously bought: {already_bought}. "
f"Now overall:{already_bought + stake_amount}. ")
return stake_amount
except Exception as exception:
logger.info(f'Error occured while trying to get stake amount for {trade.pair}: {str(exception)}')
# print(f'Error occured while trying to get stake amount for {trade.pair}: {str(exception)}')
return None
return None
# Modified Buy / Sell params - 20210619
# Buy hyperspace params:
buy_params = {
"base_nb_candles_buy": 16,
"ewo_high": 5.672,
"ewo_low": -19.931,
"low_offset": 0.973,
"rsi_buy": 59,
}
# Sell hyperspace params:
sell_params = {
"base_nb_candles_sell": 20,
"high_offset": 1.010,
}
INTERFACE_VERSION = 2
# Modified ROI - 20210620
# ROI table:
minimal_roi = {
"0": 0.028,
"10": 0.018,
"30": 0.010,
"40": 0.005
}
# SMAOffset
base_nb_candles_buy = IntParameter(
5, 80, default=buy_params['base_nb_candles_buy'], space='buy', optimize=True)
base_nb_candles_sell = IntParameter(
5, 80, default=sell_params['base_nb_candles_sell'], space='sell', optimize=True)
low_offset = DecimalParameter(
0.9, 0.99, default=buy_params['low_offset'], space='buy', optimize=True)
high_offset = DecimalParameter(
0.99, 1.1, default=sell_params['high_offset'], space='sell', optimize=True)
# Protection
fast_ewo = 50
slow_ewo = 200
ewo_low = DecimalParameter(-20.0, -8.0,
default=buy_params['ewo_low'], space='buy', optimize=True)
ewo_high = DecimalParameter(
2.0, 12.0, default=buy_params['ewo_high'], space='buy', optimize=True)
rsi_buy = IntParameter(30, 70, default=buy_params['rsi_buy'], space='buy', optimize=True)
# Trailing stop:
trailing_stop = False
trailing_stop_positive = 0.001
trailing_stop_positive_offset = 0.01
trailing_only_offset_is_reached = True
# Sell signal
use_exit_signal = True
exit_profit_only = False
exit_profit_offset = 0.01
ignore_roi_if_entry_signal = False
# Optimal timeframe for the strategy
timeframe = '5m'
process_only_new_candles = True
startup_candle_count: int = 1000
plot_config = \
{
"main_plot":
{},
"subplots":
{
"sub":
{
"rsi":
{
"color": "#80dea8",
"type": "line"
}
},
"sub2":
{
"ma_buy_16":
{
"color": "#db1ea2",
"type": "line"
},
"ma_sell_20":
{
"color": "#645825",
"type": "line"
},
"EWO":
{
"color": "#1e5964",
"type": "line"
},
"missing_data":
{
"color": "#26b08d",
"type": "line"
}
}
}
}
use_custom_stoploss = False
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# Calculate all ma_buy values
# moved into entry / exit functions to not have redundant calculations being done.
# backtesting will be slower but who cares about a few seconds during backtests right?
# All that counts is dry / live performance, not backtesting speed imo ...
# If you want to put in the work with conditions be my guest ;)
"""
for val in self.base_nb_candles_buy.range:
dataframe[f'ma_buy_{val}'] = ta.EMA(dataframe, timeperiod=val)
# Calculate all ma_sell values
for val in self.base_nb_candles_sell.range:
dataframe[f'ma_sell_{val}'] = ta.EMA(dataframe, timeperiod=val)
"""
# Elliot
dataframe['EWO'] = EWO(dataframe, self.fast_ewo, self.slow_ewo)
# RSI
dataframe['rsi'] = ta.RSI(dataframe, timeperiod=14)
candles_bidaily_of_5m: int = 576
# Check for 0 volume candles in the last day
dataframe['missing_data'] = \
(dataframe['volume'] <= 0).rolling(
window=candles_bidaily_of_5m,
min_periods=candles_bidaily_of_5m).sum()
return dataframe
def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# Speed optimization for dry / live runs, not looping through for ... values with it, nothing else.
dataframe[f'ma_buy_{self.base_nb_candles_buy.value}'] = \
ta.EMA(dataframe, timeperiod=self.base_nb_candles_buy.value)
conditions = []
conditions.append(
(dataframe['close'] < (
dataframe[f'ma_buy_{self.base_nb_candles_buy.value}'] * self.low_offset.value)) &
(dataframe['EWO'] > self.ewo_high.value) &
(dataframe['rsi'] < self.rsi_buy.value) &
(dataframe['missing_data'] < 1)
)
conditions.append(
(dataframe['close'] < (
dataframe[f'ma_buy_{self.base_nb_candles_buy.value}'] * self.low_offset.value)) &
(dataframe['EWO'] < self.ewo_low.value) &
(dataframe['missing_data'] < 1)
)
if conditions:
dataframe.loc[
reduce(lambda x, y: x | y, conditions),
'enter_long'
] = 1
return dataframe
def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# Speed optimization for dry / live runs, not looping through for ... values with it, nothing else.
dataframe[f'ma_sell_{self.base_nb_candles_sell.value}'] = \
ta.EMA(dataframe, timeperiod=self.base_nb_candles_sell.value)
conditions = []
conditions.append(
(
(dataframe['close'] > (
dataframe[f'ma_sell_{self.base_nb_candles_sell.value}'] * self.high_offset.value)) &
(dataframe['volume'] > 0)
)
)
if conditions:
dataframe.loc[
reduce(lambda x, y: x | y, conditions),
'exit_long'
] = 1
return dataframe