app.backbone.utils.general_purpose
1import logging 2import cProfile 3import pstats 4import functools 5import importlib.util 6import sys 7from functools import wraps 8import contextvars 9from typing import List 10 11from app.backbone.entities.bot_performance import BotPerformance 12 13 14logger = logging.getLogger("general_purpose") 15 16def load_function(dotpath: str): 17 """Carga una función desde un módulo, recargando el módulo si ya ha sido importado.""" 18 module_, func = dotpath.rsplit(".", maxsplit=1) 19 20 if module_ in sys.modules: 21 # Si el módulo ya está en sys.modules, recargarlo 22 spec = importlib.util.find_spec(module_) 23 if spec is not None: 24 m = importlib.util.module_from_spec(spec) 25 sys.modules[module_] = m 26 spec.loader.exec_module(m) 27 else: 28 raise ImportError(f"No se pudo encontrar el módulo: {module_}") 29 else: 30 # Importar normalmente si no ha sido cargado antes 31 m = importlib.import_module(module_) 32 33 return getattr(m, func) 34 35screener_columns = [ 36 'industry', 37 'sector', 38 'trailingPE', 39 'forwardPE', 40 'pegRatio', 41 'trailingPegRatio' 42 'beta', 43 'totalDebt', 44 'quickRatio', 45 'currentRatio', 46 'totalRevenue', 47 'debtToEquity', 48 'revenuePerShare', 49 'returnOnAssets', 50 'returnOnEquity', 51 'freeCashflow', 52 'operatingCashflow', 53 'earningsGrowth', 54 'revenueGrowth', 55 'bid', 56 'ask', 57 'marketCap', 58 'twoHundredDayAverage', 59 'recommendationKey', 60 'numberOfAnalystOpinions', 61 'symbol', 62] 63 64 65def transformar_a_uno(numero): 66 # Inicializar contador de decimales 67 decimales = 0 68 while numero != int(numero): 69 numero *= 10 70 decimales += 1 71 72 return 1 / (10 ** decimales) 73 74def profile_function(output_file="code_performance_results.prof"): 75 def decorator(func): 76 @functools.wraps(func) 77 def wrapper(*args, **kwargs): 78 with cProfile.Profile() as profile: 79 result = func(*args, **kwargs) 80 stats = pstats.Stats(profile) 81 stats.sort_stats(pstats.SortKey.TIME) 82 stats.dump_stats(output_file) 83 return result 84 return wrapper 85 return decorator 86 87 88# Contexto para saber si ya estamos dentro de un flujo 89in_streaming_context = contextvars.ContextVar("in_streaming_context", default=False) 90 91def streaming_endpoint(func): 92 @wraps(func) 93 async def wrapper(*args, **kwargs): 94 is_root = not in_streaming_context.get() 95 if is_root: 96 in_streaming_context.set(True) 97 98 queue = kwargs.get("queue") 99 100 try: 101 return await func(*args, **kwargs) 102 103 finally: 104 if is_root and queue: 105 await queue.put("DONE") 106 in_streaming_context.set(False) # restaurar estado 107 108 return wrapper 109 110def build_live_trading_config(used_backtests: List[BotPerformance] | BotPerformance, risk_fn): 111 if type(used_backtests) != list: 112 used_backtests = [used_backtests] 113 114 config_file = {} 115 for bt in used_backtests: 116 strategy_name = bt.Bot.Strategy.Name 117 ticker_name = bt.Bot.Ticker.Name 118 119 if strategy_name not in config_file: 120 config_file[strategy_name] = { 121 'metatrader_name': bt.Bot.Strategy.MetaTraderName, 122 'opt_params': None, 123 'wfo_params': { 124 'use_wfo': False, 125 'look_back_bars': 200, 126 'warmup_bars': 200 127 }, 128 'instruments_info': {} 129 } 130 131 if ticker_name not in config_file[strategy_name]['instruments_info']: 132 config_file[strategy_name]['instruments_info'][ticker_name] = {} 133 134 config_file[strategy_name]['instruments_info'][ticker_name]['risk'] = risk_fn(bt) 135 config_file[strategy_name]['instruments_info'][ticker_name]['timeframe'] = bt.Bot.Timeframe.Name 136 137 return config_file 138 139def save_ticker_timeframes(config_file): 140 ticker_timeframes = {} 141 142 for strategy_data in config_file.values(): 143 instruments_info = strategy_data.get("instruments_info", {}) 144 for ticker, info in instruments_info.items(): 145 tf = info.get("timeframe") 146 if ticker not in ticker_timeframes: 147 ticker_timeframes[ticker] = set() 148 ticker_timeframes[ticker].add(tf) 149 150 # Convertimos los sets a listas antes de guardar 151 ticker_timeframes_serializable = { 152 ticker: sorted(list(timeframes)) # o list(timeframes) si no te importa el orden 153 for ticker, timeframes in ticker_timeframes.items() 154 } 155 156 return ticker_timeframes_serializable
logger =
<Logger general_purpose (INFO)>
def
load_function(dotpath: str):
17def load_function(dotpath: str): 18 """Carga una función desde un módulo, recargando el módulo si ya ha sido importado.""" 19 module_, func = dotpath.rsplit(".", maxsplit=1) 20 21 if module_ in sys.modules: 22 # Si el módulo ya está en sys.modules, recargarlo 23 spec = importlib.util.find_spec(module_) 24 if spec is not None: 25 m = importlib.util.module_from_spec(spec) 26 sys.modules[module_] = m 27 spec.loader.exec_module(m) 28 else: 29 raise ImportError(f"No se pudo encontrar el módulo: {module_}") 30 else: 31 # Importar normalmente si no ha sido cargado antes 32 m = importlib.import_module(module_) 33 34 return getattr(m, func)
Carga una función desde un módulo, recargando el módulo si ya ha sido importado.
screener_columns =
['industry', 'sector', 'trailingPE', 'forwardPE', 'pegRatio', 'trailingPegRatiobeta', 'totalDebt', 'quickRatio', 'currentRatio', 'totalRevenue', 'debtToEquity', 'revenuePerShare', 'returnOnAssets', 'returnOnEquity', 'freeCashflow', 'operatingCashflow', 'earningsGrowth', 'revenueGrowth', 'bid', 'ask', 'marketCap', 'twoHundredDayAverage', 'recommendationKey', 'numberOfAnalystOpinions', 'symbol']
def
transformar_a_uno(numero):
def
profile_function(output_file='code_performance_results.prof'):
75def profile_function(output_file="code_performance_results.prof"): 76 def decorator(func): 77 @functools.wraps(func) 78 def wrapper(*args, **kwargs): 79 with cProfile.Profile() as profile: 80 result = func(*args, **kwargs) 81 stats = pstats.Stats(profile) 82 stats.sort_stats(pstats.SortKey.TIME) 83 stats.dump_stats(output_file) 84 return result 85 return wrapper 86 return decorator
in_streaming_context =
<ContextVar name='in_streaming_context' default=False>
def
streaming_endpoint(func):
92def streaming_endpoint(func): 93 @wraps(func) 94 async def wrapper(*args, **kwargs): 95 is_root = not in_streaming_context.get() 96 if is_root: 97 in_streaming_context.set(True) 98 99 queue = kwargs.get("queue") 100 101 try: 102 return await func(*args, **kwargs) 103 104 finally: 105 if is_root and queue: 106 await queue.put("DONE") 107 in_streaming_context.set(False) # restaurar estado 108 109 return wrapper
def
build_live_trading_config( used_backtests: Union[List[app.backbone.entities.bot_performance.BotPerformance], app.backbone.entities.bot_performance.BotPerformance], risk_fn):
111def build_live_trading_config(used_backtests: List[BotPerformance] | BotPerformance, risk_fn): 112 if type(used_backtests) != list: 113 used_backtests = [used_backtests] 114 115 config_file = {} 116 for bt in used_backtests: 117 strategy_name = bt.Bot.Strategy.Name 118 ticker_name = bt.Bot.Ticker.Name 119 120 if strategy_name not in config_file: 121 config_file[strategy_name] = { 122 'metatrader_name': bt.Bot.Strategy.MetaTraderName, 123 'opt_params': None, 124 'wfo_params': { 125 'use_wfo': False, 126 'look_back_bars': 200, 127 'warmup_bars': 200 128 }, 129 'instruments_info': {} 130 } 131 132 if ticker_name not in config_file[strategy_name]['instruments_info']: 133 config_file[strategy_name]['instruments_info'][ticker_name] = {} 134 135 config_file[strategy_name]['instruments_info'][ticker_name]['risk'] = risk_fn(bt) 136 config_file[strategy_name]['instruments_info'][ticker_name]['timeframe'] = bt.Bot.Timeframe.Name 137 138 return config_file
def
save_ticker_timeframes(config_file):
140def save_ticker_timeframes(config_file): 141 ticker_timeframes = {} 142 143 for strategy_data in config_file.values(): 144 instruments_info = strategy_data.get("instruments_info", {}) 145 for ticker, info in instruments_info.items(): 146 tf = info.get("timeframe") 147 if ticker not in ticker_timeframes: 148 ticker_timeframes[ticker] = set() 149 ticker_timeframes[ticker].add(tf) 150 151 # Convertimos los sets a listas antes de guardar 152 ticker_timeframes_serializable = { 153 ticker: sorted(list(timeframes)) # o list(timeframes) si no te importa el orden 154 for ticker, timeframes in ticker_timeframes.items() 155 } 156 157 return ticker_timeframes_serializable