Advanced strategy using GPT model for making trading decisions in futures markets with long/short positions and dynamic leverage. Version: 2025-05-25.2
Timeframe
5m
Direction
Long & Short
Stoploss
-2.5%
Trailing Stop
Yes
ROI
0m: 1.0%, 30m: 0.5%, 60m: 0.3%, 120m: 0.0%
Interface Version
3
Startup Candles
100
Indicators
7
freqtrade/freqtrade-strategies
Strategy 003 author@: Gerald Lonlas github@: https://github.com/freqtrade/freqtrade-strategies
# --- Do not remove these libs ---
from freqtrade.strategy import IStrategy, IntParameter
from typing import Dict, List, Optional, Tuple, Any
from pandas import DataFrame
import numpy as np
import pandas as pd
import talib.abstract as ta
import freqtrade.vendor.qtpylib.indicators as qtpylib
import logging
from datetime import datetime, timedelta
from freqtrade.persistence import Trade
from freqtrade.strategy import (BooleanParameter, CategoricalParameter, DecimalParameter,
IStrategy, IntParameter)
import requests
import os
import re
import json
from pathlib import Path
import time
# Zgodnie z dokumentacją: https://www.freqtrade.io/en/stable/strategy-customization/
logger = logging.getLogger(__name__)
class GPTFuturesStrategy(IStrategy):
"""
Advanced strategy using GPT model for making trading decisions in futures markets
with long/short positions and dynamic leverage.
Version: 2025-05-25.2
"""
# Strategy interface version
INTERFACE_VERSION = 3
# ROI table - minimum profit thresholds for closing trades
minimal_roi = {
"0": 0.01, # 1% profit required to close a trade immediately
"30": 0.005, # After 30 minutes, 0.5% is enough
"60": 0.0025, # After 60 minutes, 0.25% is enough
"120": 0 # After 120 minutes, close regardless of profit
}
# Stoploss settings
stoploss = -0.025 # -2.5% stoploss
# Trailing stoploss settings
trailing_stop = True
trailing_stop_positive = 0.005 # 0.5%
trailing_stop_positive_offset = 0.01 # 1%
trailing_only_offset_is_reached = True
# Strategy timeframes
timeframe = '5m'
informative_timeframe = '1h'
# Order types settings
order_types = {
'entry': 'market',
'exit': 'market',
'stoploss': 'market',
'stoploss_on_exchange': False
}
# Leverage settings
leverage_configuration = True
max_leverage = 5.0
# GPT parameters
candles_to_analyze = IntParameter(5, 50, default=20, space="buy")
# Position modes
can_short = True
position_adjustment_enable = True
# Startup candle count for indicators
startup_candle_count = 100
# Process settings
process_only_new_candles = True
use_exit_signal = True
exit_profit_only = False
ignore_roi_if_entry_signal = False
# GPT cache parameters
standard_cache_time = 300 # 5 minutes (timeframe 5m)
position_cache_time = 150 # 2.5 minutes
price_change_threshold = 0.02 # 2%
stoploss_cache_time = 600 # 10 minutes
# Safety limits
max_daily_loss = 0.05 # 5% max daily loss
max_volatility_threshold = 0.05 # 5% max volatility
safe_leverage_threshold = 3.0 # Max leverage in high volatility
# Strategy version tracking
strategy_version = "2025-05-25.2"
def __init__(self, config: dict) -> None:
"""
Initialize the strategy.
:param config: Bot configuration
"""
super().__init__(config)
# Check for OpenAI API key
self.openai_api_key = os.environ.get('OPENAI_API_KEY')
if not self.openai_api_key:
logger.error("OPENAI_API_KEY not set in environment variables!")
else:
logger.info("OPENAI_API_KEY found in environment variables.")
# Cache for GPT responses
self.gpt_cache = {} # {pair: {'timestamp': datetime, 'recommendation': dict, 'dataframe_close': float}}
# Separate cache for stoploss
self.stoploss_cache = {} # {pair: {'timestamp': datetime, 'stoploss': float, 'dataframe_close': float}}
# Statistics counters
self.api_calls = 0
self.cache_hits = 0
self.parse_errors = 0
# Flag for cache clearing after update
self.clear_cache_on_next_run = True
# Create trade history directory
self.trade_history = []
self.history_dir = Path(config['user_data_dir']) / 'strategy_history'
self.history_dir.mkdir(exist_ok=True)
self.history_file = self.history_dir / f"gpt_trade_history_{int(time.time())}.json"
# Market conditions tracking
self.market_conditions = {} # {pair: {'trend': 'up/down', 'volatility': 'high/medium/low'}}
# Daily tracking
self.daily_stats = {
'date': datetime.utcnow().date(),
'trades': 0,
'profit': 0.0,
'wins': 0,
'losses': 0
}
# Performance metrics
self.performance = {
'long_success_rate': 0.5,
'short_success_rate': 0.5,
'high_volatility_success': 0.5,
'low_volatility_success': 0.5
}
# Additional logging
logger.info(f"Strategy initialized with parameters:")
logger.info(f"Version: {self.strategy_version}")
logger.info(f"Stoploss: {self.stoploss}")
logger.info(f"Trailing stop: {self.trailing_stop}")
logger.info(f"Can short: {self.can_short}")
logger.info(f"Position adjustment: {self.position_adjustment_enable}")
logger.info(f"Timeframe: {self.timeframe}")
logger.info(f"Informative timeframe: {self.informative_timeframe}")
logger.info(f"Process only new candles: {self.process_only_new_candles}")
logger.info(f"Standard cache time: {self.standard_cache_time}s")
logger.info(f"Position cache time: {self.position_cache_time}s")
logger.info(f"Stoploss cache time: {self.stoploss_cache_time}s")
logger.info(f"Price change threshold: {self.price_change_threshold * 100}%")
logger.info(f"Safety limits: daily loss {self.max_daily_loss*100}%, max volatility {self.max_volatility_threshold*100}%")
def informative_pairs(self) -> List[Tuple[str, str]]:
"""
Define informative pairs.
"""
return [(pair, self.informative_timeframe) for pair in self.dp.current_whitelist()]
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""
Calculate technical indicators with optimized computation.
"""
# Log the data amount
pair = metadata['pair']
logger.info(f"Populate indicators for {pair} - candles: {len(dataframe)}")
if len(dataframe) == 0:
logger.warning(f"Empty dataframe for {pair}")
return dataframe
# Always calculate basic indicators for all pairs
dataframe['ema5'] = ta.EMA(dataframe, timeperiod=5)
dataframe['ema20'] = ta.EMA(dataframe, timeperiod=20)
dataframe['rsi'] = ta.RSI(dataframe, timeperiod=14)
# Additional indicators only for active pairs or pairs with open positions
if self.is_active_pair(pair):
# EMA indicators
dataframe['ema50'] = ta.EMA(dataframe, timeperiod=50)
dataframe['ema100'] = ta.EMA(dataframe, timeperiod=100)
# VWAP
dataframe['vwap'] = qtpylib.rolling_vwap(dataframe)
# Bollinger Bands
bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(dataframe), window=20, stds=2)
dataframe['bb_lowerband'] = bollinger['lower']
dataframe['bb_middleband'] = bollinger['mid']
dataframe['bb_upperband'] = bollinger['upper']
# Volume indicators
dataframe['volume_mean'] = dataframe['volume'].rolling(window=20).mean()
dataframe['volume_change'] = dataframe['volume'] / dataframe['volume'].shift(1)
dataframe['volume_ma_ratio'] = dataframe['volume'] / dataframe['volume'].rolling(window=20).mean()
# ATR (Average True Range)
dataframe['atr'] = ta.ATR(dataframe, timeperiod=14)
# MACD
macd = ta.MACD(dataframe)
dataframe['macd'] = macd['macd']
dataframe['macdsignal'] = macd['macdsignal']
dataframe['macdhist'] = macd['macdhist']
# Stochastic
stoch = ta.STOCH(dataframe)
dataframe['slowk'] = stoch['slowk']
dataframe['slowd'] = stoch['slowd']
# Candlestick patterns
dataframe['doji'] = self.detect_doji(dataframe)
dataframe['engulfing'] = self.detect_engulfing(dataframe)
dataframe['hammer'] = self.detect_hammer(dataframe)
# Support/Resistance based on volume
dataframe['vol_support'] = self.detect_volume_support(dataframe)
dataframe['vol_resistance'] = self.detect_volume_resistance(dataframe)
# Market trend detection
dataframe['uptrend'] = (dataframe['ema5'] > dataframe['ema20']) & (dataframe['ema20'] > dataframe['ema50'])
dataframe['downtrend'] = (dataframe['ema5'] < dataframe['ema20']) & (dataframe['ema20'] < dataframe['ema50'])
# Volatility measures
dataframe['volatility'] = dataframe['atr'] / dataframe['close']
dataframe['volatility_ma'] = dataframe['volatility'].rolling(window=10).mean()
# Dynamic 1h data creation from 5m data
# First try to get 1h data directly
informative = self.dp.get_pair_dataframe(pair, self.informative_timeframe)
if len(informative) > 0:
logger.info(f"Retrieved {len(informative)} 1h candles directly for {pair}")
# Add indicators on informative timeframe
informative['ema20_1h'] = ta.EMA(informative, timeperiod=20)
informative['ema50_1h'] = ta.EMA(informative, timeperiod=50)
informative['rsi_1h'] = ta.RSI(informative, timeperiod=14)
informative['atr_1h'] = ta.ATR(informative, timeperiod=14)
# Copy dataframe to avoid warnings
df_copy = dataframe.copy()
# Add informative columns to dataframe
for col in ['ema20_1h', 'ema50_1h', 'rsi_1h', 'atr_1h']:
df_copy[col] = np.nan
# Check index type - handle different index types
# For each position in base dataframe
for i in range(len(df_copy)):
# Get date/time for this index
if 'date' in df_copy.columns:
# Use 'date' column if it exists
date_val = pd.to_datetime(df_copy['date'].iloc[i])
elif isinstance(df_copy.index, pd.DatetimeIndex):
# Use index if it's DatetimeIndex
date_val = df_copy.index[i]
else:
# If no date available, skip this iteration
logger.warning(f"Cannot get date for index {i}")
continue
# Round down to full hour
hour_idx = date_val.floor('1h')
# Get index for current iteration
current_idx = df_copy.index[i]
# If date exists in informative dataframe
if hour_idx in informative.index:
# Assign values
df_copy.loc[current_idx, 'ema20_1h'] = informative.loc[hour_idx, 'ema20_1h']
df_copy.loc[current_idx, 'ema50_1h'] = informative.loc[hour_idx, 'ema50_1h']
df_copy.loc[current_idx, 'rsi_1h'] = informative.loc[hour_idx, 'rsi_1h']
df_copy.loc[current_idx, 'atr_1h'] = informative.loc[hour_idx, 'atr_1h']
# Replace original dataframe
dataframe = df_copy
else:
# If no 1h data available, resample 5m data to 1h
logger.info(f"No 1h data for {pair}, creating dynamically from 5m data")
# Make sure dataframe has date column
if 'date' not in dataframe.columns:
logger.warning(f"No 'date' column in dataframe for {pair}")
# Add empty informative columns
dataframe['ema20_1h'] = np.nan
dataframe['ema50_1h'] = np.nan
dataframe['rsi_1h'] = np.nan
dataframe['atr_1h'] = np.nan
return dataframe
# Create copy with datetime index
df_temp = dataframe.copy()
df_temp.set_index('date', inplace=True)
# Resample to 1h and calculate indicators
try:
df_1h = df_temp.resample('1h').agg({
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last',
'volume': 'sum'
})
# Calculate 1h indicators
df_1h['ema20_1h'] = ta.EMA(df_1h['close'], timeperiod=20)
df_1h['ema50_1h'] = ta.EMA(df_1h['close'], timeperiod=50)
df_1h['rsi_1h'] = ta.RSI(df_1h['close'], timeperiod=14)
df_1h['atr_1h'] = ta.ATR(df_1h, timeperiod=14)
logger.info(f"Created {len(df_1h)} 1h candles from 5m data for {pair}")
# Add informative columns to dataframe
dataframe['ema20_1h'] = np.nan
dataframe['ema50_1h'] = np.nan
dataframe['rsi_1h'] = np.nan
dataframe['atr_1h'] = np.nan
# For each position in dataframe
for i in range(len(dataframe)):
# Get date for this row
date_val = pd.to_datetime(dataframe['date'].iloc[i])
# Round down to full hour
hour_idx = date_val.floor('1h')
# If date exists in 1h dataframe
if hour_idx in df_1h.index:
# Assign values
dataframe.loc[dataframe.index[i], 'ema20_1h'] = df_1h.loc[hour_idx, 'ema20_1h']
dataframe.loc[dataframe.index[i], 'ema50_1h'] = df_1h.loc[hour_idx, 'ema50_1h']
dataframe.loc[dataframe.index[i], 'rsi_1h'] = df_1h.loc[hour_idx, 'rsi_1h']
dataframe.loc[dataframe.index[i], 'atr_1h'] = df_1h.loc[hour_idx, 'atr_1h']
except Exception as e:
logger.error(f"Error resampling data to 1h: {e}")
# Add empty columns
dataframe['ema20_1h'] = np.nan
dataframe['ema50_1h'] = np.nan
dataframe['rsi_1h'] = np.nan
dataframe['atr_1h'] = np.nan
# Update market conditions
self.update_market_conditions(dataframe, pair)
# Log completed indicators
logger.info(f"Indicators calculated for {pair}")
return dataframe
def is_active_pair(self, pair: str) -> bool:
"""
Check if a pair is active (in whitelist or has open positions)
"""
has_open_trades = len([t for t in Trade.get_trades_proxy(is_open=True) if t.pair == pair]) > 0
return pair in self.dp.current_whitelist() or has_open_trades
def detect_doji(self, dataframe: DataFrame) -> pd.Series:
"""
Detect doji candlestick pattern
"""
# A doji is a candlestick where open and close are very close
# compared to the high and low
body = abs(dataframe['close'] - dataframe['open'])
shadow = dataframe['high'] - dataframe['low']
return (body / shadow < 0.1) & (shadow > 0)
def detect_engulfing(self, dataframe: DataFrame) -> pd.Series:
"""
Detect bullish and bearish engulfing patterns
"""
# Bullish engulfing: Current candle's body completely engulfs
# the previous candle's body, and current is bullish while previous is bearish
bullish_engulfing = (
(dataframe['close'] > dataframe['open']) & # Current candle is bullish
(dataframe['close'].shift(1) < dataframe['open'].shift(1)) & # Previous candle is bearish
(dataframe['close'] > dataframe['open'].shift(1)) & # Current close > previous open
(dataframe['open'] < dataframe['close'].shift(1)) # Current open < previous close
)
# Bearish engulfing: Current candle's body completely engulfs
# the previous candle's body, and current is bearish while previous is bullish
bearish_engulfing = (
(dataframe['close'] < dataframe['open']) & # Current candle is bearish
(dataframe['close'].shift(1) > dataframe['open'].shift(1)) & # Previous candle is bullish
(dataframe['close'] < dataframe['open'].shift(1)) & # Current close < previous open
(dataframe['open'] > dataframe['close'].shift(1)) # Current open > previous close
)
return bullish_engulfing | bearish_engulfing
def detect_hammer(self, dataframe: DataFrame) -> pd.Series:
"""
Detect hammer and inverted hammer patterns
"""
# Hammer has a small body at the top and a long lower shadow
body = abs(dataframe['close'] - dataframe['open'])
shadow = dataframe['high'] - dataframe['low']
lower_shadow = (min(dataframe['open'], dataframe['close']) - dataframe['low'])
upper_shadow = (dataframe['high'] - max(dataframe['open'], dataframe['close']))
hammer = (
(body / shadow < 0.3) & # Small body
(lower_shadow / body > 2) & # Long lower shadow
(upper_shadow / body < 0.5) # Short upper shadow
)
inverted_hammer = (
(body / shadow < 0.3) & # Small body
(upper_shadow / body > 2) & # Long upper shadow
(lower_shadow / body < 0.5) # Short lower shadow
)
return hammer | inverted_hammer
def detect_volume_support(self, dataframe: DataFrame) -> pd.Series:
"""
Detect potential support levels based on volume
"""
# Support levels often form when price bounces off a level with high volume
# Look for low prices with high volume
volume_threshold = dataframe['volume'].rolling(window=20).mean() * 1.5
return (
(dataframe['volume'] > volume_threshold) & # High volume
(dataframe['close'] > dataframe['open']) & # Bullish candle
(dataframe['low'] < dataframe['low'].rolling(window=5).min()) # New low in recent candles
)
def detect_volume_resistance(self, dataframe: DataFrame) -> pd.Series:
"""
Detect potential resistance levels based on volume
"""
# Resistance levels often form when price fails to break through a level with high volume
# Look for high prices with high volume
volume_threshold = dataframe['volume'].rolling(window=20).mean() * 1.5
return (
(dataframe['volume'] > volume_threshold) & # High volume
(dataframe['close'] < dataframe['open']) & # Bearish candle
(dataframe['high'] > dataframe['high'].rolling(window=5).max()) # New high in recent candles
)
def update_market_conditions(self, dataframe: DataFrame, pair: str) -> None:
"""
Update market conditions tracking
"""
if len(dataframe) < 20:
return
# Calculate trend
ema_trend = 'up' if dataframe['ema5'].iloc[-1] > dataframe['ema20'].iloc[-1] else 'down'
# Calculate volatility
recent_volatility = dataframe['atr'].iloc[-5:].mean() / dataframe['close'].iloc[-1]
if recent_volatility > 0.005: # 0.5%
volatility_level = 'high'
elif recent_volatility < 0.002: # 0.2%
volatility_level = 'low'
else:
volatility_level = 'medium'
# Calculate volume trend
volume_ratio = dataframe['volume'].iloc[-5:].mean() / dataframe['volume'].iloc[-20:].mean()
volume_trend = 'increasing' if volume_ratio > 1.2 else 'decreasing' if volume_ratio < 0.8 else 'stable'
# Update market conditions
self.market_conditions[pair] = {
'trend': ema_trend,
'volatility': volatility_level,
'volume_trend': volume_trend,
'updated_at': datetime.utcnow()
}
def calculate_dynamic_cache_time(self, pair: str, current_time: datetime, dataframe: DataFrame, cache_type: str = 'standard') -> float:
"""
Calculate optimal cache lifetime based on market volatility
"""
# Base cache time
if cache_type == 'stoploss':
base_cache_time = self.stoploss_cache_time
else:
# Check for open positions
open_positions = [t for t in Trade.get_trades_proxy(is_open=True) if t.pair == pair]
has_open_position = len(open_positions) > 0
# Use shorter cache time for pairs with open positions
base_cache_time = self.position_cache_time if has_open_position else self.standard_cache_time
# Calculate volatility from recent candles
if len(dataframe) >= 5 and 'atr' in dataframe.columns and 'close' in dataframe.columns:
recent_volatility = dataframe['atr'].iloc[-5:].mean() / dataframe['close'].iloc[-1]
# Adjust cache time based on volatility
if recent_volatility > 0.005: # 0.5% volatility
cache_time = max(60, base_cache_time / 2) # Minimum 1 minute
elif recent_volatility < 0.001: # 0.1% volatility
cache_time = min(900, base_cache_time * 1.5) # Maximum 15 minutes
else:
cache_time = base_cache_time
# Market conditions adjustments
if pair in self.market_conditions:
conditions = self.market_conditions[pair]
# Reduce cache time during high volatility or increasing volume
if conditions.get('volatility') == 'high' or conditions.get('volume_trend') == 'increasing':
cache_time *= 0.8
# Extend cache time during stable conditions
if conditions.get('volatility') == 'low' and conditions.get('volume_trend') == 'stable':
cache_time *= 1.2
else:
cache_time = base_cache_time
logger.info(f"Dynamic cache time for {pair}: {cache_time:.1f}s")
return cache_time
def should_use_gpt_cache(self, pair: str, current_time: datetime, dataframe: DataFrame, cache_type: str = 'standard') -> bool:
"""
Check if cache should be used instead of querying GPT API.
"""
if cache_type == 'stoploss':
cache = self.stoploss_cache
else:
cache = self.gpt_cache
# Check if we have response in cache for this pair
if pair not in cache:
return False
cache_entry = cache[pair]
time_diff = (current_time - cache_entry['timestamp']).total_seconds()
# Calculate dynamic cache time
cache_time = self.calculate_dynamic_cache_time(pair, current_time, dataframe, cache_type)
# Check for sudden price changes
if len(dataframe) >= 2:
last_price = dataframe['close'].iloc[-1]
cache_price = cache_entry.get('dataframe_close', 0)
if cache_price > 0:
price_change_pct = abs((last_price - cache_price) / cache_price)
# If price changed significantly since last analysis, refresh GPT analysis
if price_change_pct > self.price_change_threshold:
logger.info(f"Significant price change for {pair}: {price_change_pct:.2%} - refreshing GPT analysis")
return False
# Use cache if within validity period
return time_diff < cache_time
def get_gpt_recommendation(self, dataframe: DataFrame, metadata: dict, current_time: datetime, open_trades: list) -> dict:
"""
Get recommendation from GPT model using caching.
"""
pair = metadata['pair']
# Check if we have enough candles for analysis
if len(dataframe) == 0:
logger.warning(f"Empty dataframe for {pair} - cannot create GPT prompt")
return {
'action': 'hold',
'reasoning': 'No data available for analysis',
'leverage': 1.0,
'stop_loss': self.stoploss,
'take_profit': 0.02
}
# Check if we should use cache
if self.should_use_gpt_cache(pair, current_time, dataframe):
self.cache_hits += 1
cache_entry = self.gpt_cache[pair]
time_diff = (current_time - cache_entry['timestamp']).total_seconds()
logger.info(f"Using cached recommendation for {pair} (age: {time_diff:.1f}s, cache hits: {self.cache_hits})")
return cache_entry['recommendation']
# If no API key, return default recommendation
if not self.openai_api_key:
logger.warning("No OPENAI_API_KEY - skipping GPT consultation")
return {
'action': 'hold',
'reasoning': 'GPT API key not configured',
'leverage': 1.0,
'stop_loss': self.stoploss,
'take_profit': 0.02
}
# Additional diagnostic logging
logger.info(f"Starting GPT consultation for {pair}")
# Get latest candles for analysis
candles_count = min(len(dataframe), self.candles_to_analyze.value)
if candles_count == 0:
logger.warning(f"Not enough candles for {pair} - cannot create GPT prompt")
return {
'action': 'hold',
'reasoning': 'Not enough data for analysis',
'leverage': 1.0,
'stop_loss': self.stoploss,
'take_profit': 0.02
}
latest_candles = dataframe.tail(candles_count).copy()
# Prepare open positions data for this pair
pair_trades = [trade for trade in open_trades if trade.pair == pair]
# Prepare prompt for GPT with market conditions
prompt = self._prepare_compact_prompt(latest_candles, pair, current_time, pair_trades)
# Log the prompt
logger.info(f"GPT Prompt:\n{prompt}")
try:
# Call OpenAI API
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.openai_api_key}"
}
data = {
"model": "gpt-4-turbo",
"messages": [
{
"role": "system",
"content": "You are an experienced futures scalper specializing in cryptocurrency markets. Your task is to analyze market data and provide precise trading recommendations."
},
{"role": "user", "content": prompt}
],
"temperature": 0.3,
"max_tokens": 1500
}
logger.info(f"Sending request to OpenAI API... (API call #{self.api_calls + 1})")
response = requests.post(
"https://api.openai.com/v1/chat/completions",
headers=headers,
json=data,
timeout=30 # Add timeout to prevent long waits
)
self.api_calls += 1
if response.status_code != 200:
logger.error(f"Error from GPT API: {response.status_code} - {response.text}")
return {'action': 'hold', 'reasoning': f'API error: {response.status_code}', 'leverage': 1.0}
# Process response
response_data = response.json()
gpt_response = response_data['choices'][0]['message']['content']
# Log the response
logger.info(f"GPT Response:\n{gpt_response}")
# Parse GPT response
recommendation = self._parse_gpt_response(gpt_response)
# Apply safety checks to recommendation
recommendation = self.apply_safety_checks(pair, recommendation, dataframe)
# Save recommendation to cache
self.gpt_cache[pair] = {
'timestamp': current_time,
'recommendation': recommendation,
'dataframe_close': dataframe['close'].iloc[-1] if len(dataframe) > 0 else 0
}
logger.info(f"Final recommendation: {recommendation} (API calls: {self.api_calls}, cache hits: {self.cache_hits})")
return recommendation
except Exception as e:
logger.error(f"Error calling GPT API: {e}")
return {'action': 'hold', 'reasoning': f'API error: {str(e)}', 'leverage': 1.0}
def apply_safety_checks(self, pair: str, recommendation: dict, dataframe: DataFrame) -> dict:
"""
Apply safety checks to GPT recommendations
"""
# 1. Check daily loss limit
daily_loss = self.calculate_daily_loss()
if daily_loss < -self.max_daily_loss:
logger.warning(f"Daily loss limit exceeded ({daily_loss:.2%}), forcing HOLD recommendation")
recommendation['action'] = 'hold'
recommendation['reasoning'] = f"Safety override: daily loss limit exceeded ({daily_loss:.2%})"
return recommendation
# 2. Check market volatility
volatility = self.calculate_market_volatility(pair, dataframe)
if volatility > self.max_volatility_threshold:
logger.warning(f"High market volatility ({volatility:.2%}), reducing leverage and risk")
# If entering a new position in high volatility, reduce leverage
if recommendation['action'] in ['long', 'short']:
recommendation['leverage'] = min(recommendation['leverage'], self.safe_leverage_threshold)
# Make stop loss tighter in high volatility
if recommendation['action'] == 'long':
recommendation['stop_loss'] = max(recommendation['stop_loss'], -0.015) # Max 1.5% loss
else: # short
recommendation['stop_loss'] = min(recommendation['stop_loss'], 0.015) # Max 1.5% loss
# 3. Enforce maximum leverage
if recommendation['leverage'] > self.max_leverage:
logger.warning(f"Reducing leverage from {recommendation['leverage']} to maximum {self.max_leverage}")
recommendation['leverage'] = self.max_leverage
return recommendation
def calculate_daily_loss(self) -> float:
"""
Calculate cumulative loss for today
"""
# Check if we need to reset daily stats (new day)
today = datetime.utcnow().date()
if self.daily_stats['date'] != today:
self.daily_stats = {
'date': today,
'trades': 0,
'profit': 0.0,
'wins': 0,
'losses': 0
}
return 0.0
return self.daily_stats['profit']
def calculate_market_volatility(self, pair: str, dataframe: DataFrame) -> float:
"""
Calculate market volatility
"""
if len(dataframe) < 20 or 'atr' not in dataframe.columns:
return 0.0
# ATR-based volatility (ATR as percentage of price)
atr_volatility = dataframe['atr'].iloc[-10:].mean() / dataframe['close'].iloc[-1]
# Price range volatility
price_range = (dataframe['high'].iloc[-20:].max() - dataframe['low'].iloc[-20:].min()) / dataframe['close'].iloc[-1]
# Combine both measures
volatility = (atr_volatility + price_range) / 2
return volatility
def calculate_dynamic_position_size(self, dataframe: DataFrame, pair: str, recommendation: dict) -> float:
"""
Dynamically calculate position size based on market volatility and signal strength
"""
# Calculate volatility
if 'atr' in dataframe.columns and len(dataframe) > 0:
volatility = dataframe['atr'].iloc[-1] / dataframe['close'].iloc[-1]
else:
volatility = 0.01 # Default 1% if ATR not available
# Base stake amount from config or default
if 'stake_amount' in self.config:
base_stake = float(self.config['stake_amount'])
else:
base_stake = 0.01 # Default 1% of portfolio
# Adjust for volatility - reduce size when volatility is high
volatility_factor = max(0.5, min(1.0, 1.0 - (volatility * 10)))
# Adjust for signal strength if available in recommendation
signal_strength = recommendation.get('signal_strength', 1.0)
# Calculate final position size
position_size = base_stake * volatility_factor * signal_strength
logger.info(f"Dynamic position size for {pair}: {position_size:.2f} (vol: {volatility:.4f}, strength: {signal_strength:.2f})")
return position_size
def _prepare_compact_prompt(self, candles: DataFrame, pair: str, current_time: datetime, open_trades: list) -> str:
"""
Prepare optimized, compact prompt for GPT model in English.
"""
# Extract symbol and base value for scaling
symbol = pair.split('/')[0]
base_value = int(candles['close'].iloc[0] // 1000 * 1000) # Round to thousands for base value
scale_factor = 1000 if base_value >= 10000 else 1
# Format header
data_header = f"{symbol} 5m Base={base_value/scale_factor:.1f}K:"
# Format tabular data
table = "| Idx | O/C | H/L | %Chg | Vol | RSI | EMA5/20 | 1h-RSI |\n"
table += "|-----|-----|-----|------|-----|-----|---------|--------|\n"
# Add every other candle to the table (for space efficiency)
step = 2 # Can change to 1 to show all candles
for i in range(0, len(candles), step):
candle = candles.iloc[i]
# Relative price change
pct_change = ((candle['close'] - candle['open']) / candle['open'] * 100)
# Difference from base value (for compactness)
o_rel = (candle['open'] - base_value) / scale_factor
c_rel = (candle['close'] - base_value) / scale_factor
h_rel = (candle['high'] - base_value) / scale_factor
l_rel = (candle['low'] - base_value) / scale_factor
# EMA trend (↑ when EMA5 > EMA20, ↓ when EMA5 < EMA20)
ema_trend = "↑" if candle['ema5'] > candle['ema20'] else "↓"
# Format table row - handle different index types
if 'date' in candles.columns:
idx_display = pd.to_datetime(candle['date']).strftime('%H:%M')
elif isinstance(candles.index, pd.DatetimeIndex):
idx_display = candles.index[i].strftime('%H:%M')
else:
idx_display = str(i) # Use numeric index if no better option
rsi_1h = candle['rsi_1h'] if 'rsi_1h' in candle and not pd.isna(candle['rsi_1h']) else "-"
table += f"| {idx_display} "
table += f"| {o_rel:.1f}/{c_rel:.1f} | {h_rel:.1f}/{l_rel:.1f} | {pct_change:.1f} "
table += f"| {int(candle['volume'])} | {int(candle['rsi'])} | {ema_trend} | {int(rsi_1h) if isinstance(rsi_1h, (int, float)) else '-'} |\n"
# Technical analysis in one line
sup_level = (candles['low'].min() - base_value) / scale_factor
res_level = (candles['high'].max() - base_value) / scale_factor
current_close = (candles['close'].iloc[-1] - base_value) / scale_factor
tech_analysis = (
f"TREND: {'DOWN' if candles['ema5'].iloc[-1] < candles['ema20'].iloc[-1] else 'UP'}, "
f"SUP: {sup_level:.1f}K, RES: {res_level:.1f}K, "
f"RSI: {candles['rsi'].iloc[-1]:.0f}({candles['rsi'].min():.0f}-{candles['rsi'].max():.0f}), "
f"CURR: {current_close:.1f}K"
)
# Additional indicators if available
advanced_indicators = ""
if 'macd' in candles.columns:
advanced_indicators += f"MACD: {candles['macd'].iloc[-1]:.2f}, "
if 'macdsignal' in candles.columns:
advanced_indicators += f"Signal: {candles['macdsignal'].iloc[-1]:.2f}, "
if 'volatility' in candles.columns:
advanced_indicators += f"Volatility: {candles['volatility'].iloc[-1]*100:.2f}%, "
# Market conditions if available
market_context = ""
if pair in self.market_conditions:
conditions = self.market_conditions[pair]
market_context = f"\n## Market Conditions\nTrend: {conditions.get('trend', 'unknown').upper()}, Volatility: {conditions.get('volatility', 'medium').upper()}, Volume: {conditions.get('volume_trend', 'stable').upper()}"
# Positions information
positions = "NO OPEN POSITIONS" if not open_trades else ", ".join(
[f"{'LONG' if not t.is_short else 'SHORT'}@{(t.open_rate-base_value)/scale_factor:.1f}K({t.calc_profit_ratio(t.open_rate)*100:.1f}%)"
for t in open_trades]
)
# Full prompt
prompt = f"""
# {pair} Analysis [{current_time.strftime('%Y-%m-%d %H:%M')}]
{data_header}
{table}
## Technical Analysis
{tech_analysis}
{advanced_indicators}
## Current Positions
{positions}
{market_context}
## Task
As an experienced futures scalper, analyze the market data above and provide a trading recommendation.
1. Analyze short-term (5-minute) and medium-term (1-hour) trends
2. Identify key support and resistance levels
3. Evaluate market momentum based on RSI, EMA and candlestick patterns
4. Consider current trading volume
5. Present 2-3 price scenarios and specific entry/exit strategies
## Response Format (JSON)
```json
{{
"action": "LONG|SHORT|CLOSE|HOLD",
"leverage": 1.0-5.0,
"stop_loss_pct": 0.5-3.0,
"take_profit_pct": 1.0-10.0,
"reasoning": "Your concise analysis here",
"scenarios": ["Scenario 1", "Scenario 2", "Scenario 3"]
}}