app.backbone.services.backtest_service

  1import asyncio
  2from datetime import date
  3import itertools
  4import json
  5import os
  6from typing import AsyncGenerator, List
  7import yaml
  8from app.backbone.database.db_service import DbService
  9from app.backbone.entities.bot import Bot
 10from app.backbone.entities.bot_performance import BotPerformance
 11from app.backbone.entities.bot_trade_performance import BotTradePerformance
 12from app.backbone.entities.luck_test import LuckTest
 13from app.backbone.entities.metric_wharehouse import MetricWharehouse
 14from app.backbone.entities.montecarlo_test import MontecarloTest
 15from app.backbone.entities.random_test import RandomTest
 16from app.backbone.entities.strategy import Strategy
 17from app.backbone.entities.ticker import Ticker
 18from app.backbone.entities.timeframe import Timeframe
 19from app.backbone.entities.trade import Trade
 20from app.backbone.services.config_service import ConfigService
 21from app.backbone.services.operation_result import OperationResult
 22from app.backbone.services.bot_service import BotService
 23from app.backbone.services.utils import _performance_from_df_to_obj, trades_from_df_to_obj
 24from app.backbone.utils.get_data import get_data
 25from app.backbone.utils.general_purpose import load_function, profile_function, streaming_endpoint
 26from app.backbone.utils.wfo_utils import run_strategy_and_get_performances
 27import pandas as pd
 28from sqlalchemy.orm import joinedload
 29from sqlalchemy import delete, func, desc, select
 30from sqlalchemy.orm import aliased
 31
 32
 33async def log_message(queue: asyncio.Queue, ticker, timeframe, status, message, error=""):
 34    """Función para agregar logs a la cola sin detener el proceso."""
 35    await queue.put(json.dumps({
 36        "ticker": ticker,
 37        "timeframe": timeframe,
 38        "status": status,
 39        "message": message,
 40        "error": error
 41}))
 42
 43class BacktestService:
 44    """
 45    Handles the orchestration, execution, persistence, and retrieval of backtests and related performance metrics.
 46
 47    This service acts as the central component for running backtests on trading strategies, saving their results,
 48    and performing various queries on historical performance data. It supports streaming logs, asynchronous execution,
 49    and interaction with the database and configuration layers.
 50
 51    Main features include:
 52    - Running and saving individual or batch backtests across strategy/ticker/timeframe/risk combinations.
 53    - Loading strategies dynamically and executing them with proper capital, leverage, and configuration.
 54    - Persisting detailed performance metrics and trade history in the database.
 55    - Performing deletions and cleanup of historical test data (including files and DB records).
 56    - Providing filtering, querying, and tagging of backtests (e.g. by performance, robustness, favorites).
 57    - Identifying robust strategies via custom metrics and filters (e.g. return/drawdown ratio > 1).
 58    - Managing test dependencies such as Monte Carlo, Luck Test, and Random Test results.
 59
 60    Attributes:
 61        db_service (DbService): Handles database connections and CRUD operations.
 62        bot_service (BotService): Manages bot records used during backtesting.
 63        config_service (ConfigService): Provides access to configuration values (e.g. risk-free rate).
 64
 65    Notes:
 66        - Backtests can be persisted with reports or run temporarily.
 67        - Streaming logs (via queues) allow real-time feedback when executing long-running backtests.
 68        - This class assumes a full application context with all entities (Bot, BotPerformance, Trade, etc.) properly defined.
 69    """
 70    def __init__(self):
 71        self.db_service = DbService()
 72        self.bot_service = BotService()
 73        self.config_service = ConfigService()
 74    
 75    async def async_iterator(self, iterable):
 76        # Si iterable es una lista, la envolvemos en una iteración asíncrona
 77        for item in iterable:
 78            yield item
 79            await asyncio.sleep(0)
 80
 81    @streaming_endpoint
 82    async def run_backtest(
 83        self,
 84        initial_cash: float,
 85        strategy: Strategy,
 86        ticker: Ticker,
 87        timeframe: Timeframe,
 88        date_from: pd.Timestamp,
 89        date_to: pd.Timestamp,
 90        method: str,
 91        risk: float,
 92        save_bt_plot: str,
 93        queue: asyncio.Queue,
 94        ) -> AsyncGenerator[str, None]:
 95        """
 96        Executes an asynchronous backtest for a given trading strategy and returns performance metrics, 
 97        trade details, and statistics via a streaming endpoint.
 98
 99        This function loads the specified strategy, fetches historical price data, applies leverage rules,
100        and runs a backtest simulation. Results are streamed in real-time through an asyncio queue, 
101        and optional plots/reports are generated.
102
103        Steps performed:
104        - Loads the trading strategy dynamically from a module path.
105        - Retrieves leverage rules from a YAML config file.
106        - Fetches and prepares historical price data for the given ticker/timeframe.
107        - Computes margin requirements based on leverage.
108        - Executes the backtest using the strategy's logic and calculates performance metrics.
109        - Generates and saves backtest plots (temporary or persistent) if requested.
110        - Streams progress updates, logs, and final results through the provided queue.
111
112        Parameters:
113        - initial_cash (float): Starting capital for the backtest simulation.
114        - strategy (Strategy): Strategy object containing the module path and name.
115        - ticker (Ticker): Financial instrument to backtest on (e.g., currency pair, stock).
116        - timeframe (Timeframe): Timeframe for price data (e.g., '1H', '4H').
117        - date_from (pd.Timestamp): Start date for historical data.
118        - date_to (pd.Timestamp): End date for historical data.
119        - method (str): Reserved for future backtest variations (unused in current implementation).
120        - risk (float): Risk percentage per trade (e.g., 0.01 for 1% risk).
121        - save_bt_plot (str): Plot saving mode: 
122            - 'temp': Saves in a temporary directory (auto-cleaned later).
123            - 'persist': Saves in the main backtest_plots directory.
124            - Any other value skips plot generation.
125        - queue (asyncio.Queue): Async queue for real-time progress streaming.
126
127        Returns:
128        AsyncGenerator[str, None]: Yields three objects upon completion:
129        - performance (pd.DataFrame): Aggregated strategy performance metrics.
130        - trade_performance (pd.DataFrame): Detailed trade-by-trade results.
131        - stats (pd.DataFrame): Statistical summaries (e.g., Sharpe ratio, max drawdown).
132
133        Side effects:
134        - Reads leverage configurations from './app/configs/leverages.yml'.
135        - May create HTML plot files in either './app/templates/static/backtest_plots/temp' (temporary)
136        or './app/templates/static/backtest_plots' (persistent).
137        - Streams logs/results via the provided asyncio.Queue (e.g., for frontend progress updates).
138
139        Notes:
140        - The strategy module path must follow the convention: 'app.backbone.strategies.{strategy_name}'.
141        - Temporary plots include a timestamp and are automatically purged by a separate cleanup process.
142        - Risk-free rate is fetched from the application's config service (used for Sharpe ratio calculations).
143        - Margin is calculated as 1/leverage (e.g., 50x leverage → 2% margin requirement).
144        """
145        await log_message(queue, '', '', "log", f"Loading strategy {strategy.Name}")
146        
147        strategy_name = strategy.Name.split(".")[1]
148        bot_name = f'{strategy_name}_{ticker.Name}_{timeframe.Name}_{risk}'
149        
150        strategy_path = 'app.backbone.strategies.' + strategy.Name
151        strategy_func = load_function(strategy_path)
152        
153        await log_message(queue, '', '', "log", f"Strategy {strategy.Name} loaded succesfully")
154        await log_message(queue, '', '', "log", "Loading leverages")
155
156        with open("./app/configs/leverages.yml", "r") as file_name:
157            leverages = yaml.safe_load(file_name)
158            await log_message(queue, '', '', "log", "Leverages loaded")
159
160        leverage = leverages[ticker.Name]
161        margin = 1 / leverage
162        
163        await log_message(queue, ticker.Name, timeframe.Name, "log", "Getting data")
164
165        prices = get_data(ticker.Name, timeframe.MetaTraderNumber, date_from, date_to)
166        prices.index = pd.to_datetime(prices.index)
167
168        await log_message(queue, ticker.Name, timeframe.Name, "log", f"Data ready: {prices.head(1)}")
169
170        await log_message(queue, ticker.Name, timeframe.Name, "log", "Starting backtest")
171
172        filename = f'{bot_name}_{date_from.strftime("%Y%m%d")}_{date_to.strftime("%Y%m%d")}'
173        plot_path = None
174
175        if save_bt_plot == 'temp':
176            plot_path = './app/templates/static/backtest_plots/temp'
177            await log_message(queue, ticker.Name, timeframe.Name, "link", os.path.join('/backtest_plots/temp', filename + '.html'))
178
179        elif save_bt_plot == 'persist':
180            plot_path = './app/templates/static/backtest_plots'
181
182        risk_free_rate = float(self.config_service.get_by_name('RiskFreeRate').Value)
183
184        performance, trade_performance, stats = run_strategy_and_get_performances(
185            strategy=strategy_func,
186            ticker=ticker,
187            timeframe=timeframe,
188            risk=risk,
189            prices=prices,
190            initial_cash=initial_cash,
191            risk_free_rate=risk_free_rate,
192            margin=margin,
193            plot_path=plot_path,
194            file_name = filename,
195            save_report= save_bt_plot == 'persist' # Solo genera el reporte si no es una corrida temporal
196        )
197
198        await log_message(queue, ticker.Name, timeframe.Name, "log", "The backtest is done :)")
199
200        await log_message(queue, ticker.Name, timeframe.Name, "completed", f"{stats.to_string()}")
201        
202        return performance, trade_performance, stats
203
204    @streaming_endpoint
205    async def run_backtests_and_save(
206        self,
207        initial_cash: float,
208        strategies: Strategy | List[Strategy],
209        tickers: Ticker | List[Ticker],
210        timeframes: List[Timeframe],
211        date_from: date,
212        date_to: date,
213        method: str,
214        risks: float | List[float],
215        save_bt_plot: str,
216        queue: asyncio.Queue,
217        ) -> AsyncGenerator[str, None]:
218        """
219        Executes multiple backtests in parallel for all combinations of strategies, tickers, timeframes and risks,
220        then saves results to the database while streaming progress updates.
221
222        This function handles the complete backtesting pipeline:
223        - Generates all possible combinations of input parameters
224        - Checks for existing backtest results to avoid duplicate work
225        - Runs new backtests when needed using run_backtest()
226        - Saves performance metrics, trade details, and statistics to the database
227        - Provides real-time feedback through an async queue
228
229        Steps performed:
230        1. Normalizes all input parameters to lists (single items → single-element lists)
231        2. Generates all combinations of strategies/tickers/timeframes/risks
232        3. For each combination:
233        - Checks if bot exists in database
234        - Verifies if backtest already exists for the date range
235        - Runs new backtest if needed
236        - Converts results to database objects
237        - Saves all data (bot, performance, trades) transactionally
238        4. Streams progress, warnings, and completion messages via queue
239
240        Parameters:
241        - initial_cash (float): Starting capital for all backtests
242        - strategies (Strategy|List[Strategy]): Single strategy or list to test
243        - tickers (Ticker|List[Ticker]): Financial instrument(s) to backtest
244        - timeframes (List[Timeframe]): Time intervals to test (e.g. ['1H', '4H'])
245        - date_from (date): Start date for historical data
246        - date_to (date): End date for historical data
247        - method (str): Backtesting methodology identifier
248        - risks (float|List[float]): Risk percentage(s) per trade (e.g. [0.01, 0.02])
249        - save_bt_plot (str): Plot saving mode:
250            - 'temp': Temporary storage
251            - 'persist': Permanent storage
252            - Other: Skip plot generation
253        - queue (asyncio.Queue): Async queue for progress streaming
254
255        Returns:
256        - AsyncGenerator[str, None]: Yields list of saved BotPerformance objects when complete
257
258        Side effects:
259        - Creates/updates database records for:
260            - Bot configurations
261            - Performance metrics
262            - Individual trades
263        - May generate plot files depending on save_bt_plot parameter
264        - Streams messages to provided queue including:
265            - Progress logs
266            - Warnings about duplicates
267            - Completion/failure notifications
268
269        Notes:
270        - Automatically skips existing backtests for the same parameters/date range
271        - Uses atomic database transactions to ensure data consistency
272        - New bots are created if they don't exist in the database
273        - Trade history is extracted from backtest stats and saved relationally
274        - All database operations use the service's db_service interface
275        - Failures for individual combinations don't stop overall execution
276        """
277        try:
278            backtests = []
279            combinations = None
280
281            strategies = [strategies] if type(strategies) != list else strategies
282            tickers = [tickers] if type(tickers) != list else tickers
283            timeframes = [timeframes] if type(timeframes) != list else timeframes
284            risks = [risks] if type(risks) != list else risks
285  
286            combinations = itertools.product(strategies, tickers, timeframes, risks)
287
288            async for combination in self.async_iterator(list(combinations)):
289                strategy, ticker, timeframe, risk = combination
290
291                strategy_name = strategy.Name.split(".")[1]
292
293                await log_message(queue, ticker.Name, timeframe.Name, "log", "Starting")
294                
295                bot_name = f'{strategy_name}_{ticker.Name}_{timeframe.Name}_{risk}'
296                
297                await log_message(queue, ticker.Name, timeframe.Name, "log", "Checking bot in the database")
298
299                bot = self.bot_service.get_bot(
300                    strategy_id=strategy.Id,
301                    ticker_id=ticker.Id,
302                    timeframe_id=timeframe.Id,
303                    risk=risk   
304                )
305                
306                bot_exists = False
307
308                if bot:
309                    await log_message(queue, ticker.Name, timeframe.Name, "log", "The bot already exists")
310
311                    bot_exists = True
312
313                    await log_message(queue, ticker.Name, timeframe.Name, "log", "Checking backtest in the database")
314                    result_performance = self.get_performances_by_bot_dates(
315                        bot_id=bot.Id, 
316                        date_from=date_from, 
317                        date_to=date_to
318                    )
319                    
320                    if result_performance:
321                        backtests.append(result_performance)
322                        await log_message(queue, ticker.Name, timeframe.Name, "warning", "The backtest already exists. Skipping...")
323                        continue
324
325                else:
326                    await log_message(queue, ticker.Name, timeframe.Name, "log", "The bot is not in the database")
327                    bot = Bot(
328                        Name=bot_name,
329                        StrategyId=strategy.Id,
330                        TickerId=ticker.Id,
331                        TimeframeId=timeframe.Id,
332                        Risk=risk
333                    )
334
335                try:
336                    performance, trade_performance, stats = await self.run_backtest(
337                        initial_cash=initial_cash,
338                        strategy=strategy,
339                        ticker=ticker,
340                        timeframe=timeframe,
341                        date_from=date_from,
342                        date_to=date_to,
343                        method=method,
344                        risk=risk,
345                        save_bt_plot=save_bt_plot,
346                        queue=queue,
347                    )
348
349                    await log_message(queue, ticker.Name, timeframe.Name, "log", "Saving metrics in the database")
350
351                    bot_performance_for_db = _performance_from_df_to_obj(
352                        performance, 
353                        date_from, 
354                        date_to, 
355                        risk, 
356                        method, 
357                        bot,
358                        initial_cash,
359                    )
360                    
361                    trade_performance_for_db = [BotTradePerformance(**row) for _, row in trade_performance.iterrows()].pop()
362                    trade_performance_for_db.BotPerformance = bot_performance_for_db 
363                    
364                    with self.db_service.get_database() as db:
365                        if not bot_exists:
366                            self.db_service.create(db, bot)
367                        
368                        backtests.append(self.db_service.create(db, bot_performance_for_db))
369
370                        self.db_service.create(db, trade_performance_for_db)
371                        
372                        trade_history = trades_from_df_to_obj(stats._trades)
373
374                        for trade in trade_history:
375                            trade.BotPerformance = bot_performance_for_db
376                            self.db_service.create(db, trade)
377
378                    await log_message(queue, ticker.Name, timeframe.Name, "completed", "It's done buddy ;)")
379                
380                except Exception as e:
381                    await log_message(queue, '', '', "failed", f"{str(e)}")
382            
383            return backtests
384            
385        except Exception as e:
386            await log_message(queue, '', '', "failed", f"{str(e)}")
387
388    def get_performances_by_strategy_ticker(self, strategy_id, ticker_id) -> BotPerformance:
389        with self.db_service.get_database() as db:
390            bot_performances = (
391                db.query(BotPerformance)
392                .join(Bot, Bot.Id == BotPerformance.BotId)
393                .filter(Bot.TickerId == ticker_id)
394                .filter(Bot.StrategyId == strategy_id)
395                .order_by(desc(BotPerformance.StabilityWeightedRar))
396                .all()
397            )
398
399            return bot_performances
400
401    def get_performances_by_bot_dates(self, bot_id, date_from, date_to) -> BotPerformance:
402        with self.db_service.get_database() as db:
403            bot_performance = (
404                db.query(BotPerformance)
405                .options(
406                    joinedload(BotPerformance.RandomTest).joinedload(RandomTest.RandomTestPerformance),
407                    joinedload(BotPerformance.LuckTest).joinedload(LuckTest.LuckTestPerformance),
408                )
409                .filter(BotPerformance.BotId == bot_id)
410                .filter(BotPerformance.DateFrom == date_from)
411                .filter(BotPerformance.DateTo == date_to)
412                .first()
413            )
414            
415            return bot_performance
416     
417    def get_performance_by_bot(self, bot_id) -> BotPerformance: # cambiar bot_id por backtest_id
418        with self.db_service.get_database() as db:
419            bot_performance = self.db_service.get_many_by_filter(db, BotPerformance, BotId=bot_id)  
420            return bot_performance
421            
422    def get_bot_performance_by_id(self, bot_performance_id) -> BotPerformance: # cambiar bot_id por backtest_id
423        with self.db_service.get_database() as db:
424            bot_performance = self.db_service.get_by_id(db, BotPerformance, id=bot_performance_id)  
425
426            return bot_performance
427
428    def delete_from_strategy(self, strategy_id) -> OperationResult:
429        with self.db_service.get_database() as db:
430            bp_subq = (
431                select(BotPerformance.Id)
432                .join(Bot, BotPerformance.BotId == Bot.Id)
433                .where(Bot.StrategyId == strategy_id)
434            ).subquery()
435
436            db.execute(
437                delete(MetricWharehouse).where(
438                    MetricWharehouse.MontecarloTestId.in_(
439                        select(MontecarloTest.Id).where(MontecarloTest.BotPerformanceId.in_(bp_subq))
440                    )
441                )
442            )
443
444            db.execute(
445                delete(MontecarloTest).where(MontecarloTest.BotPerformanceId.in_(bp_subq))
446            )
447
448            db.execute(
449                delete(RandomTest).where(RandomTest.BotPerformanceId.in_(bp_subq))
450            )
451
452            lucktest_bp_subq = (
453                select(LuckTest.LuckTestPerformanceId)
454                .where(LuckTest.BotPerformanceId.in_(bp_subq))
455            ).subquery()
456
457            bot_performances = db.query(BotPerformance).where(BotPerformance.Id.in_(bp_subq)).all()
458
459            db.execute(delete(BotPerformance).where(BotPerformance.Id.in_(lucktest_bp_subq)))
460
461            db.execute(
462                delete(LuckTest).where(LuckTest.BotPerformanceId.in_(bp_subq))
463            )
464
465            db.execute(
466                delete(BotTradePerformance).where(BotTradePerformance.BotPerformanceId.in_(bp_subq))
467            )
468
469            db.execute(
470                delete(Trade).where(Trade.BotPerformanceId.in_(bp_subq))
471            )
472
473            db.execute(
474                delete(BotPerformance).where(BotPerformance.Id.in_(bp_subq))
475            )
476
477            db.execute(
478                delete(Bot).where(Bot.StrategyId == strategy_id)
479            )
480        
481        # Borrar archivos relacionados
482        for bot_performance in bot_performances:
483            str_date_from = str(bot_performance.DateFrom).replace('-', '')
484            str_date_to = str(bot_performance.DateTo).replace('-', '')
485            file_name = f'{bot_performance.Bot.Name}_{str_date_from}_{str_date_to}.html'
486            for folder in ['luck_test_plots', 'correlation_plots', 't_test_plots', 'backtest_plots', 'backtest_plots/reports']:
487                path = os.path.join('./app/templates/static/', folder, file_name)
488                if os.path.exists(path):
489                    os.remove(path)
490
491        return OperationResult(ok=True, message=None, item=None)
492
493
494    def delete(self, bot_performance) -> OperationResult:
495        with self.db_service.get_database() as db:
496            # Cargar el objeto con relaciones necesarias
497            bot_performance_id = bot_performance if isinstance(bot_performance, int) else bot_performance.Id
498
499            bot_performance = db.query(BotPerformance)\
500                .options(
501                    joinedload(BotPerformance.BotTradePerformance),
502                    joinedload(BotPerformance.Bot),
503                    joinedload(BotPerformance.LuckTest),
504                    joinedload(BotPerformance.RandomTest),
505                )\
506                .filter(BotPerformance.Id == bot_performance_id)\
507                .first()
508
509            if not bot_performance:
510                return OperationResult(ok=False, message="BotPerformance not found", item=None)
511
512            # Eliminar relaciones directas
513            if bot_performance.BotTradePerformance:
514                self.db_service.delete(db, BotTradePerformance, bot_performance.BotTradePerformance.Id)
515
516            # Eliminar Montecarlo y métricas asociadas
517            montecarlo_test = self.db_service.get_by_filter(db, MontecarloTest, BotPerformanceId=bot_performance_id)
518            if montecarlo_test:
519                self.db_service.delete_many_by_filter(db, MetricWharehouse, MontecarloTestId=montecarlo_test.Id)
520                self.db_service.delete(db, MontecarloTest, montecarlo_test.Id)
521
522            # Eliminar LuckTests y sus performances
523            luck_tests = self.db_service.get_many_by_filter(db, LuckTest, BotPerformanceId=bot_performance_id)
524            for lt in luck_tests:
525                self.db_service.delete(db, BotPerformance, lt.LuckTestPerformanceId)
526                self.db_service.delete(db, LuckTest, lt.Id)
527
528            # Borrar archivos relacionados
529            str_date_from = str(bot_performance.DateFrom).replace('-', '')
530            str_date_to = str(bot_performance.DateTo).replace('-', '')
531            file_name = f'{bot_performance.Bot.Name}_{str_date_from}_{str_date_to}.html'
532            for folder in ['luck_test_plots', 'correlation_plots', 't_test_plots', 'backtest_plots', 'backtest_plots/reports']:
533                path = os.path.join('./app/templates/static/', folder, file_name)
534                if os.path.exists(path):
535                    os.remove(path)
536
537            # Eliminar RandomTests y sus dependencias
538            if bot_performance.RandomTest:
539                rt = bot_performance.RandomTest
540                rt_perf = self.db_service.get_by_id(db, BotPerformance, rt.RandomTestPerformanceId)
541                if rt_perf:
542                    rt_trade_perf = self.db_service.get_by_filter(db, BotTradePerformance, BotPerformanceId=rt_perf.Id)
543                    if rt_trade_perf:
544                        self.db_service.delete(db, BotTradePerformance, rt_trade_perf.Id)
545                    self.db_service.delete(db, BotPerformance, rt.RandomTestPerformanceId)
546                self.db_service.delete(db, RandomTest, rt.Id)
547
548            # Borrar trades relacionados
549            self.db_service.delete_many_by_filter(db, Trade, BotPerformanceId=bot_performance_id)
550
551            # Si no hay más performance, borrar el Bot
552            if bot_performance.BotId:
553                rem = db.query(BotPerformance).filter(BotPerformance.BotId == bot_performance.BotId).count()
554                if rem == 1:
555                    self.db_service.delete(db, Bot, bot_performance.BotId)
556
557            self.db_service.delete(db, BotPerformance, bot_performance.Id)
558            self.db_service.save(db)
559
560        return OperationResult(ok=True, message=None, item=None)
561
562    
563    def update_favorite(self, performance_id) -> OperationResult:
564
565        with self.db_service.get_database() as db:
566            performance = self.db_service.get_by_id(db, BotPerformance, performance_id)
567            
568            if not performance:
569                return OperationResult(ok=False, message='Backtest not found', item=None)
570            
571            performance.Favorite = not performance.Favorite
572            updated_performance = self.db_service.update(db, BotPerformance, performance)   
573
574            if not updated_performance:
575                return OperationResult(ok=False, message='Update failed', item=None)
576
577            return OperationResult(ok=True, message=None, item=updated_performance.Favorite)
578
579    def get_robusts(self) -> List[BotPerformance]:
580        with self.db_service.get_database() as db:
581            # Subquery para filtrar estrategias robustas (RreturnDd promedio > 1)
582            subquery = (
583                db.query(
584                    Bot.StrategyId,
585                    Bot.TickerId,
586                )
587                .join(BotPerformance, Bot.Id == BotPerformance.BotId)
588                .join(Timeframe, Bot.TimeframeId == Timeframe.Id)
589                .filter(
590                    BotPerformance.RreturnDd != "NaN",
591                    Timeframe.Selected == True
592                )
593                .group_by(Bot.StrategyId, Bot.TickerId)
594                .having(func.avg(BotPerformance.RreturnDd) > 1)
595                .subquery()
596            )
597
598            # Alias para evitar ambigüedad
599            bot_alias = aliased(Bot)
600            bp_alias = aliased(BotPerformance)
601
602            # Subquery para asignar un número de fila basado en StabilityWeightedRar
603            ranked_subquery = (
604                db.query(
605                    bp_alias.Id.label("bp_id"),
606                    func.row_number()
607                    .over(
608                        partition_by=[bot_alias.StrategyId, bot_alias.TickerId],
609                        order_by=bp_alias.StabilityWeightedRar.desc(),
610                    )
611                    .label("row_num"),
612                )
613                .join(bot_alias, bot_alias.Id == bp_alias.BotId)
614                .join(
615                    subquery,
616                    (bot_alias.StrategyId == subquery.c.StrategyId)
617                    & (bot_alias.TickerId == subquery.c.TickerId),
618                )
619                .subquery()
620            )
621
622            # Query final seleccionando solo los mejores (row_num == 1)
623            data = (
624                db.query(bp_alias)
625                .join(ranked_subquery, bp_alias.Id == ranked_subquery.c.bp_id)
626                .filter(ranked_subquery.c.row_num == 1)
627                .all()
628            )
629
630            return data
631               
632    def get_robusts_by_strategy_id(self, strategy_id) -> List[BotPerformance]:
633        with self.db_service.get_database() as db:
634            # Subquery para calcular el promedio de RreturnDd por StrategyId y TickerId
635            subquery = (
636                db.query(
637                    Bot.StrategyId,
638                    Bot.TickerId,
639                )
640                .join(BotPerformance, Bot.Id == BotPerformance.BotId)
641                .join(Timeframe, Bot.TimeframeId == Timeframe.Id)
642                .filter(
643                    Bot.StrategyId == strategy_id,
644                    BotPerformance.RreturnDd != "NaN",
645                    Timeframe.Selected == True,
646                )
647                .group_by(Bot.StrategyId, Bot.TickerId)
648                .having(func.avg(BotPerformance.RreturnDd) >= 1)  # >= en lugar de > para incluir exactamente 1
649                .subquery()
650            )
651
652            # Alias para evitar ambigüedad en las relaciones
653            bot_alias = aliased(Bot)
654            bp_alias = aliased(BotPerformance)
655
656            # Subquery para seleccionar la mejor temporalidad por StrategyId - TickerId
657            best_temporalities = (
658                db.query(
659                    bp_alias.BotId,
660                    bot_alias.StrategyId,
661                    bot_alias.TickerId,
662                    func.max(bp_alias.StabilityWeightedRar).label("max_custom_metric")
663                )
664                .join(bot_alias, bot_alias.Id == bp_alias.BotId)
665                .join(subquery, (bot_alias.StrategyId == subquery.c.StrategyId) & (bot_alias.TickerId == subquery.c.TickerId))
666                .group_by(bot_alias.StrategyId, bot_alias.TickerId)
667                .subquery()
668            )
669
670            # Query final para traer solo los registros que corresponden a la mejor temporalidad
671            data = (
672                db.query(bp_alias)
673                .join(bot_alias, bot_alias.Id == bp_alias.BotId)
674                .join(best_temporalities,
675                    (bot_alias.StrategyId == best_temporalities.c.StrategyId) &
676                    (bot_alias.TickerId == best_temporalities.c.TickerId) &
677                    (bp_alias.StabilityWeightedRar == best_temporalities.c.max_custom_metric))
678                .all()
679            )
680
681            return data
682
683    def get_favorites(self) -> List[BotPerformance]:
684        with self.db_service.get_database() as db:
685            favorites = self.db_service.get_many_by_filter(db, BotPerformance, Favorite=True)
686        
687        return favorites
688        
689    def get_trades(self, bot_performance_id:int) -> List[Trade]:
690        with self.db_service.get_database() as db:
691            bot_performance = self.db_service.get_by_id(db, BotPerformance, id=bot_performance_id)  
692            return bot_performance.TradeHistory
693
694    def get_by_filter(
695        self,
696        return_: float = None,
697        drawdown: float = None,
698        stability_ratio: float = None,
699        sharpe_ratio: float = None,
700        trades: float = None,
701        rreturn_dd: float = None,
702        custom_metric: float = None,
703        winrate: float = None,
704        strategy: str = None,
705        ticker: str = None
706    ) -> List[BotPerformance]:
707        with self.db_service.get_database() as db:
708            filters = []
709
710            # Base query y joins explícitos
711            query = db.query(BotPerformance)\
712                .join(Bot, Bot.Id == BotPerformance.BotId)\
713                .join(Strategy, Strategy.Id == Bot.StrategyId)\
714                .join(Ticker, Ticker.Id == Bot.TickerId)
715
716            # Filtros numéricos
717            if return_ is not None:
718                filters.append(BotPerformance.Return >= return_)
719            if drawdown is not None:
720                filters.append(BotPerformance.Drawdown <= drawdown)
721            if stability_ratio is not None:
722                filters.append(BotPerformance.StabilityRatio >= stability_ratio)
723            if sharpe_ratio is not None:
724                filters.append(BotPerformance.SharpeRatio >= sharpe_ratio)
725            if trades is not None:
726                filters.append(BotPerformance.Trades >= trades)
727            if rreturn_dd is not None:
728                filters.append(BotPerformance.RreturnDd >= rreturn_dd)
729            if custom_metric is not None:
730                filters.append(BotPerformance.StabilityWeightedRar >= custom_metric)
731            if winrate is not None:
732                filters.append(BotPerformance.WinRate >= winrate)
733
734            # Filtros por texto con ILIKE correctamente aplicados
735            if strategy:
736                filters.append(Strategy.Name.ilike(f"%{strategy}%"))
737            if ticker:
738                filters.append(Ticker.Name.ilike(f"%{ticker}%"))
739
740            # Query final que aplica filtros correctamente
741            final_query = (
742                query
743                .filter(*filters)
744                .options(
745                    joinedload(BotPerformance.Bot),
746                    joinedload(BotPerformance.BotTradePerformance),
747                    joinedload(BotPerformance.TradeHistory),
748                    joinedload(BotPerformance.MontecarloTest),
749                    joinedload(BotPerformance.LuckTest),
750                    joinedload(BotPerformance.RandomTest),
751                )
752                .order_by(BotPerformance.RreturnDd.desc())
753                .all()
754            )
755
756            return final_query
async def log_message( queue: asyncio.queues.Queue, ticker, timeframe, status, message, error=''):
34async def log_message(queue: asyncio.Queue, ticker, timeframe, status, message, error=""):
35    """Función para agregar logs a la cola sin detener el proceso."""
36    await queue.put(json.dumps({
37        "ticker": ticker,
38        "timeframe": timeframe,
39        "status": status,
40        "message": message,
41        "error": error
42}))

