Timeframe
5m
Direction
Long Only
Stoploss
N/A
Trailing Stop
No
ROI
N/A
Interface Version
N/A
Startup Candles
N/A
Indicators
0
freqtrade/freqtrade-strategies
author@: lenik
import pandas as pd
from freqtrade.strategy import (IStrategy, IntParameter)
from freqtrade.persistence import Trade
import logging
import os
import json
import time
from pathlib import Path
import csv
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, asdict
from copy import deepcopy
logger = logging.getLogger(__name__)
ADDRESS_TO_TRACK_TOP = "0x4b66f4048a0a90fd5ff44abbe5d68332656b78b8"
# Constants for position tracking - must be defined before PositionTracker class
POSITION_CHANGE_DETECTION_THRESHOLD = 1e-8 # Minimum size change to detect
PRICE_CHANGE_DETECTION_THRESHOLD = 1e-6 # Minimum price change to detect
#####################################################################################################################################################################################################
# Classes used to manage the copied wallet position tracking
#####################################################################################################################################################################################################
@dataclass
class PositionSnapshot:
coin: str
size: float
entry_price: float
position_value: float
unrealized_pnl: float
leverage: float
margin_used: float
timestamp: int
@dataclass
class PositionChange:
coin: str
change_type: str
old_size: Optional[float]
new_size: float
old_position_value: Optional[float]
new_position_value: float
timestamp: int
human_time: str
class PositionTracker:
def __init__(self, data_dir: str = "position_data"):
self.data_dir = data_dir
self.positions_file = os.path.join(data_dir, "positions_history.csv")
self.changes_file = os.path.join(data_dir, "changes_log.csv")
self.last_positions_file = os.path.join(data_dir, "last_positions.csv")
self.position_history: Dict[str, List[PositionSnapshot]] = {}
self.last_positions: Dict[str, PositionSnapshot] = {}
self.changes_log: List[PositionChange] = []
# Create data directory if it doesn't exist
os.makedirs(data_dir, exist_ok=True)
# Load existing data if available
self._load_data()
def _save_positions_history(self) -> None:
"""Save position history to CSV"""
try:
with open(self.positions_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow([
'coin', 'size', 'entry_price', 'position_value',
'unrealized_pnl', 'leverage', 'margin_used', 'timestamp', 'human_time'
])
for coin, positions in self.position_history.items():
for pos in positions:
writer.writerow([
pos.coin, pos.size, pos.entry_price, pos.position_value,
pos.unrealized_pnl, pos.leverage, pos.margin_used,
pos.timestamp, self._timestamp_to_human(pos.timestamp)
])
except Exception as e:
logger.error(f"Failed to save positions history: {e}")
def _save_last_positions(self) -> None:
"""Save last positions to CSV"""
try:
with open(self.last_positions_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow([
'coin', 'size', 'entry_price', 'position_value',
'unrealized_pnl', 'leverage', 'margin_used', 'timestamp', 'human_time'
])
for pos in self.last_positions.values():
writer.writerow([
pos.coin, pos.size, pos.entry_price, pos.position_value,
pos.unrealized_pnl, pos.leverage, pos.margin_used,
pos.timestamp, self._timestamp_to_human(pos.timestamp)
])
except Exception as e:
logger.error(f"Failed to save last positions: {e}")
def _save_changes_log(self) -> None:
"""Save changes log to CSV"""
try:
with open(self.changes_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow([
'coin', 'change_type', 'old_size', 'new_size',
'old_position_value', 'new_position_value', 'timestamp', 'human_time'
])
for change in self.changes_log:
writer.writerow([
change.coin, change.change_type, change.old_size or '', change.new_size,
change.old_position_value or '', change.new_position_value,
change.timestamp, change.human_time
])
except Exception as e:
logger.error(f"Failed to save changes log: {e}")
def _save_data(self) -> None:
"""Save all tracking data to CSV files"""
self._save_positions_history()
self._save_last_positions()
self._save_changes_log()
def _load_positions_history(self) -> None:
"""Load position history from CSV"""
if not os.path.exists(self.positions_file):
return
try:
with open(self.positions_file, 'r', newline='', encoding='utf-8') as f:
reader = csv.DictReader(f)
self.position_history = {}
for row in reader:
coin = row['coin']
if coin not in self.position_history:
self.position_history[coin] = []
pos = PositionSnapshot(
coin=coin,
size=float(row['size']),
entry_price=float(row['entry_price']),
position_value=float(row['position_value']),
unrealized_pnl=float(row['unrealized_pnl']),
leverage=float(row['leverage']),
margin_used=float(row['margin_used']),
timestamp=int(row['timestamp'])
)
self.position_history[coin].append(pos)
except Exception as e:
logger.error(f"Failed to load position history: {e}")
def _load_last_positions(self) -> None:
"""Load last positions from CSV"""
if not os.path.exists(self.last_positions_file):
return
try:
with open(self.last_positions_file, 'r', newline='', encoding='utf-8') as f:
reader = csv.DictReader(f)
self.last_positions = {}
for row in reader:
coin = row['coin']
pos = PositionSnapshot(
coin=coin,
size=float(row['size']),
entry_price=float(row['entry_price']),
position_value=float(row['position_value']),
unrealized_pnl=float(row['unrealized_pnl']),
leverage=float(row['leverage']),
margin_used=float(row['margin_used']),
timestamp=int(row['timestamp'])
)
self.last_positions[coin] = pos
except Exception as e:
logger.error(f"Failed to load last positions: {e}")
def _load_changes_log(self) -> None:
"""Load changes log from CSV"""
if not os.path.exists(self.changes_file):
return
try:
with open(self.changes_file, 'r', newline='', encoding='utf-8') as f:
reader = csv.DictReader(f)
self.changes_log = []
for row in reader:
old_size = float(row['old_size']) if row['old_size'] else None
old_pos_value = float(row['old_position_value']) if row['old_position_value'] else None
change = PositionChange(
coin=row['coin'],
change_type=row['change_type'],
old_size=old_size,
new_size=float(row['new_size']),
old_position_value=old_pos_value,
new_position_value=float(row['new_position_value']),
timestamp=int(row['timestamp']),
human_time=row['human_time']
)
self.changes_log.append(change)
except Exception as e:
logger.error(f"Failed to load changes log: {e}")
def _load_data(self) -> None:
"""Load all tracking data from CSV files"""
self._load_positions_history()
self._load_last_positions()
self._load_changes_log()
if self.last_positions or self.changes_log:
logger.info(f"Loaded tracking data: {len(self.last_positions)} current positions, "
f"{len(self.changes_log)} historical changes from {self.data_dir}/")
else:
logger.info(f"No existing data found. Starting with fresh tracking data in {self.data_dir}/")
def export_to_json(self, filename: str = None) -> str:
"""Export tracking data to JSON format for easy viewing"""
if filename is None:
filename = os.path.join(self.data_dir, f"export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json")
# Convert dataclasses to dictionaries for JSON serialization
export_data = {
'position_history': {
coin: [asdict(pos) for pos in positions]
for coin, positions in self.position_history.items()
},
'last_positions': {
coin: asdict(pos) for coin, pos in self.last_positions.items()
},
'changes_log': [asdict(change) for change in self.changes_log],
'export_timestamp': datetime.now().isoformat(),
'total_tracked_coins': len(self.position_history),
'total_changes': len(self.changes_log)
}
try:
with open(filename, 'w') as f:
json.dump(export_data, f, indent=2, default=str)
logger.info(f"Data exported to {filename}")
return filename
except Exception as e:
logger.info(f"Failed to export data: {e}")
return ""
def _timestamp_to_human(self, timestamp: int) -> str:
"""Convert timestamp to human readable format"""
return datetime.fromtimestamp(timestamp / 1000).strftime('%Y-%m-%d %H:%M:%S')
def _extract_positions(self, data: Dict[str, Any]) -> Dict[str, PositionSnapshot]:
"""Extract position data from the JSON response"""
positions = {}
timestamp = data.get('time', 0)
for asset_pos in data.get('assetPositions', []):
if asset_pos['type'] == 'oneWay' and 'position' in asset_pos:
pos = asset_pos['position']
coin = pos['coin']
# Convert size to float, handle both string and numeric values
size = float(pos['szi'])
# Skip positions with zero size
if size == 0:
continue
leverage_value = pos['leverage']['value'] if isinstance(pos['leverage'], dict) else pos['leverage']
snapshot = PositionSnapshot(
coin=coin,
size=size,
entry_price=float(pos['entryPx']),
position_value=float(pos['positionValue']),
unrealized_pnl=float(pos['unrealizedPnl']),
leverage=float(leverage_value),
margin_used=float(pos['marginUsed']),
timestamp=timestamp
)
positions[coin] = snapshot
return positions
def _detect_changes(self, current_positions: Dict[str, PositionSnapshot]) -> List[PositionChange]:
"""Detect changes between current and last positions"""
changes = []
# Get timestamp from position data or use current time as fallback
if current_positions:
timestamp = list(current_positions.values())[0].timestamp
else:
timestamp = int(datetime.now().timestamp() * 1000)
human_time = self._timestamp_to_human(timestamp)
# Check for closed positions
for coin in self.last_positions:
if coin not in current_positions:
old_pos = self.last_positions[coin]
change = PositionChange(
coin=coin,
change_type='closed',
old_size=old_pos.size,
new_size=0.0,
old_position_value=old_pos.position_value,
new_position_value=0.0,
timestamp=timestamp,
human_time=human_time
)
changes.append(change)
# Check for new, modified, increased, or decreased positions
for coin, current_pos in current_positions.items():
if coin not in self.last_positions:
# New position opened
position_type = "long" if current_pos.size > 0 else "short"
change = PositionChange(
coin=coin,
change_type=f'opened_{position_type}',
old_size=None,
new_size=current_pos.size,
old_position_value=None,
new_position_value=current_pos.position_value,
timestamp=timestamp,
human_time=human_time
)
changes.append(change)
else:
old_pos = self.last_positions[coin]
# Check for size changes (significant changes only)
if abs(current_pos.size - old_pos.size) > POSITION_CHANGE_DETECTION_THRESHOLD:
change_type = self._determine_change_type(old_pos.size, current_pos.size)
change = PositionChange(
coin=coin,
change_type=change_type,
old_size=old_pos.size,
new_size=current_pos.size,
old_position_value=old_pos.position_value,
new_position_value=current_pos.position_value,
timestamp=timestamp,
human_time=human_time
)
changes.append(change)
# Check for significant modifications (leverage, entry price changes)
elif (abs(current_pos.leverage - old_pos.leverage) > POSITION_CHANGE_DETECTION_THRESHOLD or
abs(current_pos.entry_price - old_pos.entry_price) > PRICE_CHANGE_DETECTION_THRESHOLD): # Higher threshold for entry price
change = PositionChange(
coin=coin,
change_type='modified',
old_size=old_pos.size,
new_size=current_pos.size,
old_position_value=old_pos.position_value,
new_position_value=current_pos.position_value,
timestamp=timestamp,
human_time=human_time
)
changes.append(change)
# Ignore pure P&L changes (position_value, unrealized_pnl, margin_used changes
# without size/leverage/entry_price changes are just market movements)
return changes
def _determine_change_type(self, old_size: float, new_size: float) -> str:
"""Determine the type of change considering long/short positions"""
# Check for direction flip (long to short or short to long)
if (old_size > 0 and new_size < 0) or (old_size < 0 and new_size > 0):
return 'flipped'
# Same direction changes
if old_size > 0 and new_size > 0: # Both long
return 'increased' if new_size > old_size else 'decreased'
elif old_size < 0 and new_size < 0: # Both short
# For shorts: more negative = larger short position
return 'increased' if abs(new_size) > abs(old_size) else 'decreased'
return 'modified'
def track_positions(self, position_data: Dict[str, Any]) -> List[PositionChange]:
"""
Main function to track positions and detect changes
Args:
position_data: JSON data containing position information
Returns:
List of detected changes
"""
# Extract current positions
current_positions = self._extract_positions(position_data)
# Detect changes
changes = self._detect_changes(current_positions)
# Only update history if there are actual position changes (not just P&L updates)
if changes:
timestamp = position_data.get('time', int(datetime.now().timestamp() * 1000))
for coin, position in current_positions.items():
if coin not in self.position_history:
self.position_history[coin] = []
# Only add to history if this represents a significant change
# (new position, size change, leverage change, etc.)
should_add_to_history = any(
change.coin == coin and change.change_type in [
'opened_long', 'opened_short', 'closed', 'increased',
'decreased', 'flipped', 'modified'
] for change in changes
)
if should_add_to_history:
self.position_history[coin].append(position)
# Always log changes (even if empty for completeness)
self.changes_log.extend(changes)
# Always update last positions (for tracking future changes)
self.last_positions = deepcopy(current_positions)
# Save data to file after each update (but only if there were changes)
if changes:
self._save_data()
else:
# Still need to save last_positions for change detection, but not full history
self._save_last_positions()
return changes
def print_changes(self, changes: List[PositionChange]) -> None:
"""Print detected changes in a readable format"""
if not changes:
logger.info("No position changes detected.")
return
logger.info(f"\n=== Position Changes Detected ({len(changes)} changes) ===")
for change in changes:
position_info = self._get_position_info(change.new_size if change.new_size != 0 else change.old_size)
logger.info(f"\n[{change.human_time}] {change.coin} - {change.change_type.upper()}")
if change.change_type.startswith('opened'):
direction = "LONG" if change.new_size > 0 else "SHORT"
logger.info(f" New {direction} position: {abs(change.new_size):,.4f} (${change.new_position_value:,.2f})")
elif change.change_type == 'closed':
old_direction = "LONG" if change.old_size > 0 else "SHORT"
logger.info(f" Closed {old_direction} position: {abs(change.old_size):,.4f} (was ${change.old_position_value:,.2f})")
elif change.change_type == 'flipped':
old_direction = "LONG" if change.old_size > 0 else "SHORT"
new_direction = "LONG" if change.new_size > 0 else "SHORT"
logger.info(f" Position flipped from {old_direction} to {new_direction}")
logger.info(f" Size: {change.old_size:,.4f} → {change.new_size:,.4f}")
logger.info(f" Value: ${change.old_position_value:,.2f} → ${change.new_position_value:,.2f}")
elif change.change_type in ['increased', 'decreased']:
direction = "LONG" if change.new_size > 0 else "SHORT"
size_diff = change.new_size - change.old_size
value_diff = change.new_position_value - change.old_position_value
# For display purposes, show absolute values but indicate direction
logger.info(f" {direction} position {change.change_type}")
logger.info(f" Size: {change.old_size:,.4f} → {change.new_size:,.4f} ({size_diff:+,.4f})")
logger.info(f" Value: ${change.old_position_value:,.2f} → ${change.new_position_value:,.2f} ({value_diff:+,.2f})")
elif change.change_type == 'modified':
direction = "LONG" if change.new_size > 0 else "SHORT"
logger.info(f" {direction} position modified (same size: {change.new_size:,.4f})")
logger.info(f" Value: ${change.old_position_value:,.2f} → ${change.new_position_value:,.2f}")
def _get_position_info(self, size: float) -> str:
"""Get position direction info"""
if size > 0:
return "LONG"
elif size < 0:
return "SHORT"
else:
return "CLOSED"
def get_current_positions(self) -> Dict[str, PositionSnapshot]:
"""Get current positions"""
return self.last_positions.copy()
def get_position_history(self, coin: Optional[str] = None) -> Dict[str, List[PositionSnapshot]]:
"""Get position history for a specific coin or all coins"""
if coin:
return {coin: self.position_history.get(coin, [])}
return self.position_history.copy()
def clear_data(self, confirm: bool = False) -> None:
"""Clear all tracking data (use with caution)"""
if not confirm:
logger.info("Use clear_data(confirm=True) to actually clear the data.")
return
self.position_history = {}
self.last_positions = {}
self.changes_log = []
# Remove CSV files
files_to_remove = [self.positions_file, self.changes_file, self.last_positions_file]
for file_path in files_to_remove:
if os.path.exists(file_path):
os.remove(file_path)
logger.info(f"All tracking data cleared from {self.data_dir}/")
def get_stats(self) -> Dict[str, Any]:
"""Get statistics about tracked data"""
stats = {
'total_coins_tracked': len(self.position_history),
'current_active_positions': len(self.last_positions),
'total_changes': len(self.changes_log),
'data_directory': self.data_dir,
'csv_files': {
'positions_history': os.path.exists(self.positions_file),
'changes_log': os.path.exists(self.changes_file),
'last_positions': os.path.exists(self.last_positions_file)
}
}
if self.changes_log:
stats['first_change'] = self._timestamp_to_human(self.changes_log[0].timestamp)
stats['last_change'] = self._timestamp_to_human(self.changes_log[-1].timestamp)
return stats
############################################################################################################################################################################################################
# End of classes usesd to manage the copied wallet position tracking
############################################################################################################################################################################################################
# =============================================================================
# STRATEGY CONSTANTS - Modify these to adjust strategy behavior
# =============================================================================
# Trading parameters
CHANGE_THRESHOLD = 0.5 # in % - Minimum position change to copy
ADJUSTMENT_THRESHOLD = (1.0/3.0)*10.0 # in % - Threshold for position corrections
# Timing and caching (synchronized values)
API_CACHE_DURATION = 5 # seconds - How long to cache API data
COOLDOWN_DRY_RUN = 5 # seconds - Cooldown after position change (dry run)
COOLDOWN_LIVE = 10 # seconds - Cooldown after position change (live)
# Thresholds and tolerances
DUST_USDC = 0.51 # Minimum USDC to subtract for orders
MICRO_ADJUSTMENT_THRESHOLD = 0.5 # % - Skip adjustments smaller than this (reduced for small capital)
MINIMUM_ACCOUNT_RATIO = 0.5 # % - Skip positions smaller than this % of account
# ROI and risk management
DISABLE_ROI_VALUE = 5000.0 # Effectively disables ROI
STOP_LOSS = -0.95 # 95% stop loss
## freqtrade strategy class
class COPY_HL(IStrategy):
global ADDRESS_TO_TRACK_TOP
minimal_roi = {
"0": DISABLE_ROI_VALUE # Effectively disables ROI
}
stoploss = STOP_LOSS
timeframe = '5m'
# The bot will iterate every process_throttle_secs since process_only_new_candles is set to false. Therefore the timeframe is basically irrelevant for this strategy.
# In theory, you could even increase it to 15m, 1h, or more, and it should not affect the bot. I ended up setting it to 5m, just in case.
startup_candle_count: int = 0
can_short: bool = False
process_only_new_candles: bool = False
position_adjustment_enable = True
# =============================================================================
# TUNABLE PARAMETERS - Reference global constants defined above
# =============================================================================
# Trading parameters
LEV = IntParameter(1, 40, default=6, space='buy', optimize=False) # Leverage to use
change_threshold = CHANGE_THRESHOLD # in % - Minimum position change to copy
adjustment_threshold = ADJUSTMENT_THRESHOLD # in % - Threshold for position corrections
ADDRESS_TO_TRACK = ADDRESS_TO_TRACK_TOP
# =============================================================================
# STATE VARIABLES (do not touch)
# =============================================================================
copied_account_position_changes = None
current_positions_to_copy = None
my_open_positions = None
nb_loop = 1
_cached_perp_data = None
_cache_timestamp = None
_cache_duration = API_CACHE_DURATION # Use the global constant
_is_cooldown_after_position_change = False
_cooldown_seconds_after_position_change = COOLDOWN_DRY_RUN # Default to dry run, will be overridden
# I noticed that with real money (an issue not seen in dry-run), it can take about 1 minute or more for the position size to be updated by Freqtrade,
# even when calling self.wallets.update(). If you don't wait long enough after a position size change, it can cause an infinite loop of position increases and decreases.
# any suggestion to improve this would be apreciated
_time_of_change = None
_got_perp_data_account_state_successfully = False
matching_positions_check_output = None
_my_wallet_address = None # Cache for our own wallet address
# Optional order type mapping.
order_types = {
'entry': 'market',
'exit': 'market',
'stoploss': 'market',
'stoploss_on_exchange': False
}
# Optional order time in force.
order_time_in_force = {
'entry': 'gtc',
'exit': 'gtc'
}
def get_my_wallet_address(self) -> str:
"""Get our own wallet address from config with caching"""
if self._my_wallet_address is None:
self._my_wallet_address = self.config.get('exchange', {}).get('walletAddress', '')
if not self._my_wallet_address:
self._my_wallet_address = "0x3026f2739C396413f7C97F1c70F81BDfd75C6D1C"
logger.info("Using hardcoded wallet address")
return self._my_wallet_address
def get_stake_total(self) -> float:
"""Get account total value directly from HyperLiquid instead of FreqTrade wallets"""
try:
# Get our own account data from HyperLiquid
my_wallet_address = self.get_my_wallet_address()
if not my_wallet_address:
logger.warning("No wallet address available, falling back to FreqTrade wallets")
stake = self.config['stake_currency'] # e.g. "USDC"
return self.wallets.get_total(stake)
my_perp_data = self.GET_PERP_ACCOUNT_STATUS(my_wallet_address)
if my_perp_data and 'marginSummary' in my_perp_data:
return float(my_perp_data['marginSummary']['accountValue'])
else:
logger.warning("Could not get account value from HyperLiquid, falling back to FreqTrade wallets")
stake = self.config['stake_currency'] # e.g. "USDC"
return self.wallets.get_total(stake)
except Exception as e:
logger.error(f"Error getting account value from HyperLiquid: {e}")
# Fallback to FreqTrade
stake = self.config['stake_currency'] # e.g. "USDC"
return self.wallets.get_total(stake)
def GET_PERP_ACCOUNT_STATUS(self, address):
"""Get account status with caching and error handling"""
try:
# Use address-specific cached data if recent
current_time = time.time()
cache_key = f"_{address}_cached_perp_data"
timestamp_key = f"_{address}_cache_timestamp"
cached_data = getattr(self, cache_key, None)
cached_timestamp = getattr(self, timestamp_key, None)
if (cached_data is not None and
cached_timestamp is not None and
current_time - cached_timestamp < self._cache_duration):
self._got_perp_data_account_state_successfully = True
return cached_data
from hyperliquid.info import Info
from hyperliquid.utils import constants
info = Info(constants.MAINNET_API_URL, skip_ws=True)
perp_user_state = info.user_state(address)
# Cache the result with address-specific keys
setattr(self, cache_key, perp_user_state)
setattr(self, timestamp_key, current_time)
self._got_perp_data_account_state_successfully = True
return perp_user_state
except Exception as e:
logger.error(f"Failed to get perp account status: {e}")
self._got_perp_data_account_state_successfully = False
# Return cached data if available, otherwise None
cache_key = f"_{address}_cached_perp_data"
cached_data = getattr(self, cache_key, None)
return cached_data if cached_data else None
def is_symbol_whitelisted(self, symbol: str) -> bool:
"""
Returns True if the given trading pair symbol is currently in the whitelist.
"""
if not self.dp:
# If DataProvider isn't available (e.g., outside strategy context)
return False
# Retrieve current whitelist from DataProvider
current_list = self.dp.current_whitelist()
return any(symbol in s for s in current_list)
def get_copied_position_leverage(self, coin_ticker: str) -> float:
"""Get the leverage of a specific coin from the copied positions"""
try:
if coin_ticker in self.current_positions_to_copy:
leverage = self.current_positions_to_copy[coin_ticker].leverage
logger.info(f"Copied position leverage for {coin_ticker}: {leverage}x")
return leverage
else:
logger.info(f"No copied position found for {coin_ticker}, using LEV parameter")
return self.LEV.value # Fallback to parameter if not found
except Exception as e:
logger.error(f"Error getting copied position leverage for {coin_ticker}: {e}")
return self.LEV.value # Fallback to parameter on error
def _get_real_exchange_position(self, pair: str) -> float:
"""Get real position directly from HyperLiquid instead of FreqTrade"""
try:
# Get our own account data from HyperLiquid
my_wallet_address = self.get_my_wallet_address()
if not my_wallet_address:
logger.warning("No wallet address found in config, falling back to FreqTrade exchange")
# Fallback to FreqTrade
exchange_positions = self.dp.exchange.fetch_positions([pair])
if exchange_positions and len(exchange_positions) > 0:
position = exchange_positions[0]
return float(position.get('contracts', 0)) if position.get('contracts') is not None else float(position.get('size', 0))
return 0.0
my_perp_data = self.GET_PERP_ACCOUNT_STATUS(my_wallet_address)
if my_perp_data and 'assetPositions' in my_perp_data:
# Extract coin from pair (remove /USDC:USDC)
coin = pair.replace("/USDC:USDC", "")
# Look for the specific coin position
for asset_pos in my_perp_data['assetPositions']:
if asset_pos['type'] == 'oneWay' and 'position' in asset_pos:
pos = asset_pos['position']
if pos['coin'] == coin:
return float(pos['szi'])
# Position not found, return 0
return 0.0
else:
logger.warning("Could not get position data from HyperLiquid, falling back to FreqTrade")
# Fallback to FreqTrade
exchange_positions = self.dp.exchange.fetch_positions([pair])
if exchange_positions and len(exchange_positions) > 0:
position = exchange_positions[0]
return float(position.get('contracts', 0)) if position.get('contracts') is not None else float(position.get('size', 0))
return 0.0
except Exception as e:
logger.error(f"Error fetching real exchange position for {pair}: {e}")
return None
def check_print_positions_summary(self):
"""
Print a nicely formatted summary of current positions, scale factor, and comparisons.
Returns:
list[dict]: For each matching position, a dict with:
- 'coin': str
- 'diff_pc': float # % difference vs expected scaled value
- 'my_value': float # actual USD value of my position
"""
matching_positions_output = []
# No need to update wallets since we get data directly from HyperLiquid
try:
logger.info("=" * 80)
logger.info("POSITIONS SUMMARY")
logger.info("=" * 80)
# Account values and scale factor
perp_data = self.GET_PERP_ACCOUNT_STATUS(self.ADDRESS_TO_TRACK)
if perp_data:
copied_account_value = float(perp_data['marginSummary']['accountValue'])
my_account_value = float(self.get_stake_total())
if copied_account_value == 0:
logger.error("Copied account value is zero, cannot calculate scale factor")
return matching_positions_output
scale_factor = my_account_value / copied_account_value
logger.info(f"Copied Account Value: ${copied_account_value:,.2f}")
logger.info(f"My Account Value: ${my_account_value:,.2f}")
logger.info(f"Scale Factor: {scale_factor:.6f}x (inverted {1.0/scale_factor:.1f}x )")
logger.info("-" * 50)
else:
logger.info("No cached perp data available")
return matching_positions_output
# Current positions to copy
logger.info("POSITIONS TO COPY:")
if self.current_positions_to_copy:
for coin, position in self.current_positions_to_copy.items():
position_value = position.position_value
size = float(position.size)
ratio_pc = position_value / copied_account_value * 100.0
position_type = "LONG" if size > 0 else "SHORT"
scaled_value = position_value * scale_factor
logger.info(f" {coin:>8} | {position_type:>5} | Size: {size:>12.4f} | "
f"Value: ${position_value:>10.2f} ({ratio_pc:>5.2f}%) | "
f"Scaled: ${scaled_value:>10.2f} | Leverage: {position.leverage}x")
else:
logger.info(" No positions to copy")
logger.info("-" * 50)
# My current open positions
logger.info("MY OPEN POSITIONS:")
if self.my_open_positions:
for trade in self.my_open_positions:
coin = trade.pair.replace("/USDC:USDC", "")
ticker = self.dp.ticker(trade.pair)
rate = ticker['last']
real_position = self._get_real_exchange_position(trade.pair)
position_value = (real_position * rate) if real_position is not None else 0
stake_amount = trade.stake_amount
ratio_pc = position_value / my_account_value * 100.0
logger.info(f" {coin:>8} | LONG | Stake: ${stake_amount:>10.2f} | "
f"Value: ${position_value:>10.2f} ({ratio_pc:>5.2f}%) | "
f"Leverage: {trade.leverage}x")
else:
logger.info(" No open positions")
logger.info("-" * 50)
# Position matching analysis
logger.info("POSITION MATCHING ANALYSIS:")
if self.current_positions_to_copy and self.my_open_positions:
copied_coins = set(self.current_positions_to_copy.keys())
my_coins = set(trade.pair.replace("/USDC:USDC", "") for trade in self.my_open_positions)
# Positions that match
matching = copied_coins.intersection(my_coins)
if matching:
logger.info(" Matching positions:")
for coin in matching:
copied_pos = self.current_positions_to_copy[coin]
my_trade = next(t for t in self.my_open_positions if t.pair.replace("/USDC:USDC", "") == coin)
copied_value = copied_pos.position_value
ticker = self.dp.ticker(my_trade.pair)
rate = ticker['last']
real_position = self._get_real_exchange_position(my_trade.pair)
logger.info(f"Real exchange position of {my_trade.pair}: {real_position}")
my_value = (real_position * rate) if real_position is not None else 0
expected_value = copied_value * scale_factor
diff_pc = ((my_value - expected_value) / expected_value * 100) if expected_value > 0 else 0.0
copied_leverage = copied_pos.leverage
my_leverage = my_trade.leverage
leverage_match = "✓" if my_leverage == copied_leverage else "✗"
logger.info(f" {coin:>8} | Copied: ${copied_value:>8.2f} -> Expected: ${expected_value:>8.2f} | "
f"Actual: ${my_value:>8.2f} | Diff: {diff_pc:>6.1f}% | "
f"Leverage: {my_leverage}x vs {copied_leverage}x {leverage_match}")
matching_positions_output.append({
"coin": coin,
"diff_pc": float(diff_pc),
"my_value": float(my_value)
})
# Positions I should have but don't
should_have = copied_coins - my_coins
if should_have:
logger.info(" Missing positions (should open if in whitelist and not Short, and significant size):")
for coin in should_have:
pos = self.current_positions_to_copy[coin]
size = float(pos.size)
position_type = "LONG" if size > 0 else "SHORT"
# Skip if scaled position < 0.5% of my account
expected_value = pos.position_value * scale_factor
expected_ratio_pc_my = (expected_value / my_account_value * 100.0) if my_account_value > 0 else 0.0
if expected_ratio_pc_my < MINIMUM_ACCOUNT_RATIO:
continue
ratio_pc_copied = pos.position_value / copied_account_value * 100.0
significant = "✓" if ratio_pc_copied >= self.change_threshold else "✗"
if self.is_symbol_whitelisted(coin):
in_wl = ', in whitelist'
else:
in_wl = ', not in whitelist'
logger.info(
f" {coin:>8} | {position_type:>5} | Copied ${pos.position_value:>8.2f} "
f"({ratio_pc_copied:>5.2f}% of copied) | "
f"Expected scaled: ${expected_value:>8.2f} ({expected_ratio_pc_my:>5.2f}% of mine) "
f"{significant} {in_wl} | Leverage: {pos.leverage}x"
)
# Positions I have but shouldn't
shouldnt_have = my_coins - copied_coins
if shouldnt_have:
logger.info(" Extra positions (should close):")
for coin in shouldnt_have:
my_trade = next(t for t in self.my_open_positions if t.pair.replace("/USDC:USDC", "") == coin)
ticker = self.dp.ticker(my_trade.pair)
rate = ticker['last']
real_position = self._get_real_exchange_position(my_trade.pair)
my_value = (real_position * rate) if real_position is not None else 0
logger.info(f" {coin:>8} | LONG | ${my_value:>8.2f}")
logger.info("=" * 80)
return matching_positions_output
except Exception as e:
logger.error(f"Error in print_positions_summary: {e}")
return matching_positions_output
def bot_start(self, **kwargs) -> None:
"""
Called only once after bot instantiation.
:param **kwargs: Ensure to keep this here so updates to this won't break your strategy.
"""
# With direct exchange position verification, we can use shorter cooldowns
# Sync cooldown with cache duration for optimal reactivity
if self.config["runmode"].value in ('live'):
self._cooldown_seconds_after_position_change = COOLDOWN_LIVE
elif self.config["runmode"].value in ('dry_run'):
self._cooldown_seconds_after_position_change = COOLDOWN_DRY_RUN
def bot_loop_start(self, current_time: datetime, **kwargs) -> None:
"""
Called at the start of the bot iteration (one loop). For each loop, it will run populate_indicators on all pairs.
Might be used to perform pair-independent tasks
(e.g. gather some remote resource for comparison)
:param current_time: datetime object, containing the current datetime
:param **kwargs: Ensure to keep this here so updates to this won't break your strategy.
"""
logger.info(f"Loop #{self.nb_loop}")
self.nb_loop += 1
try:
# Initialize tracker
here = Path(__file__).resolve().parent / 'position_data'
tracker = PositionTracker(data_dir=here)
perp_data = self.GET_PERP_ACCOUNT_STATUS(self.ADDRESS_TO_TRACK)
if perp_data is None:
logger.error("Failed to get perp data, using empty position changes")
self.copied_account_position_changes = []
self.current_positions_to_copy = {}
else:
self.copied_account_position_changes = tracker.track_positions(perp_data)
self.current_positions_to_copy = tracker._extract_positions(perp_data)
logger.info(f"Position changes: {self.copied_account_position_changes}")
tracker.print_changes(self.copied_account_position_changes)
self.my_open_positions = Trade.get_trades_proxy(is_open=True)
logger.info("Current positions to copy:")
logger.info(self.current_positions_to_copy)
logger.info("My current positions:")
logger.info(self.my_open_positions)
except Exception as e:
logger.error(f"Error in bot_loop_start: {e}")
# Initialize with safe defaults
self.copied_account_position_changes = []
self.current_positions_to_copy = {}
self.my_open_positions = []
self._got_perp_data_account_state_successfully = False
self.matching_positions_check_output = self.check_print_positions_summary()
def populate_indicators(self, df: pd.DataFrame, metadata: dict) -> pd.DataFrame:
coin_ticker = metadata['pair'].replace("/USDC:USDC", "")
df['signal'] = 2 # Default: do nothing
if not self._got_perp_data_account_state_successfully: # skip (do nothing) if API call to get perp data copied account state failed
return df
perp_data = self.GET_PERP_ACCOUNT_STATUS(self.ADDRESS_TO_TRACK)
# Handle position changes
if self.copied_account_position_changes:
for chg in self.copied_account_position_changes:
if coin_ticker in chg.coin:
copied_account_value = float(perp_data['marginSummary']['accountValue'])
#logger.info(f"copied account value: {copied_account_value}")
position_value_in_copied_account = float(chg.new_position_value) # in USDC
if float(chg.new_size)<0.0: # check if short, just in case
logger.info(f"Ignoring entry on {coin_ticker} because it is a Short. This code is LONG ONLY.")
return df
ratio_pc = position_value_in_copied_account/copied_account_value*100.0
# if it is a long open and if the size is significant
if 'opened_long' == chg.change_type:
if ratio_pc>self.change_threshold:
df['signal'] = 1
return df
else:
logger.info(f"Not opening position on {coin_ticker} because position size in copied account is too small compared to the copied account equity ({ratio_pc:.2f} , less than 1%)")
elif 'closed' in chg.change_type:
df['signal'] = 0
return df
else: # Handle missed entries/exits when no changes detected
df = self._check_missed_entry_or_exit(coin_ticker, df)
# check if mistaken Long that is actually a Short -> send exit signal
df = self.check_mistaken_short(df, coin_ticker)
return df
def _check_missed_entry_or_exit(self, coin_ticker, df):
"""Helper method to check for missed positions"""
try:
my_trades = Trade.get_trades_proxy(is_open=True)
my_current_opened_tickers = [tr.pair.replace("/USDC:USDC", "") for tr in my_trades]
# Check for missed entries
if coin_ticker in self.current_positions_to_copy:
if coin_ticker not in my_current_opened_tickers:
is_short = float(self.current_positions_to_copy[coin_ticker].size)<0.0
if self._is_position_significant(coin_ticker) and not is_short:
df['signal'] = 1
logger.info(f"Missed entry detected for {coin_ticker}. Sending entry signal.")
# Check for missed exits
# not in current positions to copy, but somehow in my current position
if coin_ticker not in self.current_positions_to_copy and coin_ticker in my_current_opened_tickers:
df['signal'] = 0
logger.info(f"Missed exit detected for {coin_ticker}. Sending exit signal.")
# in current positions to copy but not really because very small amount, but somehow in my current position
if coin_ticker in self.current_positions_to_copy and coin_ticker in my_current_opened_tickers:
if not self._is_position_significant(coin_ticker) :
df['signal'] = 0
logger.info(f"Missed exit detected for {coin_ticker}. Sending exit signal.")
except Exception as e:
logger.error(f"Error checking missed positions for {coin_ticker}: {e}")
return df
def _is_position_significant(self, coin_ticker):
"""Check if position is significant enough to copy"""
try:
if not self.current_positions_to_copy or coin_ticker not in self.current_positions_to_copy:
return False
perp_data = self.GET_PERP_ACCOUNT_STATUS(self.ADDRESS_TO_TRACK)
if not perp_data or 'marginSummary' not in perp_data:
return False
copied_account_value = float(perp_data['marginSummary']['accountValue'])
min_threshold = copied_account_value * (self.change_threshold / 100.0) # Convert % to decimal
position_value = self.current_positions_to_copy[coin_ticker].position_value
is_significant = position_value > min_threshold
logger.info(f"Position significance check for {coin_ticker}: ${position_value:.2f} > ${min_threshold:.2f} = {is_significant}")
return is_significant
except Exception as e:
logger.error(f"Error checking position significance: {e}")
return False
def check_mistaken_short(self, df, coin_ticker):
"""
"""
if coin_ticker in self.current_positions_to_copy:
my_trades = Trade.get_trades_proxy(is_open=True)
my_current_opened_tickers = [tr.pair.replace("/USDC:USDC", "") for tr in my_trades]
if coin_ticker in my_current_opened_tickers:
is_short = float(self.current_positions_to_copy[coin_ticker].size) < 0.0
if is_short:
df['signal'] = 0
logger.info(f"There was a short mistaken as a long on {coin_ticker}. This code is LONG ONLY. Exiting immediatly.")
return df
return df
def populate_entry_trend(self, dataframe: pd.DataFrame, metadata: dict) -> pd.DataFrame:
dataframe.loc[dataframe['signal'] == 1, 'enter_long'] = 1
return dataframe
def populate_exit_trend(self, dataframe: pd.DataFrame, metadata: dict) -> pd.DataFrame:
dataframe.loc[dataframe['signal'] == 0, 'exit_long'] = 1
dataframe.loc[dataframe['signal'] == 0, 'exit_short'] = 1
return dataframe
def custom_stake_amount(self, pair: str, current_time: datetime, current_rate: float,
proposed_stake: float, min_stake: float | None, max_stake: float,
leverage: float, entry_tag: str | None, side: str,
**kwargs) -> float:
# Called before entering a trade, makes it possible to manage your position size when placing a new trade.
# Returning 0 or None will prevent trades from being placed -> ACTS ALSO LIKE AN ENTRY CONFIRMATION
# Freqtrade will fall back to the proposed_stake value should your code raise an exception. The exception itself will be logged.
# You do not have to ensure that min_stake <= returned_value <= max_stake. Trades will succeed as the returned value will be clamped to supported range and this action will be logged.
coin_ticker = pair.replace("/USDC:USDC", "")
# No need to update wallets since we get data directly from HyperLiquid
if not self._got_perp_data_account_state_successfully :
return None
try:
if not self.current_positions_to_copy:
logger.error("No current positions to copy available")
return None
perp_data = self.GET_PERP_ACCOUNT_STATUS(self.ADDRESS_TO_TRACK)
copied_account_value = float(perp_data['marginSummary']['accountValue'])
my_account_value = float(self.get_stake_total())
if copied_account_value == 0:
logger.error("Copied account value is zero, cannot calculate scale factor")
return None
scale_factor = my_account_value / copied_account_value
# Look in both position changes and current positions
position_value_in_copied_account = None
# First check position changes
for chg in self.copied_account_position_changes:
if coin_ticker == chg.coin:
position_value_in_copied_account = float(chg.new_position_value)
break
# If not found in changes, check current positions
if position_value_in_copied_account is None:
if coin_ticker in self.current_positions_to_copy:
position_value_in_copied_account = self.current_positions_to_copy[coin_ticker].position_value
if position_value_in_copied_account is None:
logger.warning(f"No position value found for {coin_ticker}")
return None
ratio_pc = position_value_in_copied_account/copied_account_value * 100.0
if ratio_pc<self.change_threshold:
logger.info(f"Not opening position on {pair} because position size in copied account is too small compared to the copied account equity({ratio_pc:.1f}% < 1%)")
return None
dust_USDC = DUST_USDC
returned_val = position_value_in_copied_account * scale_factor
returned_val = (returned_val / leverage) - dust_USDC
if returned_val < min_stake:
returned_val = min_stake
logger.info(f"Calculated stake for {pair}: {returned_val}")
return returned_val
except Exception as e:
logger.error(f"Error in custom_stake_amount: {e}")
return None
def adjust_trade_position(self, trade: Trade, current_time: datetime,
current_rate: float, current_profit: float,
min_stake: float | None, max_stake: float,
current_entry_rate: float, current_exit_rate: float,
current_entry_profit: float, current_exit_profit: float,
**kwargs
) -> float | None | tuple[float | None, str | None]:
# :return float: Stake amount to adjust your trade,
# Positive values to increase position, Negative values to decrease position.
# Return None for no action.
# Optionally, return a tuple with a 2nd element with an order reason
coin_ticker = trade.pair.replace("/USDC:USDC", "")
# No need to update wallets since we get data directly from HyperLiquid
dust_USDC = DUST_USDC
if not self._got_perp_data_account_state_successfully :
return None
if self._time_of_change is not None:
if datetime.now() > self._time_of_change + timedelta(seconds=self._cooldown_seconds_after_position_change):
self._is_cooldown_after_position_change = False
if self._is_cooldown_after_position_change:
logger.info(f"Not doing position size change because of the cooldown of {self._cooldown_seconds_after_position_change} seconds.")
return None
try:
if not self.current_positions_to_copy:
return None
# Get real position from exchange for smarter comparisons
real_exchange_position = self._get_real_exchange_position(trade.pair)
perp_data = self.GET_PERP_ACCOUNT_STATUS(self.ADDRESS_TO_TRACK)
if self.copied_account_position_changes:
copied_account_value = float(perp_data['marginSummary']['accountValue'])
my_account_value = float(self.get_stake_total())
if copied_account_value == 0:
logger.error("Copied account value is zero, cannot calculate scale factor")
return None
scale_factor = my_account_value / copied_account_value
# for detected changes in the copied account
for chg in self.copied_account_position_changes:
if coin_ticker == chg.coin:
# Check if change is significant (>0.5% of account), otherwise skip doing adjustment by returning None
change_ratio_pc = abs(float(chg.old_position_value) - float(chg.new_position_value)) / copied_account_value * 100.0
if change_ratio_pc < self.change_threshold:
logger.info(f"Not increasing or decreasing position on {trade.pair} because position change in copied account is too small compared to the copied account equity({change_ratio_pc:.1f}% < 1%)")
return None
delta_stake = abs(float(chg.old_position_value) - float(chg.new_position_value)) * scale_factor
# Use real exchange position for verification if available
if real_exchange_position is not None:
logger.info(f"Real exchange position for {trade.pair}: {real_exchange_position}")
# Calculate what the position should be after the adjustment
adjustment_amount = (delta_stake / trade.leverage) / current_rate
target_position = real_exchange_position
if 'increased' in chg.change_type:
target_position += adjustment_amount
elif 'decreased' in chg.change_type:
target_position -= adjustment_amount
# Check if adjustment is significant enough (avoid micro-adjustments)
relative_change = abs(adjustment_amount) / max(abs(real_exchange_position), adjustment_amount) * 100
if relative_change < MICRO_ADJUSTMENT_THRESHOLD: # Skip micro adjustments
logger.info(f"Adjustment too small ({relative_change:.2f}%), skipping")
return None
self._time_of_change = datetime.now()
self._is_cooldown_after_position_change = True
if 'increased' in chg.change_type:
return delta_stake / trade.leverage - dust_USDC
elif 'decreased' in chg.change_type:
return -1.0 * delta_stake / trade.leverage - dust_USDC
# for already opened positions, if difference with what it should be in copied account (and scaled) is too large (>10%), adjust to match
if self.matching_positions_check_output:
for pos in self.matching_positions_check_output:
logger.info(f"{pos['coin']} → Difference: {pos['diff_pc']:.2f}% (my total value: {pos['my_value']:.1f}) ; ||>{self.adjustment_threshold:.0f}% will trigger a size correction.")
if pos['coin']==coin_ticker:
if abs(pos['diff_pc'])>self.adjustment_threshold:
# Calculate correction needed: negative diff_pc means position too small, positive means too large
# Formula: target_value = my_value / (1 + diff_pc/100), then delta = target_value - my_value
# Add protection against division by zero
denominator = 1.0 + pos['diff_pc']/100.0
if abs(denominator) < 1e-10:
logger.error(f"Division by zero in delta calculation for {coin_ticker}")
return None
target_value = pos['my_value'] / denominator
delta_stake = target_value - pos['my_value']
logger.info(delta_stake)
logger.info(delta_stake / trade.leverage)
self._time_of_change = datetime.now()
self._is_cooldown_after_position_change = True
return delta_stake / trade.leverage - dust_USDC
return None
except Exception as e:
logger.error(f"Error in adjust_trade_position: {e}")
return None
def leverage(self, pair: str, current_time: datetime, current_rate: float,
proposed_leverage: float, max_leverage: float, entry_tag: str | None, side: str,
**kwargs) -> float:
"""
Determine leverage for a position by copying the leverage from the tracked account
"""
coin_ticker = pair.replace("/USDC:USDC", "")
try:
# Get leverage from copied position
copied_leverage = self.get_copied_position_leverage(coin_ticker)
# Ensure leverage doesn't exceed max allowed
final_leverage = min(copied_leverage, max_leverage)
logger.info(f"Setting leverage for {pair}: {final_leverage}x (copied: {copied_leverage}x, max: {max_leverage}x)")
return final_leverage
except Exception as e:
logger.error(f"Error determining leverage for {pair}: {e}")
# Fallback to LEV parameter
lev = min(self.LEV.value, max_leverage)
return lev