This is an example strategy that uses the LSTMRegressor model to predict the target score. Use at your own risk. This is a simple example strategy and should be used for educational purposes only.
Timeframe
1h
Direction
Long Only
Stoploss
-10.0%
Trailing Stop
Yes
ROI
0m: 0.0%, 600m: 0.0%
Interface Version
N/A
Startup Candles
20
Indicators
0
freqtrade/freqtrade-strategies
freqtrade/freqtrade-strategies
freqtrade/freqtrade-strategies
this is an example class, implementing a PSAR based trailing stop loss you are supposed to take the `custom_stoploss()` and `populate_indicators()` parts and adapt it to your own strategy
freqtrade/freqtrade-strategies
import inspect
import json
import logging
import os
import time
from abc import ABC, abstractmethod
from collections import defaultdict
from functools import reduce
from typing import DefaultDict, Dict, List
import numpy as np
import pandas as pd
import talib.abstract as ta
from confluent_kafka import Consumer
from freqtrade.exchange.exchange_utils import *
from freqtrade.strategy import (DecimalParameter, IntParameter, IStrategy,
RealParameter)
from pandas import DataFrame
from technical import qtpylib
from utils.config import get_collector, get_sentiment_collector
collector = get_collector()
sentiment_collector = get_sentiment_collector()
logger = logging.getLogger(__name__)
class ExampleLSTMStrategy(IStrategy):
"""
This is an example strategy that uses the LSTMRegressor model to predict the target score.
Use at your own risk.
This is a simple example strategy and should be used for educational purposes only.
"""
# ROI table:
minimal_roi = {
"0": 0.0003,
# "600": 0 # we let the model decide when to exit
}
# Stoploss:
stoploss = -0.10
# Trailing stop:
trailing_stop = True
trailing_stop_positive = 0.001
trailing_stop_positive_offset = 0.0139
trailing_only_offset_is_reached = True
# timeframe = "1h" #sus
can_short = False
use_exit_signal = True
process_only_new_candles = True
# fetch_steps = 0
startup_candle_count = 20
threshold_buy = DecimalParameter(0.0001, 0.001, default=0.0003, space="buy")
threshold_sell = DecimalParameter(0.0001, 0.001, default=0.0003, space="sell")
# custom_data_source: CustomDataCollectorManager
def __init__(self, config):
super().__init__(config)
def feature_engineering_standard(
self, dataframe: DataFrame, metadata: Dict, **kwargs
):
# print(dataframe[['date','close']].tail())
# logging.info(f"kwargs: {kwargs}")
# logging.info(f"len: {len(dataframe)}")
# logging.info(f"{dataframe.columns}")
start_time = time.time()
timeout = 1 # seconds
dataframe['date'] = pd.to_datetime(dataframe['date'])
# Closed price from data collector + 1m datetime offset
while True:
request_date: pd.Timestamp = dataframe["date"].iloc[-1] # + np.timedelta64(1, 'm')
closed_price = collector.get_pair_data(metadata["pair"], request_date.to_datetime64())
if closed_price is None:
# logging.info(f"Waiting for closed price for {metadata['pair']} at {request_date}")
elapsed = time.time() - start_time
if elapsed > timeout:
logging.warning(f"Timeout reached ({timeout}s) while waiting for closed price of {metadata['pair']} at {request_date}")
break
time.sleep(0.25)
else:
logging.info(f"Data collector in use for {request_date}")
dataframe.loc[dataframe.index[-1], "close"] = closed_price
break
dataframe["%-raw_price"] = dataframe["close"]
dfs = sentiment_collector.bulk_get_pair_data(metadata["pair"], dataframe["date"].values)
for col in dfs.columns:
dataframe["%-" + col] = dfs[col].values
return dataframe
def set_freqai_targets(
self, dataframe: DataFrame, metadata: Dict, **kwargs
) -> DataFrame:
# print(dataframe.columns)
dataframe["&-target"] = dataframe["close"]
return dataframe
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
self.freqai_info = self.config["freqai"]
dataframe = self.freqai.start(dataframe, metadata, self)
# One can define indicators here if needed and add logic to populate_entry_trend and populate_exit_trend
return dataframe
def populate_entry_trend(self, df: DataFrame, metadata: dict) -> DataFrame:
enter_long_conditions = [
df["do_predict"] == 1,
(df["&-target"] - df["close"]) / df["close"] > self.threshold_buy.value,
]
if enter_long_conditions:
df.loc[
reduce(lambda x, y: x & y, enter_long_conditions),
["enter_long", "enter_tag"],
] = (1, "long")
# enter_short_conditions = [
# df["do_predict"] == 1,
# (df['&-target'] - df['close']) / df['close'] < -self.threshold_sell.value
# ]
# if enter_short_conditions:
# df.loc[
# reduce(lambda x, y: x & y, enter_short_conditions), ["enter_short", "enter_tag"]
# ] = (1, "short")
return df
def populate_exit_trend(self, df: DataFrame, metadata: dict) -> DataFrame:
# exit_long_conditions = [
# df["do_predict"] == 1,
# (df['&-target'] - df['close']) / df['close'] < -1.618 * self.threshold_buy.value * 2
# ]
# if exit_long_conditions:
# df.loc[
# reduce(lambda x, y: x & y, exit_long_conditions), ["exit_long", "exit_tag"]
# ] = (1, "exit_long")
# exit_short_conditions = [
# df["do_predict"] == 1,
# (df['&-target'] - df['close']) / df['close'] > 1.618 * self.threshold_sell.value * 2
# ]
# if exit_short_conditions:
# df.loc[
# reduce(lambda x, y: x & y, exit_short_conditions), ["exit_short", "exit_tag"]
# ] = (1, "exit_short")
return df