Función para agregar logs a la cola sin detener el proceso.

class BacktestService:
 44class BacktestService:
 45    """
 46    Handles the orchestration, execution, persistence, and retrieval of backtests and related performance metrics.
 47
 48    This service acts as the central component for running backtests on trading strategies, saving their results,
 49    and performing various queries on historical performance data. It supports streaming logs, asynchronous execution,
 50    and interaction with the database and configuration layers.
 51
 52    Main features include:
 53    - Running and saving individual or batch backtests across strategy/ticker/timeframe/risk combinations.
 54    - Loading strategies dynamically and executing them with proper capital, leverage, and configuration.
 55    - Persisting detailed performance metrics and trade history in the database.
 56    - Performing deletions and cleanup of historical test data (including files and DB records).
 57    - Providing filtering, querying, and tagging of backtests (e.g. by performance, robustness, favorites).
 58    - Identifying robust strategies via custom metrics and filters (e.g. return/drawdown ratio > 1).
 59    - Managing test dependencies such as Monte Carlo, Luck Test, and Random Test results.
 60
 61    Attributes:
 62        db_service (DbService): Handles database connections and CRUD operations.
 63        bot_service (BotService): Manages bot records used during backtesting.
 64        config_service (ConfigService): Provides access to configuration values (e.g. risk-free rate).
 65
 66    Notes:
 67        - Backtests can be persisted with reports or run temporarily.
 68        - Streaming logs (via queues) allow real-time feedback when executing long-running backtests.
 69        - This class assumes a full application context with all entities (Bot, BotPerformance, Trade, etc.) properly defined.
 70    """
 71    def __init__(self):
 72        self.db_service = DbService()
 73        self.bot_service = BotService()
 74        self.config_service = ConfigService()
 75    
 76    async def async_iterator(self, iterable):
 77        # Si iterable es una lista, la envolvemos en una iteración asíncrona
 78        for item in iterable:
 79            yield item
 80            await asyncio.sleep(0)
 81
 82    @streaming_endpoint
 83    async def run_backtest(
 84        self,
 85        initial_cash: float,
 86        strategy: Strategy,
 87        ticker: Ticker,
 88        timeframe: Timeframe,
 89        date_from: pd.Timestamp,
 90        date_to: pd.Timestamp,
 91        method: str,
 92        risk: float,
 93        save_bt_plot: str,
 94        queue: asyncio.Queue,
 95        ) -> AsyncGenerator[str, None]:
 96        """
 97        Executes an asynchronous backtest for a given trading strategy and returns performance metrics, 
 98        trade details, and statistics via a streaming endpoint.
 99
100        This function loads the specified strategy, fetches historical price data, applies leverage rules,
101        and runs a backtest simulation. Results are streamed in real-time through an asyncio queue, 
102        and optional plots/reports are generated.
103
104        Steps performed:
105        - Loads the trading strategy dynamically from a module path.
106        - Retrieves leverage rules from a YAML config file.
107        - Fetches and prepares historical price data for the given ticker/timeframe.
108        - Computes margin requirements based on leverage.
109        - Executes the backtest using the strategy's logic and calculates performance metrics.
110        - Generates and saves backtest plots (temporary or persistent) if requested.
111        - Streams progress updates, logs, and final results through the provided queue.
112
113        Parameters:
114        - initial_cash (float): Starting capital for the backtest simulation.
115        - strategy (Strategy): Strategy object containing the module path and name.
116        - ticker (Ticker): Financial instrument to backtest on (e.g., currency pair, stock).
117        - timeframe (Timeframe): Timeframe for price data (e.g., '1H', '4H').
118        - date_from (pd.Timestamp): Start date for historical data.
119        - date_to (pd.Timestamp): End date for historical data.
120        - method (str): Reserved for future backtest variations (unused in current implementation).
121        - risk (float): Risk percentage per trade (e.g., 0.01 for 1% risk).
122        - save_bt_plot (str): Plot saving mode: 
123            - 'temp': Saves in a temporary directory (auto-cleaned later).
124            - 'persist': Saves in the main backtest_plots directory.
125            - Any other value skips plot generation.
126        - queue (asyncio.Queue): Async queue for real-time progress streaming.
127
128        Returns:
129        AsyncGenerator[str, None]: Yields three objects upon completion:
130        - performance (pd.DataFrame): Aggregated strategy performance metrics.
131        - trade_performance (pd.DataFrame): Detailed trade-by-trade results.
132        - stats (pd.DataFrame): Statistical summaries (e.g., Sharpe ratio, max drawdown).
133
134        Side effects:
135        - Reads leverage configurations from './app/configs/leverages.yml'.
136        - May create HTML plot files in either './app/templates/static/backtest_plots/temp' (temporary)
137        or './app/templates/static/backtest_plots' (persistent).
138        - Streams logs/results via the provided asyncio.Queue (e.g., for frontend progress updates).
139
140        Notes:
141        - The strategy module path must follow the convention: 'app.backbone.strategies.{strategy_name}'.
142        - Temporary plots include a timestamp and are automatically purged by a separate cleanup process.
143        - Risk-free rate is fetched from the application's config service (used for Sharpe ratio calculations).
144        - Margin is calculated as 1/leverage (e.g., 50x leverage → 2% margin requirement).
145        """
146        await log_message(queue, '', '', "log", f"Loading strategy {strategy.Name}")
147        
148        strategy_name = strategy.Name.split(".")[1]
149        bot_name = f'{strategy_name}_{ticker.Name}_{timeframe.Name}_{risk}'
150        
151        strategy_path = 'app.backbone.strategies.' + strategy.Name
152        strategy_func = load_function(strategy_path)
153        
154        await log_message(queue, '', '', "log", f"Strategy {strategy.Name} loaded succesfully")
155        await log_message(queue, '', '', "log", "Loading leverages")
156
157        with open("./app/configs/leverages.yml", "r") as file_name:
158            leverages = yaml.safe_load(file_name)
159            await log_message(queue, '', '', "log", "Leverages loaded")
160
161        leverage = leverages[ticker.Name]
162        margin = 1 / leverage
163        
164        await log_message(queue, ticker.Name, timeframe.Name, "log", "Getting data")
165
166        prices = get_data(ticker.Name, timeframe.MetaTraderNumber, date_from, date_to)
167        prices.index = pd.to_datetime(prices.index)
168
169        await log_message(queue, ticker.Name, timeframe.Name, "log", f"Data ready: {prices.head(1)}")
170
171        await log_message(queue, ticker.Name, timeframe.Name, "log", "Starting backtest")
172
173        filename = f'{bot_name}_{date_from.strftime("%Y%m%d")}_{date_to.strftime("%Y%m%d")}'
174        plot_path = None
175
176        if save_bt_plot == 'temp':
177            plot_path = './app/templates/static/backtest_plots/temp'
178            await log_message(queue, ticker.Name, timeframe.Name, "link", os.path.join('/backtest_plots/temp', filename + '.html'))
179
180        elif save_bt_plot == 'persist':
181            plot_path = './app/templates/static/backtest_plots'
182
183        risk_free_rate = float(self.config_service.get_by_name('RiskFreeRate').Value)
184
185        performance, trade_performance, stats = run_strategy_and_get_performances(
186            strategy=strategy_func,
187            ticker=ticker,
188            timeframe=timeframe,
189            risk=risk,
190            prices=prices,
191            initial_cash=initial_cash,
192            risk_free_rate=risk_free_rate,
193            margin=margin,
194            plot_path=plot_path,
195            file_name = filename,
196            save_report= save_bt_plot == 'persist' # Solo genera el reporte si no es una corrida temporal
197        )
198
199        await log_message(queue, ticker.Name, timeframe.Name, "log", "The backtest is done :)")
200
201        await log_message(queue, ticker.Name, timeframe.Name, "completed", f"{stats.to_string()}")
202        
203        return performance, trade_performance, stats
204
205    @streaming_endpoint
206    async def run_backtests_and_save(
207        self,
208        initial_cash: float,
209        strategies: Strategy | List[Strategy],
210        tickers: Ticker | List[Ticker],
211        timeframes: List[Timeframe],
212        date_from: date,
213        date_to: date,
214        method: str,
215        risks: float | List[float],
216        save_bt_plot: str,
217        queue: asyncio.Queue,
218        ) -> AsyncGenerator[str, None]:
219        """
220        Executes multiple backtests in parallel for all combinations of strategies, tickers, timeframes and risks,
221        then saves results to the database while streaming progress updates.
222
223        This function handles the complete backtesting pipeline:
224        - Generates all possible combinations of input parameters
225        - Checks for existing backtest results to avoid duplicate work
226        - Runs new backtests when needed using run_backtest()
227        - Saves performance metrics, trade details, and statistics to the database
228        - Provides real-time feedback through an async queue
229
230        Steps performed:
231        1. Normalizes all input parameters to lists (single items → single-element lists)
232        2. Generates all combinations of strategies/tickers/timeframes/risks
233        3. For each combination:
234        - Checks if bot exists in database
235        - Verifies if backtest already exists for the date range
236        - Runs new backtest if needed
237        - Converts results to database objects
238        - Saves all data (bot, performance, trades) transactionally
239        4. Streams progress, warnings, and completion messages via queue
240
241        Parameters:
242        - initial_cash (float): Starting capital for all backtests
243        - strategies (Strategy|List[Strategy]): Single strategy or list to test
244        - tickers (Ticker|List[Ticker]): Financial instrument(s) to backtest
245        - timeframes (List[Timeframe]): Time intervals to test (e.g. ['1H', '4H'])
246        - date_from (date): Start date for historical data
247        - date_to (date): End date for historical data
248        - method (str): Backtesting methodology identifier
249        - risks (float|List[float]): Risk percentage(s) per trade (e.g. [0.01, 0.02])
250        - save_bt_plot (str): Plot saving mode:
251            - 'temp': Temporary storage
252            - 'persist': Permanent storage
253            - Other: Skip plot generation
254        - queue (asyncio.Queue): Async queue for progress streaming
255
256        Returns:
257        - AsyncGenerator[str, None]: Yields list of saved BotPerformance objects when complete
258
259        Side effects:
260        - Creates/updates database records for:
261            - Bot configurations
262            - Performance metrics
263            - Individual trades
264        - May generate plot files depending on save_bt_plot parameter
265        - Streams messages to provided queue including:
266            - Progress logs
267            - Warnings about duplicates
268            - Completion/failure notifications
269
270        Notes:
271        - Automatically skips existing backtests for the same parameters/date range
272        - Uses atomic database transactions to ensure data consistency
273        - New bots are created if they don't exist in the database
274        - Trade history is extracted from backtest stats and saved relationally
275        - All database operations use the service's db_service interface
276        - Failures for individual combinations don't stop overall execution
277        """
278        try:
279            backtests = []
280            combinations = None
281
282            strategies = [strategies] if type(strategies) != list else strategies
283            tickers = [tickers] if type(tickers) != list else tickers
284            timeframes = [timeframes] if type(timeframes) != list else timeframes
285            risks = [risks] if type(risks) != list else risks
286  
287            combinations = itertools.product(strategies, tickers, timeframes, risks)
288
289            async for combination in self.async_iterator(list(combinations)):
290                strategy, ticker, timeframe, risk = combination
291
292                strategy_name = strategy.Name.split(".")[1]
293
294                await log_message(queue, ticker.Name, timeframe.Name, "log", "Starting")
295                
296                bot_name = f'{strategy_name}_{ticker.Name}_{timeframe.Name}_{risk}'
297                
298                await log_message(queue, ticker.Name, timeframe.Name, "log", "Checking bot in the database")
299
300                bot = self.bot_service.get_bot(
301                    strategy_id=strategy.Id,
302                    ticker_id=ticker.Id,
303                    timeframe_id=timeframe.Id,
304                    risk=risk   
305                )
306                
307                bot_exists = False
308
309                if bot:
310                    await log_message(queue, ticker.Name, timeframe.Name, "log", "The bot already exists")
311
312                    bot_exists = True
313
314                    await log_message(queue, ticker.Name, timeframe.Name, "log", "Checking backtest in the database")
315                    result_performance = self.get_performances_by_bot_dates(
316                        bot_id=bot.Id, 
317                        date_from=date_from, 
318                        date_to=date_to
319                    )
320                    
321                    if result_performance:
322                        backtests.append(result_performance)
323                        await log_message(queue, ticker.Name, timeframe.Name, "warning", "The backtest already exists. Skipping...")
324                        continue
325
326                else:
327                    await log_message(queue, ticker.Name, timeframe.Name, "log", "The bot is not in the database")
328                    bot = Bot(
329                        Name=bot_name,
330                        StrategyId=strategy.Id,
331                        TickerId=ticker.Id,
332                        TimeframeId=timeframe.Id,
333                        Risk=risk
334                    )
335
336                try:
337                    performance, trade_performance, stats = await self.run_backtest(
338                        initial_cash=initial_cash,
339                        strategy=strategy,
340                        ticker=ticker,
341                        timeframe=timeframe,
342                        date_from=date_from,
343                        date_to=date_to,
344                        method=method,
345                        risk=risk,
346                        save_bt_plot=save_bt_plot,
347                        queue=queue,
348                    )
349
350                    await log_message(queue, ticker.Name, timeframe.Name, "log", "Saving metrics in the database")
351
352                    bot_performance_for_db = _performance_from_df_to_obj(
353                        performance, 
354                        date_from, 
355                        date_to, 
356                        risk, 
357                        method, 
358                        bot,
359                        initial_cash,
360                    )
361                    
362                    trade_performance_for_db = [BotTradePerformance(**row) for _, row in trade_performance.iterrows()].pop()
363                    trade_performance_for_db.BotPerformance = bot_performance_for_db 
364                    
365                    with self.db_service.get_database() as db:
366                        if not bot_exists:
367                            self.db_service.create(db, bot)
368                        
369                        backtests.append(self.db_service.create(db, bot_performance_for_db))
370
371                        self.db_service.create(db, trade_performance_for_db)
372                        
373                        trade_history = trades_from_df_to_obj(stats._trades)
374
375                        for trade in trade_history:
376                            trade.BotPerformance = bot_performance_for_db
377                            self.db_service.create(db, trade)
378
379                    await log_message(queue, ticker.Name, timeframe.Name, "completed", "It's done buddy ;)")
380                
381                except Exception as e:
382                    await log_message(queue, '', '', "failed", f"{str(e)}")
383            
384            return backtests
385            
386        except Exception as e:
387            await log_message(queue, '', '', "failed", f"{str(e)}")
388
389    def get_performances_by_strategy_ticker(self, strategy_id, ticker_id) -> BotPerformance:
390        with self.db_service.get_database() as db:
391            bot_performances = (
392                db.query(BotPerformance)
393                .join(Bot, Bot.Id == BotPerformance.BotId)
394                .filter(Bot.TickerId == ticker_id)
395                .filter(Bot.StrategyId == strategy_id)
396                .order_by(desc(BotPerformance.StabilityWeightedRar))
397                .all()
398            )
399
400            return bot_performances
401
402    def get_performances_by_bot_dates(self, bot_id, date_from, date_to) -> BotPerformance:
403        with self.db_service.get_database() as db:
404            bot_performance = (
405                db.query(BotPerformance)
406                .options(
407                    joinedload(BotPerformance.RandomTest).joinedload(RandomTest.RandomTestPerformance),
408                    joinedload(BotPerformance.LuckTest).joinedload(LuckTest.LuckTestPerformance),
409                )
410                .filter(BotPerformance.BotId == bot_id)
411                .filter(BotPerformance.DateFrom == date_from)
412                .filter(BotPerformance.DateTo == date_to)
413                .first()
414            )
415            
416            return bot_performance
417     
418    def get_performance_by_bot(self, bot_id) -> BotPerformance: # cambiar bot_id por backtest_id
419        with self.db_service.get_database() as db:
420            bot_performance = self.db_service.get_many_by_filter(db, BotPerformance, BotId=bot_id)  
421            return bot_performance
422            
423    def get_bot_performance_by_id(self, bot_performance_id) -> BotPerformance: # cambiar bot_id por backtest_id
424        with self.db_service.get_database() as db:
425            bot_performance = self.db_service.get_by_id(db, BotPerformance, id=bot_performance_id)  
426
427            return bot_performance
428
429    def delete_from_strategy(self, strategy_id) -> OperationResult:
430        with self.db_service.get_database() as db:
431            bp_subq = (
432                select(BotPerformance.Id)
433                .join(Bot, BotPerformance.BotId == Bot.Id)
434                .where(Bot.StrategyId == strategy_id)
435            ).subquery()
436
437            db.execute(
438                delete(MetricWharehouse).where(
439                    MetricWharehouse.MontecarloTestId.in_(
440                        select(MontecarloTest.Id).where(MontecarloTest.BotPerformanceId.in_(bp_subq))
441                    )
442                )
443            )
444
445            db.execute(
446                delete(MontecarloTest).where(MontecarloTest.BotPerformanceId.in_(bp_subq))
447            )
448
449            db.execute(
450                delete(RandomTest).where(RandomTest.BotPerformanceId.in_(bp_subq))
451            )
452
453            lucktest_bp_subq = (
454                select(LuckTest.LuckTestPerformanceId)
455                .where(LuckTest.BotPerformanceId.in_(bp_subq))
456            ).subquery()
457
458            bot_performances = db.query(BotPerformance).where(BotPerformance.Id.in_(bp_subq)).all()
459
460            db.execute(delete(BotPerformance).where(BotPerformance.Id.in_(lucktest_bp_subq)))
461
462            db.execute(
463                delete(LuckTest).where(LuckTest.BotPerformanceId.in_(bp_subq))
464            )
465
466            db.execute(
467                delete(BotTradePerformance).where(BotTradePerformance.BotPerformanceId.in_(bp_subq))
468            )
469
470            db.execute(
471                delete(Trade).where(Trade.BotPerformanceId.in_(bp_subq))
472            )
473
474            db.execute(
475                delete(BotPerformance).where(BotPerformance.Id.in_(bp_subq))
476            )
477
478            db.execute(
479                delete(Bot).where(Bot.StrategyId == strategy_id)
480            )
481        
482        # Borrar archivos relacionados
483        for bot_performance in bot_performances:
484            str_date_from = str(bot_performance.DateFrom).replace('-', '')
485            str_date_to = str(bot_performance.DateTo).replace('-', '')
486            file_name = f'{bot_performance.Bot.Name}_{str_date_from}_{str_date_to}.html'
487            for folder in ['luck_test_plots', 'correlation_plots', 't_test_plots', 'backtest_plots', 'backtest_plots/reports']:
488                path = os.path.join('./app/templates/static/', folder, file_name)
489                if os.path.exists(path):
490                    os.remove(path)
491
492        return OperationResult(ok=True, message=None, item=None)
493
494
495    def delete(self, bot_performance) -> OperationResult:
496        with self.db_service.get_database() as db:
497            # Cargar el objeto con relaciones necesarias
498            bot_performance_id = bot_performance if isinstance(bot_performance, int) else bot_performance.Id
499
500            bot_performance = db.query(BotPerformance)\
501                .options(
502                    joinedload(BotPerformance.BotTradePerformance),
503                    joinedload(BotPerformance.Bot),
504                    joinedload(BotPerformance.LuckTest),
505                    joinedload(BotPerformance.RandomTest),
506                )\
507                .filter(BotPerformance.Id == bot_performance_id)\
508                .first()
509
510            if not bot_performance:
511                return OperationResult(ok=False, message="BotPerformance not found", item=None)
512
513            # Eliminar relaciones directas
514            if bot_performance.BotTradePerformance:
515                self.db_service.delete(db, BotTradePerformance, bot_performance.BotTradePerformance.Id)
516
517            # Eliminar Montecarlo y métricas asociadas
518            montecarlo_test = self.db_service.get_by_filter(db, MontecarloTest, BotPerformanceId=bot_performance_id)
519            if montecarlo_test:
520                self.db_service.delete_many_by_filter(db, MetricWharehouse, MontecarloTestId=montecarlo_test.Id)
521                self.db_service.delete(db, MontecarloTest, montecarlo_test.Id)
522
523            # Eliminar LuckTests y sus performances
524            luck_tests = self.db_service.get_many_by_filter(db, LuckTest, BotPerformanceId=bot_performance_id)
525            for lt in luck_tests:
526                self.db_service.delete(db, BotPerformance, lt.LuckTestPerformanceId)
527                self.db_service.delete(db, LuckTest, lt.Id)
528
529            # Borrar archivos relacionados
530            str_date_from = str(bot_performance.DateFrom).replace('-', '')
531            str_date_to = str(bot_performance.DateTo).replace('-', '')
532            file_name = f'{bot_performance.Bot.Name}_{str_date_from}_{str_date_to}.html'
533            for folder in ['luck_test_plots', 'correlation_plots', 't_test_plots', 'backtest_plots', 'backtest_plots/reports']:
534                path = os.path.join('./app/templates/static/', folder, file_name)
535                if os.path.exists(path):
536                    os.remove(path)
537
538            # Eliminar RandomTests y sus dependencias
539            if bot_performance.RandomTest:
540                rt = bot_performance.RandomTest
541                rt_perf = self.db_service.get_by_id(db, BotPerformance, rt.RandomTestPerformanceId)
542                if rt_perf:
543                    rt_trade_perf = self.db_service.get_by_filter(db, BotTradePerformance, BotPerformanceId=rt_perf.Id)
544                    if rt_trade_perf:
545                        self.db_service.delete(db, BotTradePerformance, rt_trade_perf.Id)
546                    self.db_service.delete(db, BotPerformance, rt.RandomTestPerformanceId)
547                self.db_service.delete(db, RandomTest, rt.Id)
548
549            # Borrar trades relacionados
550            self.db_service.delete_many_by_filter(db, Trade, BotPerformanceId=bot_performance_id)
551
552            # Si no hay más performance, borrar el Bot
553            if bot_performance.BotId:
554                rem = db.query(BotPerformance).filter(BotPerformance.BotId == bot_performance.BotId).count()
555                if rem == 1:
556                    self.db_service.delete(db, Bot, bot_performance.BotId)
557
558            self.db_service.delete(db, BotPerformance, bot_performance.Id)
559            self.db_service.save(db)
560
561        return OperationResult(ok=True, message=None, item=None)
562
563    
564    def update_favorite(self, performance_id) -> OperationResult:
565
566        with self.db_service.get_database() as db:
567            performance = self.db_service.get_by_id(db, BotPerformance, performance_id)
568            
569            if not performance:
570                return OperationResult(ok=False, message='Backtest not found', item=None)
571            
572            performance.Favorite = not performance.Favorite
573            updated_performance = self.db_service.update(db, BotPerformance, performance)   
574
575            if not updated_performance:
576                return OperationResult(ok=False, message='Update failed', item=None)
577
578            return OperationResult(ok=True, message=None, item=updated_performance.Favorite)
579
580    def get_robusts(self) -> List[BotPerformance]:
581        with self.db_service.get_database() as db:
582            # Subquery para filtrar estrategias robustas (RreturnDd promedio > 1)
583            subquery = (
584                db.query(
585                    Bot.StrategyId,
586                    Bot.TickerId,
587                )
588                .join(BotPerformance, Bot.Id == BotPerformance.BotId)
589                .join(Timeframe, Bot.TimeframeId == Timeframe.Id)
590                .filter(
591                    BotPerformance.RreturnDd != "NaN",
592                    Timeframe.Selected == True
593                )
594                .group_by(Bot.StrategyId, Bot.TickerId)
595                .having(func.avg(BotPerformance.RreturnDd) > 1)
596                .subquery()
597            )
598
599            # Alias para evitar ambigüedad
600            bot_alias = aliased(Bot)
601            bp_alias = aliased(BotPerformance)
602
603            # Subquery para asignar un número de fila basado en StabilityWeightedRar
604            ranked_subquery = (
605                db.query(
606                    bp_alias.Id.label("bp_id"),
607                    func.row_number()
608                    .over(
609                        partition_by=[bot_alias.StrategyId, bot_alias.TickerId],
610                        order_by=bp_alias.StabilityWeightedRar.desc(),
611                    )
612                    .label("row_num"),
613                )
614                .join(bot_alias, bot_alias.Id == bp_alias.BotId)
615                .join(
616                    subquery,
617                    (bot_alias.StrategyId == subquery.c.StrategyId)
618                    & (bot_alias.TickerId == subquery.c.TickerId),
619                )
620                .subquery()
621            )
622
623            # Query final seleccionando solo los mejores (row_num == 1)
624            data = (
625                db.query(bp_alias)
626                .join(ranked_subquery, bp_alias.Id == ranked_subquery.c.bp_id)
627                .filter(ranked_subquery.c.row_num == 1)
628                .all()
629            )
630
631            return data
632               
633    def get_robusts_by_strategy_id(self, strategy_id) -> List[BotPerformance]:
634        with self.db_service.get_database() as db:
635            # Subquery para calcular el promedio de RreturnDd por StrategyId y TickerId
636            subquery = (
637                db.query(
638                    Bot.StrategyId,
639                    Bot.TickerId,
640                )
641                .join(BotPerformance, Bot.Id == BotPerformance.BotId)
642                .join(Timeframe, Bot.TimeframeId == Timeframe.Id)
643                .filter(
644                    Bot.StrategyId == strategy_id,
645                    BotPerformance.RreturnDd != "NaN",
646                    Timeframe.Selected == True,
647                )
648                .group_by(Bot.StrategyId, Bot.TickerId)
649                .having(func.avg(BotPerformance.RreturnDd) >= 1)  # >= en lugar de > para incluir exactamente 1
650                .subquery()
651            )
652
653            # Alias para evitar ambigüedad en las relaciones
654            bot_alias = aliased(Bot)
655            bp_alias = aliased(BotPerformance)
656
657            # Subquery para seleccionar la mejor temporalidad por StrategyId - TickerId
658            best_temporalities = (
659                db.query(
660                    bp_alias.BotId,
661                    bot_alias.StrategyId,
662                    bot_alias.TickerId,
663                    func.max(bp_alias.StabilityWeightedRar).label("max_custom_metric")
664                )
665                .join(bot_alias, bot_alias.Id == bp_alias.BotId)
666                .join(subquery, (bot_alias.StrategyId == subquery.c.StrategyId) & (bot_alias.TickerId == subquery.c.TickerId))
667                .group_by(bot_alias.StrategyId, bot_alias.TickerId)
668                .subquery()
669            )
670
671            # Query final para traer solo los registros que corresponden a la mejor temporalidad
672            data = (
673                db.query(bp_alias)
674                .join(bot_alias, bot_alias.Id == bp_alias.BotId)
675                .join(best_temporalities,
676                    (bot_alias.StrategyId == best_temporalities.c.StrategyId) &
677                    (bot_alias.TickerId == best_temporalities.c.TickerId) &
678                    (bp_alias.StabilityWeightedRar == best_temporalities.c.max_custom_metric))
679                .all()
680            )
681
682            return data
683
684    def get_favorites(self) -> List[BotPerformance]:
685        with self.db_service.get_database() as db:
686            favorites = self.db_service.get_many_by_filter(db, BotPerformance, Favorite=True)
687        
688        return favorites
689        
690    def get_trades(self, bot_performance_id:int) -> List[Trade]:
691        with self.db_service.get_database() as db:
692            bot_performance = self.db_service.get_by_id(db, BotPerformance, id=bot_performance_id)  
693            return bot_performance.TradeHistory
694
695    def get_by_filter(
696        self,
697        return_: float = None,
698        drawdown: float = None,
699        stability_ratio: float = None,
700        sharpe_ratio: float = None,
701        trades: float = None,
702        rreturn_dd: float = None,
703        custom_metric: float = None,
704        winrate: float = None,
705        strategy: str = None,
706        ticker: str = None
707    ) -> List[BotPerformance]:
708        with self.db_service.get_database() as db:
709            filters = []
710
711            # Base query y joins explícitos
712            query = db.query(BotPerformance)\
713                .join(Bot, Bot.Id == BotPerformance.BotId)\
714                .join(Strategy, Strategy.Id == Bot.StrategyId)\
715                .join(Ticker, Ticker.Id == Bot.TickerId)
716
717            # Filtros numéricos
718            if return_ is not None:
719                filters.append(BotPerformance.Return >= return_)
720            if drawdown is not None:
721                filters.append(BotPerformance.Drawdown <= drawdown)
722            if stability_ratio is not None:
723                filters.append(BotPerformance.StabilityRatio >= stability_ratio)
724            if sharpe_ratio is not None:
725                filters.append(BotPerformance.SharpeRatio >= sharpe_ratio)
726            if trades is not None:
727                filters.append(BotPerformance.Trades >= trades)
728            if rreturn_dd is not None:
729                filters.append(BotPerformance.RreturnDd >= rreturn_dd)
730            if custom_metric is not None:
731                filters.append(BotPerformance.StabilityWeightedRar >= custom_metric)
732            if winrate is not None:
733                filters.append(BotPerformance.WinRate >= winrate)
734
735            # Filtros por texto con ILIKE correctamente aplicados
736            if strategy:
737                filters.append(Strategy.Name.ilike(f"%{strategy}%"))
738            if ticker:
739                filters.append(Ticker.Name.ilike(f"%{ticker}%"))
740
741            # Query final que aplica filtros correctamente
742            final_query = (
743                query
744                .filter(*filters)
745                .options(
746                    joinedload(BotPerformance.Bot),
747                    joinedload(BotPerformance.BotTradePerformance),
748                    joinedload(BotPerformance.TradeHistory),
749                    joinedload(BotPerformance.MontecarloTest),
750                    joinedload(BotPerformance.LuckTest),
751                    joinedload(BotPerformance.RandomTest),
752                )
753                .order_by(BotPerformance.RreturnDd.desc())
754                .all()
755            )
756
757            return final_query

