Timeframe
5m
Direction
Long Only
Stoploss
-95.0%
Trailing Stop
No
ROI
0m: 500000.0%
Interface Version
N/A
Startup Candles
N/A
Indicators
0
freqtrade/freqtrade-strategies
author@: lenik
import pandas as pd
from freqtrade.strategy import (IStrategy, IntParameter)
from freqtrade.persistence import Trade
import logging
import os
import json
import time
from pathlib import Path
import json
import os
import csv
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, asdict
from copy import deepcopy
logger = logging.getLogger(__name__)
ADDRESS_TO_TRACK_TOP = "0x4b66f4048a0a90fd5ff44abbe5d68332656b78b8"
# also 0x95b8b411653328db32f59b143c6d45f8501e2b35 is pretty good
#####################################################################################################################################################################################################
# Classes used to manage the copied wallet position tracking
#####################################################################################################################################################################################################
@dataclass
class PositionSnapshot:
coin: str
size: float
entry_price: float
position_value: float
unrealized_pnl: float
leverage: float
margin_used: float
timestamp: int
@dataclass
class PositionChange:
coin: str
change_type: str
old_size: Optional[float]
new_size: float
old_position_value: Optional[float]
new_position_value: float
timestamp: int
human_time: str
class PositionTracker:
def __init__(self, data_dir: str = "position_data"):
self.data_dir = data_dir
self.positions_file = os.path.join(data_dir, "positions_history.csv")
self.changes_file = os.path.join(data_dir, "changes_log.csv")
self.last_positions_file = os.path.join(data_dir, "last_positions.csv")
self.position_history: Dict[str, List[PositionSnapshot]] = {}
self.last_positions: Dict[str, PositionSnapshot] = {}
self.changes_log: List[PositionChange] = []
# Create data directory if it doesn't exist
os.makedirs(data_dir, exist_ok=True)
# Load existing data if available
self._load_data()
def _save_positions_history(self) -> None:
"""Save position history to CSV"""
try:
with open(self.positions_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow([
'coin', 'size', 'entry_price', 'position_value',
'unrealized_pnl', 'leverage', 'margin_used', 'timestamp', 'human_time'
])
for coin, positions in self.position_history.items():
for pos in positions:
writer.writerow([
pos.coin, pos.size, pos.entry_price, pos.position_value,
pos.unrealized_pnl, pos.leverage, pos.margin_used,
pos.timestamp, self._timestamp_to_human(pos.timestamp)
])
except Exception as e:
logger.info(f"Warning: Failed to save positions history: {e}")
def _save_last_positions(self) -> None:
"""Save last positions to CSV"""
try:
with open(self.last_positions_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow([
'coin', 'size', 'entry_price', 'position_value',
'unrealized_pnl', 'leverage', 'margin_used', 'timestamp', 'human_time'
])
for pos in self.last_positions.values():
writer.writerow([
pos.coin, pos.size, pos.entry_price, pos.position_value,
pos.unrealized_pnl, pos.leverage, pos.margin_used,
pos.timestamp, self._timestamp_to_human(pos.timestamp)
])
except Exception as e:
logger.info(f"Warning: Failed to save last positions: {e}")
def _save_changes_log(self) -> None:
"""Save changes log to CSV"""
try:
with open(self.changes_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow([
'coin', 'change_type', 'old_size', 'new_size',
'old_position_value', 'new_position_value', 'timestamp', 'human_time'
])
for change in self.changes_log:
writer.writerow([
change.coin, change.change_type, change.old_size or '', change.new_size,
change.old_position_value or '', change.new_position_value,
change.timestamp, change.human_time
])
except Exception as e:
logger.info(f"Warning: Failed to save changes log: {e}")
def _save_data(self) -> None:
"""Save all tracking data to CSV files"""
self._save_positions_history()
self._save_last_positions()
self._save_changes_log()
def _load_positions_history(self) -> None:
"""Load position history from CSV"""
if not os.path.exists(self.positions_file):
return
try:
with open(self.positions_file, 'r', newline='', encoding='utf-8') as f:
reader = csv.DictReader(f)
self.position_history = {}
for row in reader:
coin = row['coin']
if coin not in self.position_history:
self.position_history[coin] = []
pos = PositionSnapshot(
coin=coin,
size=float(row['size']),
entry_price=float(row['entry_price']),
position_value=float(row['position_value']),
unrealized_pnl=float(row['unrealized_pnl']),
leverage=float(row['leverage']),
margin_used=float(row['margin_used']),
timestamp=int(row['timestamp'])
)
self.position_history[coin].append(pos)
except Exception as e:
logger.info(f"Warning: Failed to load position history: {e}")
def _load_last_positions(self) -> None:
"""Load last positions from CSV"""
if not os.path.exists(self.last_positions_file):
return
try:
with open(self.last_positions_file, 'r', newline='', encoding='utf-8') as f:
reader = csv.DictReader(f)
self.last_positions = {}
for row in reader:
coin = row['coin']
pos = PositionSnapshot(
coin=coin,
size=float(row['size']),
entry_price=float(row['entry_price']),
position_value=float(row['position_value']),
unrealized_pnl=float(row['unrealized_pnl']),
leverage=float(row['leverage']),
margin_used=float(row['margin_used']),
timestamp=int(row['timestamp'])
)
self.last_positions[coin] = pos
except Exception as e:
logger.info(f"Warning: Failed to load last positions: {e}")
def _load_changes_log(self) -> None:
"""Load changes log from CSV"""
if not os.path.exists(self.changes_file):
return
try:
with open(self.changes_file, 'r', newline='', encoding='utf-8') as f:
reader = csv.DictReader(f)
self.changes_log = []
for row in reader:
old_size = float(row['old_size']) if row['old_size'] else None
old_pos_value = float(row['old_position_value']) if row['old_position_value'] else None
change = PositionChange(
coin=row['coin'],
change_type=row['change_type'],
old_size=old_size,
new_size=float(row['new_size']),
old_position_value=old_pos_value,
new_position_value=float(row['new_position_value']),
timestamp=int(row['timestamp']),
human_time=row['human_time']
)
self.changes_log.append(change)
except Exception as e:
logger.info(f"Warning: Failed to load changes log: {e}")
def _load_data(self) -> None:
"""Load all tracking data from CSV files"""
self._load_positions_history()
self._load_last_positions()
self._load_changes_log()
if self.last_positions or self.changes_log:
logger.info(f"Loaded tracking data: {len(self.last_positions)} current positions, "
f"{len(self.changes_log)} historical changes from {self.data_dir}/")
else:
logger.info(f"No existing data found. Starting with fresh tracking data in {self.data_dir}/")
def export_to_json(self, filename: str = None) -> str:
"""Export tracking data to JSON format for easy viewing"""
if filename is None:
filename = os.path.join(self.data_dir, f"export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json")
# Convert dataclasses to dictionaries for JSON serialization
export_data = {
'position_history': {
coin: [asdict(pos) for pos in positions]
for coin, positions in self.position_history.items()
},
'last_positions': {
coin: asdict(pos) for coin, pos in self.last_positions.items()
},
'changes_log': [asdict(change) for change in self.changes_log],
'export_timestamp': datetime.now().isoformat(),
'total_tracked_coins': len(self.position_history),
'total_changes': len(self.changes_log)
}
try:
with open(filename, 'w') as f:
json.dump(export_data, f, indent=2, default=str)
logger.info(f"Data exported to {filename}")
return filename
except Exception as e:
logger.info(f"Failed to export data: {e}")
return ""
def _timestamp_to_human(self, timestamp: int) -> str:
"""Convert timestamp to human readable format"""
return datetime.fromtimestamp(timestamp / 1000).strftime('%Y-%m-%d %H:%M:%S')
def _extract_positions(self, data: Dict[str, Any]) -> Dict[str, PositionSnapshot]:
"""Extract position data from the JSON response"""
positions = {}
timestamp = data.get('time', 0)
for asset_pos in data.get('assetPositions', []):
if asset_pos['type'] == 'oneWay' and 'position' in asset_pos:
pos = asset_pos['position']
coin = pos['coin']
# Convert size to float, handle both string and numeric values
size = float(pos['szi'])
# Skip positions with zero size
if size == 0:
continue
leverage_value = pos['leverage']['value'] if isinstance(pos['leverage'], dict) else pos['leverage']
snapshot = PositionSnapshot(
coin=coin,
size=size,
entry_price=float(pos['entryPx']),
position_value=float(pos['positionValue']),
unrealized_pnl=float(pos['unrealizedPnl']),
leverage=float(leverage_value),
margin_used=float(pos['marginUsed']),
timestamp=timestamp
)
positions[coin] = snapshot
return positions
def _detect_changes(self, current_positions: Dict[str, PositionSnapshot]) -> List[PositionChange]:
"""Detect changes between current and last positions"""
changes = []
timestamp = list(current_positions.values())[0].timestamp if current_positions else int(datetime.now().timestamp() * 1000)
human_time = self._timestamp_to_human(timestamp)
# Check for closed positions
for coin in self.last_positions:
if coin not in current_positions:
old_pos = self.last_positions[coin]
change = PositionChange(
coin=coin,
change_type='closed',
old_size=old_pos.size,
new_size=0.0,
old_position_value=old_pos.position_value,
new_position_value=0.0,
timestamp=timestamp,
human_time=human_time
)
changes.append(change)
# Check for new, modified, increased, or decreased positions
for coin, current_pos in current_positions.items():
if coin not in self.last_positions:
# New position opened
position_type = "long" if current_pos.size > 0 else "short"
change = PositionChange(
coin=coin,
change_type=f'opened_{position_type}',
old_size=None,
new_size=current_pos.size,
old_position_value=None,
new_position_value=current_pos.position_value,
timestamp=timestamp,
human_time=human_time
)
changes.append(change)
else:
old_pos = self.last_positions[coin]
# Check for size changes (significant changes only)
if abs(current_pos.size - old_pos.size) > 1e-8:
change_type = self._determine_change_type(old_pos.size, current_pos.size)
change = PositionChange(
coin=coin,
change_type=change_type,
old_size=old_pos.size,
new_size=current_pos.size,
old_position_value=old_pos.position_value,
new_position_value=current_pos.position_value,
timestamp=timestamp,
human_time=human_time
)
changes.append(change)
# Check for significant modifications (leverage, entry price changes)
elif (abs(current_pos.leverage - old_pos.leverage) > 1e-8 or
abs(current_pos.entry_price - old_pos.entry_price) > 1e-6): # Higher threshold for entry price
change = PositionChange(
coin=coin,
change_type='modified',
old_size=old_pos.size,
new_size=current_pos.size,
old_position_value=old_pos.position_value,
new_position_value=current_pos.position_value,
timestamp=timestamp,
human_time=human_time
)
changes.append(change)
# Ignore pure P&L changes (position_value, unrealized_pnl, margin_used changes
# without size/leverage/entry_price changes are just market movements)
return changes
def _determine_change_type(self, old_size: float, new_size: float) -> str:
"""Determine the type of change considering long/short positions"""
# Check for direction flip (long to short or short to long)
if (old_size > 0 and new_size < 0) or (old_size < 0 and new_size > 0):
return 'flipped'
# Same direction changes
if old_size > 0 and new_size > 0: # Both long
return 'increased' if new_size > old_size else 'decreased'
elif old_size < 0 and new_size < 0: # Both short
# For shorts: more negative = larger short position
return 'increased' if abs(new_size) > abs(old_size) else 'decreased'
return 'modified'
def track_positions(self, position_data: Dict[str, Any]) -> List[PositionChange]:
"""
Main function to track positions and detect changes
Args:
position_data: JSON data containing position information
Returns:
List of detected changes
"""
# Extract current positions
current_positions = self._extract_positions(position_data)
# Detect changes
changes = self._detect_changes(current_positions)
# Only update history if there are actual position changes (not just P&L updates)
if changes:
timestamp = position_data.get('time', int(datetime.now().timestamp() * 1000))
for coin, position in current_positions.items():
if coin not in self.position_history:
self.position_history[coin] = []
# Only add to history if this represents a significant change
# (new position, size change, leverage change, etc.)
should_add_to_history = any(
change.coin == coin and change.change_type in [
'opened_long', 'opened_short', 'closed', 'increased',
'decreased', 'flipped', 'modified'
] for change in changes
)
if should_add_to_history:
self.position_history[coin].append(position)
# Always log changes (even if empty for completeness)
self.changes_log.extend(changes)
# Always update last positions (for tracking future changes)
self.last_positions = deepcopy(current_positions)
# Save data to file after each update (but only if there were changes)
if changes:
self._save_data()
else:
# Still need to save last_positions for change detection, but not full history
self._save_last_positions()
return changes
def print_changes(self, changes: List[PositionChange]) -> None:
"""Print detected changes in a readable format"""
if not changes:
logger.info("No position changes detected.")
return
logger.info(f"\n=== Position Changes Detected ({len(changes)} changes) ===")
for change in changes:
position_info = self._get_position_info(change.new_size if change.new_size != 0 else change.old_size)
logger.info(f"\n[{change.human_time}] {change.coin} - {change.change_type.upper()}")
if change.change_type.startswith('opened'):
direction = "LONG" if change.new_size > 0 else "SHORT"
logger.info(f" New {direction} position: {abs(change.new_size):,.4f} (${change.new_position_value:,.2f})")
elif change.change_type == 'closed':
old_direction = "LONG" if change.old_size > 0 else "SHORT"
logger.info(f" Closed {old_direction} position: {abs(change.old_size):,.4f} (was ${change.old_position_value:,.2f})")
elif change.change_type == 'flipped':
old_direction = "LONG" if change.old_size > 0 else "SHORT"
new_direction = "LONG" if change.new_size > 0 else "SHORT"
logger.info(f" Position flipped from {old_direction} to {new_direction}")
logger.info(f" Size: {change.old_size:,.4f} → {change.new_size:,.4f}")
logger.info(f" Value: ${change.old_position_value:,.2f} → ${change.new_position_value:,.2f}")
elif change.change_type in ['increased', 'decreased']:
direction = "LONG" if change.new_size > 0 else "SHORT"
size_diff = change.new_size - change.old_size
value_diff = change.new_position_value - change.old_position_value
# For display purposes, show absolute values but indicate direction
logger.info(f" {direction} position {change.change_type}")
logger.info(f" Size: {change.old_size:,.4f} → {change.new_size:,.4f} ({size_diff:+,.4f})")
logger.info(f" Value: ${change.old_position_value:,.2f} → ${change.new_position_value:,.2f} ({value_diff:+,.2f})")
elif change.change_type == 'modified':
direction = "LONG" if change.new_size > 0 else "SHORT"
logger.info(f" {direction} position modified (same size: {change.new_size:,.4f})")
logger.info(f" Value: ${change.old_position_value:,.2f} → ${change.new_position_value:,.2f}")
def _get_position_info(self, size: float) -> str:
"""Get position direction info"""
if size > 0:
return "LONG"
elif size < 0:
return "SHORT"
else:
return "CLOSED"
def get_current_positions(self) -> Dict[str, PositionSnapshot]:
"""Get current positions"""
return self.last_positions.copy()
def get_position_history(self, coin: Optional[str] = None) -> Dict[str, List[PositionSnapshot]]:
"""Get position history for a specific coin or all coins"""
if coin:
return {coin: self.position_history.get(coin, [])}
return self.position_history.copy()
def clear_data(self, confirm: bool = False) -> None:
"""Clear all tracking data (use with caution)"""
if not confirm:
logger.info("Use clear_data(confirm=True) to actually clear the data.")
return
self.position_history = {}
self.last_positions = {}
self.changes_log = []
# Remove CSV files
files_to_remove = [self.positions_file, self.changes_file, self.last_positions_file]
for file_path in files_to_remove:
if os.path.exists(file_path):
os.remove(file_path)
logger.info(f"All tracking data cleared from {self.data_dir}/")
def get_stats(self) -> Dict[str, Any]:
"""Get statistics about tracked data"""
stats = {
'total_coins_tracked': len(self.position_history),
'current_active_positions': len(self.last_positions),
'total_changes': len(self.changes_log),
'data_directory': self.data_dir,
'csv_files': {
'positions_history': os.path.exists(self.positions_file),
'changes_log': os.path.exists(self.changes_file),
'last_positions': os.path.exists(self.last_positions_file)
}
}
if self.changes_log:
stats['first_change'] = self._timestamp_to_human(self.changes_log[0].timestamp)
stats['last_change'] = self._timestamp_to_human(self.changes_log[-1].timestamp)
return stats
############################################################################################################################################################################################################
# End of classes usesd to manage the copied wallet position tracking
############################################################################################################################################################################################################
## freqtrade strategy class
class COPY_HL_LS(IStrategy):
global ADDRESS_TO_TRACK_TOP
minimal_roi = {
"0": 5000.0 # Effectively disables ROI
}
stoploss = -0.95
timeframe = '5m'
startup_candle_count: int = 0
can_short: bool = True
process_only_new_candles: bool = False
position_adjustment_enable = True
# Tunable parameters
LEV = IntParameter(1, 6, default=6, space='buy', optimize=False) # Leverage to use
change_threshold = 0.5 # in %
adjustement_threshold = 10.0 # in %
ADDRESS_TO_TRACK = ADDRESS_TO_TRACK_TOP
# State variables (do not touch)
copied_account_position_changes = None
current_positions_to_copy = None
my_open_positions = None
nb_loop = 1
_cached_perp_data = None
_cache_timestamp = None
_cache_duration = 5 # seconds
_is_cooldown_after_position_change = False
_cooldown_seconds_after_position_change = 100 # seconds
_time_of_change = None
_got_perp_data_account_state_successfully = False
matching_positions_check_output = None
# Optional order type mapping.
order_types = {
'entry': 'market',
'exit': 'market',
'stoploss': 'market',
'stoploss_on_exchange': False
}
# Optional order time in force.
order_time_in_force = {
'entry': 'gtc',
'exit': 'gtc'
}
def get_stake_total(self) -> float:
stake = self.config['stake_currency'] # e.g. "USDC"
return self.wallets.get_total(stake)
def GET_PERP_ACCOUNT_STATUS(self, address):
"""Get account status with caching and error handling"""
try:
# Use cached data if recent
current_time = time.time()
if (self._cached_perp_data is not None and
self._cache_timestamp is not None and
current_time - self._cache_timestamp < self._cache_duration):
self._got_perp_data_account_state_successfully = True
return self._cached_perp_data
from hyperliquid.info import Info
from hyperliquid.utils import constants
info = Info(constants.MAINNET_API_URL, skip_ws=True)
perp_user_state = info.user_state(address)
# Cache the result
self._cached_perp_data = perp_user_state
self._cache_timestamp = current_time
self._got_perp_data_account_state_successfully = True
return perp_user_state
except Exception as e:
logger.error(f"Failed to get perp account status: {e}")
self._got_perp_data_account_state_successfully = False
# Return cached data if available, otherwise None
return self._cached_perp_data if self._cached_perp_data else None
def is_symbol_whitelisted(self, symbol: str) -> bool:
"""
Returns True if the given trading pair symbol is currently in the whitelist.
"""
if not self.dp:
# If DataProvider isn't available (e.g., outside strategy context)
return False
# Retrieve current whitelist from DataProvider
current_list = self.dp.current_whitelist()
return any(symbol in s for s in current_list)
def check_print_positions_summary(self):
"""
Print a nicely formatted summary of current positions, scale factor, and comparisons.
Returns:
list[dict]: For each matching position, a dict with:
- 'coin': str
- 'diff_pc': float # % difference vs expected scaled value
- 'my_value': float # actual USD value of my position
"""
matching_positions_output = []
self.wallets.update()
try:
logger.info("=" * 80)
logger.info("POSITIONS SUMMARY")
logger.info("=" * 80)
# Account values and scale factor
perp_data = self.GET_PERP_ACCOUNT_STATUS(self.ADDRESS_TO_TRACK)
if perp_data:
copied_account_value = float(perp_data['marginSummary']['accountValue'])
my_account_value = float(self.get_stake_total())
scale_factor = my_account_value / copied_account_value
logger.info(f"Copied Account Value: ${copied_account_value:,.2f}")
logger.info(f"My Account Value: ${my_account_value:,.2f}")
logger.info(f"Scale Factor: {scale_factor:.6f}x (inverted {1.0/scale_factor:.1f}x )")
logger.info("-" * 50)
else:
logger.info("No cached perp data available")
return matching_positions_output
# Current positions to copy
logger.info("POSITIONS TO COPY:")
if self.current_positions_to_copy:
for coin, position in self.current_positions_to_copy.items():
position_value = position.position_value
size = float(position.size)
ratio_pc = position_value / copied_account_value * 100.0
position_type = "LONG" if size > 0 else "SHORT"
scaled_value = position_value * scale_factor
logger.info(f" {coin:>8} | {position_type:>5} | Size: {size:>12.4f} | "
f"Value: ${position_value:>10.2f} ({ratio_pc:>5.2f}%) | "
f"Scaled: ${scaled_value:>10.2f}")
else:
logger.info(" No positions to copy")
logger.info("-" * 50)
# My current open positions
logger.info("MY OPEN POSITIONS:")
if self.my_open_positions:
for trade in self.my_open_positions:
coin = trade.pair.replace("/USDC:USDC", "")
ticker = self.dp.ticker(trade.pair)
rate = ticker['last']
position_value = trade.amount * rate
stake_amount = trade.stake_amount
ratio_pc = position_value / my_account_value * 100.0
position_type = "LONG" if trade.is_short == False else "SHORT"
logger.info(f" {coin:>8} | {position_type:>5} | Stake: ${stake_amount:>10.2f} | "
f"Value: ${position_value:>10.2f} ({ratio_pc:>5.2f}%) | "
f"Leverage: {trade.leverage}x")
else:
logger.info(" No open positions")
logger.info("-" * 50)
# Position matching analysis
logger.info("POSITION MATCHING ANALYSIS:")
if self.current_positions_to_copy and self.my_open_positions:
copied_coins = set(self.current_positions_to_copy.keys())
my_coins = set(trade.pair.replace("/USDC:USDC", "") for trade in self.my_open_positions)
# Positions that match
matching = copied_coins.intersection(my_coins)
if matching:
logger.info(" Matching positions:")
for coin in matching:
copied_pos = self.current_positions_to_copy[coin]
my_trade = next(t for t in self.my_open_positions if t.pair.replace("/USDC:USDC", "") == coin)
copied_value = copied_pos.position_value
ticker = self.dp.ticker(my_trade.pair)
rate = ticker['last']
logger.info(f"amount of {my_trade.pair}: {my_trade.amount}")
my_value = my_trade.amount * rate
expected_value = copied_value * scale_factor
diff_pc = ((my_value - expected_value) / expected_value * 100) if expected_value > 0 else 0.0
logger.info(f" {coin:>8} | Copied: ${copied_value:>8.2f} -> Expected: ${expected_value:>8.2f} | "
f"Actual: ${my_value:>8.2f} | Diff: {diff_pc:>6.1f}%")
matching_positions_output.append({
"coin": coin,
"diff_pc": float(diff_pc),
"my_value": float(my_value)
})
# Positions I should have but don't
should_have = copied_coins - my_coins
if should_have:
logger.info(" Missing positions (should open if in whitelist and significant size):")
for coin in should_have:
pos = self.current_positions_to_copy[coin]
size = float(pos.size)
position_type = "LONG" if size > 0 else "SHORT"
# Skip if scaled position < 0.5% of my account
expected_value = pos.position_value * scale_factor
expected_ratio_pc_my = (expected_value / my_account_value * 100.0) if my_account_value > 0 else 0.0
if expected_ratio_pc_my < 0.5:
continue
ratio_pc_copied = pos.position_value / copied_account_value * 100.0
significant = "✓" if ratio_pc_copied >= self.change_threshold else "✗"
if self.is_symbol_whitelisted(coin):
in_wl = ', in whitelist'
else:
in_wl = ', not in whitelist'
logger.info(
f" {coin:>8} | {position_type:>5} | Copied ${pos.position_value:>8.2f} "
f"({ratio_pc_copied:>5.2f}% of copied) | "
f"Expected scaled: ${expected_value:>8.2f} ({expected_ratio_pc_my:>5.2f}% of mine) {significant} {in_wl}"
)
# Positions I have but shouldn't
shouldnt_have = my_coins - copied_coins
if shouldnt_have:
logger.info(" Extra positions (should close):")
for coin in shouldnt_have:
my_trade = next(t for t in self.my_open_positions if t.pair.replace("/USDC:USDC", "") == coin)
ticker = self.dp.ticker(my_trade.pair)
rate = ticker['last']
my_value = trade.amount * rate
position_type = "LONG" if my_trade.is_short == False else "SHORT"
logger.info(f" {coin:>8} | {position_type:>5} | ${my_value:>8.2f}")
logger.info("=" * 80)
return matching_positions_output
except Exception as e:
logger.error(f"Error in print_positions_summary: {e}")
return matching_positions_output
def bot_start(self, **kwargs) -> None:
"""
Called only once after bot instantiation.
:param **kwargs: Ensure to keep this here so updates to this won't break your strategy.
"""
# because in Live (real money, real account) the value returned by self.dp.ticker(trade.pair) or trade.amount takes some time (> 1 minute) to be refreshed, even if we call self.wallets.update()
if self.config["runmode"].value in ('live'):
self._cooldown_seconds_after_position_change = 120
elif self.config["runmode"].value in ('dry_run'):
self._cooldown_seconds_after_position_change = 5
def bot_loop_start(self, current_time: datetime, **kwargs) -> None:
"""
Called at the start of the bot iteration (one loop). For each loop, it will run populate_indicators on all pairs.
Might be used to perform pair-independent tasks
(e.g. gather some remote resource for comparison)
:param current_time: datetime object, containing the current datetime
:param **kwargs: Ensure to keep this here so updates to this won't break your strategy.
"""
logger.info(f"Loop #{self.nb_loop}")
self.nb_loop += 1
try:
# Initialize tracker
here = Path(__file__).resolve().parent / 'position_data'
tracker = PositionTracker(data_dir=here)
perp_data = self.GET_PERP_ACCOUNT_STATUS(self.ADDRESS_TO_TRACK)
if perp_data is None:
logger.error("Failed to get perp data, using empty position changes")
self.copied_account_position_changes = []
self.current_positions_to_copy = {}
else:
self.copied_account_position_changes = tracker.track_positions(perp_data)
self.current_positions_to_copy = tracker._extract_positions(perp_data)
logger.info(f"Position changes: {self.copied_account_position_changes}")
tracker.print_changes(self.copied_account_position_changes)
self.my_open_positions = Trade.get_trades_proxy(is_open=True)
logger.info("Current positions to copy:")
logger.info(self.current_positions_to_copy)
logger.info("My current positions:")
logger.info(self.my_open_positions)
except Exception as e:
logger.error(f"Error in bot_loop_start: {e}")
# Initialize with safe defaults
self.copied_account_position_changes = []
self.current_positions_to_copy = {}
self.my_open_positions = []
self._got_perp_data_account_state_successfully = False
self.matching_positions_check_output = self.check_print_positions_summary()
def populate_indicators(self, df: pd.DataFrame, metadata: dict) -> pd.DataFrame:
coin_ticker = metadata['pair'].replace("/USDC:USDC", "")
df['signal'] = 2 # Default: do nothing
if not self._got_perp_data_account_state_successfully: # skip (do nothing) if API call to get perp data copied account state failed
return df
perp_data = self.GET_PERP_ACCOUNT_STATUS(self.ADDRESS_TO_TRACK)
# Handle position changes
if self.copied_account_position_changes:
for chg in self.copied_account_position_changes:
if coin_ticker in chg.coin:
copied_account_value = float(perp_data['marginSummary']['accountValue'])
#logger.info(f"copied account value: {copied_account_value}")
position_value_in_copied_account = float(chg.new_position_value) # in USDC
ratio_pc = position_value_in_copied_account/copied_account_value*100.0
# Handle both long and short opens
if 'opened_long' == chg.change_type:
if ratio_pc>self.change_threshold:
df['signal'] = 1 # Long entry signal
return df
else:
logger.info(f"Not opening long position on {coin_ticker} because position size in copied account is too small compared to the copied account equity ({ratio_pc:.2f} , less than {self.change_threshold}%)")
elif 'opened_short' == chg.change_type:
if ratio_pc>self.change_threshold:
df['signal'] = -1 # Short entry signal
return df
else:
logger.info(f"Not opening short position on {coin_ticker} because position size in copied account is too small compared to the copied account equity ({ratio_pc:.2f} , less than {self.change_threshold}%)")
elif 'closed' in chg.change_type:
df['signal'] = 0 # Exit signal for both long and short
return df
else: # Handle missed entries/exits when no changes detected
df = self._check_missed_entry_or_exit(coin_ticker, df)
# check if position direction mismatch -> send exit signal
df = self.check_position_direction_mismatch(df, coin_ticker)
return df
def _check_missed_entry_or_exit(self, coin_ticker, df):
"""Helper method to check for missed positions"""
try:
my_trades = Trade.get_trades_proxy(is_open=True)
my_current_opened_tickers = [tr.pair.replace("/USDC:USDC", "") for tr in my_trades]
# Check for missed entries (both long and short)
if coin_ticker in self.current_positions_to_copy:
if coin_ticker not in my_current_opened_tickers:
size = float(self.current_positions_to_copy[coin_ticker].size)
is_short = size < 0.0
if self._is_position_significant(coin_ticker):
if is_short:
df['signal'] = -1 # Short entry signal
logger.info(f"Missed short entry detected for {coin_ticker}. Sending short entry signal.")
else:
df['signal'] = 1 # Long entry signal
logger.info(f"Missed long entry detected for {coin_ticker}. Sending long entry signal.")
# Check for missed exits
# not in current positions to copy, but somehow in my current position
if coin_ticker not in self.current_positions_to_copy and coin_ticker in my_current_opened_tickers:
df['signal'] = 0
logger.info(f"Missed exit detected for {coin_ticker}. Sending exit signal.")
# in current positions to copy but not really because very small amount, but somehow in my current position
if coin_ticker in self.current_positions_to_copy and coin_ticker in my_current_opened_tickers:
if not self._is_position_significant(coin_ticker) :
df['signal'] = 0
logger.info(f"Missed exit detected for {coin_ticker}. Sending exit signal.")
except Exception as e:
logger.error(f"Error checking missed positions for {coin_ticker}: {e}")
return df
def _is_position_significant(self, coin_ticker):
"""Check if position is significant enough to copy"""
try:
if not self._cached_perp_data:
return None
perp_data = self.GET_PERP_ACCOUNT_STATUS(self.ADDRESS_TO_TRACK)
copied_account_value = float(perp_data['marginSummary']['accountValue'])
#logger.info(f"copied account value: {copied_account_value}")
min_threshold = copied_account_value / (100.0/self.change_threshold) # 1% threshold
position_value = self.current_positions_to_copy[coin_ticker].position_value
return position_value > min_threshold
except Exception as e:
logger.error(f"Error checking position significance: {e}")
return None
def check_position_direction_mismatch(self, df, coin_ticker):
"""
Check if my position direction doesn't match the copied position direction
"""
if coin_ticker in self.current_positions_to_copy:
my_trades = Trade.get_trades_proxy(is_open=True)
for trade in my_trades:
if trade.pair.replace("/USDC:USDC", "") == coin_ticker:
copied_size = float(self.current_positions_to_copy[coin_ticker].size)
copied_is_short = copied_size < 0.0
my_is_short = trade.is_short
# If directions don't match, exit the position
if copied_is_short != my_is_short:
df['signal'] = 0
logger.info(f"Position direction mismatch on {coin_ticker}. Copied: {'SHORT' if copied_is_short else 'LONG'}, Mine: {'SHORT' if my_is_short else 'LONG'}. Exiting to realign.")
return df
return df
def populate_entry_trend(self, dataframe: pd.DataFrame, metadata: dict) -> pd.DataFrame:
dataframe.loc[dataframe['signal'] == 1, 'enter_long'] = 1
dataframe.loc[dataframe['signal'] == -1, 'enter_short'] = 1
return dataframe
def populate_exit_trend(self, dataframe: pd.DataFrame, metadata: dict) -> pd.DataFrame:
dataframe.loc[dataframe['signal'] == 0, 'exit_long'] = 1
dataframe.loc[dataframe['signal'] == 0, 'exit_short'] = 1
return dataframe
def custom_stake_amount(self, pair: str, current_time: datetime, current_rate: float,
proposed_stake: float, min_stake: float | None, max_stake: float,
leverage: float, entry_tag: str | None, side: str,
**kwargs) -> float:
# Called before entering a trade, makes it possible to manage your position size when placing a new trade.
# Returning 0 or None will prevent trades from being placed -> ACTS ALSO LIKE AN ENTRY CONFIRMATION
# Freqtrade will fall back to the proposed_stake value should your code raise an exception. The exception itself will be logged.
# You do not have to ensure that min_stake <= returned_value <= max_stake. Trades will succeed as the returned value will be clamped to supported range and this action will be logged.
coin_ticker = pair.replace("/USDC:USDC", "")
self.wallets.update()
if not self._got_perp_data_account_state_successfully :
return None
try:
if not self._cached_perp_data:
logger.error("No cached perp data available")
return None
perp_data = self.GET_PERP_ACCOUNT_STATUS(self.ADDRESS_TO_TRACK)
copied_account_value = float(perp_data['marginSummary']['accountValue'])
my_account_value = float(self.get_stake_total())
scale_factor = my_account_value / copied_account_value
# Look in both position changes and current positions
position_value_in_copied_account = None
copied_position_size = None
# First check position changes
for chg in self.copied_account_position_changes:
if coin_ticker == chg.coin:
position_value_in_copied_account = float(chg.new_position_value)
copied_position_size = float(chg.new_size)
break
# If not found in changes, check current positions
if position_value_in_copied_account is None:
if coin_ticker in self.current_positions_to_copy:
position_value_in_copied_account = self.current_positions_to_copy[coin_ticker].position_value
copied_position_size = float(self.current_positions_to_copy[coin_ticker].size)
if position_value_in_copied_account is None or copied_position_size is None:
logger.warning(f"No position data found for {coin_ticker}")
return None
# Determine position direction
is_short_position = copied_position_size < 0.0
position_type = "SHORT" if is_short_position else "LONG"
# Validate that the side matches the position direction
if (side == "short" and not is_short_position) or (side == "long" and is_short_position):
logger.warning(f"Position direction mismatch for {coin_ticker}: side={side}, copied_position={'SHORT' if is_short_position else 'LONG'}")
return None
ratio_pc = position_value_in_copied_account/copied_account_value * 100.0
if ratio_pc < self.change_threshold:
logger.info(f"Not opening {position_type} position on {pair} because position size in copied account is too small ({ratio_pc:.1f}% < {self.change_threshold}%)")
return None
dust_USDC = 0.51
returned_val = position_value_in_copied_account * scale_factor
returned_val = (returned_val / leverage) - dust_USDC
if returned_val < min_stake:
returned_val = min_stake
logger.info(f"Calculated stake for {position_type} {pair}: {returned_val} (copied value: ${position_value_in_copied_account:.2f}, scale: {scale_factor:.4f})")
return returned_val
except Exception as e:
logger.error(f"Error in custom_stake_amount: {e}")
return None
def adjust_trade_position(self, trade: Trade, current_time: datetime,
current_rate: float, current_profit: float,
min_stake: float | None, max_stake: float,
current_entry_rate: float, current_exit_rate: float,
current_entry_profit: float, current_exit_profit: float,
**kwargs
) -> float | None | tuple[float | None, str | None]:
# :return float: Stake amount to adjust your trade,
# Positive values to increase position, Negative values to decrease position.
# Return None for no action.
# Optionally, return a tuple with a 2nd element with an order reason
coin_ticker = trade.pair.replace("/USDC:USDC", "")
self.wallets.update()
dust_USDC = 0.51
if not self._got_perp_data_account_state_successfully :
return None
if self._time_of_change is not None:
if datetime.now() > self._time_of_change + timedelta(seconds=self._cooldown_seconds_after_position_change):
self._is_cooldown_after_position_change = False
if self._is_cooldown_after_position_change:
logger.info(f"Not doing position size change because of the cooldown of {self._cooldown_seconds_after_position_change} seconds.")
return None
try:
if not self._cached_perp_data:
return None
perp_data = self.GET_PERP_ACCOUNT_STATUS(self.ADDRESS_TO_TRACK)
if self.copied_account_position_changes:
copied_account_value = float(perp_data['marginSummary']['accountValue'])
my_account_value = float(self.get_stake_total())
scale_factor = my_account_value / copied_account_value
# for detected changes in the copied account
for chg in self.copied_account_position_changes:
if coin_ticker == chg.coin:
# Check if change is significant (>0.5% of account), otherwise skip doing adjustment by returning None
change_ratio_pc = abs(float(chg.old_position_value) - float(chg.new_position_value)) / copied_account_value * 100.0
if change_ratio_pc < self.change_threshold:
logger.info(f"Not adjusting position on {trade.pair} because position change in copied account is too small ({change_ratio_pc:.1f}% < {self.change_threshold}%)")
return None
# Determine position direction for logging
position_type = "SHORT" if trade.is_short else "LONG"
old_size = float(chg.old_size) if chg.old_size is not None else 0.0
new_size = float(chg.new_size)
delta_stake = abs(float(chg.old_position_value) - float(chg.new_position_value)) * scale_factor
adjustment_amount = delta_stake / trade.leverage - dust_USDC
self._time_of_change = datetime.now()
self._is_cooldown_after_position_change = True
if 'increased' in chg.change_type:
logger.info(f"Increasing {position_type} position on {trade.pair}: +${adjustment_amount:.2f} (copied size: {old_size:.4f} -> {new_size:.4f})")
return adjustment_amount
elif 'decreased' in chg.change_type:
logger.info(f"Decreasing {position_type} position on {trade.pair}: -${adjustment_amount:.2f} (copied size: {old_size:.4f} -> {new_size:.4f})")
return -1.0 * adjustment_amount
# for already opened positions, if difference with what it should be in copied account (and scaled) is too large (>10%), adjust to match
if self.matching_positions_check_output:
for pos in self.matching_positions_check_output:
logger.info(f"{pos['coin']} → Difference: {pos['diff_pc']:.2f}% (my total value: {pos['my_value']:.1f}) ; ||>{self.adjustement_threshold:.0f}% will trigger a size correction.")
if pos['coin']==coin_ticker:
if abs(pos['diff_pc'])>self.adjustement_threshold:
position_type = "SHORT" if trade.is_short else "LONG"
delta_stake = pos['my_value']/(1.0 + pos['diff_pc']/100.0)-pos['my_value']
adjustment_amount = delta_stake / trade.leverage - dust_USDC
action = "Increasing" if delta_stake > 0 else "Decreasing"
logger.info(f"{action} {position_type} position on {trade.pair} to correct size mismatch: ${adjustment_amount:.2f} (diff: {pos['diff_pc']:.1f}%)")
self._time_of_change = datetime.now()
self._is_cooldown_after_position_change = True
return adjustment_amount
return None
except Exception as e:
logger.error(f"Error in adjust_trade_position: {e}")
return None
def leverage(self, pair: str, current_time: datetime, current_rate: float,
proposed_leverage: float, max_leverage: float, entry_tag: str | None, side: str,
**kwargs) -> float:
lev = min(self.LEV.value, max_leverage)
return lev