This is an example strategy that uses the LSTMRegressor model to predict the target score. Use at your own risk. This is a simple example strategy and should be used for educational purposes only.
Timeframe
1h
Direction
Long & Short
Stoploss
-30.5%
Trailing Stop
No
ROI
0m: 23.9%, 79m: 5.8%, 231m: 2.9%, 543m: 0.0%
Interface Version
N/A
Startup Candles
0
Indicators
15
freqtrade/freqtrade-strategies
Strategy 003 author@: Gerald Lonlas github@: https://github.com/freqtrade/freqtrade-strategies
import numpy as np
import pandas_ta as pta
from typing import Optional
import math
import freqtrade.vendor.qtpylib.indicators as qtpylib
import logging
from functools import reduce
from typing import Dict
from datetime import timedelta, datetime, timezone
from freqtrade.persistence import Trade
from pandas import DataFrame, Series
import talib.abstract as ta
from pandas import DataFrame
from freqtrade.strategy import DecimalParameter, IntParameter, IStrategy,BooleanParameter
from technical.pivots_points import pivots_points
from scipy.signal import argrelextrema
import logging
from functools import reduce
import numpy as np
import pandas as pd
import talib.abstract as ta
from pandas import DataFrame
from technical import qtpylib
from freqtrade.exchange.exchange_utils import *
from freqtrade.strategy import IStrategy, RealParameter
from technical.pivots_points import pivots_points
logger = logging.getLogger(__name__)
"""
ifconfig
sudo ifconfig wlan1 down
source ./.venv/bin/activate
freqtrade download-data -c config-torch.json --timerange 20240101-20240909 --timeframes 5m
freqtrade download-data -c config-torch.json --timerange 20220101-20240909 --timeframes 1h 2h 4h
freqtrade backtesting --cache none -c config-torch.json --breakdown day week month --timeframe-detail 5m --timerange 20240201-20240301
freqtrade trade --strategy AlexStrategyFinalV83Hyper --config config-torch.json --freqaimodel PyTorchLSTMRegressor
freqtrade backtesting --breakdown day week --cache none -c config-torch.json --timerange 20240209-20240220
freqtrade backtesting --strategy AlexStrategyFinalV83 -c config-torch.json --freqaimodel PyTorchLSTMRegressor --timerange 20240801-20240820
freqtrade backtesting --strategy AlexStrategyFinalV83 -c config-torch.json --freqaimodel XGBoostRegressor --breakdown day week month --timerange 20240801-20240810
freqtrade lookahead-analysis --strategy AlexStrategyFinalV83 -c config-torch.json --timerange 20240101-20240820
CHANGELOG:
V83 - Corrected Market Remige Filter R, and Dynamic Weights Adjustment
"""
class AlexStrategyFinalV9Hyper(IStrategy):
"""
This is an example strategy that uses the LSTMRegressor model to predict the target score.
Use at your own risk.
This is a simple example strategy and should be used for educational purposes only.
"""
# Hyperspace parameters:
buy_params = {
"threshold_buy": 0.59453,
"w0": 0.54347,
"w1": 0.82226,
"w2": 0.56675,
"w3": 0.77918,
"w4": 0.98488,
"w5": 0.31368,
"w6": 0.75916,
"w7": 0.09226,
"w8": 0.85667,
}
sell_params = {
"threshold_sell": 0.80573,
}
# ROI table:
minimal_roi = {
# "0": 0.239,
# "79": 0.058,
# "231": 0.029,
# "543": 0
}
# To hyperopt the initial value, just hyperopt minimal_roi and copy the values here, then disable the hyperoptable minimal_roi.
roi_default = {
"0": 0.239,
"79": 0.058,
"231": 0.029,
"543": 0
}
pair_self_controller = {}
can_two_side = BooleanParameter(default=False, space="sell")
self_roi_change_rate = DecimalParameter(0.001, 0.05, default=0.01, space="sell", decimals=3, optimize=True)
self_roi_penalty_factor = DecimalParameter(0.1, 10, default=1, space="sell", decimals=1, optimize=True)
self_roi_0_limit = DecimalParameter(0.01, 0.5, default=0.05, space="sell", decimals=2, optimize=True)
# Stoploss:
stoploss = -0.305 # Were letting the model decide when to sell
timeframe = "1h"
can_short = True
use_exit_signal = True
process_only_new_candles = True
exit_profit_only = False
exit_profit_offset = 0.01
ignore_roi_if_entry_signal = False
startup_candle_count = 0
leverage_value = 5.0
# Trailing stop:
#trailing_stop = False
#trailing_only_offset_is_reached = False
# Hyperoptable buy/sell thresholds
threshold_buy = RealParameter(-1, 1, default=0, space='buy')
threshold_sell = RealParameter(-1, 1, default=0, space='sell')
# Hyperoptable indicator weights
w0 = RealParameter(0, 1, default=0.10, space='buy') # moving average (normalized_ma)
w1 = RealParameter(0, 1, default=0.15, space='buy') # MACD (normalized_macd)
w2 = RealParameter(0, 1, default=0.10, space='buy') # Rate of Change (ROC)
w3 = RealParameter(0, 1, default=0.15, space='buy') # RSI (normalized_rsi)
w4 = RealParameter(0, 1, default=0.10, space='buy') # Bollinger Band width
w5 = RealParameter(0, 1, default=0.10, space='buy') # CCI (normalized_cci)
w6 = RealParameter(0, 1, default=0.10, space='buy') # OBV (normalized_obv)
w7 = RealParameter(0, 1, default=0.05, space='buy') # ATR (normalized_atr)
w8 = RealParameter(0, 1, default=0.10, space='buy') # Stochastic Oscillator (normalized_stoch)
def feature_engineering_expand_all(self, dataframe: DataFrame, period: int,
metadata: Dict, **kwargs):
# #----------------------------------
# dataframe["%-mfi-period"] = ta.MFI(dataframe, timeperiod=period)
# dataframe["%-adx-period"] = ta.ADX(dataframe, window=period)
# dataframe["%-er-period"] = pta.er(dataframe['close'], length=period)
# dataframe["%-rocr-period"] = ta.ROCR(dataframe, timeperiod=period)
# dataframe["%-cmf-period"] = chaikin_mf(dataframe, periods=period)
# dataframe["%-tcp-period"] = top_percent_change(dataframe, period)
# dataframe["%-cti-period"] = pta.cti(dataframe['close'], length=period)
# dataframe["%-chop-period"] = qtpylib.chopiness(dataframe, period)
# dataframe["%-linear-period"] = ta.LINEARREG_ANGLE(dataframe['close'], timeperiod=period)
# dataframe["%-atr-period"] = ta.ATR(dataframe, timeperiod=period)
# dataframe["%-atr-periodp"] = dataframe["%-atr-period"] / dataframe['close'] * 1000
# #----------------------------------
dataframe["%-cci-period"] = ta.CCI(dataframe, timeperiod=20)
dataframe["%-rsi-period"] = ta.RSI(dataframe, timeperiod=10)
dataframe["%-momentum-period"] = ta.MOM(dataframe, timeperiod=4)
dataframe['%-ma-period'] = ta.SMA(dataframe, timeperiod=10)
dataframe['%-macd-period'], dataframe['%-macdsignal-period'], dataframe['%-macdhist-period'] = ta.MACD(
dataframe['close'], slowperiod=12,
fastperiod=26)
dataframe['%-roc-period'] = ta.ROC(dataframe, timeperiod=2)
bollinger = qtpylib.bollinger_bands(
qtpylib.typical_price(dataframe), window=period, stds=2.2
)
dataframe["bb_lowerband-period"] = bollinger["lower"]
dataframe["bb_middleband-period"] = bollinger["mid"]
dataframe["bb_upperband-period"] = bollinger["upper"]
dataframe["%-bb_width-period"] = (
dataframe["bb_upperband-period"]
- dataframe["bb_lowerband-period"]
) / dataframe["bb_middleband-period"]
dataframe["%-close-bb_lower-period"] = (
dataframe["close"] / dataframe["bb_lowerband-period"]
)
return dataframe
def feature_engineering_expand_basic(self, dataframe: DataFrame, metadata: Dict, **kwargs):
dataframe["%-pct-change"] = dataframe["close"].pct_change()
dataframe["%-raw_volume"] = dataframe["volume"]
dataframe["%-raw_price"] = dataframe["close"]
return dataframe
def feature_engineering_standard(self, dataframe: DataFrame, metadata: Dict, **kwargs):
dataframe['date'] = pd.to_datetime(dataframe['date'])
dataframe["%-day_of_week"] = dataframe["date"].dt.dayofweek
dataframe["%-hour_of_day"] = dataframe["date"].dt.hour
return dataframe
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
dataframe['ma'] = ta.SMA(dataframe, timeperiod=10)
dataframe['roc'] = ta.ROC(dataframe, timeperiod=2)
dataframe['macd'], dataframe['macdsignal'], dataframe['macdhist'] = ta.MACD(dataframe['close'], slowperiod=12,
fastperiod=26)
dataframe['momentum'] = ta.MOM(dataframe, timeperiod=4)
dataframe['rsi'] = ta.RSI(dataframe, timeperiod=10)
bollinger = ta.BBANDS(dataframe, timeperiod=20)
dataframe['bb_upperband'] = bollinger['upperband']
dataframe['bb_middleband'] = bollinger['middleband']
dataframe['bb_lowerband'] = bollinger['lowerband']
dataframe['cci'] = ta.CCI(dataframe, timeperiod=20)
dataframe['stoch'] = ta.STOCH(dataframe)['slowk']
dataframe['atr'] = ta.ATR(dataframe, timeperiod=14)
dataframe['obv'] = ta.OBV(dataframe)
dataframe['ma_100'] = ta.SMA(dataframe, timeperiod=100)
# Step 1: Normalize Indicators:
# Why? Normalizing the indicators will make them comparable and allow us to assign weights to them.
# How? We will calculate the z-score of each indicator by subtracting the rolling mean and dividing by the
# rolling standard deviation. This will give us a normalized value that is centered around 0 with a standard
# deviation of 1.
dataframe['normalized_stoch'] = (dataframe['stoch'] - dataframe['stoch'].rolling(window=14).mean()) / dataframe['stoch'].rolling(window=14).std()
dataframe['normalized_atr'] = (dataframe['atr'] - dataframe['atr'].rolling(window=14).mean()) / dataframe['atr'].rolling(window=14).std()
dataframe['normalized_obv'] = (dataframe['obv'] - dataframe['obv'].rolling(window=14).mean()) / dataframe['obv'].rolling(window=14).std()
dataframe['normalized_ma'] = (dataframe['close'] - dataframe['close'].rolling(window=10).mean()) / dataframe['close'].rolling(window=10).std()
dataframe['normalized_macd'] = (dataframe['macd'] - dataframe['macd'].rolling(window=26).mean()) / dataframe['macd'].rolling(window=26).std()
dataframe['normalized_roc'] = (dataframe['roc'] - dataframe['roc'].rolling(window=2).mean()) / dataframe['roc'].rolling(window=2).std()
dataframe['normalized_momentum'] = (dataframe['momentum'] - dataframe['momentum'].rolling(window=4).mean()) / \
dataframe['momentum'].rolling(window=4).std()
dataframe['normalized_rsi'] = (dataframe['rsi'] - dataframe['rsi'].rolling(window=10).mean()) / dataframe['rsi'].rolling(window=10).std()
dataframe['normalized_bb_width'] = (dataframe['bb_upperband'] - dataframe['bb_lowerband']).rolling(
window=20).mean() / (dataframe['bb_upperband'] - dataframe['bb_lowerband']).rolling(window=20).std()
dataframe['normalized_cci'] = (dataframe['cci'] - dataframe['cci'].rolling(window=20).mean()) / dataframe['cci'].rolling(window=20).std()
# Dynamic Weights Adjustment
# Calculate trend strength as the absolute difference between MA and close price
trend_strength = abs(dataframe['ma'] - dataframe['close'])
# Calculate rolling mean and stddev once to avoid redundancy
rolling_mean = trend_strength.rolling(window=14).mean()
rolling_stddev = trend_strength.rolling(window=14).std()
# Calculate a more dynamic strong trend threshold
strong_trend_threshold = rolling_mean + 1.5 * rolling_stddev
# Determine strong trend condition
is_strong_trend = trend_strength > strong_trend_threshold
# Apply dynamic weight adjustment, could also consider a more gradual adjustment
dataframe['w_momentum'] = self.w3.value * (1 + 0.5 * (trend_strength / strong_trend_threshold))
# Optional: Clip the w_momentum values to prevent extreme cases
dataframe['w_momentum'] = dataframe['w_momentum'].clip(lower=self.w3.value, upper=self.w3.value * 2)
# Step 2: Calculate aggregate score S
# Calculate aggregate score S
w = [self.w0.value, self.w1.value, self.w2.value, self.w3.value, self.w4.value, self.w5.value, self.w6.value, self.w7.value, self.w8.value]
dataframe['S'] = w[0] * dataframe['normalized_ma'] + \
w[1] * dataframe['normalized_macd'] + \
w[2] * dataframe['normalized_roc'] + \
w[3] * dataframe['normalized_rsi'] + \
w[4] * dataframe['normalized_bb_width'] + \
w[5] * dataframe['normalized_cci'] + \
dataframe['w_momentum'] * dataframe['normalized_momentum'] + \
self.w8.value * dataframe['normalized_stoch'] + \
self.w7.value * dataframe['normalized_atr'] + \
self.w6.value * dataframe['normalized_obv']
# Step 3: Market Regime Filter R
dataframe['R'] = 0
dataframe.loc[dataframe['close'] > dataframe['bb_upperband'], 'R'] = 1
dataframe.loc[dataframe['close'] < dataframe['bb_lowerband'], 'R'] = -1
# Additional Market Regime Filter based on long-term MA
#dataframe['ma_100'] = ta.SMA(dataframe, timeperiod=100)
#dataframe['R2'] = np.where(dataframe['close'] > dataframe['ma_100'], 1, -1)
#dataframe['R2'].fillna(0, inplace=True) # Fill NaNs with 0 or another default value
buffer_pct = 0.01 # 1% buffer
dataframe['R2'] = np.where(dataframe['close'] > dataframe['ma_100'] * (1 + buffer_pct), 1,
np.where(dataframe['close'] < dataframe['ma_100'] * (1 - buffer_pct), -1, np.nan))
# Step 4: Volatility Adjustment V
# EXPLANATION: Calculate the Bollinger Band width and assign it to V. The Bollinger Band width is the
# difference between the upper and lower Bollinger Bands divided by the middle Bollinger Band. The idea is
# that when the Bollinger Bands are wide, the market is volatile, and when the Bollinger Bands are narrow,
# the market is less volatile. So we are using the Bollinger Band width as a measure of volatility. You can
# use other indicators to measure volatility as well. For example, you can use the ATR (Average True Range)
# Calculate Bollinger Band Width
bb_width = (dataframe['bb_upperband'] - dataframe['bb_lowerband']) / dataframe['bb_middleband']
dataframe['V_mean'] = 1 / (bb_width + 1e-8) # Avoid division by zero
# ATR-based Volatility Measure
dataframe['V2_mean'] = 1 / (dataframe['atr'] + 1e-8) # Avoid division by zero
# Rolling window size for adaptive normalization
rolling_window = 50
# Normalize V_mean using a rolling window
mean_v = dataframe['V_mean'].rolling(window=rolling_window).mean()
std_v = dataframe['V_mean'].rolling(window=rolling_window).std()
dataframe['V_norm'] = (dataframe['V_mean'] - mean_v) / std_v
dataframe['V_norm'] = dataframe['V_norm'].fillna(0)
# Normalize V2_mean using a rolling window
mean_v2 = dataframe['V2_mean'].rolling(window=rolling_window).mean()
std_v2 = dataframe['V2_mean'].rolling(window=rolling_window).std()
dataframe['V2_norm'] = (dataframe['V2_mean'] - mean_v2) / std_v2
dataframe['V2_norm'] = dataframe['V2_norm'].fillna(0)
# Signal assignment using hysteresis
upper_threshold = 1.0
lower_threshold = -1.0
dataframe['V'] = np.where(dataframe['V_norm'] > upper_threshold, 1,
np.where(dataframe['V_norm'] < lower_threshold, -1, np.nan))
dataframe['V2'] = np.where(dataframe['V2_norm'] > upper_threshold, 1,
np.where(dataframe['V2_norm'] < lower_threshold, -1, np.nan))
# Forward-fill to maintain the last state of the signal
dataframe['V'] = dataframe['V'].ffill() # Correct ffill usage
dataframe['V2'] = dataframe['V2'].ffill() # Correct ffill usage
# Get Final Target Score to incorporate new calculations
dataframe['T'] = dataframe['S'] * dataframe['R'] * dataframe['R2'] * dataframe['V'] * dataframe['V2']
# Assign the target score T to the AI target column
target_horizon = 1 # Define your prediction horizon here
dataframe['&-target'] = dataframe['T'].shift(-target_horizon)
return dataframe
def populate_entry_trend(self, df: DataFrame, metadata: dict) -> DataFrame:
enter_long_conditions = [
df['&-target'] > self.threshold_buy.value, #
df['volume'] > 0
]
enter_short_conditions = [
df['&-target'] < self.threshold_sell.value,
df["volume"] > 0
]
df.loc[
reduce(lambda x, y: x & y, enter_long_conditions), ["enter_long", "enter_tag"]
] = (1, "long")
df.loc[
reduce(lambda x, y: x & y, enter_short_conditions), ["enter_short", "enter_tag"]
] = (1, "short")
return df
def populate_exit_trend(self, df: DataFrame, metadata: dict) -> DataFrame:
exit_long_conditions = [
df['&-target'] < self.threshold_sell.value
]
exit_short_conditions = [
df['&-target'] > self.threshold_buy.value
]
if exit_long_conditions:
df.loc[
reduce(lambda x, y: x & y, exit_long_conditions), ["exit_long", "exit_tag"]
] = (1, "exit_long")
if exit_short_conditions:
df.loc[
reduce(lambda x, y: x & y, exit_short_conditions), ["exit_short", "exit_tag"]
] = (1, "exit_short")
return df
def leverage(self, pair: str, current_time: 'datetime', current_rate: float, proposed_leverage: float, **kwargs) -> float:
return self.leverage_value
# # #--------------------------------------
# overbuy_factor = 1.295 #2
# position_adjustment_enable = True
# initial_safety_order_trigger = -0.02
# max_so_multiplier_orig = 3
# safety_order_step_scale = 2
# safety_order_volume_scale = 1.8
# # just for initialization, now we calculate it...
# max_so_multiplier = max_so_multiplier_orig
# # We will store the size of stake of each trade's first order here
# cust_proposed_initial_stakes = {}
# # Amount the strategy should compensate previously partially filled orders for successive safety orders (0.0 - 1.0)
# partial_fill_compensation_scale = 1
# if (max_so_multiplier_orig > 0):
# if (safety_order_volume_scale > 1):
# # print(safety_order_volume_scale * (math.pow(safety_order_volume_scale,(max_so_multiplier - 1)) - 1))
# firstLine = (safety_order_volume_scale *
# (math.pow(safety_order_volume_scale, (max_so_multiplier_orig - 1)) - 1))
# divisor = (safety_order_volume_scale - 1)
# max_so_multiplier = (2 + firstLine / divisor)
# # max_so_multiplier = (2 +
# # (safety_order_volume_scale *
# # (math.pow(safety_order_volume_scale, (max_so_multiplier - 1)) - 1) /
# # (safety_order_volume_scale - 1)))
# elif (safety_order_volume_scale < 1):
# firstLine = safety_order_volume_scale * \
# (1 - math.pow(safety_order_volume_scale, (max_so_multiplier_orig - 1)))
# divisor = 1 - safety_order_volume_scale
# max_so_multiplier = (2 + firstLine / divisor)
# # max_so_multiplier = (2 + (safety_order_volume_scale * (
# # 1 - math.pow(safety_order_volume_scale, (max_so_multiplier - 1))) / (
# # 1 - safety_order_volume_scale)))
# # Since stoploss can only go up and can't go down, if you set your stoploss here, your lowest stoploss will always be tied to the first buy rate
# # So disable the hard stoploss here, and use custom_sell or custom_stoploss to handle the stoploss trigger
# stoploss = -1
def custom_exit(self, pair: str, trade: 'Trade', current_time: 'datetime', current_rate: float,
current_profit: float, **kwargs):
# Determine the direction of the trade
direction = 'all' if self.can_two_side.value else ('short' if trade.is_short else 'long')
# Initialize pair_self_controller for each pair
if pair not in self.pair_self_controller:
self.pair_self_controller[pair] = {}
if direction not in self.pair_self_controller[pair]:
self.pair_self_controller[pair][direction] = {
'self_roi': self.roi_default.copy(),
'self_total_count': 0
}
# Get all closed trades for the pair
trades_origin = Trade.get_trades_proxy(pair=pair, is_open=False)
# Sort the trade list by close date
trades = sorted(trades_origin, key=lambda trade: trade.close_date)
# Check if there are new closed trades
need_change = len(trades) > self.pair_self_controller[pair][direction]['self_total_count']
if need_change:
# If the last trade was exited using 'self_roi' (indicating a profit)
if trades[-1].exit_reason == "self_roi":
# Modify the ROI table to seek greater profits
profit = trades[-1].close_profit
# Find out which stage of ROI it might have exited at
for minutes, roi in sorted(self.pair_self_controller[pair][direction]['self_roi'].items(), key=lambda x: int(x[0])):
# Increase the profit threshold for those stages to expect greater profits next time
if profit >= roi:
self.pair_self_controller[pair][direction]['self_roi'][minutes] += self.self_roi_change_rate.value
# The ROI for the 0th minute should not be less than the limit value
if self.pair_self_controller[pair][direction]['self_roi']['0'] < 0.05:
self.pair_self_controller[pair][direction]['self_roi']['0'] = self.self_roi_0_limit.value
# Print information
# print(f"Modified ROI table for {pair}({direction}): Increased threshold for {minutes} minutes to {self.pair_self_controller[pair][direction]['self_roi'][minutes]}")
# Decrease the profit threshold for stages not reached to expect quicker profits next time
else:
self.pair_self_controller[pair][direction]['self_roi'][minutes] -= self.self_roi_change_rate.value
if self.pair_self_controller[pair][direction]['self_roi'][minutes] < 0:
self.pair_self_controller[pair][direction]['self_roi'][minutes] = 0.01
# The ROI for the 0th minute should not be less than the limit value
if self.pair_self_controller[pair][direction]['self_roi']['0'] < 0.05:
self.pair_self_controller[pair][direction]['self_roi']['0'] = self.self_roi_0_limit.value
# Print information
# print(f"Modified ROI table for {pair}({direction}): Decreased threshold for {minutes} minutes to {self.pair_self_controller[pair][direction]['self_roi'][minutes]}")
# If the trade was a loss
else:
profit = trades[-1].close_profit
if(profit < 0):
# Decrease the thresholds of self_roi (not less than 0) to seek more stable profits
for minutes in self.pair_self_controller[pair][direction]['self_roi']:
self.pair_self_controller[pair][direction]['self_roi'][minutes] -= self.self_roi_change_rate.value * self.self_roi_penalty_factor.value
if self.pair_self_controller[pair][direction]['self_roi'][minutes] < 0:
self.pair_self_controller[pair][direction]['self_roi'][minutes] = 0.01
# The ROI for the 0th minute should not be less than the limit value
if self.pair_self_controller[pair][direction]['self_roi']['0'] < 0.05:
self.pair_self_controller[pair][direction]['self_roi']['0'] = self.self_roi_0_limit.value
# Print information
# print(f"Modified ROI table for {pair}({direction}): Decreased threshold for {minutes} minutes to {self.pair_self_controller[pair][direction]['self_roi'][minutes]}")
# Refresh self_total_count
self.pair_self_controller[pair][direction]['self_total_count'] = len(trades)
# Decide whether to issue an exit signal based on the self_roi attribute
hold_minutes = (current_time - trade.open_date_utc).total_seconds() / 60
for minutes, roi in sorted(self.pair_self_controller[pair][direction]['self_roi'].items(), key=lambda x: int(x[0])):
if hold_minutes >= int(minutes) and current_profit >= roi:
return 'self_roi'
# def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float,
# rate: float, time_in_force: str, exit_reason: str,
# current_time: datetime, **kwargs) -> bool:
# # remove pair from custom initial stake dict only if full exit
# if trade.amount == amount and pair in self.cust_proposed_initial_stakes:
# del self.cust_proposed_initial_stakes[pair]
# return True
# # Let unlimited stakes leave funds open for DCA orders
# def custom_stake_amount(self, pair: str, current_time: datetime, current_rate: float,
# proposed_stake: float, min_stake: float, max_stake: float,
# **kwargs) -> float:
# custom_stake = proposed_stake / self.max_so_multiplier * self.overbuy_factor
# self.cust_proposed_initial_stakes[pair] = custom_stake # Setting of first stake size just before each first order of a trade
# return custom_stake # set to static 10 to simulate partial fills of 10$, etc
# def adjust_trade_position(self, trade: Trade, current_time: datetime,
# current_rate: float, current_profit: float, min_stake: float,
# max_stake: float, **kwargs) -> Optional[float]:
# if current_profit > self.initial_safety_order_trigger:
# return None
# filled_buys = trade.select_filled_orders(trade.entry_side)
# count_of_buys = len(filled_buys)
# if 1 <= count_of_buys <= self.max_so_multiplier_orig:
# # if (1 <= count_of_buys) and (open_trade_value < self.stake_amount * self.overbuy_factor):
# safety_order_trigger = (abs(self.initial_safety_order_trigger) * count_of_buys)
# if self.safety_order_step_scale > 1:
# safety_order_trigger = abs(self.initial_safety_order_trigger) + (
# abs(self.initial_safety_order_trigger) * self.safety_order_step_scale * (
# math.pow(self.safety_order_step_scale, (count_of_buys - 1)) - 1) / (self.safety_order_step_scale - 1))
# elif self.safety_order_step_scale < 1:
# safety_order_trigger = abs(self.initial_safety_order_trigger) + (
# abs(self.initial_safety_order_trigger) * self.safety_order_step_scale * (
# 1 - math.pow(self.safety_order_step_scale, (count_of_buys - 1))) / (1 - self.safety_order_step_scale))
# if current_profit <= (-1 * abs(safety_order_trigger)):
# try:
# # This returns first order actual stake size
# actual_initial_stake = filled_buys[0].cost
# # Fallback for when the initial stake was not set for whatever reason
# stake_amount = actual_initial_stake
# already_bought = sum(filled_buy.cost for filled_buy in filled_buys)
# if trade.pair in self.cust_proposed_initial_stakes:
# if self.cust_proposed_initial_stakes[trade.pair] > 0:
# # This calculates the amount of stake that will get used for the current safety order,
# # including compensation for any partial buys
# proposed_initial_stake = self.cust_proposed_initial_stakes[trade.pair]
# current_actual_stake = already_bought * math.pow(self.safety_order_volume_scale,
# (count_of_buys - 1))
# current_stake_preposition = proposed_initial_stake * math.pow(self.safety_order_volume_scale,
# (count_of_buys - 1))
# current_stake_preposition_compensation = current_stake_preposition + abs(
# current_stake_preposition - current_actual_stake)
# total_so_stake = lerp(current_actual_stake, current_stake_preposition_compensation,
# self.partial_fill_compensation_scale)
# # Set the calculated stake amount
# stake_amount = total_so_stake
# else:
# # Fallback stake amount calculation
# stake_amount = stake_amount * math.pow(self.safety_order_volume_scale, (count_of_buys - 1))
# else:
# # Fallback stake amount calculation
# stake_amount = stake_amount * math.pow(self.safety_order_volume_scale, (count_of_buys - 1))
# # amount = stake_amount / current_rate
# # logger.info(
# # f"Initiating safety order buy #{count_of_buys} "
# # f"for {trade.pair} with stake amount of {stake_amount}. "
# # f"which equals {amount}. "
# # f"Previously bought: {already_bought}. "
# # f"Now overall:{already_bought + stake_amount}. ")
# return stake_amount
# except Exception as exception:
# # logger.info(f'Error occured while trying to get stake amount for {trade.pair}: {str(exception)}')
# # print(f'Error occured while trying to get stake amount for {trade.pair}: {str(exception)}')
# return None
# return None
# def lerp(a: float, b: float, t: float) -> float:
# """Linear interpolate on the scale given by a to b, using t as the point on that scale.
# Examples
# --------
# 50 == lerp(0, 100, 0.5)
# 4.2 == lerp(1, 5, 0.8)
# """
# return (1 - t) * a + t * b
# #--------------------------------------
def chaikin_mf(df, periods=20):
close = df["close"]
low = df["low"]
high = df["high"]
volume = df["volume"]
mfv = ((close - low) - (high - close)) / (high - low)
mfv = mfv.fillna(0.0)
mfv *= volume
cmf = mfv.rolling(periods).sum() / volume.rolling(periods).sum()
return Series(cmf, name="cmf")
def top_percent_change(dataframe: DataFrame, length: int) -> float:
if length == 0:
return (dataframe["open"] - dataframe["close"]) / dataframe["close"]
else:
return (dataframe["open"].rolling(length).max() - dataframe["close"]) / dataframe["close"]