Handles the orchestration, execution, persistence, and retrieval of backtests and related performance metrics.

This service acts as the central component for running backtests on trading strategies, saving their results, and performing various queries on historical performance data. It supports streaming logs, asynchronous execution, and interaction with the database and configuration layers.

Main features include:

  • Running and saving individual or batch backtests across strategy/ticker/timeframe/risk combinations.
  • Loading strategies dynamically and executing them with proper capital, leverage, and configuration.
  • Persisting detailed performance metrics and trade history in the database.
  • Performing deletions and cleanup of historical test data (including files and DB records).
  • Providing filtering, querying, and tagging of backtests (e.g. by performance, robustness, favorites).
  • Identifying robust strategies via custom metrics and filters (e.g. return/drawdown ratio > 1).
  • Managing test dependencies such as Monte Carlo, Luck Test, and Random Test results.

Attributes: db_service (DbService): Handles database connections and CRUD operations. bot_service (BotService): Manages bot records used during backtesting. config_service (ConfigService): Provides access to configuration values (e.g. risk-free rate).

Notes: - Backtests can be persisted with reports or run temporarily. - Streaming logs (via queues) allow real-time feedback when executing long-running backtests. - This class assumes a full application context with all entities (Bot, BotPerformance, Trade, etc.) properly defined.

