app.backbone.utils.general_purpose

  1import logging
  2import cProfile
  3import pstats
  4import functools
  5import importlib.util
  6import sys
  7from functools import wraps
  8import contextvars
  9from typing import List
 10
 11from app.backbone.entities.bot_performance import BotPerformance
 12
 13
 14logger = logging.getLogger("general_purpose")
 15
 16def load_function(dotpath: str):
 17    """Carga una función desde un módulo, recargando el módulo si ya ha sido importado."""
 18    module_, func = dotpath.rsplit(".", maxsplit=1)
 19
 20    if module_ in sys.modules:
 21        # Si el módulo ya está en sys.modules, recargarlo
 22        spec = importlib.util.find_spec(module_)
 23        if spec is not None:
 24            m = importlib.util.module_from_spec(spec)
 25            sys.modules[module_] = m
 26            spec.loader.exec_module(m)
 27        else:
 28            raise ImportError(f"No se pudo encontrar el módulo: {module_}")
 29    else:
 30        # Importar normalmente si no ha sido cargado antes
 31        m = importlib.import_module(module_)
 32
 33    return getattr(m, func)
 34
 35screener_columns = [
 36    'industry',
 37    'sector',
 38    'trailingPE',
 39    'forwardPE',
 40    'pegRatio',
 41    'trailingPegRatio'
 42    'beta',
 43    'totalDebt',
 44    'quickRatio',
 45    'currentRatio',
 46    'totalRevenue',
 47    'debtToEquity',
 48    'revenuePerShare',
 49    'returnOnAssets',
 50    'returnOnEquity',
 51    'freeCashflow',
 52    'operatingCashflow',
 53    'earningsGrowth',
 54    'revenueGrowth',
 55    'bid',
 56    'ask',
 57    'marketCap',
 58    'twoHundredDayAverage',
 59    'recommendationKey',
 60    'numberOfAnalystOpinions',
 61    'symbol',
 62]
 63
 64
 65def transformar_a_uno(numero):
 66    # Inicializar contador de decimales
 67    decimales = 0
 68    while numero != int(numero):
 69        numero *= 10
 70        decimales += 1
 71
 72    return 1 / (10 ** decimales)
 73
 74def profile_function(output_file="code_performance_results.prof"):
 75    def decorator(func):
 76        @functools.wraps(func)
 77        def wrapper(*args, **kwargs):
 78            with cProfile.Profile() as profile:
 79                result = func(*args, **kwargs)
 80            stats = pstats.Stats(profile)
 81            stats.sort_stats(pstats.SortKey.TIME)
 82            stats.dump_stats(output_file)
 83            return result
 84        return wrapper
 85    return decorator
 86
 87
 88# Contexto para saber si ya estamos dentro de un flujo
 89in_streaming_context = contextvars.ContextVar("in_streaming_context", default=False)
 90
 91def streaming_endpoint(func):
 92    @wraps(func)
 93    async def wrapper(*args, **kwargs):
 94        is_root = not in_streaming_context.get()
 95        if is_root:
 96            in_streaming_context.set(True)
 97
 98        queue = kwargs.get("queue")
 99
100        try:
101            return await func(*args, **kwargs)
102        
103        finally:
104            if is_root and queue:
105                await queue.put("DONE")
106                in_streaming_context.set(False)  # restaurar estado
107                
108    return wrapper
109
110def build_live_trading_config(used_backtests: List[BotPerformance] | BotPerformance, risk_fn):
111    if type(used_backtests) != list:
112        used_backtests = [used_backtests]
113    
114    config_file = {}
115    for bt in used_backtests:
116        strategy_name = bt.Bot.Strategy.Name
117        ticker_name = bt.Bot.Ticker.Name
118
119        if strategy_name not in config_file:
120            config_file[strategy_name] = {
121                'metatrader_name': bt.Bot.Strategy.MetaTraderName,
122                'opt_params': None,
123                'wfo_params': {
124                    'use_wfo': False,
125                    'look_back_bars': 200,
126                    'warmup_bars': 200
127                },
128                'instruments_info': {}
129            }
130
131        if ticker_name not in config_file[strategy_name]['instruments_info']:
132            config_file[strategy_name]['instruments_info'][ticker_name] = {}
133
134        config_file[strategy_name]['instruments_info'][ticker_name]['risk'] = risk_fn(bt)
135        config_file[strategy_name]['instruments_info'][ticker_name]['timeframe'] = bt.Bot.Timeframe.Name
136
137    return config_file
138
139def save_ticker_timeframes(config_file):
140    ticker_timeframes = {}
141
142    for strategy_data in config_file.values():
143        instruments_info = strategy_data.get("instruments_info", {})
144        for ticker, info in instruments_info.items():
145            tf = info.get("timeframe")
146            if ticker not in ticker_timeframes:
147                ticker_timeframes[ticker] = set()
148            ticker_timeframes[ticker].add(tf)
149
150    # Convertimos los sets a listas antes de guardar
151    ticker_timeframes_serializable = {
152        ticker: sorted(list(timeframes))  # o list(timeframes) si no te importa el orden
153        for ticker, timeframes in ticker_timeframes.items()
154    }
155
156    return ticker_timeframes_serializable
logger = <Logger general_purpose (INFO)>
def load_function(dotpath: str):
17def load_function(dotpath: str):
18    """Carga una función desde un módulo, recargando el módulo si ya ha sido importado."""
19    module_, func = dotpath.rsplit(".", maxsplit=1)
20
21    if module_ in sys.modules:
22        # Si el módulo ya está en sys.modules, recargarlo
23        spec = importlib.util.find_spec(module_)
24        if spec is not None:
25            m = importlib.util.module_from_spec(spec)
26            sys.modules[module_] = m
27            spec.loader.exec_module(m)
28        else:
29            raise ImportError(f"No se pudo encontrar el módulo: {module_}")
30    else:
31        # Importar normalmente si no ha sido cargado antes
32        m = importlib.import_module(module_)
33
34    return getattr(m, func)

