app.backbone.services.backtest_service
1import asyncio 2from datetime import date 3import itertools 4import json 5import os 6from typing import AsyncGenerator, List 7import yaml 8from app.backbone.database.db_service import DbService 9from app.backbone.entities.bot import Bot 10from app.backbone.entities.bot_performance import BotPerformance 11from app.backbone.entities.bot_trade_performance import BotTradePerformance 12from app.backbone.entities.luck_test import LuckTest 13from app.backbone.entities.metric_wharehouse import MetricWharehouse 14from app.backbone.entities.montecarlo_test import MontecarloTest 15from app.backbone.entities.random_test import RandomTest 16from app.backbone.entities.strategy import Strategy 17from app.backbone.entities.ticker import Ticker 18from app.backbone.entities.timeframe import Timeframe 19from app.backbone.entities.trade import Trade 20from app.backbone.services.config_service import ConfigService 21from app.backbone.services.operation_result import OperationResult 22from app.backbone.services.bot_service import BotService 23from app.backbone.services.utils import _performance_from_df_to_obj, trades_from_df_to_obj 24from app.backbone.utils.get_data import get_data 25from app.backbone.utils.general_purpose import load_function, profile_function, streaming_endpoint 26from app.backbone.utils.wfo_utils import run_strategy_and_get_performances 27import pandas as pd 28from sqlalchemy.orm import joinedload 29from sqlalchemy import delete, func, desc, select 30from sqlalchemy.orm import aliased 31 32 33async def log_message(queue: asyncio.Queue, ticker, timeframe, status, message, error=""): 34 """Función para agregar logs a la cola sin detener el proceso.""" 35 await queue.put(json.dumps({ 36 "ticker": ticker, 37 "timeframe": timeframe, 38 "status": status, 39 "message": message, 40 "error": error 41})) 42 43class BacktestService: 44 """ 45 Handles the orchestration, execution, persistence, and retrieval of backtests and related performance metrics. 46 47 This service acts as the central component for running backtests on trading strategies, saving their results, 48 and performing various queries on historical performance data. It supports streaming logs, asynchronous execution, 49 and interaction with the database and configuration layers. 50 51 Main features include: 52 - Running and saving individual or batch backtests across strategy/ticker/timeframe/risk combinations. 53 - Loading strategies dynamically and executing them with proper capital, leverage, and configuration. 54 - Persisting detailed performance metrics and trade history in the database. 55 - Performing deletions and cleanup of historical test data (including files and DB records). 56 - Providing filtering, querying, and tagging of backtests (e.g. by performance, robustness, favorites). 57 - Identifying robust strategies via custom metrics and filters (e.g. return/drawdown ratio > 1). 58 - Managing test dependencies such as Monte Carlo, Luck Test, and Random Test results. 59 60 Attributes: 61 db_service (DbService): Handles database connections and CRUD operations. 62 bot_service (BotService): Manages bot records used during backtesting. 63 config_service (ConfigService): Provides access to configuration values (e.g. risk-free rate). 64 65 Notes: 66 - Backtests can be persisted with reports or run temporarily. 67 - Streaming logs (via queues) allow real-time feedback when executing long-running backtests. 68 - This class assumes a full application context with all entities (Bot, BotPerformance, Trade, etc.) properly defined. 69 """ 70 def __init__(self): 71 self.db_service = DbService() 72 self.bot_service = BotService() 73 self.config_service = ConfigService() 74 75 async def async_iterator(self, iterable): 76 # Si iterable es una lista, la envolvemos en una iteración asíncrona 77 for item in iterable: 78 yield item 79 await asyncio.sleep(0) 80 81 @streaming_endpoint 82 async def run_backtest( 83 self, 84 initial_cash: float, 85 strategy: Strategy, 86 ticker: Ticker, 87 timeframe: Timeframe, 88 date_from: pd.Timestamp, 89 date_to: pd.Timestamp, 90 method: str, 91 risk: float, 92 save_bt_plot: str, 93 queue: asyncio.Queue, 94 ) -> AsyncGenerator[str, None]: 95 """ 96 Executes an asynchronous backtest for a given trading strategy and returns performance metrics, 97 trade details, and statistics via a streaming endpoint. 98 99 This function loads the specified strategy, fetches historical price data, applies leverage rules, 100 and runs a backtest simulation. Results are streamed in real-time through an asyncio queue, 101 and optional plots/reports are generated. 102 103 Steps performed: 104 - Loads the trading strategy dynamically from a module path. 105 - Retrieves leverage rules from a YAML config file. 106 - Fetches and prepares historical price data for the given ticker/timeframe. 107 - Computes margin requirements based on leverage. 108 - Executes the backtest using the strategy's logic and calculates performance metrics. 109 - Generates and saves backtest plots (temporary or persistent) if requested. 110 - Streams progress updates, logs, and final results through the provided queue. 111 112 Parameters: 113 - initial_cash (float): Starting capital for the backtest simulation. 114 - strategy (Strategy): Strategy object containing the module path and name. 115 - ticker (Ticker): Financial instrument to backtest on (e.g., currency pair, stock). 116 - timeframe (Timeframe): Timeframe for price data (e.g., '1H', '4H'). 117 - date_from (pd.Timestamp): Start date for historical data. 118 - date_to (pd.Timestamp): End date for historical data. 119 - method (str): Reserved for future backtest variations (unused in current implementation). 120 - risk (float): Risk percentage per trade (e.g., 0.01 for 1% risk). 121 - save_bt_plot (str): Plot saving mode: 122 - 'temp': Saves in a temporary directory (auto-cleaned later). 123 - 'persist': Saves in the main backtest_plots directory. 124 - Any other value skips plot generation. 125 - queue (asyncio.Queue): Async queue for real-time progress streaming. 126 127 Returns: 128 AsyncGenerator[str, None]: Yields three objects upon completion: 129 - performance (pd.DataFrame): Aggregated strategy performance metrics. 130 - trade_performance (pd.DataFrame): Detailed trade-by-trade results. 131 - stats (pd.DataFrame): Statistical summaries (e.g., Sharpe ratio, max drawdown). 132 133 Side effects: 134 - Reads leverage configurations from './app/configs/leverages.yml'. 135 - May create HTML plot files in either './app/templates/static/backtest_plots/temp' (temporary) 136 or './app/templates/static/backtest_plots' (persistent). 137 - Streams logs/results via the provided asyncio.Queue (e.g., for frontend progress updates). 138 139 Notes: 140 - The strategy module path must follow the convention: 'app.backbone.strategies.{strategy_name}'. 141 - Temporary plots include a timestamp and are automatically purged by a separate cleanup process. 142 - Risk-free rate is fetched from the application's config service (used for Sharpe ratio calculations). 143 - Margin is calculated as 1/leverage (e.g., 50x leverage → 2% margin requirement). 144 """ 145 await log_message(queue, '', '', "log", f"Loading strategy {strategy.Name}") 146 147 strategy_name = strategy.Name.split(".")[1] 148 bot_name = f'{strategy_name}_{ticker.Name}_{timeframe.Name}_{risk}' 149 150 strategy_path = 'app.backbone.strategies.' + strategy.Name 151 strategy_func = load_function(strategy_path) 152 153 await log_message(queue, '', '', "log", f"Strategy {strategy.Name} loaded succesfully") 154 await log_message(queue, '', '', "log", "Loading leverages") 155 156 with open("./app/configs/leverages.yml", "r") as file_name: 157 leverages = yaml.safe_load(file_name) 158 await log_message(queue, '', '', "log", "Leverages loaded") 159 160 leverage = leverages[ticker.Name] 161 margin = 1 / leverage 162 163 await log_message(queue, ticker.Name, timeframe.Name, "log", "Getting data") 164 165 prices = get_data(ticker.Name, timeframe.MetaTraderNumber, date_from, date_to) 166 prices.index = pd.to_datetime(prices.index) 167 168 await log_message(queue, ticker.Name, timeframe.Name, "log", f"Data ready: {prices.head(1)}") 169 170 await log_message(queue, ticker.Name, timeframe.Name, "log", "Starting backtest") 171 172 filename = f'{bot_name}_{date_from.strftime("%Y%m%d")}_{date_to.strftime("%Y%m%d")}' 173 plot_path = None 174 175 if save_bt_plot == 'temp': 176 plot_path = './app/templates/static/backtest_plots/temp' 177 await log_message(queue, ticker.Name, timeframe.Name, "link", os.path.join('/backtest_plots/temp', filename + '.html')) 178 179 elif save_bt_plot == 'persist': 180 plot_path = './app/templates/static/backtest_plots' 181 182 risk_free_rate = float(self.config_service.get_by_name('RiskFreeRate').Value) 183 184 performance, trade_performance, stats = run_strategy_and_get_performances( 185 strategy=strategy_func, 186 ticker=ticker, 187 timeframe=timeframe, 188 risk=risk, 189 prices=prices, 190 initial_cash=initial_cash, 191 risk_free_rate=risk_free_rate, 192 margin=margin, 193 plot_path=plot_path, 194 file_name = filename, 195 save_report= save_bt_plot == 'persist' # Solo genera el reporte si no es una corrida temporal 196 ) 197 198 await log_message(queue, ticker.Name, timeframe.Name, "log", "The backtest is done :)") 199 200 await log_message(queue, ticker.Name, timeframe.Name, "completed", f"{stats.to_string()}") 201 202 return performance, trade_performance, stats 203 204 @streaming_endpoint 205 async def run_backtests_and_save( 206 self, 207 initial_cash: float, 208 strategies: Strategy | List[Strategy], 209 tickers: Ticker | List[Ticker], 210 timeframes: List[Timeframe], 211 date_from: date, 212 date_to: date, 213 method: str, 214 risks: float | List[float], 215 save_bt_plot: str, 216 queue: asyncio.Queue, 217 ) -> AsyncGenerator[str, None]: 218 """ 219 Executes multiple backtests in parallel for all combinations of strategies, tickers, timeframes and risks, 220 then saves results to the database while streaming progress updates. 221 222 This function handles the complete backtesting pipeline: 223 - Generates all possible combinations of input parameters 224 - Checks for existing backtest results to avoid duplicate work 225 - Runs new backtests when needed using run_backtest() 226 - Saves performance metrics, trade details, and statistics to the database 227 - Provides real-time feedback through an async queue 228 229 Steps performed: 230 1. Normalizes all input parameters to lists (single items → single-element lists) 231 2. Generates all combinations of strategies/tickers/timeframes/risks 232 3. For each combination: 233 - Checks if bot exists in database 234 - Verifies if backtest already exists for the date range 235 - Runs new backtest if needed 236 - Converts results to database objects 237 - Saves all data (bot, performance, trades) transactionally 238 4. Streams progress, warnings, and completion messages via queue 239 240 Parameters: 241 - initial_cash (float): Starting capital for all backtests 242 - strategies (Strategy|List[Strategy]): Single strategy or list to test 243 - tickers (Ticker|List[Ticker]): Financial instrument(s) to backtest 244 - timeframes (List[Timeframe]): Time intervals to test (e.g. ['1H', '4H']) 245 - date_from (date): Start date for historical data 246 - date_to (date): End date for historical data 247 - method (str): Backtesting methodology identifier 248 - risks (float|List[float]): Risk percentage(s) per trade (e.g. [0.01, 0.02]) 249 - save_bt_plot (str): Plot saving mode: 250 - 'temp': Temporary storage 251 - 'persist': Permanent storage 252 - Other: Skip plot generation 253 - queue (asyncio.Queue): Async queue for progress streaming 254 255 Returns: 256 - AsyncGenerator[str, None]: Yields list of saved BotPerformance objects when complete 257 258 Side effects: 259 - Creates/updates database records for: 260 - Bot configurations 261 - Performance metrics 262 - Individual trades 263 - May generate plot files depending on save_bt_plot parameter 264 - Streams messages to provided queue including: 265 - Progress logs 266 - Warnings about duplicates 267 - Completion/failure notifications 268 269 Notes: 270 - Automatically skips existing backtests for the same parameters/date range 271 - Uses atomic database transactions to ensure data consistency 272 - New bots are created if they don't exist in the database 273 - Trade history is extracted from backtest stats and saved relationally 274 - All database operations use the service's db_service interface 275 - Failures for individual combinations don't stop overall execution 276 """ 277 try: 278 backtests = [] 279 combinations = None 280 281 strategies = [strategies] if type(strategies) != list else strategies 282 tickers = [tickers] if type(tickers) != list else tickers 283 timeframes = [timeframes] if type(timeframes) != list else timeframes 284 risks = [risks] if type(risks) != list else risks 285 286 combinations = itertools.product(strategies, tickers, timeframes, risks) 287 288 async for combination in self.async_iterator(list(combinations)): 289 strategy, ticker, timeframe, risk = combination 290 291 strategy_name = strategy.Name.split(".")[1] 292 293 await log_message(queue, ticker.Name, timeframe.Name, "log", "Starting") 294 295 bot_name = f'{strategy_name}_{ticker.Name}_{timeframe.Name}_{risk}' 296 297 await log_message(queue, ticker.Name, timeframe.Name, "log", "Checking bot in the database") 298 299 bot = self.bot_service.get_bot( 300 strategy_id=strategy.Id, 301 ticker_id=ticker.Id, 302 timeframe_id=timeframe.Id, 303 risk=risk 304 ) 305 306 bot_exists = False 307 308 if bot: 309 await log_message(queue, ticker.Name, timeframe.Name, "log", "The bot already exists") 310 311 bot_exists = True 312 313 await log_message(queue, ticker.Name, timeframe.Name, "log", "Checking backtest in the database") 314 result_performance = self.get_performances_by_bot_dates( 315 bot_id=bot.Id, 316 date_from=date_from, 317 date_to=date_to 318 ) 319 320 if result_performance: 321 backtests.append(result_performance) 322 await log_message(queue, ticker.Name, timeframe.Name, "warning", "The backtest already exists. Skipping...") 323 continue 324 325 else: 326 await log_message(queue, ticker.Name, timeframe.Name, "log", "The bot is not in the database") 327 bot = Bot( 328 Name=bot_name, 329 StrategyId=strategy.Id, 330 TickerId=ticker.Id, 331 TimeframeId=timeframe.Id, 332 Risk=risk 333 ) 334 335 try: 336 performance, trade_performance, stats = await self.run_backtest( 337 initial_cash=initial_cash, 338 strategy=strategy, 339 ticker=ticker, 340 timeframe=timeframe, 341 date_from=date_from, 342 date_to=date_to, 343 method=method, 344 risk=risk, 345 save_bt_plot=save_bt_plot, 346 queue=queue, 347 ) 348 349 await log_message(queue, ticker.Name, timeframe.Name, "log", "Saving metrics in the database") 350 351 bot_performance_for_db = _performance_from_df_to_obj( 352 performance, 353 date_from, 354 date_to, 355 risk, 356 method, 357 bot, 358 initial_cash, 359 ) 360 361 trade_performance_for_db = [BotTradePerformance(**row) for _, row in trade_performance.iterrows()].pop() 362 trade_performance_for_db.BotPerformance = bot_performance_for_db 363 364 with self.db_service.get_database() as db: 365 if not bot_exists: 366 self.db_service.create(db, bot) 367 368 backtests.append(self.db_service.create(db, bot_performance_for_db)) 369 370 self.db_service.create(db, trade_performance_for_db) 371 372 trade_history = trades_from_df_to_obj(stats._trades) 373 374 for trade in trade_history: 375 trade.BotPerformance = bot_performance_for_db 376 self.db_service.create(db, trade) 377 378 await log_message(queue, ticker.Name, timeframe.Name, "completed", "It's done buddy ;)") 379 380 except Exception as e: 381 await log_message(queue, '', '', "failed", f"{str(e)}") 382 383 return backtests 384 385 except Exception as e: 386 await log_message(queue, '', '', "failed", f"{str(e)}") 387 388 def get_performances_by_strategy_ticker(self, strategy_id, ticker_id) -> BotPerformance: 389 with self.db_service.get_database() as db: 390 bot_performances = ( 391 db.query(BotPerformance) 392 .join(Bot, Bot.Id == BotPerformance.BotId) 393 .filter(Bot.TickerId == ticker_id) 394 .filter(Bot.StrategyId == strategy_id) 395 .order_by(desc(BotPerformance.StabilityWeightedRar)) 396 .all() 397 ) 398 399 return bot_performances 400 401 def get_performances_by_bot_dates(self, bot_id, date_from, date_to) -> BotPerformance: 402 with self.db_service.get_database() as db: 403 bot_performance = ( 404 db.query(BotPerformance) 405 .options( 406 joinedload(BotPerformance.RandomTest).joinedload(RandomTest.RandomTestPerformance), 407 joinedload(BotPerformance.LuckTest).joinedload(LuckTest.LuckTestPerformance), 408 ) 409 .filter(BotPerformance.BotId == bot_id) 410 .filter(BotPerformance.DateFrom == date_from) 411 .filter(BotPerformance.DateTo == date_to) 412 .first() 413 ) 414 415 return bot_performance 416 417 def get_performance_by_bot(self, bot_id) -> BotPerformance: # cambiar bot_id por backtest_id 418 with self.db_service.get_database() as db: 419 bot_performance = self.db_service.get_many_by_filter(db, BotPerformance, BotId=bot_id) 420 return bot_performance 421 422 def get_bot_performance_by_id(self, bot_performance_id) -> BotPerformance: # cambiar bot_id por backtest_id 423 with self.db_service.get_database() as db: 424 bot_performance = self.db_service.get_by_id(db, BotPerformance, id=bot_performance_id) 425 426 return bot_performance 427 428 def delete_from_strategy(self, strategy_id) -> OperationResult: 429 with self.db_service.get_database() as db: 430 bp_subq = ( 431 select(BotPerformance.Id) 432 .join(Bot, BotPerformance.BotId == Bot.Id) 433 .where(Bot.StrategyId == strategy_id) 434 ).subquery() 435 436 db.execute( 437 delete(MetricWharehouse).where( 438 MetricWharehouse.MontecarloTestId.in_( 439 select(MontecarloTest.Id).where(MontecarloTest.BotPerformanceId.in_(bp_subq)) 440 ) 441 ) 442 ) 443 444 db.execute( 445 delete(MontecarloTest).where(MontecarloTest.BotPerformanceId.in_(bp_subq)) 446 ) 447 448 db.execute( 449 delete(RandomTest).where(RandomTest.BotPerformanceId.in_(bp_subq)) 450 ) 451 452 lucktest_bp_subq = ( 453 select(LuckTest.LuckTestPerformanceId) 454 .where(LuckTest.BotPerformanceId.in_(bp_subq)) 455 ).subquery() 456 457 bot_performances = db.query(BotPerformance).where(BotPerformance.Id.in_(bp_subq)).all() 458 459 db.execute(delete(BotPerformance).where(BotPerformance.Id.in_(lucktest_bp_subq))) 460 461 db.execute( 462 delete(LuckTest).where(LuckTest.BotPerformanceId.in_(bp_subq)) 463 ) 464 465 db.execute( 466 delete(BotTradePerformance).where(BotTradePerformance.BotPerformanceId.in_(bp_subq)) 467 ) 468 469 db.execute( 470 delete(Trade).where(Trade.BotPerformanceId.in_(bp_subq)) 471 ) 472 473 db.execute( 474 delete(BotPerformance).where(BotPerformance.Id.in_(bp_subq)) 475 ) 476 477 db.execute( 478 delete(Bot).where(Bot.StrategyId == strategy_id) 479 ) 480 481 # Borrar archivos relacionados 482 for bot_performance in bot_performances: 483 str_date_from = str(bot_performance.DateFrom).replace('-', '') 484 str_date_to = str(bot_performance.DateTo).replace('-', '') 485 file_name = f'{bot_performance.Bot.Name}_{str_date_from}_{str_date_to}.html' 486 for folder in ['luck_test_plots', 'correlation_plots', 't_test_plots', 'backtest_plots', 'backtest_plots/reports']: 487 path = os.path.join('./app/templates/static/', folder, file_name) 488 if os.path.exists(path): 489 os.remove(path) 490 491 return OperationResult(ok=True, message=None, item=None) 492 493 494 def delete(self, bot_performance) -> OperationResult: 495 with self.db_service.get_database() as db: 496 # Cargar el objeto con relaciones necesarias 497 bot_performance_id = bot_performance if isinstance(bot_performance, int) else bot_performance.Id 498 499 bot_performance = db.query(BotPerformance)\ 500 .options( 501 joinedload(BotPerformance.BotTradePerformance), 502 joinedload(BotPerformance.Bot), 503 joinedload(BotPerformance.LuckTest), 504 joinedload(BotPerformance.RandomTest), 505 )\ 506 .filter(BotPerformance.Id == bot_performance_id)\ 507 .first() 508 509 if not bot_performance: 510 return OperationResult(ok=False, message="BotPerformance not found", item=None) 511 512 # Eliminar relaciones directas 513 if bot_performance.BotTradePerformance: 514 self.db_service.delete(db, BotTradePerformance, bot_performance.BotTradePerformance.Id) 515 516 # Eliminar Montecarlo y métricas asociadas 517 montecarlo_test = self.db_service.get_by_filter(db, MontecarloTest, BotPerformanceId=bot_performance_id) 518 if montecarlo_test: 519 self.db_service.delete_many_by_filter(db, MetricWharehouse, MontecarloTestId=montecarlo_test.Id) 520 self.db_service.delete(db, MontecarloTest, montecarlo_test.Id) 521 522 # Eliminar LuckTests y sus performances 523 luck_tests = self.db_service.get_many_by_filter(db, LuckTest, BotPerformanceId=bot_performance_id) 524 for lt in luck_tests: 525 self.db_service.delete(db, BotPerformance, lt.LuckTestPerformanceId) 526 self.db_service.delete(db, LuckTest, lt.Id) 527 528 # Borrar archivos relacionados 529 str_date_from = str(bot_performance.DateFrom).replace('-', '') 530 str_date_to = str(bot_performance.DateTo).replace('-', '') 531 file_name = f'{bot_performance.Bot.Name}_{str_date_from}_{str_date_to}.html' 532 for folder in ['luck_test_plots', 'correlation_plots', 't_test_plots', 'backtest_plots', 'backtest_plots/reports']: 533 path = os.path.join('./app/templates/static/', folder, file_name) 534 if os.path.exists(path): 535 os.remove(path) 536 537 # Eliminar RandomTests y sus dependencias 538 if bot_performance.RandomTest: 539 rt = bot_performance.RandomTest 540 rt_perf = self.db_service.get_by_id(db, BotPerformance, rt.RandomTestPerformanceId) 541 if rt_perf: 542 rt_trade_perf = self.db_service.get_by_filter(db, BotTradePerformance, BotPerformanceId=rt_perf.Id) 543 if rt_trade_perf: 544 self.db_service.delete(db, BotTradePerformance, rt_trade_perf.Id) 545 self.db_service.delete(db, BotPerformance, rt.RandomTestPerformanceId) 546 self.db_service.delete(db, RandomTest, rt.Id) 547 548 # Borrar trades relacionados 549 self.db_service.delete_many_by_filter(db, Trade, BotPerformanceId=bot_performance_id) 550 551 # Si no hay más performance, borrar el Bot 552 if bot_performance.BotId: 553 rem = db.query(BotPerformance).filter(BotPerformance.BotId == bot_performance.BotId).count() 554 if rem == 1: 555 self.db_service.delete(db, Bot, bot_performance.BotId) 556 557 self.db_service.delete(db, BotPerformance, bot_performance.Id) 558 self.db_service.save(db) 559 560 return OperationResult(ok=True, message=None, item=None) 561 562 563 def update_favorite(self, performance_id) -> OperationResult: 564 565 with self.db_service.get_database() as db: 566 performance = self.db_service.get_by_id(db, BotPerformance, performance_id) 567 568 if not performance: 569 return OperationResult(ok=False, message='Backtest not found', item=None) 570 571 performance.Favorite = not performance.Favorite 572 updated_performance = self.db_service.update(db, BotPerformance, performance) 573 574 if not updated_performance: 575 return OperationResult(ok=False, message='Update failed', item=None) 576 577 return OperationResult(ok=True, message=None, item=updated_performance.Favorite) 578 579 def get_robusts(self) -> List[BotPerformance]: 580 with self.db_service.get_database() as db: 581 # Subquery para filtrar estrategias robustas (RreturnDd promedio > 1) 582 subquery = ( 583 db.query( 584 Bot.StrategyId, 585 Bot.TickerId, 586 ) 587 .join(BotPerformance, Bot.Id == BotPerformance.BotId) 588 .join(Timeframe, Bot.TimeframeId == Timeframe.Id) 589 .filter( 590 BotPerformance.RreturnDd != "NaN", 591 Timeframe.Selected == True 592 ) 593 .group_by(Bot.StrategyId, Bot.TickerId) 594 .having(func.avg(BotPerformance.RreturnDd) > 1) 595 .subquery() 596 ) 597 598 # Alias para evitar ambigüedad 599 bot_alias = aliased(Bot) 600 bp_alias = aliased(BotPerformance) 601 602 # Subquery para asignar un número de fila basado en StabilityWeightedRar 603 ranked_subquery = ( 604 db.query( 605 bp_alias.Id.label("bp_id"), 606 func.row_number() 607 .over( 608 partition_by=[bot_alias.StrategyId, bot_alias.TickerId], 609 order_by=bp_alias.StabilityWeightedRar.desc(), 610 ) 611 .label("row_num"), 612 ) 613 .join(bot_alias, bot_alias.Id == bp_alias.BotId) 614 .join( 615 subquery, 616 (bot_alias.StrategyId == subquery.c.StrategyId) 617 & (bot_alias.TickerId == subquery.c.TickerId), 618 ) 619 .subquery() 620 ) 621 622 # Query final seleccionando solo los mejores (row_num == 1) 623 data = ( 624 db.query(bp_alias) 625 .join(ranked_subquery, bp_alias.Id == ranked_subquery.c.bp_id) 626 .filter(ranked_subquery.c.row_num == 1) 627 .all() 628 ) 629 630 return data 631 632 def get_robusts_by_strategy_id(self, strategy_id) -> List[BotPerformance]: 633 with self.db_service.get_database() as db: 634 # Subquery para calcular el promedio de RreturnDd por StrategyId y TickerId 635 subquery = ( 636 db.query( 637 Bot.StrategyId, 638 Bot.TickerId, 639 ) 640 .join(BotPerformance, Bot.Id == BotPerformance.BotId) 641 .join(Timeframe, Bot.TimeframeId == Timeframe.Id) 642 .filter( 643 Bot.StrategyId == strategy_id, 644 BotPerformance.RreturnDd != "NaN", 645 Timeframe.Selected == True, 646 ) 647 .group_by(Bot.StrategyId, Bot.TickerId) 648 .having(func.avg(BotPerformance.RreturnDd) >= 1) # >= en lugar de > para incluir exactamente 1 649 .subquery() 650 ) 651 652 # Alias para evitar ambigüedad en las relaciones 653 bot_alias = aliased(Bot) 654 bp_alias = aliased(BotPerformance) 655 656 # Subquery para seleccionar la mejor temporalidad por StrategyId - TickerId 657 best_temporalities = ( 658 db.query( 659 bp_alias.BotId, 660 bot_alias.StrategyId, 661 bot_alias.TickerId, 662 func.max(bp_alias.StabilityWeightedRar).label("max_custom_metric") 663 ) 664 .join(bot_alias, bot_alias.Id == bp_alias.BotId) 665 .join(subquery, (bot_alias.StrategyId == subquery.c.StrategyId) & (bot_alias.TickerId == subquery.c.TickerId)) 666 .group_by(bot_alias.StrategyId, bot_alias.TickerId) 667 .subquery() 668 ) 669 670 # Query final para traer solo los registros que corresponden a la mejor temporalidad 671 data = ( 672 db.query(bp_alias) 673 .join(bot_alias, bot_alias.Id == bp_alias.BotId) 674 .join(best_temporalities, 675 (bot_alias.StrategyId == best_temporalities.c.StrategyId) & 676 (bot_alias.TickerId == best_temporalities.c.TickerId) & 677 (bp_alias.StabilityWeightedRar == best_temporalities.c.max_custom_metric)) 678 .all() 679 ) 680 681 return data 682 683 def get_favorites(self) -> List[BotPerformance]: 684 with self.db_service.get_database() as db: 685 favorites = self.db_service.get_many_by_filter(db, BotPerformance, Favorite=True) 686 687 return favorites 688 689 def get_trades(self, bot_performance_id:int) -> List[Trade]: 690 with self.db_service.get_database() as db: 691 bot_performance = self.db_service.get_by_id(db, BotPerformance, id=bot_performance_id) 692 return bot_performance.TradeHistory 693 694 def get_by_filter( 695 self, 696 return_: float = None, 697 drawdown: float = None, 698 stability_ratio: float = None, 699 sharpe_ratio: float = None, 700 trades: float = None, 701 rreturn_dd: float = None, 702 custom_metric: float = None, 703 winrate: float = None, 704 strategy: str = None, 705 ticker: str = None 706 ) -> List[BotPerformance]: 707 with self.db_service.get_database() as db: 708 filters = [] 709 710 # Base query y joins explícitos 711 query = db.query(BotPerformance)\ 712 .join(Bot, Bot.Id == BotPerformance.BotId)\ 713 .join(Strategy, Strategy.Id == Bot.StrategyId)\ 714 .join(Ticker, Ticker.Id == Bot.TickerId) 715 716 # Filtros numéricos 717 if return_ is not None: 718 filters.append(BotPerformance.Return >= return_) 719 if drawdown is not None: 720 filters.append(BotPerformance.Drawdown <= drawdown) 721 if stability_ratio is not None: 722 filters.append(BotPerformance.StabilityRatio >= stability_ratio) 723 if sharpe_ratio is not None: 724 filters.append(BotPerformance.SharpeRatio >= sharpe_ratio) 725 if trades is not None: 726 filters.append(BotPerformance.Trades >= trades) 727 if rreturn_dd is not None: 728 filters.append(BotPerformance.RreturnDd >= rreturn_dd) 729 if custom_metric is not None: 730 filters.append(BotPerformance.StabilityWeightedRar >= custom_metric) 731 if winrate is not None: 732 filters.append(BotPerformance.WinRate >= winrate) 733 734 # Filtros por texto con ILIKE correctamente aplicados 735 if strategy: 736 filters.append(Strategy.Name.ilike(f"%{strategy}%")) 737 if ticker: 738 filters.append(Ticker.Name.ilike(f"%{ticker}%")) 739 740 # Query final que aplica filtros correctamente 741 final_query = ( 742 query 743 .filter(*filters) 744 .options( 745 joinedload(BotPerformance.Bot), 746 joinedload(BotPerformance.BotTradePerformance), 747 joinedload(BotPerformance.TradeHistory), 748 joinedload(BotPerformance.MontecarloTest), 749 joinedload(BotPerformance.LuckTest), 750 joinedload(BotPerformance.RandomTest), 751 ) 752 .order_by(BotPerformance.RreturnDd.desc()) 753 .all() 754 ) 755 756 return final_query
34async def log_message(queue: asyncio.Queue, ticker, timeframe, status, message, error=""): 35 """Función para agregar logs a la cola sin detener el proceso.""" 36 await queue.put(json.dumps({ 37 "ticker": ticker, 38 "timeframe": timeframe, 39 "status": status, 40 "message": message, 41 "error": error 42}))
Función para agregar logs a la cola sin detener el proceso.
44class BacktestService: 45 """ 46 Handles the orchestration, execution, persistence, and retrieval of backtests and related performance metrics. 47 48 This service acts as the central component for running backtests on trading strategies, saving their results, 49 and performing various queries on historical performance data. It supports streaming logs, asynchronous execution, 50 and interaction with the database and configuration layers. 51 52 Main features include: 53 - Running and saving individual or batch backtests across strategy/ticker/timeframe/risk combinations. 54 - Loading strategies dynamically and executing them with proper capital, leverage, and configuration. 55 - Persisting detailed performance metrics and trade history in the database. 56 - Performing deletions and cleanup of historical test data (including files and DB records). 57 - Providing filtering, querying, and tagging of backtests (e.g. by performance, robustness, favorites). 58 - Identifying robust strategies via custom metrics and filters (e.g. return/drawdown ratio > 1). 59 - Managing test dependencies such as Monte Carlo, Luck Test, and Random Test results. 60 61 Attributes: 62 db_service (DbService): Handles database connections and CRUD operations. 63 bot_service (BotService): Manages bot records used during backtesting. 64 config_service (ConfigService): Provides access to configuration values (e.g. risk-free rate). 65 66 Notes: 67 - Backtests can be persisted with reports or run temporarily. 68 - Streaming logs (via queues) allow real-time feedback when executing long-running backtests. 69 - This class assumes a full application context with all entities (Bot, BotPerformance, Trade, etc.) properly defined. 70 """ 71 def __init__(self): 72 self.db_service = DbService() 73 self.bot_service = BotService() 74 self.config_service = ConfigService() 75 76 async def async_iterator(self, iterable): 77 # Si iterable es una lista, la envolvemos en una iteración asíncrona 78 for item in iterable: 79 yield item 80 await asyncio.sleep(0) 81 82 @streaming_endpoint 83 async def run_backtest( 84 self, 85 initial_cash: float, 86 strategy: Strategy, 87 ticker: Ticker, 88 timeframe: Timeframe, 89 date_from: pd.Timestamp, 90 date_to: pd.Timestamp, 91 method: str, 92 risk: float, 93 save_bt_plot: str, 94 queue: asyncio.Queue, 95 ) -> AsyncGenerator[str, None]: 96 """ 97 Executes an asynchronous backtest for a given trading strategy and returns performance metrics, 98 trade details, and statistics via a streaming endpoint. 99 100 This function loads the specified strategy, fetches historical price data, applies leverage rules, 101 and runs a backtest simulation. Results are streamed in real-time through an asyncio queue, 102 and optional plots/reports are generated. 103 104 Steps performed: 105 - Loads the trading strategy dynamically from a module path. 106 - Retrieves leverage rules from a YAML config file. 107 - Fetches and prepares historical price data for the given ticker/timeframe. 108 - Computes margin requirements based on leverage. 109 - Executes the backtest using the strategy's logic and calculates performance metrics. 110 - Generates and saves backtest plots (temporary or persistent) if requested. 111 - Streams progress updates, logs, and final results through the provided queue. 112 113 Parameters: 114 - initial_cash (float): Starting capital for the backtest simulation. 115 - strategy (Strategy): Strategy object containing the module path and name. 116 - ticker (Ticker): Financial instrument to backtest on (e.g., currency pair, stock). 117 - timeframe (Timeframe): Timeframe for price data (e.g., '1H', '4H'). 118 - date_from (pd.Timestamp): Start date for historical data. 119 - date_to (pd.Timestamp): End date for historical data. 120 - method (str): Reserved for future backtest variations (unused in current implementation). 121 - risk (float): Risk percentage per trade (e.g., 0.01 for 1% risk). 122 - save_bt_plot (str): Plot saving mode: 123 - 'temp': Saves in a temporary directory (auto-cleaned later). 124 - 'persist': Saves in the main backtest_plots directory. 125 - Any other value skips plot generation. 126 - queue (asyncio.Queue): Async queue for real-time progress streaming. 127 128 Returns: 129 AsyncGenerator[str, None]: Yields three objects upon completion: 130 - performance (pd.DataFrame): Aggregated strategy performance metrics. 131 - trade_performance (pd.DataFrame): Detailed trade-by-trade results. 132 - stats (pd.DataFrame): Statistical summaries (e.g., Sharpe ratio, max drawdown). 133 134 Side effects: 135 - Reads leverage configurations from './app/configs/leverages.yml'. 136 - May create HTML plot files in either './app/templates/static/backtest_plots/temp' (temporary) 137 or './app/templates/static/backtest_plots' (persistent). 138 - Streams logs/results via the provided asyncio.Queue (e.g., for frontend progress updates). 139 140 Notes: 141 - The strategy module path must follow the convention: 'app.backbone.strategies.{strategy_name}'. 142 - Temporary plots include a timestamp and are automatically purged by a separate cleanup process. 143 - Risk-free rate is fetched from the application's config service (used for Sharpe ratio calculations). 144 - Margin is calculated as 1/leverage (e.g., 50x leverage → 2% margin requirement). 145 """ 146 await log_message(queue, '', '', "log", f"Loading strategy {strategy.Name}") 147 148 strategy_name = strategy.Name.split(".")[1] 149 bot_name = f'{strategy_name}_{ticker.Name}_{timeframe.Name}_{risk}' 150 151 strategy_path = 'app.backbone.strategies.' + strategy.Name 152 strategy_func = load_function(strategy_path) 153 154 await log_message(queue, '', '', "log", f"Strategy {strategy.Name} loaded succesfully") 155 await log_message(queue, '', '', "log", "Loading leverages") 156 157 with open("./app/configs/leverages.yml", "r") as file_name: 158 leverages = yaml.safe_load(file_name) 159 await log_message(queue, '', '', "log", "Leverages loaded") 160 161 leverage = leverages[ticker.Name] 162 margin = 1 / leverage 163 164 await log_message(queue, ticker.Name, timeframe.Name, "log", "Getting data") 165 166 prices = get_data(ticker.Name, timeframe.MetaTraderNumber, date_from, date_to) 167 prices.index = pd.to_datetime(prices.index) 168 169 await log_message(queue, ticker.Name, timeframe.Name, "log", f"Data ready: {prices.head(1)}") 170 171 await log_message(queue, ticker.Name, timeframe.Name, "log", "Starting backtest") 172 173 filename = f'{bot_name}_{date_from.strftime("%Y%m%d")}_{date_to.strftime("%Y%m%d")}' 174 plot_path = None 175 176 if save_bt_plot == 'temp': 177 plot_path = './app/templates/static/backtest_plots/temp' 178 await log_message(queue, ticker.Name, timeframe.Name, "link", os.path.join('/backtest_plots/temp', filename + '.html')) 179 180 elif save_bt_plot == 'persist': 181 plot_path = './app/templates/static/backtest_plots' 182 183 risk_free_rate = float(self.config_service.get_by_name('RiskFreeRate').Value) 184 185 performance, trade_performance, stats = run_strategy_and_get_performances( 186 strategy=strategy_func, 187 ticker=ticker, 188 timeframe=timeframe, 189 risk=risk, 190 prices=prices, 191 initial_cash=initial_cash, 192 risk_free_rate=risk_free_rate, 193 margin=margin, 194 plot_path=plot_path, 195 file_name = filename, 196 save_report= save_bt_plot == 'persist' # Solo genera el reporte si no es una corrida temporal 197 ) 198 199 await log_message(queue, ticker.Name, timeframe.Name, "log", "The backtest is done :)") 200 201 await log_message(queue, ticker.Name, timeframe.Name, "completed", f"{stats.to_string()}") 202 203 return performance, trade_performance, stats 204 205 @streaming_endpoint 206 async def run_backtests_and_save( 207 self, 208 initial_cash: float, 209 strategies: Strategy | List[Strategy], 210 tickers: Ticker | List[Ticker], 211 timeframes: List[Timeframe], 212 date_from: date, 213 date_to: date, 214 method: str, 215 risks: float | List[float], 216 save_bt_plot: str, 217 queue: asyncio.Queue, 218 ) -> AsyncGenerator[str, None]: 219 """ 220 Executes multiple backtests in parallel for all combinations of strategies, tickers, timeframes and risks, 221 then saves results to the database while streaming progress updates. 222 223 This function handles the complete backtesting pipeline: 224 - Generates all possible combinations of input parameters 225 - Checks for existing backtest results to avoid duplicate work 226 - Runs new backtests when needed using run_backtest() 227 - Saves performance metrics, trade details, and statistics to the database 228 - Provides real-time feedback through an async queue 229 230 Steps performed: 231 1. Normalizes all input parameters to lists (single items → single-element lists) 232 2. Generates all combinations of strategies/tickers/timeframes/risks 233 3. For each combination: 234 - Checks if bot exists in database 235 - Verifies if backtest already exists for the date range 236 - Runs new backtest if needed 237 - Converts results to database objects 238 - Saves all data (bot, performance, trades) transactionally 239 4. Streams progress, warnings, and completion messages via queue 240 241 Parameters: 242 - initial_cash (float): Starting capital for all backtests 243 - strategies (Strategy|List[Strategy]): Single strategy or list to test 244 - tickers (Ticker|List[Ticker]): Financial instrument(s) to backtest 245 - timeframes (List[Timeframe]): Time intervals to test (e.g. ['1H', '4H']) 246 - date_from (date): Start date for historical data 247 - date_to (date): End date for historical data 248 - method (str): Backtesting methodology identifier 249 - risks (float|List[float]): Risk percentage(s) per trade (e.g. [0.01, 0.02]) 250 - save_bt_plot (str): Plot saving mode: 251 - 'temp': Temporary storage 252 - 'persist': Permanent storage 253 - Other: Skip plot generation 254 - queue (asyncio.Queue): Async queue for progress streaming 255 256 Returns: 257 - AsyncGenerator[str, None]: Yields list of saved BotPerformance objects when complete 258 259 Side effects: 260 - Creates/updates database records for: 261 - Bot configurations 262 - Performance metrics 263 - Individual trades 264 - May generate plot files depending on save_bt_plot parameter 265 - Streams messages to provided queue including: 266 - Progress logs 267 - Warnings about duplicates 268 - Completion/failure notifications 269 270 Notes: 271 - Automatically skips existing backtests for the same parameters/date range 272 - Uses atomic database transactions to ensure data consistency 273 - New bots are created if they don't exist in the database 274 - Trade history is extracted from backtest stats and saved relationally 275 - All database operations use the service's db_service interface 276 - Failures for individual combinations don't stop overall execution 277 """ 278 try: 279 backtests = [] 280 combinations = None 281 282 strategies = [strategies] if type(strategies) != list else strategies 283 tickers = [tickers] if type(tickers) != list else tickers 284 timeframes = [timeframes] if type(timeframes) != list else timeframes 285 risks = [risks] if type(risks) != list else risks 286 287 combinations = itertools.product(strategies, tickers, timeframes, risks) 288 289 async for combination in self.async_iterator(list(combinations)): 290 strategy, ticker, timeframe, risk = combination 291 292 strategy_name = strategy.Name.split(".")[1] 293 294 await log_message(queue, ticker.Name, timeframe.Name, "log", "Starting") 295 296 bot_name = f'{strategy_name}_{ticker.Name}_{timeframe.Name}_{risk}' 297 298 await log_message(queue, ticker.Name, timeframe.Name, "log", "Checking bot in the database") 299 300 bot = self.bot_service.get_bot( 301 strategy_id=strategy.Id, 302 ticker_id=ticker.Id, 303 timeframe_id=timeframe.Id, 304 risk=risk 305 ) 306 307 bot_exists = False 308 309 if bot: 310 await log_message(queue, ticker.Name, timeframe.Name, "log", "The bot already exists") 311 312 bot_exists = True 313 314 await log_message(queue, ticker.Name, timeframe.Name, "log", "Checking backtest in the database") 315 result_performance = self.get_performances_by_bot_dates( 316 bot_id=bot.Id, 317 date_from=date_from, 318 date_to=date_to 319 ) 320 321 if result_performance: 322 backtests.append(result_performance) 323 await log_message(queue, ticker.Name, timeframe.Name, "warning", "The backtest already exists. Skipping...") 324 continue 325 326 else: 327 await log_message(queue, ticker.Name, timeframe.Name, "log", "The bot is not in the database") 328 bot = Bot( 329 Name=bot_name, 330 StrategyId=strategy.Id, 331 TickerId=ticker.Id, 332 TimeframeId=timeframe.Id, 333 Risk=risk 334 ) 335 336 try: 337 performance, trade_performance, stats = await self.run_backtest( 338 initial_cash=initial_cash, 339 strategy=strategy, 340 ticker=ticker, 341 timeframe=timeframe, 342 date_from=date_from, 343 date_to=date_to, 344 method=method, 345 risk=risk, 346 save_bt_plot=save_bt_plot, 347 queue=queue, 348 ) 349 350 await log_message(queue, ticker.Name, timeframe.Name, "log", "Saving metrics in the database") 351 352 bot_performance_for_db = _performance_from_df_to_obj( 353 performance, 354 date_from, 355 date_to, 356 risk, 357 method, 358 bot, 359 initial_cash, 360 ) 361 362 trade_performance_for_db = [BotTradePerformance(**row) for _, row in trade_performance.iterrows()].pop() 363 trade_performance_for_db.BotPerformance = bot_performance_for_db 364 365 with self.db_service.get_database() as db: 366 if not bot_exists: 367 self.db_service.create(db, bot) 368 369 backtests.append(self.db_service.create(db, bot_performance_for_db)) 370 371 self.db_service.create(db, trade_performance_for_db) 372 373 trade_history = trades_from_df_to_obj(stats._trades) 374 375 for trade in trade_history: 376 trade.BotPerformance = bot_performance_for_db 377 self.db_service.create(db, trade) 378 379 await log_message(queue, ticker.Name, timeframe.Name, "completed", "It's done buddy ;)") 380 381 except Exception as e: 382 await log_message(queue, '', '', "failed", f"{str(e)}") 383 384 return backtests 385 386 except Exception as e: 387 await log_message(queue, '', '', "failed", f"{str(e)}") 388 389 def get_performances_by_strategy_ticker(self, strategy_id, ticker_id) -> BotPerformance: 390 with self.db_service.get_database() as db: 391 bot_performances = ( 392 db.query(BotPerformance) 393 .join(Bot, Bot.Id == BotPerformance.BotId) 394 .filter(Bot.TickerId == ticker_id) 395 .filter(Bot.StrategyId == strategy_id) 396 .order_by(desc(BotPerformance.StabilityWeightedRar)) 397 .all() 398 ) 399 400 return bot_performances 401 402 def get_performances_by_bot_dates(self, bot_id, date_from, date_to) -> BotPerformance: 403 with self.db_service.get_database() as db: 404 bot_performance = ( 405 db.query(BotPerformance) 406 .options( 407 joinedload(BotPerformance.RandomTest).joinedload(RandomTest.RandomTestPerformance), 408 joinedload(BotPerformance.LuckTest).joinedload(LuckTest.LuckTestPerformance), 409 ) 410 .filter(BotPerformance.BotId == bot_id) 411 .filter(BotPerformance.DateFrom == date_from) 412 .filter(BotPerformance.DateTo == date_to) 413 .first() 414 ) 415 416 return bot_performance 417 418 def get_performance_by_bot(self, bot_id) -> BotPerformance: # cambiar bot_id por backtest_id 419 with self.db_service.get_database() as db: 420 bot_performance = self.db_service.get_many_by_filter(db, BotPerformance, BotId=bot_id) 421 return bot_performance 422 423 def get_bot_performance_by_id(self, bot_performance_id) -> BotPerformance: # cambiar bot_id por backtest_id 424 with self.db_service.get_database() as db: 425 bot_performance = self.db_service.get_by_id(db, BotPerformance, id=bot_performance_id) 426 427 return bot_performance 428 429 def delete_from_strategy(self, strategy_id) -> OperationResult: 430 with self.db_service.get_database() as db: 431 bp_subq = ( 432 select(BotPerformance.Id) 433 .join(Bot, BotPerformance.BotId == Bot.Id) 434 .where(Bot.StrategyId == strategy_id) 435 ).subquery() 436 437 db.execute( 438 delete(MetricWharehouse).where( 439 MetricWharehouse.MontecarloTestId.in_( 440 select(MontecarloTest.Id).where(MontecarloTest.BotPerformanceId.in_(bp_subq)) 441 ) 442 ) 443 ) 444 445 db.execute( 446 delete(MontecarloTest).where(MontecarloTest.BotPerformanceId.in_(bp_subq)) 447 ) 448 449 db.execute( 450 delete(RandomTest).where(RandomTest.BotPerformanceId.in_(bp_subq)) 451 ) 452 453 lucktest_bp_subq = ( 454 select(LuckTest.LuckTestPerformanceId) 455 .where(LuckTest.BotPerformanceId.in_(bp_subq)) 456 ).subquery() 457 458 bot_performances = db.query(BotPerformance).where(BotPerformance.Id.in_(bp_subq)).all() 459 460 db.execute(delete(BotPerformance).where(BotPerformance.Id.in_(lucktest_bp_subq))) 461 462 db.execute( 463 delete(LuckTest).where(LuckTest.BotPerformanceId.in_(bp_subq)) 464 ) 465 466 db.execute( 467 delete(BotTradePerformance).where(BotTradePerformance.BotPerformanceId.in_(bp_subq)) 468 ) 469 470 db.execute( 471 delete(Trade).where(Trade.BotPerformanceId.in_(bp_subq)) 472 ) 473 474 db.execute( 475 delete(BotPerformance).where(BotPerformance.Id.in_(bp_subq)) 476 ) 477 478 db.execute( 479 delete(Bot).where(Bot.StrategyId == strategy_id) 480 ) 481 482 # Borrar archivos relacionados 483 for bot_performance in bot_performances: 484 str_date_from = str(bot_performance.DateFrom).replace('-', '') 485 str_date_to = str(bot_performance.DateTo).replace('-', '') 486 file_name = f'{bot_performance.Bot.Name}_{str_date_from}_{str_date_to}.html' 487 for folder in ['luck_test_plots', 'correlation_plots', 't_test_plots', 'backtest_plots', 'backtest_plots/reports']: 488 path = os.path.join('./app/templates/static/', folder, file_name) 489 if os.path.exists(path): 490 os.remove(path) 491 492 return OperationResult(ok=True, message=None, item=None) 493 494 495 def delete(self, bot_performance) -> OperationResult: 496 with self.db_service.get_database() as db: 497 # Cargar el objeto con relaciones necesarias 498 bot_performance_id = bot_performance if isinstance(bot_performance, int) else bot_performance.Id 499 500 bot_performance = db.query(BotPerformance)\ 501 .options( 502 joinedload(BotPerformance.BotTradePerformance), 503 joinedload(BotPerformance.Bot), 504 joinedload(BotPerformance.LuckTest), 505 joinedload(BotPerformance.RandomTest), 506 )\ 507 .filter(BotPerformance.Id == bot_performance_id)\ 508 .first() 509 510 if not bot_performance: 511 return OperationResult(ok=False, message="BotPerformance not found", item=None) 512 513 # Eliminar relaciones directas 514 if bot_performance.BotTradePerformance: 515 self.db_service.delete(db, BotTradePerformance, bot_performance.BotTradePerformance.Id) 516 517 # Eliminar Montecarlo y métricas asociadas 518 montecarlo_test = self.db_service.get_by_filter(db, MontecarloTest, BotPerformanceId=bot_performance_id) 519 if montecarlo_test: 520 self.db_service.delete_many_by_filter(db, MetricWharehouse, MontecarloTestId=montecarlo_test.Id) 521 self.db_service.delete(db, MontecarloTest, montecarlo_test.Id) 522 523 # Eliminar LuckTests y sus performances 524 luck_tests = self.db_service.get_many_by_filter(db, LuckTest, BotPerformanceId=bot_performance_id) 525 for lt in luck_tests: 526 self.db_service.delete(db, BotPerformance, lt.LuckTestPerformanceId) 527 self.db_service.delete(db, LuckTest, lt.Id) 528 529 # Borrar archivos relacionados 530 str_date_from = str(bot_performance.DateFrom).replace('-', '') 531 str_date_to = str(bot_performance.DateTo).replace('-', '') 532 file_name = f'{bot_performance.Bot.Name}_{str_date_from}_{str_date_to}.html' 533 for folder in ['luck_test_plots', 'correlation_plots', 't_test_plots', 'backtest_plots', 'backtest_plots/reports']: 534 path = os.path.join('./app/templates/static/', folder, file_name) 535 if os.path.exists(path): 536 os.remove(path) 537 538 # Eliminar RandomTests y sus dependencias 539 if bot_performance.RandomTest: 540 rt = bot_performance.RandomTest 541 rt_perf = self.db_service.get_by_id(db, BotPerformance, rt.RandomTestPerformanceId) 542 if rt_perf: 543 rt_trade_perf = self.db_service.get_by_filter(db, BotTradePerformance, BotPerformanceId=rt_perf.Id) 544 if rt_trade_perf: 545 self.db_service.delete(db, BotTradePerformance, rt_trade_perf.Id) 546 self.db_service.delete(db, BotPerformance, rt.RandomTestPerformanceId) 547 self.db_service.delete(db, RandomTest, rt.Id) 548 549 # Borrar trades relacionados 550 self.db_service.delete_many_by_filter(db, Trade, BotPerformanceId=bot_performance_id) 551 552 # Si no hay más performance, borrar el Bot 553 if bot_performance.BotId: 554 rem = db.query(BotPerformance).filter(BotPerformance.BotId == bot_performance.BotId).count() 555 if rem == 1: 556 self.db_service.delete(db, Bot, bot_performance.BotId) 557 558 self.db_service.delete(db, BotPerformance, bot_performance.Id) 559 self.db_service.save(db) 560 561 return OperationResult(ok=True, message=None, item=None) 562 563 564 def update_favorite(self, performance_id) -> OperationResult: 565 566 with self.db_service.get_database() as db: 567 performance = self.db_service.get_by_id(db, BotPerformance, performance_id) 568 569 if not performance: 570 return OperationResult(ok=False, message='Backtest not found', item=None) 571 572 performance.Favorite = not performance.Favorite 573 updated_performance = self.db_service.update(db, BotPerformance, performance) 574 575 if not updated_performance: 576 return OperationResult(ok=False, message='Update failed', item=None) 577 578 return OperationResult(ok=True, message=None, item=updated_performance.Favorite) 579 580 def get_robusts(self) -> List[BotPerformance]: 581 with self.db_service.get_database() as db: 582 # Subquery para filtrar estrategias robustas (RreturnDd promedio > 1) 583 subquery = ( 584 db.query( 585 Bot.StrategyId, 586 Bot.TickerId, 587 ) 588 .join(BotPerformance, Bot.Id == BotPerformance.BotId) 589 .join(Timeframe, Bot.TimeframeId == Timeframe.Id) 590 .filter( 591 BotPerformance.RreturnDd != "NaN", 592 Timeframe.Selected == True 593 ) 594 .group_by(Bot.StrategyId, Bot.TickerId) 595 .having(func.avg(BotPerformance.RreturnDd) > 1) 596 .subquery() 597 ) 598 599 # Alias para evitar ambigüedad 600 bot_alias = aliased(Bot) 601 bp_alias = aliased(BotPerformance) 602 603 # Subquery para asignar un número de fila basado en StabilityWeightedRar 604 ranked_subquery = ( 605 db.query( 606 bp_alias.Id.label("bp_id"), 607 func.row_number() 608 .over( 609 partition_by=[bot_alias.StrategyId, bot_alias.TickerId], 610 order_by=bp_alias.StabilityWeightedRar.desc(), 611 ) 612 .label("row_num"), 613 ) 614 .join(bot_alias, bot_alias.Id == bp_alias.BotId) 615 .join( 616 subquery, 617 (bot_alias.StrategyId == subquery.c.StrategyId) 618 & (bot_alias.TickerId == subquery.c.TickerId), 619 ) 620 .subquery() 621 ) 622 623 # Query final seleccionando solo los mejores (row_num == 1) 624 data = ( 625 db.query(bp_alias) 626 .join(ranked_subquery, bp_alias.Id == ranked_subquery.c.bp_id) 627 .filter(ranked_subquery.c.row_num == 1) 628 .all() 629 ) 630 631 return data 632 633 def get_robusts_by_strategy_id(self, strategy_id) -> List[BotPerformance]: 634 with self.db_service.get_database() as db: 635 # Subquery para calcular el promedio de RreturnDd por StrategyId y TickerId 636 subquery = ( 637 db.query( 638 Bot.StrategyId, 639 Bot.TickerId, 640 ) 641 .join(BotPerformance, Bot.Id == BotPerformance.BotId) 642 .join(Timeframe, Bot.TimeframeId == Timeframe.Id) 643 .filter( 644 Bot.StrategyId == strategy_id, 645 BotPerformance.RreturnDd != "NaN", 646 Timeframe.Selected == True, 647 ) 648 .group_by(Bot.StrategyId, Bot.TickerId) 649 .having(func.avg(BotPerformance.RreturnDd) >= 1) # >= en lugar de > para incluir exactamente 1 650 .subquery() 651 ) 652 653 # Alias para evitar ambigüedad en las relaciones 654 bot_alias = aliased(Bot) 655 bp_alias = aliased(BotPerformance) 656 657 # Subquery para seleccionar la mejor temporalidad por StrategyId - TickerId 658 best_temporalities = ( 659 db.query( 660 bp_alias.BotId, 661 bot_alias.StrategyId, 662 bot_alias.TickerId, 663 func.max(bp_alias.StabilityWeightedRar).label("max_custom_metric") 664 ) 665 .join(bot_alias, bot_alias.Id == bp_alias.BotId) 666 .join(subquery, (bot_alias.StrategyId == subquery.c.StrategyId) & (bot_alias.TickerId == subquery.c.TickerId)) 667 .group_by(bot_alias.StrategyId, bot_alias.TickerId) 668 .subquery() 669 ) 670 671 # Query final para traer solo los registros que corresponden a la mejor temporalidad 672 data = ( 673 db.query(bp_alias) 674 .join(bot_alias, bot_alias.Id == bp_alias.BotId) 675 .join(best_temporalities, 676 (bot_alias.StrategyId == best_temporalities.c.StrategyId) & 677 (bot_alias.TickerId == best_temporalities.c.TickerId) & 678 (bp_alias.StabilityWeightedRar == best_temporalities.c.max_custom_metric)) 679 .all() 680 ) 681 682 return data 683 684 def get_favorites(self) -> List[BotPerformance]: 685 with self.db_service.get_database() as db: 686 favorites = self.db_service.get_many_by_filter(db, BotPerformance, Favorite=True) 687 688 return favorites 689 690 def get_trades(self, bot_performance_id:int) -> List[Trade]: 691 with self.db_service.get_database() as db: 692 bot_performance = self.db_service.get_by_id(db, BotPerformance, id=bot_performance_id) 693 return bot_performance.TradeHistory 694 695 def get_by_filter( 696 self, 697 return_: float = None, 698 drawdown: float = None, 699 stability_ratio: float = None, 700 sharpe_ratio: float = None, 701 trades: float = None, 702 rreturn_dd: float = None, 703 custom_metric: float = None, 704 winrate: float = None, 705 strategy: str = None, 706 ticker: str = None 707 ) -> List[BotPerformance]: 708 with self.db_service.get_database() as db: 709 filters = [] 710 711 # Base query y joins explícitos 712 query = db.query(BotPerformance)\ 713 .join(Bot, Bot.Id == BotPerformance.BotId)\ 714 .join(Strategy, Strategy.Id == Bot.StrategyId)\ 715 .join(Ticker, Ticker.Id == Bot.TickerId) 716 717 # Filtros numéricos 718 if return_ is not None: 719 filters.append(BotPerformance.Return >= return_) 720 if drawdown is not None: 721 filters.append(BotPerformance.Drawdown <= drawdown) 722 if stability_ratio is not None: 723 filters.append(BotPerformance.StabilityRatio >= stability_ratio) 724 if sharpe_ratio is not None: 725 filters.append(BotPerformance.SharpeRatio >= sharpe_ratio) 726 if trades is not None: 727 filters.append(BotPerformance.Trades >= trades) 728 if rreturn_dd is not None: 729 filters.append(BotPerformance.RreturnDd >= rreturn_dd) 730 if custom_metric is not None: 731 filters.append(BotPerformance.StabilityWeightedRar >= custom_metric) 732 if winrate is not None: 733 filters.append(BotPerformance.WinRate >= winrate) 734 735 # Filtros por texto con ILIKE correctamente aplicados 736 if strategy: 737 filters.append(Strategy.Name.ilike(f"%{strategy}%")) 738 if ticker: 739 filters.append(Ticker.Name.ilike(f"%{ticker}%")) 740 741 # Query final que aplica filtros correctamente 742 final_query = ( 743 query 744 .filter(*filters) 745 .options( 746 joinedload(BotPerformance.Bot), 747 joinedload(BotPerformance.BotTradePerformance), 748 joinedload(BotPerformance.TradeHistory), 749 joinedload(BotPerformance.MontecarloTest), 750 joinedload(BotPerformance.LuckTest), 751 joinedload(BotPerformance.RandomTest), 752 ) 753 .order_by(BotPerformance.RreturnDd.desc()) 754 .all() 755 ) 756 757 return final_query
Handles the orchestration, execution, persistence, and retrieval of backtests and related performance metrics.
This service acts as the central component for running backtests on trading strategies, saving their results, and performing various queries on historical performance data. It supports streaming logs, asynchronous execution, and interaction with the database and configuration layers.
Main features include:
- Running and saving individual or batch backtests across strategy/ticker/timeframe/risk combinations.
- Loading strategies dynamically and executing them with proper capital, leverage, and configuration.
- Persisting detailed performance metrics and trade history in the database.
- Performing deletions and cleanup of historical test data (including files and DB records).
- Providing filtering, querying, and tagging of backtests (e.g. by performance, robustness, favorites).
- Identifying robust strategies via custom metrics and filters (e.g. return/drawdown ratio > 1).
- Managing test dependencies such as Monte Carlo, Luck Test, and Random Test results.
Attributes: db_service (DbService): Handles database connections and CRUD operations. bot_service (BotService): Manages bot records used during backtesting. config_service (ConfigService): Provides access to configuration values (e.g. risk-free rate).
Notes: - Backtests can be persisted with reports or run temporarily. - Streaming logs (via queues) allow real-time feedback when executing long-running backtests. - This class assumes a full application context with all entities (Bot, BotPerformance, Trade, etc.) properly defined.
82 @streaming_endpoint 83 async def run_backtest( 84 self, 85 initial_cash: float, 86 strategy: Strategy, 87 ticker: Ticker, 88 timeframe: Timeframe, 89 date_from: pd.Timestamp, 90 date_to: pd.Timestamp, 91 method: str, 92 risk: float, 93 save_bt_plot: str, 94 queue: asyncio.Queue, 95 ) -> AsyncGenerator[str, None]: 96 """ 97 Executes an asynchronous backtest for a given trading strategy and returns performance metrics, 98 trade details, and statistics via a streaming endpoint. 99 100 This function loads the specified strategy, fetches historical price data, applies leverage rules, 101 and runs a backtest simulation. Results are streamed in real-time through an asyncio queue, 102 and optional plots/reports are generated. 103 104 Steps performed: 105 - Loads the trading strategy dynamically from a module path. 106 - Retrieves leverage rules from a YAML config file. 107 - Fetches and prepares historical price data for the given ticker/timeframe. 108 - Computes margin requirements based on leverage. 109 - Executes the backtest using the strategy's logic and calculates performance metrics. 110 - Generates and saves backtest plots (temporary or persistent) if requested. 111 - Streams progress updates, logs, and final results through the provided queue. 112 113 Parameters: 114 - initial_cash (float): Starting capital for the backtest simulation. 115 - strategy (Strategy): Strategy object containing the module path and name. 116 - ticker (Ticker): Financial instrument to backtest on (e.g., currency pair, stock). 117 - timeframe (Timeframe): Timeframe for price data (e.g., '1H', '4H'). 118 - date_from (pd.Timestamp): Start date for historical data. 119 - date_to (pd.Timestamp): End date for historical data. 120 - method (str): Reserved for future backtest variations (unused in current implementation). 121 - risk (float): Risk percentage per trade (e.g., 0.01 for 1% risk). 122 - save_bt_plot (str): Plot saving mode: 123 - 'temp': Saves in a temporary directory (auto-cleaned later). 124 - 'persist': Saves in the main backtest_plots directory. 125 - Any other value skips plot generation. 126 - queue (asyncio.Queue): Async queue for real-time progress streaming. 127 128 Returns: 129 AsyncGenerator[str, None]: Yields three objects upon completion: 130 - performance (pd.DataFrame): Aggregated strategy performance metrics. 131 - trade_performance (pd.DataFrame): Detailed trade-by-trade results. 132 - stats (pd.DataFrame): Statistical summaries (e.g., Sharpe ratio, max drawdown). 133 134 Side effects: 135 - Reads leverage configurations from './app/configs/leverages.yml'. 136 - May create HTML plot files in either './app/templates/static/backtest_plots/temp' (temporary) 137 or './app/templates/static/backtest_plots' (persistent). 138 - Streams logs/results via the provided asyncio.Queue (e.g., for frontend progress updates). 139 140 Notes: 141 - The strategy module path must follow the convention: 'app.backbone.strategies.{strategy_name}'. 142 - Temporary plots include a timestamp and are automatically purged by a separate cleanup process. 143 - Risk-free rate is fetched from the application's config service (used for Sharpe ratio calculations). 144 - Margin is calculated as 1/leverage (e.g., 50x leverage → 2% margin requirement). 145 """ 146 await log_message(queue, '', '', "log", f"Loading strategy {strategy.Name}") 147 148 strategy_name = strategy.Name.split(".")[1] 149 bot_name = f'{strategy_name}_{ticker.Name}_{timeframe.Name}_{risk}' 150 151 strategy_path = 'app.backbone.strategies.' + strategy.Name 152 strategy_func = load_function(strategy_path) 153 154 await log_message(queue, '', '', "log", f"Strategy {strategy.Name} loaded succesfully") 155 await log_message(queue, '', '', "log", "Loading leverages") 156 157 with open("./app/configs/leverages.yml", "r") as file_name: 158 leverages = yaml.safe_load(file_name) 159 await log_message(queue, '', '', "log", "Leverages loaded") 160 161 leverage = leverages[ticker.Name] 162 margin = 1 / leverage 163 164 await log_message(queue, ticker.Name, timeframe.Name, "log", "Getting data") 165 166 prices = get_data(ticker.Name, timeframe.MetaTraderNumber, date_from, date_to) 167 prices.index = pd.to_datetime(prices.index) 168 169 await log_message(queue, ticker.Name, timeframe.Name, "log", f"Data ready: {prices.head(1)}") 170 171 await log_message(queue, ticker.Name, timeframe.Name, "log", "Starting backtest") 172 173 filename = f'{bot_name}_{date_from.strftime("%Y%m%d")}_{date_to.strftime("%Y%m%d")}' 174 plot_path = None 175 176 if save_bt_plot == 'temp': 177 plot_path = './app/templates/static/backtest_plots/temp' 178 await log_message(queue, ticker.Name, timeframe.Name, "link", os.path.join('/backtest_plots/temp', filename + '.html')) 179 180 elif save_bt_plot == 'persist': 181 plot_path = './app/templates/static/backtest_plots' 182 183 risk_free_rate = float(self.config_service.get_by_name('RiskFreeRate').Value) 184 185 performance, trade_performance, stats = run_strategy_and_get_performances( 186 strategy=strategy_func, 187 ticker=ticker, 188 timeframe=timeframe, 189 risk=risk, 190 prices=prices, 191 initial_cash=initial_cash, 192 risk_free_rate=risk_free_rate, 193 margin=margin, 194 plot_path=plot_path, 195 file_name = filename, 196 save_report= save_bt_plot == 'persist' # Solo genera el reporte si no es una corrida temporal 197 ) 198 199 await log_message(queue, ticker.Name, timeframe.Name, "log", "The backtest is done :)") 200 201 await log_message(queue, ticker.Name, timeframe.Name, "completed", f"{stats.to_string()}") 202 203 return performance, trade_performance, stats
Executes an asynchronous backtest for a given trading strategy and returns performance metrics, trade details, and statistics via a streaming endpoint.
This function loads the specified strategy, fetches historical price data, applies leverage rules, and runs a backtest simulation. Results are streamed in real-time through an asyncio queue, and optional plots/reports are generated.
Steps performed:
- Loads the trading strategy dynamically from a module path.
- Retrieves leverage rules from a YAML config file.
- Fetches and prepares historical price data for the given ticker/timeframe.
- Computes margin requirements based on leverage.
- Executes the backtest using the strategy's logic and calculates performance metrics.
- Generates and saves backtest plots (temporary or persistent) if requested.
- Streams progress updates, logs, and final results through the provided queue.
Parameters:
- initial_cash (float): Starting capital for the backtest simulation.
- strategy (Strategy): Strategy object containing the module path and name.
- ticker (Ticker): Financial instrument to backtest on (e.g., currency pair, stock).
- timeframe (Timeframe): Timeframe for price data (e.g., '1H', '4H').
- date_from (pd.Timestamp): Start date for historical data.
- date_to (pd.Timestamp): End date for historical data.
- method (str): Reserved for future backtest variations (unused in current implementation).
- risk (float): Risk percentage per trade (e.g., 0.01 for 1% risk).
- save_bt_plot (str): Plot saving mode:
- 'temp': Saves in a temporary directory (auto-cleaned later).
- 'persist': Saves in the main backtest_plots directory.
- Any other value skips plot generation.
- queue (asyncio.Queue): Async queue for real-time progress streaming.
Returns: AsyncGenerator[str, None]: Yields three objects upon completion:
- performance (pd.DataFrame): Aggregated strategy performance metrics.
- trade_performance (pd.DataFrame): Detailed trade-by-trade results.
- stats (pd.DataFrame): Statistical summaries (e.g., Sharpe ratio, max drawdown).
Side effects:
- Reads leverage configurations from './app/configs/leverages.yml'.
- May create HTML plot files in either './app/templates/static/backtest_plots/temp' (temporary) or './app/templates/static/backtest_plots' (persistent).
- Streams logs/results via the provided asyncio.Queue (e.g., for frontend progress updates).
Notes:
- The strategy module path must follow the convention: 'app.backbone.strategies.{strategy_name}'.
- Temporary plots include a timestamp and are automatically purged by a separate cleanup process.
- Risk-free rate is fetched from the application's config service (used for Sharpe ratio calculations).
- Margin is calculated as 1/leverage (e.g., 50x leverage → 2% margin requirement).
205 @streaming_endpoint 206 async def run_backtests_and_save( 207 self, 208 initial_cash: float, 209 strategies: Strategy | List[Strategy], 210 tickers: Ticker | List[Ticker], 211 timeframes: List[Timeframe], 212 date_from: date, 213 date_to: date, 214 method: str, 215 risks: float | List[float], 216 save_bt_plot: str, 217 queue: asyncio.Queue, 218 ) -> AsyncGenerator[str, None]: 219 """ 220 Executes multiple backtests in parallel for all combinations of strategies, tickers, timeframes and risks, 221 then saves results to the database while streaming progress updates. 222 223 This function handles the complete backtesting pipeline: 224 - Generates all possible combinations of input parameters 225 - Checks for existing backtest results to avoid duplicate work 226 - Runs new backtests when needed using run_backtest() 227 - Saves performance metrics, trade details, and statistics to the database 228 - Provides real-time feedback through an async queue 229 230 Steps performed: 231 1. Normalizes all input parameters to lists (single items → single-element lists) 232 2. Generates all combinations of strategies/tickers/timeframes/risks 233 3. For each combination: 234 - Checks if bot exists in database 235 - Verifies if backtest already exists for the date range 236 - Runs new backtest if needed 237 - Converts results to database objects 238 - Saves all data (bot, performance, trades) transactionally 239 4. Streams progress, warnings, and completion messages via queue 240 241 Parameters: 242 - initial_cash (float): Starting capital for all backtests 243 - strategies (Strategy|List[Strategy]): Single strategy or list to test 244 - tickers (Ticker|List[Ticker]): Financial instrument(s) to backtest 245 - timeframes (List[Timeframe]): Time intervals to test (e.g. ['1H', '4H']) 246 - date_from (date): Start date for historical data 247 - date_to (date): End date for historical data 248 - method (str): Backtesting methodology identifier 249 - risks (float|List[float]): Risk percentage(s) per trade (e.g. [0.01, 0.02]) 250 - save_bt_plot (str): Plot saving mode: 251 - 'temp': Temporary storage 252 - 'persist': Permanent storage 253 - Other: Skip plot generation 254 - queue (asyncio.Queue): Async queue for progress streaming 255 256 Returns: 257 - AsyncGenerator[str, None]: Yields list of saved BotPerformance objects when complete 258 259 Side effects: 260 - Creates/updates database records for: 261 - Bot configurations 262 - Performance metrics 263 - Individual trades 264 - May generate plot files depending on save_bt_plot parameter 265 - Streams messages to provided queue including: 266 - Progress logs 267 - Warnings about duplicates 268 - Completion/failure notifications 269 270 Notes: 271 - Automatically skips existing backtests for the same parameters/date range 272 - Uses atomic database transactions to ensure data consistency 273 - New bots are created if they don't exist in the database 274 - Trade history is extracted from backtest stats and saved relationally 275 - All database operations use the service's db_service interface 276 - Failures for individual combinations don't stop overall execution 277 """ 278 try: 279 backtests = [] 280 combinations = None 281 282 strategies = [strategies] if type(strategies) != list else strategies 283 tickers = [tickers] if type(tickers) != list else tickers 284 timeframes = [timeframes] if type(timeframes) != list else timeframes 285 risks = [risks] if type(risks) != list else risks 286 287 combinations = itertools.product(strategies, tickers, timeframes, risks) 288 289 async for combination in self.async_iterator(list(combinations)): 290 strategy, ticker, timeframe, risk = combination 291 292 strategy_name = strategy.Name.split(".")[1] 293 294 await log_message(queue, ticker.Name, timeframe.Name, "log", "Starting") 295 296 bot_name = f'{strategy_name}_{ticker.Name}_{timeframe.Name}_{risk}' 297 298 await log_message(queue, ticker.Name, timeframe.Name, "log", "Checking bot in the database") 299 300 bot = self.bot_service.get_bot( 301 strategy_id=strategy.Id, 302 ticker_id=ticker.Id, 303 timeframe_id=timeframe.Id, 304 risk=risk 305 ) 306 307 bot_exists = False 308 309 if bot: 310 await log_message(queue, ticker.Name, timeframe.Name, "log", "The bot already exists") 311 312 bot_exists = True 313 314 await log_message(queue, ticker.Name, timeframe.Name, "log", "Checking backtest in the database") 315 result_performance = self.get_performances_by_bot_dates( 316 bot_id=bot.Id, 317 date_from=date_from, 318 date_to=date_to 319 ) 320 321 if result_performance: 322 backtests.append(result_performance) 323 await log_message(queue, ticker.Name, timeframe.Name, "warning", "The backtest already exists. Skipping...") 324 continue 325 326 else: 327 await log_message(queue, ticker.Name, timeframe.Name, "log", "The bot is not in the database") 328 bot = Bot( 329 Name=bot_name, 330 StrategyId=strategy.Id, 331 TickerId=ticker.Id, 332 TimeframeId=timeframe.Id, 333 Risk=risk 334 ) 335 336 try: 337 performance, trade_performance, stats = await self.run_backtest( 338 initial_cash=initial_cash, 339 strategy=strategy, 340 ticker=ticker, 341 timeframe=timeframe, 342 date_from=date_from, 343 date_to=date_to, 344 method=method, 345 risk=risk, 346 save_bt_plot=save_bt_plot, 347 queue=queue, 348 ) 349 350 await log_message(queue, ticker.Name, timeframe.Name, "log", "Saving metrics in the database") 351 352 bot_performance_for_db = _performance_from_df_to_obj( 353 performance, 354 date_from, 355 date_to, 356 risk, 357 method, 358 bot, 359 initial_cash, 360 ) 361 362 trade_performance_for_db = [BotTradePerformance(**row) for _, row in trade_performance.iterrows()].pop() 363 trade_performance_for_db.BotPerformance = bot_performance_for_db 364 365 with self.db_service.get_database() as db: 366 if not bot_exists: 367 self.db_service.create(db, bot) 368 369 backtests.append(self.db_service.create(db, bot_performance_for_db)) 370 371 self.db_service.create(db, trade_performance_for_db) 372 373 trade_history = trades_from_df_to_obj(stats._trades) 374 375 for trade in trade_history: 376 trade.BotPerformance = bot_performance_for_db 377 self.db_service.create(db, trade) 378 379 await log_message(queue, ticker.Name, timeframe.Name, "completed", "It's done buddy ;)") 380 381 except Exception as e: 382 await log_message(queue, '', '', "failed", f"{str(e)}") 383 384 return backtests 385 386 except Exception as e: 387 await log_message(queue, '', '', "failed", f"{str(e)}")
Executes multiple backtests in parallel for all combinations of strategies, tickers, timeframes and risks, then saves results to the database while streaming progress updates.
This function handles the complete backtesting pipeline:
- Generates all possible combinations of input parameters
- Checks for existing backtest results to avoid duplicate work
- Runs new backtests when needed using run_backtest()
- Saves performance metrics, trade details, and statistics to the database
- Provides real-time feedback through an async queue
Steps performed:
- Normalizes all input parameters to lists (single items → single-element lists)
- Generates all combinations of strategies/tickers/timeframes/risks
- For each combination:
- Checks if bot exists in database
- Verifies if backtest already exists for the date range
- Runs new backtest if needed
- Converts results to database objects
- Saves all data (bot, performance, trades) transactionally
- Streams progress, warnings, and completion messages via queue
Parameters:
- initial_cash (float): Starting capital for all backtests
- strategies (Strategy|List[Strategy]): Single strategy or list to test
- tickers (Ticker|List[Ticker]): Financial instrument(s) to backtest
- timeframes (List[Timeframe]): Time intervals to test (e.g. ['1H', '4H'])
- date_from (date): Start date for historical data
- date_to (date): End date for historical data
- method (str): Backtesting methodology identifier
- risks (float|List[float]): Risk percentage(s) per trade (e.g. [0.01, 0.02])
- save_bt_plot (str): Plot saving mode:
- 'temp': Temporary storage
- 'persist': Permanent storage
- Other: Skip plot generation
- queue (asyncio.Queue): Async queue for progress streaming
Returns:
- AsyncGenerator[str, None]: Yields list of saved BotPerformance objects when complete
Side effects:
- Creates/updates database records for:
- Bot configurations
- Performance metrics
- Individual trades
- May generate plot files depending on save_bt_plot parameter
- Streams messages to provided queue including:
- Progress logs
- Warnings about duplicates
- Completion/failure notifications
Notes:
- Automatically skips existing backtests for the same parameters/date range
- Uses atomic database transactions to ensure data consistency
- New bots are created if they don't exist in the database
- Trade history is extracted from backtest stats and saved relationally
- All database operations use the service's db_service interface
- Failures for individual combinations don't stop overall execution
389 def get_performances_by_strategy_ticker(self, strategy_id, ticker_id) -> BotPerformance: 390 with self.db_service.get_database() as db: 391 bot_performances = ( 392 db.query(BotPerformance) 393 .join(Bot, Bot.Id == BotPerformance.BotId) 394 .filter(Bot.TickerId == ticker_id) 395 .filter(Bot.StrategyId == strategy_id) 396 .order_by(desc(BotPerformance.StabilityWeightedRar)) 397 .all() 398 ) 399 400 return bot_performances
402 def get_performances_by_bot_dates(self, bot_id, date_from, date_to) -> BotPerformance: 403 with self.db_service.get_database() as db: 404 bot_performance = ( 405 db.query(BotPerformance) 406 .options( 407 joinedload(BotPerformance.RandomTest).joinedload(RandomTest.RandomTestPerformance), 408 joinedload(BotPerformance.LuckTest).joinedload(LuckTest.LuckTestPerformance), 409 ) 410 .filter(BotPerformance.BotId == bot_id) 411 .filter(BotPerformance.DateFrom == date_from) 412 .filter(BotPerformance.DateTo == date_to) 413 .first() 414 ) 415 416 return bot_performance
429 def delete_from_strategy(self, strategy_id) -> OperationResult: 430 with self.db_service.get_database() as db: 431 bp_subq = ( 432 select(BotPerformance.Id) 433 .join(Bot, BotPerformance.BotId == Bot.Id) 434 .where(Bot.StrategyId == strategy_id) 435 ).subquery() 436 437 db.execute( 438 delete(MetricWharehouse).where( 439 MetricWharehouse.MontecarloTestId.in_( 440 select(MontecarloTest.Id).where(MontecarloTest.BotPerformanceId.in_(bp_subq)) 441 ) 442 ) 443 ) 444 445 db.execute( 446 delete(MontecarloTest).where(MontecarloTest.BotPerformanceId.in_(bp_subq)) 447 ) 448 449 db.execute( 450 delete(RandomTest).where(RandomTest.BotPerformanceId.in_(bp_subq)) 451 ) 452 453 lucktest_bp_subq = ( 454 select(LuckTest.LuckTestPerformanceId) 455 .where(LuckTest.BotPerformanceId.in_(bp_subq)) 456 ).subquery() 457 458 bot_performances = db.query(BotPerformance).where(BotPerformance.Id.in_(bp_subq)).all() 459 460 db.execute(delete(BotPerformance).where(BotPerformance.Id.in_(lucktest_bp_subq))) 461 462 db.execute( 463 delete(LuckTest).where(LuckTest.BotPerformanceId.in_(bp_subq)) 464 ) 465 466 db.execute( 467 delete(BotTradePerformance).where(BotTradePerformance.BotPerformanceId.in_(bp_subq)) 468 ) 469 470 db.execute( 471 delete(Trade).where(Trade.BotPerformanceId.in_(bp_subq)) 472 ) 473 474 db.execute( 475 delete(BotPerformance).where(BotPerformance.Id.in_(bp_subq)) 476 ) 477 478 db.execute( 479 delete(Bot).where(Bot.StrategyId == strategy_id) 480 ) 481 482 # Borrar archivos relacionados 483 for bot_performance in bot_performances: 484 str_date_from = str(bot_performance.DateFrom).replace('-', '') 485 str_date_to = str(bot_performance.DateTo).replace('-', '') 486 file_name = f'{bot_performance.Bot.Name}_{str_date_from}_{str_date_to}.html' 487 for folder in ['luck_test_plots', 'correlation_plots', 't_test_plots', 'backtest_plots', 'backtest_plots/reports']: 488 path = os.path.join('./app/templates/static/', folder, file_name) 489 if os.path.exists(path): 490 os.remove(path) 491 492 return OperationResult(ok=True, message=None, item=None)
495 def delete(self, bot_performance) -> OperationResult: 496 with self.db_service.get_database() as db: 497 # Cargar el objeto con relaciones necesarias 498 bot_performance_id = bot_performance if isinstance(bot_performance, int) else bot_performance.Id 499 500 bot_performance = db.query(BotPerformance)\ 501 .options( 502 joinedload(BotPerformance.BotTradePerformance), 503 joinedload(BotPerformance.Bot), 504 joinedload(BotPerformance.LuckTest), 505 joinedload(BotPerformance.RandomTest), 506 )\ 507 .filter(BotPerformance.Id == bot_performance_id)\ 508 .first() 509 510 if not bot_performance: 511 return OperationResult(ok=False, message="BotPerformance not found", item=None) 512 513 # Eliminar relaciones directas 514 if bot_performance.BotTradePerformance: 515 self.db_service.delete(db, BotTradePerformance, bot_performance.BotTradePerformance.Id) 516 517 # Eliminar Montecarlo y métricas asociadas 518 montecarlo_test = self.db_service.get_by_filter(db, MontecarloTest, BotPerformanceId=bot_performance_id) 519 if montecarlo_test: 520 self.db_service.delete_many_by_filter(db, MetricWharehouse, MontecarloTestId=montecarlo_test.Id) 521 self.db_service.delete(db, MontecarloTest, montecarlo_test.Id) 522 523 # Eliminar LuckTests y sus performances 524 luck_tests = self.db_service.get_many_by_filter(db, LuckTest, BotPerformanceId=bot_performance_id) 525 for lt in luck_tests: 526 self.db_service.delete(db, BotPerformance, lt.LuckTestPerformanceId) 527 self.db_service.delete(db, LuckTest, lt.Id) 528 529 # Borrar archivos relacionados 530 str_date_from = str(bot_performance.DateFrom).replace('-', '') 531 str_date_to = str(bot_performance.DateTo).replace('-', '') 532 file_name = f'{bot_performance.Bot.Name}_{str_date_from}_{str_date_to}.html' 533 for folder in ['luck_test_plots', 'correlation_plots', 't_test_plots', 'backtest_plots', 'backtest_plots/reports']: 534 path = os.path.join('./app/templates/static/', folder, file_name) 535 if os.path.exists(path): 536 os.remove(path) 537 538 # Eliminar RandomTests y sus dependencias 539 if bot_performance.RandomTest: 540 rt = bot_performance.RandomTest 541 rt_perf = self.db_service.get_by_id(db, BotPerformance, rt.RandomTestPerformanceId) 542 if rt_perf: 543 rt_trade_perf = self.db_service.get_by_filter(db, BotTradePerformance, BotPerformanceId=rt_perf.Id) 544 if rt_trade_perf: 545 self.db_service.delete(db, BotTradePerformance, rt_trade_perf.Id) 546 self.db_service.delete(db, BotPerformance, rt.RandomTestPerformanceId) 547 self.db_service.delete(db, RandomTest, rt.Id) 548 549 # Borrar trades relacionados 550 self.db_service.delete_many_by_filter(db, Trade, BotPerformanceId=bot_performance_id) 551 552 # Si no hay más performance, borrar el Bot 553 if bot_performance.BotId: 554 rem = db.query(BotPerformance).filter(BotPerformance.BotId == bot_performance.BotId).count() 555 if rem == 1: 556 self.db_service.delete(db, Bot, bot_performance.BotId) 557 558 self.db_service.delete(db, BotPerformance, bot_performance.Id) 559 self.db_service.save(db) 560 561 return OperationResult(ok=True, message=None, item=None)
564 def update_favorite(self, performance_id) -> OperationResult: 565 566 with self.db_service.get_database() as db: 567 performance = self.db_service.get_by_id(db, BotPerformance, performance_id) 568 569 if not performance: 570 return OperationResult(ok=False, message='Backtest not found', item=None) 571 572 performance.Favorite = not performance.Favorite 573 updated_performance = self.db_service.update(db, BotPerformance, performance) 574 575 if not updated_performance: 576 return OperationResult(ok=False, message='Update failed', item=None) 577 578 return OperationResult(ok=True, message=None, item=updated_performance.Favorite)
580 def get_robusts(self) -> List[BotPerformance]: 581 with self.db_service.get_database() as db: 582 # Subquery para filtrar estrategias robustas (RreturnDd promedio > 1) 583 subquery = ( 584 db.query( 585 Bot.StrategyId, 586 Bot.TickerId, 587 ) 588 .join(BotPerformance, Bot.Id == BotPerformance.BotId) 589 .join(Timeframe, Bot.TimeframeId == Timeframe.Id) 590 .filter( 591 BotPerformance.RreturnDd != "NaN", 592 Timeframe.Selected == True 593 ) 594 .group_by(Bot.StrategyId, Bot.TickerId) 595 .having(func.avg(BotPerformance.RreturnDd) > 1) 596 .subquery() 597 ) 598 599 # Alias para evitar ambigüedad 600 bot_alias = aliased(Bot) 601 bp_alias = aliased(BotPerformance) 602 603 # Subquery para asignar un número de fila basado en StabilityWeightedRar 604 ranked_subquery = ( 605 db.query( 606 bp_alias.Id.label("bp_id"), 607 func.row_number() 608 .over( 609 partition_by=[bot_alias.StrategyId, bot_alias.TickerId], 610 order_by=bp_alias.StabilityWeightedRar.desc(), 611 ) 612 .label("row_num"), 613 ) 614 .join(bot_alias, bot_alias.Id == bp_alias.BotId) 615 .join( 616 subquery, 617 (bot_alias.StrategyId == subquery.c.StrategyId) 618 & (bot_alias.TickerId == subquery.c.TickerId), 619 ) 620 .subquery() 621 ) 622 623 # Query final seleccionando solo los mejores (row_num == 1) 624 data = ( 625 db.query(bp_alias) 626 .join(ranked_subquery, bp_alias.Id == ranked_subquery.c.bp_id) 627 .filter(ranked_subquery.c.row_num == 1) 628 .all() 629 ) 630 631 return data
633 def get_robusts_by_strategy_id(self, strategy_id) -> List[BotPerformance]: 634 with self.db_service.get_database() as db: 635 # Subquery para calcular el promedio de RreturnDd por StrategyId y TickerId 636 subquery = ( 637 db.query( 638 Bot.StrategyId, 639 Bot.TickerId, 640 ) 641 .join(BotPerformance, Bot.Id == BotPerformance.BotId) 642 .join(Timeframe, Bot.TimeframeId == Timeframe.Id) 643 .filter( 644 Bot.StrategyId == strategy_id, 645 BotPerformance.RreturnDd != "NaN", 646 Timeframe.Selected == True, 647 ) 648 .group_by(Bot.StrategyId, Bot.TickerId) 649 .having(func.avg(BotPerformance.RreturnDd) >= 1) # >= en lugar de > para incluir exactamente 1 650 .subquery() 651 ) 652 653 # Alias para evitar ambigüedad en las relaciones 654 bot_alias = aliased(Bot) 655 bp_alias = aliased(BotPerformance) 656 657 # Subquery para seleccionar la mejor temporalidad por StrategyId - TickerId 658 best_temporalities = ( 659 db.query( 660 bp_alias.BotId, 661 bot_alias.StrategyId, 662 bot_alias.TickerId, 663 func.max(bp_alias.StabilityWeightedRar).label("max_custom_metric") 664 ) 665 .join(bot_alias, bot_alias.Id == bp_alias.BotId) 666 .join(subquery, (bot_alias.StrategyId == subquery.c.StrategyId) & (bot_alias.TickerId == subquery.c.TickerId)) 667 .group_by(bot_alias.StrategyId, bot_alias.TickerId) 668 .subquery() 669 ) 670 671 # Query final para traer solo los registros que corresponden a la mejor temporalidad 672 data = ( 673 db.query(bp_alias) 674 .join(bot_alias, bot_alias.Id == bp_alias.BotId) 675 .join(best_temporalities, 676 (bot_alias.StrategyId == best_temporalities.c.StrategyId) & 677 (bot_alias.TickerId == best_temporalities.c.TickerId) & 678 (bp_alias.StabilityWeightedRar == best_temporalities.c.max_custom_metric)) 679 .all() 680 ) 681 682 return data
695 def get_by_filter( 696 self, 697 return_: float = None, 698 drawdown: float = None, 699 stability_ratio: float = None, 700 sharpe_ratio: float = None, 701 trades: float = None, 702 rreturn_dd: float = None, 703 custom_metric: float = None, 704 winrate: float = None, 705 strategy: str = None, 706 ticker: str = None 707 ) -> List[BotPerformance]: 708 with self.db_service.get_database() as db: 709 filters = [] 710 711 # Base query y joins explícitos 712 query = db.query(BotPerformance)\ 713 .join(Bot, Bot.Id == BotPerformance.BotId)\ 714 .join(Strategy, Strategy.Id == Bot.StrategyId)\ 715 .join(Ticker, Ticker.Id == Bot.TickerId) 716 717 # Filtros numéricos 718 if return_ is not None: 719 filters.append(BotPerformance.Return >= return_) 720 if drawdown is not None: 721 filters.append(BotPerformance.Drawdown <= drawdown) 722 if stability_ratio is not None: 723 filters.append(BotPerformance.StabilityRatio >= stability_ratio) 724 if sharpe_ratio is not None: 725 filters.append(BotPerformance.SharpeRatio >= sharpe_ratio) 726 if trades is not None: 727 filters.append(BotPerformance.Trades >= trades) 728 if rreturn_dd is not None: 729 filters.append(BotPerformance.RreturnDd >= rreturn_dd) 730 if custom_metric is not None: 731 filters.append(BotPerformance.StabilityWeightedRar >= custom_metric) 732 if winrate is not None: 733 filters.append(BotPerformance.WinRate >= winrate) 734 735 # Filtros por texto con ILIKE correctamente aplicados 736 if strategy: 737 filters.append(Strategy.Name.ilike(f"%{strategy}%")) 738 if ticker: 739 filters.append(Ticker.Name.ilike(f"%{ticker}%")) 740 741 # Query final que aplica filtros correctamente 742 final_query = ( 743 query 744 .filter(*filters) 745 .options( 746 joinedload(BotPerformance.Bot), 747 joinedload(BotPerformance.BotTradePerformance), 748 joinedload(BotPerformance.TradeHistory), 749 joinedload(BotPerformance.MontecarloTest), 750 joinedload(BotPerformance.LuckTest), 751 joinedload(BotPerformance.RandomTest), 752 ) 753 .order_by(BotPerformance.RreturnDd.desc()) 754 .all() 755 ) 756 757 return final_query