db_service
bot_service
config_service
async def async_iterator(self, iterable):
76    async def async_iterator(self, iterable):
77        # Si iterable es una lista, la envolvemos en una iteración asíncrona
78        for item in iterable:
79            yield item
80            await asyncio.sleep(0)
@streaming_endpoint
async def run_backtest( self, initial_cash: float, strategy: app.backbone.entities.strategy.Strategy, ticker: app.backbone.entities.ticker.Ticker, timeframe: app.backbone.entities.timeframe.Timeframe, date_from: pandas._libs.tslibs.timestamps.Timestamp, date_to: pandas._libs.tslibs.timestamps.Timestamp, method: str, risk: float, save_bt_plot: str, queue: asyncio.queues.Queue) -> AsyncGenerator[str, NoneType]:
 82    @streaming_endpoint
 83    async def run_backtest(
 84        self,
 85        initial_cash: float,
 86        strategy: Strategy,
 87        ticker: Ticker,
 88        timeframe: Timeframe,
 89        date_from: pd.Timestamp,
 90        date_to: pd.Timestamp,
 91        method: str,
 92        risk: float,
 93        save_bt_plot: str,
 94        queue: asyncio.Queue,
 95        ) -> AsyncGenerator[str, None]:
 96        """
 97        Executes an asynchronous backtest for a given trading strategy and returns performance metrics, 
 98        trade details, and statistics via a streaming endpoint.
 99
100        This function loads the specified strategy, fetches historical price data, applies leverage rules,
101        and runs a backtest simulation. Results are streamed in real-time through an asyncio queue, 
102        and optional plots/reports are generated.
103
104        Steps performed:
105        - Loads the trading strategy dynamically from a module path.
106        - Retrieves leverage rules from a YAML config file.
107        - Fetches and prepares historical price data for the given ticker/timeframe.
108        - Computes margin requirements based on leverage.
109        - Executes the backtest using the strategy's logic and calculates performance metrics.
110        - Generates and saves backtest plots (temporary or persistent) if requested.
111        - Streams progress updates, logs, and final results through the provided queue.
112
113        Parameters:
114        - initial_cash (float): Starting capital for the backtest simulation.
115        - strategy (Strategy): Strategy object containing the module path and name.
116        - ticker (Ticker): Financial instrument to backtest on (e.g., currency pair, stock).
117        - timeframe (Timeframe): Timeframe for price data (e.g., '1H', '4H').
118        - date_from (pd.Timestamp): Start date for historical data.
119        - date_to (pd.Timestamp): End date for historical data.
120        - method (str): Reserved for future backtest variations (unused in current implementation).
121        - risk (float): Risk percentage per trade (e.g., 0.01 for 1% risk).
122        - save_bt_plot (str): Plot saving mode: 
123            - 'temp': Saves in a temporary directory (auto-cleaned later).
124            - 'persist': Saves in the main backtest_plots directory.
125            - Any other value skips plot generation.
126        - queue (asyncio.Queue): Async queue for real-time progress streaming.
127
128        Returns:
129        AsyncGenerator[str, None]: Yields three objects upon completion:
130        - performance (pd.DataFrame): Aggregated strategy performance metrics.
131        - trade_performance (pd.DataFrame): Detailed trade-by-trade results.
132        - stats (pd.DataFrame): Statistical summaries (e.g., Sharpe ratio, max drawdown).
133
134        Side effects:
135        - Reads leverage configurations from './app/configs/leverages.yml'.
136        - May create HTML plot files in either './app/templates/static/backtest_plots/temp' (temporary)
137        or './app/templates/static/backtest_plots' (persistent).
138        - Streams logs/results via the provided asyncio.Queue (e.g., for frontend progress updates).
139
140        Notes:
141        - The strategy module path must follow the convention: 'app.backbone.strategies.{strategy_name}'.
142        - Temporary plots include a timestamp and are automatically purged by a separate cleanup process.
143        - Risk-free rate is fetched from the application's config service (used for Sharpe ratio calculations).
144        - Margin is calculated as 1/leverage (e.g., 50x leverage → 2% margin requirement).
145        """
146        await log_message(queue, '', '', "log", f"Loading strategy {strategy.Name}")
147        
148        strategy_name = strategy.Name.split(".")[1]
149        bot_name = f'{strategy_name}_{ticker.Name}_{timeframe.Name}_{risk}'
150        
151        strategy_path = 'app.backbone.strategies.' + strategy.Name
152        strategy_func = load_function(strategy_path)
153        
154        await log_message(queue, '', '', "log", f"Strategy {strategy.Name} loaded succesfully")
155        await log_message(queue, '', '', "log", "Loading leverages")
156
157        with open("./app/configs/leverages.yml", "r") as file_name:
158            leverages = yaml.safe_load(file_name)
159            await log_message(queue, '', '', "log", "Leverages loaded")
160
161        leverage = leverages[ticker.Name]
162        margin = 1 / leverage
163        
164        await log_message(queue, ticker.Name, timeframe.Name, "log", "Getting data")
165
166        prices = get_data(ticker.Name, timeframe.MetaTraderNumber, date_from, date_to)
167        prices.index = pd.to_datetime(prices.index)
168
169        await log_message(queue, ticker.Name, timeframe.Name, "log", f"Data ready: {prices.head(1)}")
170
171        await log_message(queue, ticker.Name, timeframe.Name, "log", "Starting backtest")
172
173        filename = f'{bot_name}_{date_from.strftime("%Y%m%d")}_{date_to.strftime("%Y%m%d")}'
174        plot_path = None
175
176        if save_bt_plot == 'temp':
177            plot_path = './app/templates/static/backtest_plots/temp'
178            await log_message(queue, ticker.Name, timeframe.Name, "link", os.path.join('/backtest_plots/temp', filename + '.html'))
179
180        elif save_bt_plot == 'persist':
181            plot_path = './app/templates/static/backtest_plots'
182
183        risk_free_rate = float(self.config_service.get_by_name('RiskFreeRate').Value)
184
185        performance, trade_performance, stats = run_strategy_and_get_performances(
186            strategy=strategy_func,
187            ticker=ticker,
188            timeframe=timeframe,
189            risk=risk,
190            prices=prices,
191            initial_cash=initial_cash,
192            risk_free_rate=risk_free_rate,
193            margin=margin,
194            plot_path=plot_path,
195            file_name = filename,
196            save_report= save_bt_plot == 'persist' # Solo genera el reporte si no es una corrida temporal
197        )
198
199        await log_message(queue, ticker.Name, timeframe.Name, "log", "The backtest is done :)")
200
201        await log_message(queue, ticker.Name, timeframe.Name, "completed", f"{stats.to_string()}")
202        
203        return performance, trade_performance, stats