Carga una función desde un módulo, recargando el módulo si ya ha sido importado.

screener_columns = ['industry', 'sector', 'trailingPE', 'forwardPE', 'pegRatio', 'trailingPegRatiobeta', 'totalDebt', 'quickRatio', 'currentRatio', 'totalRevenue', 'debtToEquity', 'revenuePerShare', 'returnOnAssets', 'returnOnEquity', 'freeCashflow', 'operatingCashflow', 'earningsGrowth', 'revenueGrowth', 'bid', 'ask', 'marketCap', 'twoHundredDayAverage', 'recommendationKey', 'numberOfAnalystOpinions', 'symbol']
def transformar_a_uno(numero):
66def transformar_a_uno(numero):
67    # Inicializar contador de decimales
68    decimales = 0
69    while numero != int(numero):
70        numero *= 10
71        decimales += 1
72
73    return 1 / (10 ** decimales)
def profile_function(output_file='code_performance_results.prof'):
75def profile_function(output_file="code_performance_results.prof"):
76    def decorator(func):
77        @functools.wraps(func)
78        def wrapper(*args, **kwargs):
79            with cProfile.Profile() as profile:
80                result = func(*args, **kwargs)
81            stats = pstats.Stats(profile)
82            stats.sort_stats(pstats.SortKey.TIME)
83            stats.dump_stats(output_file)
84            return result
85        return wrapper
86    return decorator
in_streaming_context = <ContextVar name='in_streaming_context' default=False>
def streaming_endpoint(func):
 92def streaming_endpoint(func):
 93    @wraps(func)
 94    async def wrapper(*args, **kwargs):
 95        is_root = not in_streaming_context.get()
 96        if is_root:
 97            in_streaming_context.set(True)
 98
 99        queue = kwargs.get("queue")
100
101        try:
102            return await func(*args, **kwargs)
103        
104        finally:
105            if is_root and queue:
106                await queue.put("DONE")
107                in_streaming_context.set(False)  # restaurar estado
108                
109    return wrapper
def build_live_trading_config( used_backtests: Union[List[app.backbone.entities.bot_performance.BotPerformance], app.backbone.entities.bot_performance.BotPerformance], risk_fn):
111def build_live_trading_config(used_backtests: List[BotPerformance] | BotPerformance, risk_fn):
112    if type(used_backtests) != list:
113        used_backtests = [used_backtests]
114    
115    config_file = {}
116    for bt in used_backtests:
117        strategy_name = bt.Bot.Strategy.Name
118        ticker_name = bt.Bot.Ticker.Name
119
120        if strategy_name not in config_file:
121            config_file[strategy_name] = {
122                'metatrader_name': bt.Bot.Strategy.MetaTraderName,
123                'opt_params': None,
124                'wfo_params': {
125                    'use_wfo': False,
126                    'look_back_bars': 200,
127                    'warmup_bars': 200
128                },
129                'instruments_info': {}
130            }
131
132        if ticker_name not in config_file[strategy_name]['instruments_info']:
133            config_file[strategy_name]['instruments_info'][ticker_name] = {}
134
135        config_file[strategy_name]['instruments_info'][ticker_name]['risk'] = risk_fn(bt)
136        config_file[strategy_name]['instruments_info'][ticker_name]['timeframe'] = bt.Bot.Timeframe.Name
137
138    return config_file
def save_ticker_timeframes(config_file):
140def save_ticker_timeframes(config_file):
141    ticker_timeframes = {}
142
143    for strategy_data in config_file.values():
144        instruments_info = strategy_data.get("instruments_info", {})
145        for ticker, info in instruments_info.items():
146            tf = info.get("timeframe")
147            if ticker not in ticker_timeframes:
148                ticker_timeframes[ticker] = set()
149            ticker_timeframes[ticker].add(tf)
150
151    # Convertimos los sets a listas antes de guardar
152    ticker_timeframes_serializable = {
153        ticker: sorted(list(timeframes))  # o list(timeframes) si no te importa el orden
154        for ticker, timeframes in ticker_timeframes.items()
155    }
156
157    return ticker_timeframes_serializable