Timeframe
15m
Direction
Long Only
Stoploss
-85.0%
Trailing Stop
No
ROI
0m: -100.0%
Interface Version
3
Startup Candles
N/A
Indicators
0
# pragma pylint: disable=missing-docstring, invalid-name, pointless-string-statement
# flake8: noqa: F401
# isort: skip_file
# --- Do not remove these libs ---
from warnings import simplefilter
import numpy as np # noqa
import pandas as pd # noqa
import sys
import threading
from run_avellaneda_param_calculation import run_avellaneda_param_calculation
from pandas import DataFrame
from functools import reduce
import json
import logging
from pathlib import Path
from freqtrade.strategy import (BooleanParameter, CategoricalParameter, DecimalParameter,
IStrategy, IntParameter, stoploss_from_absolute, informative)
from freqtrade.exchange import timeframe_to_prev_date
from freqtrade.persistence import Trade, Order
from datetime import datetime, timezone
# --------------------------------
# Add your lib to import here
import math
from typing import Optional, Tuple
from dataclasses import dataclass
from pair_loader import get_active_pair, pair_to_ticker
logger = logging.getLogger(__name__)
# Setup dedicated logger for market making values
mm_logger = logging.getLogger('market_making_values')
mm_logger.setLevel(logging.INFO)
log_file_path = Path(__file__).parent / 'log_ave_mm.txt'
mm_handler = logging.FileHandler(log_file_path)
mm_formatter = logging.Formatter('%(asctime)s - %(message)s')
mm_handler.setFormatter(mm_formatter)
mm_logger.addHandler(mm_handler)
mm_logger.propagate = False
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)
pd.set_option('display.max_colwidth', None)
pd.options.mode.chained_assignment = None
def calculate_optimal_spreads(mid_price, sigma, k_bid, k_ask, gamma, time_remaining, q_inventory_exposure, fee):
"""Compute reservation price and bid/ask quotes, logging the inputs/outputs."""
sigma_abs = sigma * mid_price
reservation_decay = gamma * sigma_abs**2.0 * time_remaining
risk_term = 0.5 * reservation_decay
half_spread_bid = risk_term + (1.0 / gamma) * np.log(1.0 + (gamma / k_bid))
half_spread_ask = risk_term + (1.0 / gamma) * np.log(1.0 + (gamma / k_ask))
r = mid_price - q_inventory_exposure * reservation_decay
r_b = r - half_spread_bid - mid_price * fee
r_a = r + half_spread_ask + mid_price * fee
delta_a_rel = r_a - mid_price
delta_b_rel = mid_price - r_b
delta_a_percent = (delta_a_rel / mid_price) * 100.0
delta_b_percent = (delta_b_rel / mid_price) * 100.0
mm_logger.info("=" * 65)
mm_logger.info("AVELLANEDA-STOIKOV MODEL PARAMETERS")
mm_logger.info("=" * 65)
mm_logger.info(f"Time Remaining Fraction: {time_remaining:>12.4f}")
mm_logger.info(f"Inventory Exposure: {q_inventory_exposure:>12.4f}")
mm_logger.info(f"Sigma (Volatility): {sigma:>12.6f}")
mm_logger.info(f"K Bid: {k_bid:>12.6f}")
mm_logger.info(f"K Ask: {k_ask:>12.6f}")
mm_logger.info(f"Maker fee: {fee:>12.6f}")
mm_logger.info(f"Gamma: {gamma:>12.6f}")
mm_logger.info(f"Mid-Price: {mid_price:>12.4f}")
mm_logger.info(f"Reservation Price: {r:>12.4f}")
mm_logger.info(f"Buy Spread (% of mid): {delta_b_percent:>12.4f}%")
mm_logger.info(f"Sell Spread (% of mid): {delta_a_percent:>12.4f}%")
mm_logger.info(f"Buy Limit Price: {r_b:>12.4f}")
mm_logger.info(f"Sell Limit Price: {r_a:>12.4f}")
mm_logger.info("=" * 65)
return r_b, r_a
def _fmt_optional(value: float | None, digits: int = 6) -> str:
return f"{value:.{digits}f}" if isinstance(value, (int, float)) else "n/a"
#---------------------------------------------------------- LOAD CONFIG ----------------------------------------------------------
def get_params_directory():
"""
Get the directory containing parameter JSON files.
Uses environment variable AVELLANEDA_PARAMS_DIR if set, otherwise searches for scripts/ directory.
Works consistently whether running locally, in Docker, or in a container.
Returns:
Path: Directory where parameter files are located
"""
import os
# First check environment variable
env_path = os.getenv('AVELLANEDA_PARAMS_DIR')
if env_path:
params_dir = Path(env_path).resolve()
logger.info(f"Using params directory from AVELLANEDA_PARAMS_DIR: {params_dir}")
return params_dir
# Fall back to scripts directory relative to project root
try:
current_file = Path(__file__).resolve()
current_dir = current_file.parent
except NameError: # e.g., interactive
current_dir = Path(sys.argv[0]).resolve().parent if sys.argv and sys.argv[0] else Path.cwd()
# Search for scripts directory
search_paths = [
current_dir / '../../scripts', # From user_data/strategies/
current_dir / '../scripts',
current_dir / 'scripts',
current_dir.parent.parent / 'scripts',
]
for path in search_paths:
resolved = path.resolve()
if resolved.exists() and resolved.is_dir():
logger.info(f"Using params directory: {resolved}")
return resolved
# If scripts/ not found, use current directory as fallback
logger.warning(f"scripts/ directory not found, using current directory: {current_dir}")
return current_dir
def find_upwards(filename: str, start: Path, max_up: int = 10) -> Path:
p = start.resolve()
for _ in range(max_up + 1):
candidate = p / filename
if candidate.exists():
return candidate
if p.parent == p:
break
p = p.parent
raise FileNotFoundError(f"Could not find {filename} from {start}")
def log_parameters_summary(params_file: Path, params: dict) -> None:
"""Log a concise summary of the loaded parameters and spreads."""
market_data = params.get('market_data', {})
optimal_params = params.get('optimal_parameters', {})
limit_orders = params.get('limit_orders', {})
calculated_values = params.get('calculated_values', {})
k_bid = market_data.get('k_bid', market_data.get('k'))
k_ask = market_data.get('k_ask', market_data.get('k'))
sigma = market_data.get('sigma')
gamma = optimal_params.get('gamma')
time_horizon_hours = optimal_params.get('time_horizon_hours')
mid_price = market_data.get('mid_price') or calculated_values.get('reservation_price')
delta_b = limit_orders.get('delta_b', calculated_values.get('half_spread_bid'))
delta_a = limit_orders.get('delta_a', calculated_values.get('half_spread_ask'))
delta_b_percent = limit_orders.get('delta_b_percent')
delta_a_percent = limit_orders.get('delta_a_percent')
bid_price = limit_orders.get('bid_price')
ask_price = limit_orders.get('ask_price')
logger.info(
"Parameter summary | source=%s | gamma=%s | sigma=%s | k_bid=%s | k_ask=%s | horizon_h=%s | mid=%s",
params_file,
_fmt_optional(gamma),
_fmt_optional(sigma),
_fmt_optional(k_bid),
_fmt_optional(k_ask),
_fmt_optional(time_horizon_hours, 4),
_fmt_optional(mid_price, 4),
)
logger.info(
"Spread snapshot | bid=%s (%s%%) | ask=%s (%s%%) | bid_px=%s | ask_px=%s",
_fmt_optional(delta_b),
_fmt_optional(delta_b_percent, 4),
_fmt_optional(delta_a),
_fmt_optional(delta_a_percent, 4),
_fmt_optional(bid_price, 4),
_fmt_optional(ask_price, 4),
)
def load_configs(start_dir: Path | None = None, max_up: int = 10):
"""
Load Avellaneda parameters from JSON file, searching in multiple locations.
Args:
start_dir: Starting directory for search (defaults to current file's directory)
max_up: Maximum levels to search upwards
Returns:
dict: Loaded parameters from JSON file
Raises:
FileNotFoundError: If the parameters file cannot be found
"""
params_dir = get_params_directory()
try:
active_pair = get_active_pair()
except Exception:
active_pair = None
ticker = pair_to_ticker(active_pair) or "PAXG"
param_file_name = f"avellaneda_parameters_{ticker}.json"
logger.info(f"Resolving parameters for pair '{active_pair or 'unknown'}' (ticker '{ticker}') using {params_dir}")
params_file = params_dir / param_file_name
if params_file.exists():
try:
params_MM = json.loads(params_file.read_text(encoding='utf-8'))
logger.info(f"Successfully loaded parameters from: {params_file}")
log_parameters_summary(params_file, params_MM)
return params_MM
except (json.JSONDecodeError, IOError) as e:
logger.error(f"Error reading {params_file}: {e}")
if start_dir is None:
try:
start_dir = Path(__file__).resolve().parent
except NameError: # e.g., interactive
start_dir = Path(sys.argv[0]).resolve().parent if sys.argv and sys.argv[0] else Path.cwd()
search_locations = [
f"scripts/{param_file_name}",
param_file_name,
f"user_data/strategies/{param_file_name}",
f"../scripts/{param_file_name}",
f"../../scripts/{param_file_name}"
]
for location in search_locations:
try:
params_file_found = find_upwards(location, start_dir, max_up)
params_MM = json.loads(params_file_found.read_text(encoding='utf-8'))
logger.info(f"Successfully loaded parameters from: {params_file_found}")
log_parameters_summary(params_file_found, params_MM)
return params_MM
except FileNotFoundError:
continue
except (json.JSONDecodeError, IOError) as e:
logger.error(f"Error reading {location}: {e}")
continue
raise FileNotFoundError(
f"Could not find '{param_file_name}' in any of these locations:\n"
f" - Primary: {params_file}\n"
f" - " + "\n - ".join(str(start_dir / loc) for loc in search_locations)
)
class avellaneda(IStrategy):
# Strategy interface version - allow new iterations of the strategy interface.
# Check the documentation or the Sample strategy to get the latest version.
INTERFACE_VERSION = 3
# Can this strategy go short?
can_short: bool = False
use_custom_stoploss: bool = False
process_only_new_candles: bool = False
position_adjustment_enable: bool = False
max_entry_position_adjustment = 0
startup_candle_count: int = 0
# Minimum number of data collection periods required before trading
min_data_periods: int = 3
minimal_roi = {
"0": -1
}
params_MM = None
gamma = None
k_bid = None
k_ask = None
sigma = None
time_horizon_hours = None
fees_HL_maker = 0.02/100.0
nb_loop = 0
stoploss = -0.85
trailing_stop = False
timeframe = '15m'
order_types = {
'entry': 'limit',
'exit': 'limit',
'stoploss': 'limit',
"emergency_exit": "limit",
'stoploss_on_exchange': False
}
order_time_in_force = {
'entry': 'gtc',
'exit': 'gtc'
}
def bot_start(self, **kwargs) -> None:
"""
Called only once after bot instantiation.
:param **kwargs: Ensure to keep this here so updates to this won't break your strategy.
"""
pairs = self.dp.current_whitelist()
if len(pairs)!=1:
sys.exit()
if not self.can_short:
logger.info('Running calculation of parameters')
symbol = pairs[0].replace("/USDC:USDC","")
logger.info(f"Current symbol: {symbol}")
run_avellaneda_param_calculation()
self.params_MM = load_configs()
self.gamma = self.params_MM['optimal_parameters']['gamma']
self.k_bid = self.params_MM['market_data'].get('k_bid', self.params_MM['market_data'].get('k'))
self.k_ask = self.params_MM['market_data'].get('k_ask', self.params_MM['market_data'].get('k'))
self.sigma = self.params_MM['market_data']['sigma']
# Default to 0.5 hours if not present in older JSONs
self.time_horizon_hours = self.params_MM['optimal_parameters'].get('time_horizon_hours', 0.5)
# Check if we have sufficient data periods
num_periods = self.params_MM.get('current_state', {}).get('num_data_periods', 0)
if num_periods < self.min_data_periods:
logger.warning(f"Insufficient data collected: {num_periods}/{self.min_data_periods} periods. Trading will be disabled until more data is collected.")
else:
logger.info(f"Sufficient data available: {num_periods} periods collected.")
def bot_loop_start(self, current_time: datetime, **kwargs) -> None:
"""
Called at the start of the bot iteration (one loop). For each loop, it will run populate_indicators on all pairs.
Might be used to perform pair-independent tasks
(e.g. gather some remote resource for comparison)
:param current_time: datetime object, containing the current datetime
:param **kwargs: Ensure to keep this here so updates to this won't break your strategy.
"""
if not self.can_short:
if self.nb_loop%10==0:
logger.info('Running calculation of parameters')
run_avellaneda_param_calculation()
self.nb_loop = self.nb_loop + 1
self.params_MM = load_configs()
self.gamma = self.params_MM['optimal_parameters']['gamma']
self.k_bid = self.params_MM['market_data'].get('k_bid', self.params_MM['market_data'].get('k'))
self.k_ask = self.params_MM['market_data'].get('k_ask', self.params_MM['market_data'].get('k'))
self.sigma = self.params_MM['market_data']['sigma']
self.time_horizon_hours = self.params_MM['optimal_parameters'].get('time_horizon_hours', 0.5)
def informative_pairs(self):
"""
"""
return []
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""
"""
return dataframe
def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""
Only allow entry if we have sufficient data collected by the data collector.
"""
# Check if parameters are loaded and we have enough collected data periods
if self.params_MM is not None and self.sigma is not None:
# Check if we have enough data periods from the data collector
num_periods = self.params_MM.get('current_state', {}).get('num_data_periods', 0)
if num_periods >= self.min_data_periods:
dataframe.loc[:, 'enter_long'] = 1
else:
logger.warning(f"Insufficient data periods: {num_periods}/{self.min_data_periods}. Waiting for more data collection.")
dataframe.loc[:, 'enter_long'] = 0
else:
dataframe.loc[:, 'enter_long'] = 0
return dataframe
def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""
"""
dataframe.loc[:, 'exit_long'] = 0
return dataframe
def get_mid_price(self, pair: str, fallback_rate: float) -> float:
"""
Get effective mid price from orderbook based on $1000 depth.
Fallback to best mid or provided rate if orderbook unavailable or insufficient.
"""
# Request deeper orderbook to find $1000 depth
orderbook = self.dp.orderbook(pair, maximum=50)
if not orderbook or 'bids' not in orderbook or 'asks' not in orderbook:
return fallback_rate
THRESHOLD = 1000.0
# Naive Mid Calculation (Top of Book)
best_bid = orderbook['bids'][0][0] if len(orderbook['bids']) > 0 else None
best_ask = orderbook['asks'][0][0] if len(orderbook['asks']) > 0 else None
naive_mid = (best_bid + best_ask) / 2 if best_bid and best_ask else None
# Calculate Effective Bid
effective_bid = None
cum_val = 0.0
# Bids are sorted high to low
for price, amount in orderbook['bids']:
cum_val += price * amount
if cum_val >= THRESHOLD:
effective_bid = price
break
# Fallback to best bid if threshold not reached
if effective_bid is None and len(orderbook['bids']) > 0:
effective_bid = orderbook['bids'][0][0]
# Calculate Effective Ask
effective_ask = None
cum_val = 0.0
# Asks are sorted low to high
for price, amount in orderbook['asks']:
cum_val += price * amount
if cum_val >= THRESHOLD:
effective_ask = price
break
# Fallback to best ask if threshold not reached
if effective_ask is None and len(orderbook['asks']) > 0:
effective_ask = orderbook['asks'][0][0]
if effective_bid is not None and effective_ask is not None:
effective_mid = (effective_bid + effective_ask) / 2
# Log comparison
if naive_mid:
diff_pct = abs(effective_mid - naive_mid) / naive_mid * 100
logger.info(f"Price Check | Naive Mid: {naive_mid:.4f} | Effective Mid: {effective_mid:.4f} | Diff: {diff_pct:.4f}%")
return effective_mid
else:
return fallback_rate
def custom_entry_price(self, pair: str, current_time: datetime, proposed_rate: float,
entry_tag: str, side: str, **kwargs) -> float:
if self.sigma is None or self.gamma is None or self.params_MM is None:
return None
if side!="long":
return None
mid_price = self.get_mid_price(pair, proposed_rate)
symbol = pair.replace("/USDC:USDC","")
open_trades = Trade.get_open_trades()
total_quote_position = sum([float(trade.open_rate) * float(trade.amount) for trade in open_trades])
total_capital = self.wallets.get_total(self.config['stake_currency'])
# q_inventory_exposure = total_quote_position / total_capital if total_capital > 0 else 0
q_inventory_exposure = 0.0
r_buy, r_sell = calculate_optimal_spreads(mid_price,self.sigma,self.k_bid,self.k_ask,self.gamma,self.time_horizon_hours/24.0,q_inventory_exposure,self.fees_HL_maker)
return r_buy
def custom_exit_price(self, pair: str, trade: Trade,
current_time: datetime, proposed_rate: float,
current_profit: float, exit_tag: str, **kwargs) -> float:
if self.sigma is None or self.gamma is None or self.params_MM is None:
return None
if trade.is_short:
return None
mid_price = self.get_mid_price(pair, proposed_rate)
symbol = pair.replace("/USDC:USDC","")
open_trades = Trade.get_open_trades()
total_quote_position = sum([float(trade.open_rate) * float(trade.amount) for trade in open_trades])
total_capital = self.wallets.get_total(self.config['stake_currency'])
# q_inventory_exposure = total_quote_position / total_capital if total_capital > 0 else 0
q_inventory_exposure = 0.0
r_buy, r_sell = calculate_optimal_spreads(mid_price,self.sigma,self.k_bid,self.k_ask,self.gamma,self.time_horizon_hours/24.0,q_inventory_exposure,self.fees_HL_maker)
return r_sell
def adjust_entry_price(self, trade: Trade, order: Order, pair: str,
current_time: datetime, proposed_rate: float, current_order_rate: float,
entry_tag: str, side: str, **kwargs) -> float:
if self.sigma is None or self.gamma is None or self.params_MM is None:
return None
if trade.is_short:
return None
mid_price = self.get_mid_price(pair, proposed_rate)
symbol = pair.replace("/USDC:USDC","")
open_trades = Trade.get_open_trades()
total_quote_position = sum([float(trade.open_rate) * float(trade.amount) for trade in open_trades])
total_capital = self.wallets.get_total(self.config['stake_currency'])
# q_inventory_exposure = total_quote_position / total_capital if total_capital > 0 else 0
q_inventory_exposure = 0.0
r_buy, r_sell = calculate_optimal_spreads(mid_price,self.sigma,self.k_bid,self.k_ask,self.gamma,self.time_horizon_hours/24.0,q_inventory_exposure,self.fees_HL_maker)
return r_buy
def custom_exit(self, pair: str, trade: Trade, current_time: datetime, current_rate: float,
current_profit: float, **kwargs):
return "always_exit"
def leverage(self, pair: str, current_time: datetime, current_rate: float,
proposed_leverage: float, max_leverage: float, entry_tag: str | None, side: str,
**kwargs) -> float:
lev = 1
logger.info(f"Using leverage: {lev}. Should not be changed.")
return lev
# @property
# def protections(self):
# return [
# {
# "method": "MaxDrawdown",
# "lookback_period": 10080, # 1 week
# "trade_limit": 0, # Evaluate all trades since the bot started
# "stop_duration_candles": 10000000, # Stop trading indefinitely
# "max_allowed_drawdown": 0.05 # Maximum drawdown of 5% before stopping
# },
# ]