Executes an asynchronous backtest for a given trading strategy and returns performance metrics, trade details, and statistics via a streaming endpoint.

This function loads the specified strategy, fetches historical price data, applies leverage rules, and runs a backtest simulation. Results are streamed in real-time through an asyncio queue, and optional plots/reports are generated.

Steps performed:

  • Loads the trading strategy dynamically from a module path.
  • Retrieves leverage rules from a YAML config file.
  • Fetches and prepares historical price data for the given ticker/timeframe.
  • Computes margin requirements based on leverage.
  • Executes the backtest using the strategy's logic and calculates performance metrics.
  • Generates and saves backtest plots (temporary or persistent) if requested.
  • Streams progress updates, logs, and final results through the provided queue.

Parameters:

  • initial_cash (float): Starting capital for the backtest simulation.
  • strategy (Strategy): Strategy object containing the module path and name.
  • ticker (Ticker): Financial instrument to backtest on (e.g., currency pair, stock).
  • timeframe (Timeframe): Timeframe for price data (e.g., '1H', '4H').
  • date_from (pd.Timestamp): Start date for historical data.
  • date_to (pd.Timestamp): End date for historical data.
  • method (str): Reserved for future backtest variations (unused in current implementation).
  • risk (float): Risk percentage per trade (e.g., 0.01 for 1% risk).
  • save_bt_plot (str): Plot saving mode:
    • 'temp': Saves in a temporary directory (auto-cleaned later).
    • 'persist': Saves in the main backtest_plots directory.
    • Any other value skips plot generation.
  • queue (asyncio.Queue): Async queue for real-time progress streaming.

