app.backbone.services.ticker_service
1from typing import List 2from app.backbone.entities.bot import Bot 3from app.backbone.entities.timeframe import Timeframe 4from app.backbone.database.db_service import DbService 5from app.backbone.services.operation_result import OperationResult 6from app.backbone.entities.ticker import Ticker 7from app.backbone.entities.category import Category 8import MetaTrader5 as mt5 9 10TIMEFRAMES = { 11 "M1": mt5.TIMEFRAME_M1, 12 "M2": mt5.TIMEFRAME_M2, 13 "M3": mt5.TIMEFRAME_M3, 14 "M4": mt5.TIMEFRAME_M4, 15 "M5": mt5.TIMEFRAME_M5, 16 "M6": mt5.TIMEFRAME_M6, 17 "M10": mt5.TIMEFRAME_M10, 18 "M12": mt5.TIMEFRAME_M12, 19 "M15": mt5.TIMEFRAME_M15, 20 "M20": mt5.TIMEFRAME_M20, 21 "M30": mt5.TIMEFRAME_M30, 22 "H1": mt5.TIMEFRAME_H1, 23 "H2": mt5.TIMEFRAME_H2, 24 "H3": mt5.TIMEFRAME_H3, 25 "H4": mt5.TIMEFRAME_H4, 26 "H6": mt5.TIMEFRAME_H6, 27 "H8": mt5.TIMEFRAME_H8, 28 "H12": mt5.TIMEFRAME_H12, 29 "D1": mt5.TIMEFRAME_D1, 30 "W1": mt5.TIMEFRAME_W1, 31 "MN1": mt5.TIMEFRAME_MN1, 32} 33 34class TickerService: 35 def __init__(self): 36 self.db_service = DbService() 37 38 39 def create_update_timeframes(self) -> OperationResult: 40 if not mt5.initialize(): 41 print("initialize() failed, error code =", mt5.last_error()) 42 quit() 43 44 with self.db_service.get_database() as db: 45 # Guardar en la base de datos si no existe 46 for name, number in TIMEFRAMES.items(): 47 result_timeframe = self.db_service.get_by_filter(db, Timeframe, Name=name) 48 49 if not result_timeframe: 50 timeframe = Timeframe( 51 Name=name, 52 MetaTraderNumber=number, 53 Selected=False, 54 ) 55 56 self.db_service.create(db, timeframe) 57 58 else: 59 timeframe = result_timeframe 60 timeframe.Selected = False if not timeframe.Selected else True 61 self.db_service.update(db, Timeframe, timeframe) 62 63 64 # Confirmar los cambios en la base de datos 65 self.db_service.save(db) 66 67 return OperationResult(ok=True, message="Categorías y tickers procesados correctamente", item=None) 68 69 def update_timeframe(self, timeframe: Timeframe, selected:bool): 70 with self.db_service.get_database() as db: 71 72 timeframe.Selected = selected 73 74 self.db_service.update(db, Timeframe, timeframe) 75 self.db_service.save(db) 76 77 def create(self) -> OperationResult: 78 if not mt5.initialize(): 79 print("initialize() failed, error code =", mt5.last_error()) 80 quit() 81 82 symbols = mt5.symbols_get() 83 84 categories_tickers = {} 85 for symbol in symbols: 86 category_name = symbol.path.split("\\")[0] 87 ticker_name = symbol.path.split("\\")[1] 88 89 if category_name not in categories_tickers.keys(): 90 categories_tickers[category_name] = [] 91 92 categories_tickers[category_name].append(ticker_name) 93 94 with self.db_service.get_database() as db: 95 for category_name, tickers in categories_tickers.items(): 96 # Buscar la categoría en la base de datos 97 category = self.db_service.get_by_filter(db, Category, Name=category_name) 98 99 # Si la categoría no existe, crearla 100 if not category: 101 category = Category(Name=category_name, Commission=0) 102 self.db_service.create(db, category) 103 104 # Procesar los tickers asociados a esta categoría 105 for ticker_name in tickers: 106 print(ticker_name) 107 108 _ = mt5.copy_rates_from_pos( 109 ticker_name, 16385, 0, 3 110 ) 111 112 symbol_info = mt5.symbol_info(ticker_name) 113 114 if symbol_info is not None: 115 print(symbol_info) 116 117 while symbol_info.ask == 0: 118 symbol_info = mt5.symbol_info(ticker_name) 119 120 spread = (symbol_info.spread * symbol_info.point) / symbol_info.ask 121 122 # Buscar el ticker en la base de datos 123 ticker = self.db_service.get_by_filter(db, Ticker, Name=ticker_name, CategoryId=category.Id) 124 125 # Si el ticker no existe, crearlo 126 if not ticker: 127 ticker = Ticker(Name=ticker_name, Category=category, Spread=spread) 128 self.db_service.create(db, ticker) 129 else: 130 # Si el ticker existe, actualizar su información 131 ticker.Spread = spread 132 self.db_service.update(db, Ticker, ticker) 133 134 self.db_service.save(db) 135 136 return OperationResult(ok=True, message="Categorías y tickers procesados correctamente", item=None) 137 138 def get_tickers_by_category(self, category_id:int) -> List[Ticker]: 139 with self.db_service.get_database() as db: 140 tickers = self.db_service.get_many_by_filter(db, Ticker, CategoryId=category_id) 141 142 return tickers 143 144 def get_by_name(self, name:str) -> Ticker: 145 with self.db_service.get_database() as db: 146 ticker = self.db_service.get_by_filter(db, Ticker, Name=name) 147 148 return ticker 149 150 def get_all_categories(self) -> List[Category]: 151 with self.db_service.get_database() as db: 152 categories = self.db_service.get_all(db, Category) 153 return categories 154 155 def get_ticker_by_id(self, id) -> Ticker: 156 with self.db_service.get_database() as db: 157 ticker = self.db_service.get_by_id(db, Ticker, id) 158 return ticker 159 160 def get_all_timeframes(self) -> List[Timeframe]: 161 with self.db_service.get_database() as db: 162 categories = self.db_service.get_all(db, Timeframe) 163 return categories 164 165 def get_timeframe_by_id(self, id) -> Timeframe: 166 with self.db_service.get_database() as db: 167 timeframe = self.db_service.get_by_id(db, Timeframe, id) 168 return timeframe 169 170 def get_timeframe_by_name(self, name:str) -> Timeframe: 171 with self.db_service.get_database() as db: 172 timeframe = self.db_service.get_by_filter(db, Timeframe, Name=name) 173 return timeframe 174 175 def get_all_tickers(self) -> List[Ticker]: 176 with self.db_service.get_database() as db: 177 tickers = self.db_service.get_all(db, Ticker) 178 return tickers 179 180 def get_tickers_by_strategy(self, strategy_id) -> OperationResult: 181 with self.db_service.get_database() as db: 182 183 strategies = ( 184 db.query(Ticker) 185 .join(Bot, Bot.TickerId == Ticker.Id) 186 .filter(Bot.StrategyId == strategy_id) 187 ) 188 189 result = OperationResult(ok=True, message=None, item=strategies) 190 return result 191 192 def update_categories_commissions(self, category_id: int | List[int], commission:float | List[float]): 193 194 if type(category_id) != list: 195 category_id = [category_id] 196 197 if type(commission) != list: 198 commission = [commission] 199 200 if len(commission) != len(category_id): 201 return OperationResult(ok=False, message='The length of the lists must be the same') 202 203 with self.db_service.get_database() as db: 204 for cat_id, com in zip(category_id, commission): 205 old_category = self.db_service.get_by_id(db, Category, id=cat_id) 206 new_category = old_category 207 new_category.Commission = com 208 209 self.db_service.update(db, Category, new_category) 210 211 return OperationResult(ok=True, message=None, item=None) 212 213
TIMEFRAMES =
{'M1': 1, 'M2': 2, 'M3': 3, 'M4': 4, 'M5': 5, 'M6': 6, 'M10': 10, 'M12': 12, 'M15': 15, 'M20': 20, 'M30': 30, 'H1': 16385, 'H2': 16386, 'H3': 16387, 'H4': 16388, 'H6': 16390, 'H8': 16392, 'H12': 16396, 'D1': 16408, 'W1': 32769, 'MN1': 49153}
class
TickerService:
35class TickerService: 36 def __init__(self): 37 self.db_service = DbService() 38 39 40 def create_update_timeframes(self) -> OperationResult: 41 if not mt5.initialize(): 42 print("initialize() failed, error code =", mt5.last_error()) 43 quit() 44 45 with self.db_service.get_database() as db: 46 # Guardar en la base de datos si no existe 47 for name, number in TIMEFRAMES.items(): 48 result_timeframe = self.db_service.get_by_filter(db, Timeframe, Name=name) 49 50 if not result_timeframe: 51 timeframe = Timeframe( 52 Name=name, 53 MetaTraderNumber=number, 54 Selected=False, 55 ) 56 57 self.db_service.create(db, timeframe) 58 59 else: 60 timeframe = result_timeframe 61 timeframe.Selected = False if not timeframe.Selected else True 62 self.db_service.update(db, Timeframe, timeframe) 63 64 65 # Confirmar los cambios en la base de datos 66 self.db_service.save(db) 67 68 return OperationResult(ok=True, message="Categorías y tickers procesados correctamente", item=None) 69 70 def update_timeframe(self, timeframe: Timeframe, selected:bool): 71 with self.db_service.get_database() as db: 72 73 timeframe.Selected = selected 74 75 self.db_service.update(db, Timeframe, timeframe) 76 self.db_service.save(db) 77 78 def create(self) -> OperationResult: 79 if not mt5.initialize(): 80 print("initialize() failed, error code =", mt5.last_error()) 81 quit() 82 83 symbols = mt5.symbols_get() 84 85 categories_tickers = {} 86 for symbol in symbols: 87 category_name = symbol.path.split("\\")[0] 88 ticker_name = symbol.path.split("\\")[1] 89 90 if category_name not in categories_tickers.keys(): 91 categories_tickers[category_name] = [] 92 93 categories_tickers[category_name].append(ticker_name) 94 95 with self.db_service.get_database() as db: 96 for category_name, tickers in categories_tickers.items(): 97 # Buscar la categoría en la base de datos 98 category = self.db_service.get_by_filter(db, Category, Name=category_name) 99 100 # Si la categoría no existe, crearla 101 if not category: 102 category = Category(Name=category_name, Commission=0) 103 self.db_service.create(db, category) 104 105 # Procesar los tickers asociados a esta categoría 106 for ticker_name in tickers: 107 print(ticker_name) 108 109 _ = mt5.copy_rates_from_pos( 110 ticker_name, 16385, 0, 3 111 ) 112 113 symbol_info = mt5.symbol_info(ticker_name) 114 115 if symbol_info is not None: 116 print(symbol_info) 117 118 while symbol_info.ask == 0: 119 symbol_info = mt5.symbol_info(ticker_name) 120 121 spread = (symbol_info.spread * symbol_info.point) / symbol_info.ask 122 123 # Buscar el ticker en la base de datos 124 ticker = self.db_service.get_by_filter(db, Ticker, Name=ticker_name, CategoryId=category.Id) 125 126 # Si el ticker no existe, crearlo 127 if not ticker: 128 ticker = Ticker(Name=ticker_name, Category=category, Spread=spread) 129 self.db_service.create(db, ticker) 130 else: 131 # Si el ticker existe, actualizar su información 132 ticker.Spread = spread 133 self.db_service.update(db, Ticker, ticker) 134 135 self.db_service.save(db) 136 137 return OperationResult(ok=True, message="Categorías y tickers procesados correctamente", item=None) 138 139 def get_tickers_by_category(self, category_id:int) -> List[Ticker]: 140 with self.db_service.get_database() as db: 141 tickers = self.db_service.get_many_by_filter(db, Ticker, CategoryId=category_id) 142 143 return tickers 144 145 def get_by_name(self, name:str) -> Ticker: 146 with self.db_service.get_database() as db: 147 ticker = self.db_service.get_by_filter(db, Ticker, Name=name) 148 149 return ticker 150 151 def get_all_categories(self) -> List[Category]: 152 with self.db_service.get_database() as db: 153 categories = self.db_service.get_all(db, Category) 154 return categories 155 156 def get_ticker_by_id(self, id) -> Ticker: 157 with self.db_service.get_database() as db: 158 ticker = self.db_service.get_by_id(db, Ticker, id) 159 return ticker 160 161 def get_all_timeframes(self) -> List[Timeframe]: 162 with self.db_service.get_database() as db: 163 categories = self.db_service.get_all(db, Timeframe) 164 return categories 165 166 def get_timeframe_by_id(self, id) -> Timeframe: 167 with self.db_service.get_database() as db: 168 timeframe = self.db_service.get_by_id(db, Timeframe, id) 169 return timeframe 170 171 def get_timeframe_by_name(self, name:str) -> Timeframe: 172 with self.db_service.get_database() as db: 173 timeframe = self.db_service.get_by_filter(db, Timeframe, Name=name) 174 return timeframe 175 176 def get_all_tickers(self) -> List[Ticker]: 177 with self.db_service.get_database() as db: 178 tickers = self.db_service.get_all(db, Ticker) 179 return tickers 180 181 def get_tickers_by_strategy(self, strategy_id) -> OperationResult: 182 with self.db_service.get_database() as db: 183 184 strategies = ( 185 db.query(Ticker) 186 .join(Bot, Bot.TickerId == Ticker.Id) 187 .filter(Bot.StrategyId == strategy_id) 188 ) 189 190 result = OperationResult(ok=True, message=None, item=strategies) 191 return result 192 193 def update_categories_commissions(self, category_id: int | List[int], commission:float | List[float]): 194 195 if type(category_id) != list: 196 category_id = [category_id] 197 198 if type(commission) != list: 199 commission = [commission] 200 201 if len(commission) != len(category_id): 202 return OperationResult(ok=False, message='The length of the lists must be the same') 203 204 with self.db_service.get_database() as db: 205 for cat_id, com in zip(category_id, commission): 206 old_category = self.db_service.get_by_id(db, Category, id=cat_id) 207 new_category = old_category 208 new_category.Commission = com 209 210 self.db_service.update(db, Category, new_category) 211 212 return OperationResult(ok=True, message=None, item=None)
40 def create_update_timeframes(self) -> OperationResult: 41 if not mt5.initialize(): 42 print("initialize() failed, error code =", mt5.last_error()) 43 quit() 44 45 with self.db_service.get_database() as db: 46 # Guardar en la base de datos si no existe 47 for name, number in TIMEFRAMES.items(): 48 result_timeframe = self.db_service.get_by_filter(db, Timeframe, Name=name) 49 50 if not result_timeframe: 51 timeframe = Timeframe( 52 Name=name, 53 MetaTraderNumber=number, 54 Selected=False, 55 ) 56 57 self.db_service.create(db, timeframe) 58 59 else: 60 timeframe = result_timeframe 61 timeframe.Selected = False if not timeframe.Selected else True 62 self.db_service.update(db, Timeframe, timeframe) 63 64 65 # Confirmar los cambios en la base de datos 66 self.db_service.save(db) 67 68 return OperationResult(ok=True, message="Categorías y tickers procesados correctamente", item=None)
78 def create(self) -> OperationResult: 79 if not mt5.initialize(): 80 print("initialize() failed, error code =", mt5.last_error()) 81 quit() 82 83 symbols = mt5.symbols_get() 84 85 categories_tickers = {} 86 for symbol in symbols: 87 category_name = symbol.path.split("\\")[0] 88 ticker_name = symbol.path.split("\\")[1] 89 90 if category_name not in categories_tickers.keys(): 91 categories_tickers[category_name] = [] 92 93 categories_tickers[category_name].append(ticker_name) 94 95 with self.db_service.get_database() as db: 96 for category_name, tickers in categories_tickers.items(): 97 # Buscar la categoría en la base de datos 98 category = self.db_service.get_by_filter(db, Category, Name=category_name) 99 100 # Si la categoría no existe, crearla 101 if not category: 102 category = Category(Name=category_name, Commission=0) 103 self.db_service.create(db, category) 104 105 # Procesar los tickers asociados a esta categoría 106 for ticker_name in tickers: 107 print(ticker_name) 108 109 _ = mt5.copy_rates_from_pos( 110 ticker_name, 16385, 0, 3 111 ) 112 113 symbol_info = mt5.symbol_info(ticker_name) 114 115 if symbol_info is not None: 116 print(symbol_info) 117 118 while symbol_info.ask == 0: 119 symbol_info = mt5.symbol_info(ticker_name) 120 121 spread = (symbol_info.spread * symbol_info.point) / symbol_info.ask 122 123 # Buscar el ticker en la base de datos 124 ticker = self.db_service.get_by_filter(db, Ticker, Name=ticker_name, CategoryId=category.Id) 125 126 # Si el ticker no existe, crearlo 127 if not ticker: 128 ticker = Ticker(Name=ticker_name, Category=category, Spread=spread) 129 self.db_service.create(db, ticker) 130 else: 131 # Si el ticker existe, actualizar su información 132 ticker.Spread = spread 133 self.db_service.update(db, Ticker, ticker) 134 135 self.db_service.save(db) 136 137 return OperationResult(ok=True, message="Categorías y tickers procesados correctamente", item=None)
def
get_tickers_by_strategy( self, strategy_id) -> app.backbone.services.operation_result.OperationResult:
181 def get_tickers_by_strategy(self, strategy_id) -> OperationResult: 182 with self.db_service.get_database() as db: 183 184 strategies = ( 185 db.query(Ticker) 186 .join(Bot, Bot.TickerId == Ticker.Id) 187 .filter(Bot.StrategyId == strategy_id) 188 ) 189 190 result = OperationResult(ok=True, message=None, item=strategies) 191 return result
def
update_categories_commissions( self, category_id: Union[int, List[int]], commission: Union[float, List[float]]):
193 def update_categories_commissions(self, category_id: int | List[int], commission:float | List[float]): 194 195 if type(category_id) != list: 196 category_id = [category_id] 197 198 if type(commission) != list: 199 commission = [commission] 200 201 if len(commission) != len(category_id): 202 return OperationResult(ok=False, message='The length of the lists must be the same') 203 204 with self.db_service.get_database() as db: 205 for cat_id, com in zip(category_id, commission): 206 old_category = self.db_service.get_by_id(db, Category, id=cat_id) 207 new_category = old_category 208 new_category.Commission = com 209 210 self.db_service.update(db, Category, new_category) 211 212 return OperationResult(ok=True, message=None, item=None)