Returns: AsyncGenerator[str, None]: Yields three objects upon completion:

  • performance (pd.DataFrame): Aggregated strategy performance metrics.
  • trade_performance (pd.DataFrame): Detailed trade-by-trade results.
  • stats (pd.DataFrame): Statistical summaries (e.g., Sharpe ratio, max drawdown).

Side effects:

  • Reads leverage configurations from './app/configs/leverages.yml'.
  • May create HTML plot files in either './app/templates/static/backtest_plots/temp' (temporary) or './app/templates/static/backtest_plots' (persistent).
  • Streams logs/results via the provided asyncio.Queue (e.g., for frontend progress updates).

Notes:

  • The strategy module path must follow the convention: 'app.backbone.strategies.{strategy_name}'.
  • Temporary plots include a timestamp and are automatically purged by a separate cleanup process.
  • Risk-free rate is fetched from the application's config service (used for Sharpe ratio calculations).
  • Margin is calculated as 1/leverage (e.g., 50x leverage → 2% margin requirement).
@streaming_endpoint
async def run_backtests_and_save( self, initial_cash: float, strategies: Union[app.backbone.entities.strategy.Strategy, List[app.backbone.entities.strategy.Strategy]], tickers: Union[app.backbone.entities.ticker.Ticker, List[app.backbone.entities.ticker.Ticker]], timeframes: List[app.backbone.entities.timeframe.Timeframe], date_from: datetime.date, date_to: datetime.date, method: str, risks: Union[float, List[float]], save_bt_plot: str, queue: asyncio.queues.Queue) -> AsyncGenerator[str, NoneType]:
205    @streaming_endpoint
206    async def run_backtests_and_save(
207        self,
208        initial_cash: float,
209        strategies: Strategy | List[Strategy],
210        tickers: Ticker | List[Ticker],
211        timeframes: List[Timeframe],
212        date_from: date,
213        date_to: date,
214        method: str,
215        risks: float | List[float],
216        save_bt_plot: str,
217        queue: asyncio.Queue,
218        ) -> AsyncGenerator[str, None]:
219        """
220        Executes multiple backtests in parallel for all combinations of strategies, tickers, timeframes and risks,
221        then saves results to the database while streaming progress updates.
222
223        This function handles the complete backtesting pipeline:
224        - Generates all possible combinations of input parameters
225        - Checks for existing backtest results to avoid duplicate work
226        - Runs new backtests when needed using run_backtest()
227        - Saves performance metrics, trade details, and statistics to the database
228        - Provides real-time feedback through an async queue
229
230        Steps performed:
231        1. Normalizes all input parameters to lists (single items → single-element lists)
232        2. Generates all combinations of strategies/tickers/timeframes/risks
233        3. For each combination:
234        - Checks if bot exists in database
235        - Verifies if backtest already exists for the date range
236        - Runs new backtest if needed
237        - Converts results to database objects
238        - Saves all data (bot, performance, trades) transactionally
239        4. Streams progress, warnings, and completion messages via queue
240
241        Parameters:
242        - initial_cash (float): Starting capital for all backtests
243        - strategies (Strategy|List[Strategy]): Single strategy or list to test
244        - tickers (Ticker|List[Ticker]): Financial instrument(s) to backtest
245        - timeframes (List[Timeframe]): Time intervals to test (e.g. ['1H', '4H'])
246        - date_from (date): Start date for historical data
247        - date_to (date): End date for historical data
248        - method (str): Backtesting methodology identifier
249        - risks (float|List[float]): Risk percentage(s) per trade (e.g. [0.01, 0.02])
250        - save_bt_plot (str): Plot saving mode:
251            - 'temp': Temporary storage
252            - 'persist': Permanent storage
253            - Other: Skip plot generation
254        - queue (asyncio.Queue): Async queue for progress streaming
255
256        Returns:
257        - AsyncGenerator[str, None]: Yields list of saved BotPerformance objects when complete
258
259        Side effects:
260        - Creates/updates database records for:
261            - Bot configurations
262            - Performance metrics
263            - Individual trades
264        - May generate plot files depending on save_bt_plot parameter
265        - Streams messages to provided queue including:
266            - Progress logs
267            - Warnings about duplicates
268            - Completion/failure notifications
269
270        Notes:
271        - Automatically skips existing backtests for the same parameters/date range
272        - Uses atomic database transactions to ensure data consistency
273        - New bots are created if they don't exist in the database
274        - Trade history is extracted from backtest stats and saved relationally
275        - All database operations use the service's db_service interface
276        - Failures for individual combinations don't stop overall execution
277        """
278        try:
279            backtests = []
280            combinations = None
281
282            strategies = [strategies] if type(strategies) != list else strategies
283            tickers = [tickers] if type(tickers) != list else tickers
284            timeframes = [timeframes] if type(timeframes) != list else timeframes
285            risks = [risks] if type(risks) != list else risks
286  
287            combinations = itertools.product(strategies, tickers, timeframes, risks)
288
289            async for combination in self.async_iterator(list(combinations)):
290                strategy, ticker, timeframe, risk = combination
291
292                strategy_name = strategy.Name.split(".")[1]
293
294                await log_message(queue, ticker.Name, timeframe.Name, "log", "Starting")
295                
296                bot_name = f'{strategy_name}_{ticker.Name}_{timeframe.Name}_{risk}'
297                
298                await log_message(queue, ticker.Name, timeframe.Name, "log", "Checking bot in the database")
299
300                bot = self.bot_service.get_bot(
301                    strategy_id=strategy.Id,
302                    ticker_id=ticker.Id,
303                    timeframe_id=timeframe.Id,
304                    risk=risk   
305                )
306                
307                bot_exists = False
308
309                if bot:
310                    await log_message(queue, ticker.Name, timeframe.Name, "log", "The bot already exists")
311
312                    bot_exists = True
313
314                    await log_message(queue, ticker.Name, timeframe.Name, "log", "Checking backtest in the database")
315                    result_performance = self.get_performances_by_bot_dates(
316                        bot_id=bot.Id, 
317                        date_from=date_from, 
318                        date_to=date_to
319                    )
320                    
321                    if result_performance:
322                        backtests.append(result_performance)
323                        await log_message(queue, ticker.Name, timeframe.Name, "warning", "The backtest already exists. Skipping...")
324                        continue
325
326                else:
327                    await log_message(queue, ticker.Name, timeframe.Name, "log", "The bot is not in the database")
328                    bot = Bot(
329                        Name=bot_name,
330                        StrategyId=strategy.Id,
331                        TickerId=ticker.Id,
332                        TimeframeId=timeframe.Id,
333                        Risk=risk
334                    )
335
336                try:
337                    performance, trade_performance, stats = await self.run_backtest(
338                        initial_cash=initial_cash,
339                        strategy=strategy,
340                        ticker=ticker,
341                        timeframe=timeframe,
342                        date_from=date_from,
343                        date_to=date_to,
344                        method=method,
345                        risk=risk,
346                        save_bt_plot=save_bt_plot,
347                        queue=queue,
348                    )
349
350                    await log_message(queue, ticker.Name, timeframe.Name, "log", "Saving metrics in the database")
351
352                    bot_performance_for_db = _performance_from_df_to_obj(
353                        performance, 
354                        date_from, 
355                        date_to, 
356                        risk, 
357                        method, 
358                        bot,
359                        initial_cash,
360                    )
361                    
362                    trade_performance_for_db = [BotTradePerformance(**row) for _, row in trade_performance.iterrows()].pop()
363                    trade_performance_for_db.BotPerformance = bot_performance_for_db 
364                    
365                    with self.db_service.get_database() as db:
366                        if not bot_exists:
367                            self.db_service.create(db, bot)
368                        
369                        backtests.append(self.db_service.create(db, bot_performance_for_db))
370
371                        self.db_service.create(db, trade_performance_for_db)
372                        
373                        trade_history = trades_from_df_to_obj(stats._trades)
374
375                        for trade in trade_history:
376                            trade.BotPerformance = bot_performance_for_db
377                            self.db_service.create(db, trade)
378
379                    await log_message(queue, ticker.Name, timeframe.Name, "completed", "It's done buddy ;)")
380                
381                except Exception as e:
382                    await log_message(queue, '', '', "failed", f"{str(e)}")
383            
384            return backtests
385            
386        except Exception as e:
387            await log_message(queue, '', '', "failed", f"{str(e)}")

Executes multiple backtests in parallel for all combinations of strategies, tickers, timeframes and risks, then saves results to the database while streaming progress updates.

This function handles the complete backtesting pipeline:

  • Generates all possible combinations of input parameters
  • Checks for existing backtest results to avoid duplicate work
  • Runs new backtests when needed using run_backtest()
  • Saves performance metrics, trade details, and statistics to the database
  • Provides real-time feedback through an async queue

Steps performed:

  1. Normalizes all input parameters to lists (single items → single-element lists)
  2. Generates all combinations of strategies/tickers/timeframes/risks
  3. For each combination:
  • Checks if bot exists in database
  • Verifies if backtest already exists for the date range
  • Runs new backtest if needed
  • Converts results to database objects
  • Saves all data (bot, performance, trades) transactionally
  1. Streams progress, warnings, and completion messages via queue

Parameters:

  • initial_cash (float): Starting capital for all backtests
  • strategies (Strategy|List[Strategy]): Single strategy or list to test
  • tickers (Ticker|List[Ticker]): Financial instrument(s) to backtest
  • timeframes (List[Timeframe]): Time intervals to test (e.g. ['1H', '4H'])
  • date_from (date): Start date for historical data
  • date_to (date): End date for historical data
  • method (str): Backtesting methodology identifier
  • risks (float|List[float]): Risk percentage(s) per trade (e.g. [0.01, 0.02])
  • save_bt_plot (str): Plot saving mode:
    • 'temp': Temporary storage
    • 'persist': Permanent storage
    • Other: Skip plot generation
  • queue (asyncio.Queue): Async queue for progress streaming

Returns:

  • AsyncGenerator[str, None]: Yields list of saved BotPerformance objects when complete

Side effects:

  • Creates/updates database records for:
    • Bot configurations
    • Performance metrics
    • Individual trades
  • May generate plot files depending on save_bt_plot parameter
  • Streams messages to provided queue including:
    • Progress logs
    • Warnings about duplicates
    • Completion/failure notifications

Notes:

  • Automatically skips existing backtests for the same parameters/date range
  • Uses atomic database transactions to ensure data consistency
  • New bots are created if they don't exist in the database
  • Trade history is extracted from backtest stats and saved relationally
  • All database operations use the service's db_service interface
  • Failures for individual combinations don't stop overall execution
def get_performances_by_strategy_ticker( self, strategy_id, ticker_id) -> app.backbone.entities.bot_performance.BotPerformance:
389    def get_performances_by_strategy_ticker(self, strategy_id, ticker_id) -> BotPerformance:
390        with self.db_service.get_database() as db:
391            bot_performances = (
392                db.query(BotPerformance)
393                .join(Bot, Bot.Id == BotPerformance.BotId)
394                .filter(Bot.TickerId == ticker_id)
395                .filter(Bot.StrategyId == strategy_id)
396                .order_by(desc(BotPerformance.StabilityWeightedRar))
397                .all()
398            )
399
400            return bot_performances
def get_performances_by_bot_dates( self, bot_id, date_from, date_to) -> app.backbone.entities.bot_performance.BotPerformance:
402    def get_performances_by_bot_dates(self, bot_id, date_from, date_to) -> BotPerformance:
403        with self.db_service.get_database() as db:
404            bot_performance = (
405                db.query(BotPerformance)
406                .options(
407                    joinedload(BotPerformance.RandomTest).joinedload(RandomTest.RandomTestPerformance),
408                    joinedload(BotPerformance.LuckTest).joinedload(LuckTest.LuckTestPerformance),
409                )
410                .filter(BotPerformance.BotId == bot_id)
411                .filter(BotPerformance.DateFrom == date_from)
412                .filter(BotPerformance.DateTo == date_to)
413                .first()
414            )
415            
416            return bot_performance
def get_performance_by_bot(self, bot_id) -> app.backbone.entities.bot_performance.BotPerformance:
418    def get_performance_by_bot(self, bot_id) -> BotPerformance: # cambiar bot_id por backtest_id
419        with self.db_service.get_database() as db:
420            bot_performance = self.db_service.get_many_by_filter(db, BotPerformance, BotId=bot_id)  
421            return bot_performance
def get_bot_performance_by_id( self, bot_performance_id) -> app.backbone.entities.bot_performance.BotPerformance:
423    def get_bot_performance_by_id(self, bot_performance_id) -> BotPerformance: # cambiar bot_id por backtest_id
424        with self.db_service.get_database() as db:
425            bot_performance = self.db_service.get_by_id(db, BotPerformance, id=bot_performance_id)  
426
427            return bot_performance
def delete_from_strategy( self, strategy_id) -> app.backbone.services.operation_result.OperationResult:
429    def delete_from_strategy(self, strategy_id) -> OperationResult:
430        with self.db_service.get_database() as db:
431            bp_subq = (
432                select(BotPerformance.Id)
433                .join(Bot, BotPerformance.BotId == Bot.Id)
434                .where(Bot.StrategyId == strategy_id)
435            ).subquery()
436
437            db.execute(
438                delete(MetricWharehouse).where(
439                    MetricWharehouse.MontecarloTestId.in_(
440                        select(MontecarloTest.Id).where(MontecarloTest.BotPerformanceId.in_(bp_subq))
441                    )
442                )
443            )
444
445            db.execute(
446                delete(MontecarloTest).where(MontecarloTest.BotPerformanceId.in_(bp_subq))
447            )
448
449            db.execute(
450                delete(RandomTest).where(RandomTest.BotPerformanceId.in_(bp_subq))
451            )
452
453            lucktest_bp_subq = (
454                select(LuckTest.LuckTestPerformanceId)
455                .where(LuckTest.BotPerformanceId.in_(bp_subq))
456            ).subquery()
457
458            bot_performances = db.query(BotPerformance).where(BotPerformance.Id.in_(bp_subq)).all()
459
460            db.execute(delete(BotPerformance).where(BotPerformance.Id.in_(lucktest_bp_subq)))
461
462            db.execute(
463                delete(LuckTest).where(LuckTest.BotPerformanceId.in_(bp_subq))
464            )
465
466            db.execute(
467                delete(BotTradePerformance).where(BotTradePerformance.BotPerformanceId.in_(bp_subq))
468            )
469
470            db.execute(
471                delete(Trade).where(Trade.BotPerformanceId.in_(bp_subq))
472            )
473
474            db.execute(
475                delete(BotPerformance).where(BotPerformance.Id.in_(bp_subq))
476            )
477
478            db.execute(
479                delete(Bot).where(Bot.StrategyId == strategy_id)
480            )
481        
482        # Borrar archivos relacionados
483        for bot_performance in bot_performances:
484            str_date_from = str(bot_performance.DateFrom).replace('-', '')
485            str_date_to = str(bot_performance.DateTo).replace('-', '')
486            file_name = f'{bot_performance.Bot.Name}_{str_date_from}_{str_date_to}.html'
487            for folder in ['luck_test_plots', 'correlation_plots', 't_test_plots', 'backtest_plots', 'backtest_plots/reports']:
488                path = os.path.join('./app/templates/static/', folder, file_name)
489                if os.path.exists(path):
490                    os.remove(path)
491
492        return OperationResult(ok=True, message=None, item=None)
def delete( self, bot_performance) -> app.backbone.services.operation_result.OperationResult:
495    def delete(self, bot_performance) -> OperationResult:
496        with self.db_service.get_database() as db:
497            # Cargar el objeto con relaciones necesarias
498            bot_performance_id = bot_performance if isinstance(bot_performance, int) else bot_performance.Id
499
500            bot_performance = db.query(BotPerformance)\
501                .options(
502                    joinedload(BotPerformance.BotTradePerformance),
503                    joinedload(BotPerformance.Bot),
504                    joinedload(BotPerformance.LuckTest),
505                    joinedload(BotPerformance.RandomTest),
506                )\
507                .filter(BotPerformance.Id == bot_performance_id)\
508                .first()
509
510            if not bot_performance:
511                return OperationResult(ok=False, message="BotPerformance not found", item=None)
512
513            # Eliminar relaciones directas
514            if bot_performance.BotTradePerformance:
515                self.db_service.delete(db, BotTradePerformance, bot_performance.BotTradePerformance.Id)
516
517            # Eliminar Montecarlo y métricas asociadas
518            montecarlo_test = self.db_service.get_by_filter(db, MontecarloTest, BotPerformanceId=bot_performance_id)
519            if montecarlo_test:
520                self.db_service.delete_many_by_filter(db, MetricWharehouse, MontecarloTestId=montecarlo_test.Id)
521                self.db_service.delete(db, MontecarloTest, montecarlo_test.Id)
522
523            # Eliminar LuckTests y sus performances
524            luck_tests = self.db_service.get_many_by_filter(db, LuckTest, BotPerformanceId=bot_performance_id)
525            for lt in luck_tests:
526                self.db_service.delete(db, BotPerformance, lt.LuckTestPerformanceId)
527                self.db_service.delete(db, LuckTest, lt.Id)
528
529            # Borrar archivos relacionados
530            str_date_from = str(bot_performance.DateFrom).replace('-', '')
531            str_date_to = str(bot_performance.DateTo).replace('-', '')
532            file_name = f'{bot_performance.Bot.Name}_{str_date_from}_{str_date_to}.html'
533            for folder in ['luck_test_plots', 'correlation_plots', 't_test_plots', 'backtest_plots', 'backtest_plots/reports']:
534                path = os.path.join('./app/templates/static/', folder, file_name)
535                if os.path.exists(path):
536                    os.remove(path)
537
538            # Eliminar RandomTests y sus dependencias
539            if bot_performance.RandomTest:
540                rt = bot_performance.RandomTest
541                rt_perf = self.db_service.get_by_id(db, BotPerformance, rt.RandomTestPerformanceId)
542                if rt_perf:
543                    rt_trade_perf = self.db_service.get_by_filter(db, BotTradePerformance, BotPerformanceId=rt_perf.Id)
544                    if rt_trade_perf:
545                        self.db_service.delete(db, BotTradePerformance, rt_trade_perf.Id)
546                    self.db_service.delete(db, BotPerformance, rt.RandomTestPerformanceId)
547                self.db_service.delete(db, RandomTest, rt.Id)
548
549            # Borrar trades relacionados
550            self.db_service.delete_many_by_filter(db, Trade, BotPerformanceId=bot_performance_id)
551
552            # Si no hay más performance, borrar el Bot
553            if bot_performance.BotId:
554                rem = db.query(BotPerformance).filter(BotPerformance.BotId == bot_performance.BotId).count()
555                if rem == 1:
556                    self.db_service.delete(db, Bot, bot_performance.BotId)
557
558            self.db_service.delete(db, BotPerformance, bot_performance.Id)
559            self.db_service.save(db)
560
561        return OperationResult(ok=True, message=None, item=None)
def update_favorite( self, performance_id) -> app.backbone.services.operation_result.OperationResult:
564    def update_favorite(self, performance_id) -> OperationResult:
565
566        with self.db_service.get_database() as db:
567            performance = self.db_service.get_by_id(db, BotPerformance, performance_id)
568            
569            if not performance:
570                return OperationResult(ok=False, message='Backtest not found', item=None)
571            
572            performance.Favorite = not performance.Favorite
573            updated_performance = self.db_service.update(db, BotPerformance, performance)   
574
575            if not updated_performance:
576                return OperationResult(ok=False, message='Update failed', item=None)
577
578            return OperationResult(ok=True, message=None, item=updated_performance.Favorite)
def get_robusts(self) -> List[app.backbone.entities.bot_performance.BotPerformance]:
580    def get_robusts(self) -> List[BotPerformance]:
581        with self.db_service.get_database() as db:
582            # Subquery para filtrar estrategias robustas (RreturnDd promedio > 1)
583            subquery = (
584                db.query(
585                    Bot.StrategyId,
586                    Bot.TickerId,
587                )
588                .join(BotPerformance, Bot.Id == BotPerformance.BotId)
589                .join(Timeframe, Bot.TimeframeId == Timeframe.Id)
590                .filter(
591                    BotPerformance.RreturnDd != "NaN",
592                    Timeframe.Selected == True
593                )
594                .group_by(Bot.StrategyId, Bot.TickerId)
595                .having(func.avg(BotPerformance.RreturnDd) > 1)
596                .subquery()
597            )
598
599            # Alias para evitar ambigüedad
600            bot_alias = aliased(Bot)
601            bp_alias = aliased(BotPerformance)
602
603            # Subquery para asignar un número de fila basado en StabilityWeightedRar
604            ranked_subquery = (
605                db.query(
606                    bp_alias.Id.label("bp_id"),
607                    func.row_number()
608                    .over(
609                        partition_by=[bot_alias.StrategyId, bot_alias.TickerId],
610                        order_by=bp_alias.StabilityWeightedRar.desc(),
611                    )
612                    .label("row_num"),
613                )
614                .join(bot_alias, bot_alias.Id == bp_alias.BotId)
615                .join(
616                    subquery,
617                    (bot_alias.StrategyId == subquery.c.StrategyId)
618                    & (bot_alias.TickerId == subquery.c.TickerId),
619                )
620                .subquery()
621            )
622
623            # Query final seleccionando solo los mejores (row_num == 1)
624            data = (
625                db.query(bp_alias)
626                .join(ranked_subquery, bp_alias.Id == ranked_subquery.c.bp_id)
627                .filter(ranked_subquery.c.row_num == 1)
628                .all()
629            )
630
631            return data
def get_robusts_by_strategy_id( self, strategy_id) -> List[app.backbone.entities.bot_performance.BotPerformance]:
633    def get_robusts_by_strategy_id(self, strategy_id) -> List[BotPerformance]:
634        with self.db_service.get_database() as db:
635            # Subquery para calcular el promedio de RreturnDd por StrategyId y TickerId
636            subquery = (
637                db.query(
638                    Bot.StrategyId,
639                    Bot.TickerId,
640                )
641                .join(BotPerformance, Bot.Id == BotPerformance.BotId)
642                .join(Timeframe, Bot.TimeframeId == Timeframe.Id)
643                .filter(
644                    Bot.StrategyId == strategy_id,
645                    BotPerformance.RreturnDd != "NaN",
646                    Timeframe.Selected == True,
647                )
648                .group_by(Bot.StrategyId, Bot.TickerId)
649                .having(func.avg(BotPerformance.RreturnDd) >= 1)  # >= en lugar de > para incluir exactamente 1
650                .subquery()
651            )
652
653            # Alias para evitar ambigüedad en las relaciones
654            bot_alias = aliased(Bot)
655            bp_alias = aliased(BotPerformance)
656
657            # Subquery para seleccionar la mejor temporalidad por StrategyId - TickerId
658            best_temporalities = (
659                db.query(
660                    bp_alias.BotId,
661                    bot_alias.StrategyId,
662                    bot_alias.TickerId,
663                    func.max(bp_alias.StabilityWeightedRar).label("max_custom_metric")
664                )
665                .join(bot_alias, bot_alias.Id == bp_alias.BotId)
666                .join(subquery, (bot_alias.StrategyId == subquery.c.StrategyId) & (bot_alias.TickerId == subquery.c.TickerId))
667                .group_by(bot_alias.StrategyId, bot_alias.TickerId)
668                .subquery()
669            )
670
671            # Query final para traer solo los registros que corresponden a la mejor temporalidad
672            data = (
673                db.query(bp_alias)
674                .join(bot_alias, bot_alias.Id == bp_alias.BotId)
675                .join(best_temporalities,
676                    (bot_alias.StrategyId == best_temporalities.c.StrategyId) &
677                    (bot_alias.TickerId == best_temporalities.c.TickerId) &
678                    (bp_alias.StabilityWeightedRar == best_temporalities.c.max_custom_metric))
679                .all()
680            )
681
682            return data
def get_favorites(self) -> List[app.backbone.entities.bot_performance.BotPerformance]:
684    def get_favorites(self) -> List[BotPerformance]:
685        with self.db_service.get_database() as db:
686            favorites = self.db_service.get_many_by_filter(db, BotPerformance, Favorite=True)
687        
688        return favorites
def get_trades(self, bot_performance_id: int) -> List[app.backbone.entities.trade.Trade]:
690    def get_trades(self, bot_performance_id:int) -> List[Trade]:
691        with self.db_service.get_database() as db:
692            bot_performance = self.db_service.get_by_id(db, BotPerformance, id=bot_performance_id)  
693            return bot_performance.TradeHistory
def get_by_filter( self, return_: float = None, drawdown: float = None, stability_ratio: float = None, sharpe_ratio: float = None, trades: float = None, rreturn_dd: float = None, custom_metric: float = None, winrate: float = None, strategy: str = None, ticker: str = None) -> List[app.backbone.entities.bot_performance.BotPerformance]:
695    def get_by_filter(
696        self,
697        return_: float = None,
698        drawdown: float = None,
699        stability_ratio: float = None,
700        sharpe_ratio: float = None,
701        trades: float = None,
702        rreturn_dd: float = None,
703        custom_metric: float = None,
704        winrate: float = None,
705        strategy: str = None,
706        ticker: str = None
707    ) -> List[BotPerformance]:
708        with self.db_service.get_database() as db:
709            filters = []
710
711            # Base query y joins explícitos
712            query = db.query(BotPerformance)\
713                .join(Bot, Bot.Id == BotPerformance.BotId)\
714                .join(Strategy, Strategy.Id == Bot.StrategyId)\
715                .join(Ticker, Ticker.Id == Bot.TickerId)
716
717            # Filtros numéricos
718            if return_ is not None:
719                filters.append(BotPerformance.Return >= return_)
720            if drawdown is not None:
721                filters.append(BotPerformance.Drawdown <= drawdown)
722            if stability_ratio is not None:
723                filters.append(BotPerformance.StabilityRatio >= stability_ratio)
724            if sharpe_ratio is not None:
725                filters.append(BotPerformance.SharpeRatio >= sharpe_ratio)
726            if trades is not None:
727                filters.append(BotPerformance.Trades >= trades)
728            if rreturn_dd is not None:
729                filters.append(BotPerformance.RreturnDd >= rreturn_dd)
730            if custom_metric is not None:
731                filters.append(BotPerformance.StabilityWeightedRar >= custom_metric)
732            if winrate is not None:
733                filters.append(BotPerformance.WinRate >= winrate)
734
735            # Filtros por texto con ILIKE correctamente aplicados
736            if strategy:
737                filters.append(Strategy.Name.ilike(f"%{strategy}%"))
738            if ticker:
739                filters.append(Ticker.Name.ilike(f"%{ticker}%"))
740
741            # Query final que aplica filtros correctamente
742            final_query = (
743                query
744                .filter(*filters)
745                .options(
746                    joinedload(BotPerformance.Bot),
747                    joinedload(BotPerformance.BotTradePerformance),
748                    joinedload(BotPerformance.TradeHistory),
749                    joinedload(BotPerformance.MontecarloTest),
750                    joinedload(BotPerformance.LuckTest),
751                    joinedload(BotPerformance.RandomTest),
752                )
753                .order_by(BotPerformance.RreturnDd.desc())
754                .all()
755            )
756
757            return final_query