app.backbone.services.ticker_service

  1from typing import List
  2from app.backbone.entities.bot import Bot
  3from app.backbone.entities.timeframe import Timeframe
  4from app.backbone.database.db_service import DbService
  5from app.backbone.services.operation_result import OperationResult
  6from app.backbone.entities.ticker import Ticker
  7from app.backbone.entities.category import Category
  8import MetaTrader5 as mt5
  9
 10TIMEFRAMES = {
 11    "M1": mt5.TIMEFRAME_M1,
 12    "M2": mt5.TIMEFRAME_M2,
 13    "M3": mt5.TIMEFRAME_M3,
 14    "M4": mt5.TIMEFRAME_M4,
 15    "M5": mt5.TIMEFRAME_M5,
 16    "M6": mt5.TIMEFRAME_M6,
 17    "M10": mt5.TIMEFRAME_M10,
 18    "M12": mt5.TIMEFRAME_M12,
 19    "M15": mt5.TIMEFRAME_M15,
 20    "M20": mt5.TIMEFRAME_M20,
 21    "M30": mt5.TIMEFRAME_M30,
 22    "H1": mt5.TIMEFRAME_H1,
 23    "H2": mt5.TIMEFRAME_H2,
 24    "H3": mt5.TIMEFRAME_H3,
 25    "H4": mt5.TIMEFRAME_H4,
 26    "H6": mt5.TIMEFRAME_H6,
 27    "H8": mt5.TIMEFRAME_H8,
 28    "H12": mt5.TIMEFRAME_H12,
 29    "D1": mt5.TIMEFRAME_D1,
 30    "W1": mt5.TIMEFRAME_W1,
 31    "MN1": mt5.TIMEFRAME_MN1,
 32}
 33
 34class TickerService:
 35    def __init__(self):
 36        self.db_service = DbService()
 37
 38
 39    def create_update_timeframes(self) -> OperationResult:
 40        if not mt5.initialize():
 41            print("initialize() failed, error code =", mt5.last_error())
 42            quit()
 43
 44        with self.db_service.get_database() as db:
 45            # Guardar en la base de datos si no existe
 46            for name, number in TIMEFRAMES.items():
 47                result_timeframe = self.db_service.get_by_filter(db, Timeframe, Name=name)
 48
 49                if not result_timeframe:
 50                    timeframe = Timeframe(
 51                        Name=name,
 52                        MetaTraderNumber=number,
 53                        Selected=False,
 54                    )
 55
 56                    self.db_service.create(db, timeframe)
 57                
 58                else:
 59                    timeframe = result_timeframe
 60                    timeframe.Selected = False if not timeframe.Selected else True
 61                    self.db_service.update(db, Timeframe, timeframe)
 62
 63
 64            # Confirmar los cambios en la base de datos
 65            self.db_service.save(db)
 66
 67        return OperationResult(ok=True, message="Categorías y tickers procesados correctamente", item=None)
 68
 69    def update_timeframe(self, timeframe: Timeframe, selected:bool):
 70        with self.db_service.get_database() as db:
 71
 72            timeframe.Selected = selected
 73
 74            self.db_service.update(db, Timeframe, timeframe)
 75            self.db_service.save(db)
 76        
 77    def create(self) -> OperationResult:
 78        if not mt5.initialize():
 79            print("initialize() failed, error code =", mt5.last_error())
 80            quit()
 81
 82        symbols = mt5.symbols_get()
 83
 84        categories_tickers = {}
 85        for symbol in symbols:
 86            category_name = symbol.path.split("\\")[0]
 87            ticker_name = symbol.path.split("\\")[1]
 88
 89            if category_name not in categories_tickers.keys():
 90                categories_tickers[category_name] = []
 91
 92            categories_tickers[category_name].append(ticker_name)
 93
 94        with self.db_service.get_database() as db:
 95            for category_name, tickers in categories_tickers.items():
 96                # Buscar la categoría en la base de datos
 97                category = self.db_service.get_by_filter(db, Category, Name=category_name)
 98                
 99                # Si la categoría no existe, crearla
100                if not category:
101                    category = Category(Name=category_name, Commission=0)
102                    self.db_service.create(db, category)
103
104                # Procesar los tickers asociados a esta categoría
105                for ticker_name in tickers:
106                    print(ticker_name)
107
108                    _ = mt5.copy_rates_from_pos(
109                        ticker_name, 16385, 0, 3
110                    )
111
112                    symbol_info = mt5.symbol_info(ticker_name)
113
114                    if symbol_info is not None:
115                        print(symbol_info)
116
117                        while symbol_info.ask == 0:
118                            symbol_info = mt5.symbol_info(ticker_name)
119
120                        spread = (symbol_info.spread * symbol_info.point) / symbol_info.ask
121
122                        # Buscar el ticker en la base de datos
123                        ticker = self.db_service.get_by_filter(db, Ticker, Name=ticker_name, CategoryId=category.Id)
124                        
125                        # Si el ticker no existe, crearlo
126                        if not ticker:
127                            ticker = Ticker(Name=ticker_name, Category=category, Spread=spread)
128                            self.db_service.create(db, ticker)
129                        else:
130                            # Si el ticker existe, actualizar su información
131                            ticker.Spread = spread
132                            self.db_service.update(db, Ticker, ticker)
133
134            self.db_service.save(db)
135
136        return OperationResult(ok=True, message="Categorías y tickers procesados correctamente", item=None)
137
138    def get_tickers_by_category(self, category_id:int) -> List[Ticker]:
139        with self.db_service.get_database() as db:
140            tickers = self.db_service.get_many_by_filter(db, Ticker, CategoryId=category_id)
141            
142            return tickers
143        
144    def get_by_name(self, name:str) -> Ticker:
145        with self.db_service.get_database() as db:
146            ticker = self.db_service.get_by_filter(db, Ticker, Name=name)
147            
148        return ticker
149
150    def get_all_categories(self) -> List[Category]:
151        with self.db_service.get_database() as db:
152            categories = self.db_service.get_all(db, Category)
153            return categories
154            
155    def get_ticker_by_id(self, id) -> Ticker:
156        with self.db_service.get_database() as db:
157            ticker = self.db_service.get_by_id(db, Ticker, id)
158            return ticker
159            
160    def get_all_timeframes(self) -> List[Timeframe]:
161        with self.db_service.get_database() as db:
162            categories = self.db_service.get_all(db, Timeframe)
163            return categories
164
165    def get_timeframe_by_id(self, id) -> Timeframe:
166        with self.db_service.get_database() as db:
167            timeframe = self.db_service.get_by_id(db, Timeframe, id)
168            return timeframe
169        
170    def get_timeframe_by_name(self, name:str) -> Timeframe:
171        with self.db_service.get_database() as db:
172            timeframe = self.db_service.get_by_filter(db, Timeframe, Name=name)
173            return timeframe
174                        
175    def get_all_tickers(self) -> List[Ticker]:
176        with self.db_service.get_database() as db:
177            tickers = self.db_service.get_all(db, Ticker)
178            return tickers
179         
180    def get_tickers_by_strategy(self, strategy_id) -> OperationResult:
181        with self.db_service.get_database() as db:
182        
183            strategies = (
184                db.query(Ticker)
185                    .join(Bot, Bot.TickerId == Ticker.Id)
186                    .filter(Bot.StrategyId == strategy_id)
187            )
188            
189            result = OperationResult(ok=True, message=None, item=strategies)
190            return result
191
192    def update_categories_commissions(self, category_id: int | List[int], commission:float | List[float]):
193
194        if type(category_id) != list:
195            category_id = [category_id]
196
197        if type(commission) != list:
198            commission = [commission]
199
200        if len(commission) != len(category_id):
201            return OperationResult(ok=False, message='The length of the lists must be the same')
202
203        with self.db_service.get_database() as db:
204            for cat_id, com in zip(category_id, commission):
205                old_category = self.db_service.get_by_id(db, Category, id=cat_id)
206                new_category = old_category
207                new_category.Commission = com
208
209                self.db_service.update(db, Category, new_category)
210
211            return OperationResult(ok=True, message=None, item=None)
212
213        
TIMEFRAMES = {'M1': 1, 'M2': 2, 'M3': 3, 'M4': 4, 'M5': 5, 'M6': 6, 'M10': 10, 'M12': 12, 'M15': 15, 'M20': 20, 'M30': 30, 'H1': 16385, 'H2': 16386, 'H3': 16387, 'H4': 16388, 'H6': 16390, 'H8': 16392, 'H12': 16396, 'D1': 16408, 'W1': 32769, 'MN1': 49153}
class TickerService:
 35class TickerService:
 36    def __init__(self):
 37        self.db_service = DbService()
 38
 39
 40    def create_update_timeframes(self) -> OperationResult:
 41        if not mt5.initialize():
 42            print("initialize() failed, error code =", mt5.last_error())
 43            quit()
 44
 45        with self.db_service.get_database() as db:
 46            # Guardar en la base de datos si no existe
 47            for name, number in TIMEFRAMES.items():
 48                result_timeframe = self.db_service.get_by_filter(db, Timeframe, Name=name)
 49
 50                if not result_timeframe:
 51                    timeframe = Timeframe(
 52                        Name=name,
 53                        MetaTraderNumber=number,
 54                        Selected=False,
 55                    )
 56
 57                    self.db_service.create(db, timeframe)
 58                
 59                else:
 60                    timeframe = result_timeframe
 61                    timeframe.Selected = False if not timeframe.Selected else True
 62                    self.db_service.update(db, Timeframe, timeframe)
 63
 64
 65            # Confirmar los cambios en la base de datos
 66            self.db_service.save(db)
 67
 68        return OperationResult(ok=True, message="Categorías y tickers procesados correctamente", item=None)
 69
 70    def update_timeframe(self, timeframe: Timeframe, selected:bool):
 71        with self.db_service.get_database() as db:
 72
 73            timeframe.Selected = selected
 74
 75            self.db_service.update(db, Timeframe, timeframe)
 76            self.db_service.save(db)
 77        
 78    def create(self) -> OperationResult:
 79        if not mt5.initialize():
 80            print("initialize() failed, error code =", mt5.last_error())
 81            quit()
 82
 83        symbols = mt5.symbols_get()
 84
 85        categories_tickers = {}
 86        for symbol in symbols:
 87            category_name = symbol.path.split("\\")[0]
 88            ticker_name = symbol.path.split("\\")[1]
 89
 90            if category_name not in categories_tickers.keys():
 91                categories_tickers[category_name] = []
 92
 93            categories_tickers[category_name].append(ticker_name)
 94
 95        with self.db_service.get_database() as db:
 96            for category_name, tickers in categories_tickers.items():
 97                # Buscar la categoría en la base de datos
 98                category = self.db_service.get_by_filter(db, Category, Name=category_name)
 99                
100                # Si la categoría no existe, crearla
101                if not category:
102                    category = Category(Name=category_name, Commission=0)
103                    self.db_service.create(db, category)
104
105                # Procesar los tickers asociados a esta categoría
106                for ticker_name in tickers:
107                    print(ticker_name)
108
109                    _ = mt5.copy_rates_from_pos(
110                        ticker_name, 16385, 0, 3
111                    )
112
113                    symbol_info = mt5.symbol_info(ticker_name)
114
115                    if symbol_info is not None:
116                        print(symbol_info)
117
118                        while symbol_info.ask == 0:
119                            symbol_info = mt5.symbol_info(ticker_name)
120
121                        spread = (symbol_info.spread * symbol_info.point) / symbol_info.ask
122
123                        # Buscar el ticker en la base de datos
124                        ticker = self.db_service.get_by_filter(db, Ticker, Name=ticker_name, CategoryId=category.Id)
125                        
126                        # Si el ticker no existe, crearlo
127                        if not ticker:
128                            ticker = Ticker(Name=ticker_name, Category=category, Spread=spread)
129                            self.db_service.create(db, ticker)
130                        else:
131                            # Si el ticker existe, actualizar su información
132                            ticker.Spread = spread
133                            self.db_service.update(db, Ticker, ticker)
134
135            self.db_service.save(db)
136
137        return OperationResult(ok=True, message="Categorías y tickers procesados correctamente", item=None)
138
139    def get_tickers_by_category(self, category_id:int) -> List[Ticker]:
140        with self.db_service.get_database() as db:
141            tickers = self.db_service.get_many_by_filter(db, Ticker, CategoryId=category_id)
142            
143            return tickers
144        
145    def get_by_name(self, name:str) -> Ticker:
146        with self.db_service.get_database() as db:
147            ticker = self.db_service.get_by_filter(db, Ticker, Name=name)
148            
149        return ticker
150
151    def get_all_categories(self) -> List[Category]:
152        with self.db_service.get_database() as db:
153            categories = self.db_service.get_all(db, Category)
154            return categories
155            
156    def get_ticker_by_id(self, id) -> Ticker:
157        with self.db_service.get_database() as db:
158            ticker = self.db_service.get_by_id(db, Ticker, id)
159            return ticker
160            
161    def get_all_timeframes(self) -> List[Timeframe]:
162        with self.db_service.get_database() as db:
163            categories = self.db_service.get_all(db, Timeframe)
164            return categories
165
166    def get_timeframe_by_id(self, id) -> Timeframe:
167        with self.db_service.get_database() as db:
168            timeframe = self.db_service.get_by_id(db, Timeframe, id)
169            return timeframe
170        
171    def get_timeframe_by_name(self, name:str) -> Timeframe:
172        with self.db_service.get_database() as db:
173            timeframe = self.db_service.get_by_filter(db, Timeframe, Name=name)
174            return timeframe
175                        
176    def get_all_tickers(self) -> List[Ticker]:
177        with self.db_service.get_database() as db:
178            tickers = self.db_service.get_all(db, Ticker)
179            return tickers
180         
181    def get_tickers_by_strategy(self, strategy_id) -> OperationResult:
182        with self.db_service.get_database() as db:
183        
184            strategies = (
185                db.query(Ticker)
186                    .join(Bot, Bot.TickerId == Ticker.Id)
187                    .filter(Bot.StrategyId == strategy_id)
188            )
189            
190            result = OperationResult(ok=True, message=None, item=strategies)
191            return result
192
193    def update_categories_commissions(self, category_id: int | List[int], commission:float | List[float]):
194
195        if type(category_id) != list:
196            category_id = [category_id]
197
198        if type(commission) != list:
199            commission = [commission]
200
201        if len(commission) != len(category_id):
202            return OperationResult(ok=False, message='The length of the lists must be the same')
203
204        with self.db_service.get_database() as db:
205            for cat_id, com in zip(category_id, commission):
206                old_category = self.db_service.get_by_id(db, Category, id=cat_id)
207                new_category = old_category
208                new_category.Commission = com
209
210                self.db_service.update(db, Category, new_category)
211
212            return OperationResult(ok=True, message=None, item=None)
db_service
def create_update_timeframes(self) -> app.backbone.services.operation_result.OperationResult:
40    def create_update_timeframes(self) -> OperationResult:
41        if not mt5.initialize():
42            print("initialize() failed, error code =", mt5.last_error())
43            quit()
44
45        with self.db_service.get_database() as db:
46            # Guardar en la base de datos si no existe
47            for name, number in TIMEFRAMES.items():
48                result_timeframe = self.db_service.get_by_filter(db, Timeframe, Name=name)
49
50                if not result_timeframe:
51                    timeframe = Timeframe(
52                        Name=name,
53                        MetaTraderNumber=number,
54                        Selected=False,
55                    )
56
57                    self.db_service.create(db, timeframe)
58                
59                else:
60                    timeframe = result_timeframe
61                    timeframe.Selected = False if not timeframe.Selected else True
62                    self.db_service.update(db, Timeframe, timeframe)
63
64
65            # Confirmar los cambios en la base de datos
66            self.db_service.save(db)
67
68        return OperationResult(ok=True, message="Categorías y tickers procesados correctamente", item=None)
def update_timeframe( self, timeframe: app.backbone.entities.timeframe.Timeframe, selected: bool):
70    def update_timeframe(self, timeframe: Timeframe, selected:bool):
71        with self.db_service.get_database() as db:
72
73            timeframe.Selected = selected
74
75            self.db_service.update(db, Timeframe, timeframe)
76            self.db_service.save(db)
 78    def create(self) -> OperationResult:
 79        if not mt5.initialize():
 80            print("initialize() failed, error code =", mt5.last_error())
 81            quit()
 82
 83        symbols = mt5.symbols_get()
 84
 85        categories_tickers = {}
 86        for symbol in symbols:
 87            category_name = symbol.path.split("\\")[0]
 88            ticker_name = symbol.path.split("\\")[1]
 89
 90            if category_name not in categories_tickers.keys():
 91                categories_tickers[category_name] = []
 92
 93            categories_tickers[category_name].append(ticker_name)
 94
 95        with self.db_service.get_database() as db:
 96            for category_name, tickers in categories_tickers.items():
 97                # Buscar la categoría en la base de datos
 98                category = self.db_service.get_by_filter(db, Category, Name=category_name)
 99                
100                # Si la categoría no existe, crearla
101                if not category:
102                    category = Category(Name=category_name, Commission=0)
103                    self.db_service.create(db, category)
104
105                # Procesar los tickers asociados a esta categoría
106                for ticker_name in tickers:
107                    print(ticker_name)
108
109                    _ = mt5.copy_rates_from_pos(
110                        ticker_name, 16385, 0, 3
111                    )
112
113                    symbol_info = mt5.symbol_info(ticker_name)
114
115                    if symbol_info is not None:
116                        print(symbol_info)
117
118                        while symbol_info.ask == 0:
119                            symbol_info = mt5.symbol_info(ticker_name)
120
121                        spread = (symbol_info.spread * symbol_info.point) / symbol_info.ask
122
123                        # Buscar el ticker en la base de datos
124                        ticker = self.db_service.get_by_filter(db, Ticker, Name=ticker_name, CategoryId=category.Id)
125                        
126                        # Si el ticker no existe, crearlo
127                        if not ticker:
128                            ticker = Ticker(Name=ticker_name, Category=category, Spread=spread)
129                            self.db_service.create(db, ticker)
130                        else:
131                            # Si el ticker existe, actualizar su información
132                            ticker.Spread = spread
133                            self.db_service.update(db, Ticker, ticker)
134
135            self.db_service.save(db)
136
137        return OperationResult(ok=True, message="Categorías y tickers procesados correctamente", item=None)
def get_tickers_by_category(self, category_id: int) -> List[app.backbone.entities.ticker.Ticker]:
139    def get_tickers_by_category(self, category_id:int) -> List[Ticker]:
140        with self.db_service.get_database() as db:
141            tickers = self.db_service.get_many_by_filter(db, Ticker, CategoryId=category_id)
142            
143            return tickers
def get_by_name(self, name: str) -> app.backbone.entities.ticker.Ticker:
145    def get_by_name(self, name:str) -> Ticker:
146        with self.db_service.get_database() as db:
147            ticker = self.db_service.get_by_filter(db, Ticker, Name=name)
148            
149        return ticker
def get_all_categories(self) -> List[app.backbone.entities.category.Category]:
151    def get_all_categories(self) -> List[Category]:
152        with self.db_service.get_database() as db:
153            categories = self.db_service.get_all(db, Category)
154            return categories
def get_ticker_by_id(self, id) -> app.backbone.entities.ticker.Ticker:
156    def get_ticker_by_id(self, id) -> Ticker:
157        with self.db_service.get_database() as db:
158            ticker = self.db_service.get_by_id(db, Ticker, id)
159            return ticker
def get_all_timeframes(self) -> List[app.backbone.entities.timeframe.Timeframe]:
161    def get_all_timeframes(self) -> List[Timeframe]:
162        with self.db_service.get_database() as db:
163            categories = self.db_service.get_all(db, Timeframe)
164            return categories
def get_timeframe_by_id(self, id) -> app.backbone.entities.timeframe.Timeframe:
166    def get_timeframe_by_id(self, id) -> Timeframe:
167        with self.db_service.get_database() as db:
168            timeframe = self.db_service.get_by_id(db, Timeframe, id)
169            return timeframe
def get_timeframe_by_name(self, name: str) -> app.backbone.entities.timeframe.Timeframe:
171    def get_timeframe_by_name(self, name:str) -> Timeframe:
172        with self.db_service.get_database() as db:
173            timeframe = self.db_service.get_by_filter(db, Timeframe, Name=name)
174            return timeframe
def get_all_tickers(self) -> List[app.backbone.entities.ticker.Ticker]:
176    def get_all_tickers(self) -> List[Ticker]:
177        with self.db_service.get_database() as db:
178            tickers = self.db_service.get_all(db, Ticker)
179            return tickers
def get_tickers_by_strategy( self, strategy_id) -> app.backbone.services.operation_result.OperationResult:
181    def get_tickers_by_strategy(self, strategy_id) -> OperationResult:
182        with self.db_service.get_database() as db:
183        
184            strategies = (
185                db.query(Ticker)
186                    .join(Bot, Bot.TickerId == Ticker.Id)
187                    .filter(Bot.StrategyId == strategy_id)
188            )
189            
190            result = OperationResult(ok=True, message=None, item=strategies)
191            return result
def update_categories_commissions( self, category_id: Union[int, List[int]], commission: Union[float, List[float]]):
193    def update_categories_commissions(self, category_id: int | List[int], commission:float | List[float]):
194
195        if type(category_id) != list:
196            category_id = [category_id]
197
198        if type(commission) != list:
199            commission = [commission]
200
201        if len(commission) != len(category_id):
202            return OperationResult(ok=False, message='The length of the lists must be the same')
203
204        with self.db_service.get_database() as db:
205            for cat_id, com in zip(category_id, commission):
206                old_category = self.db_service.get_by_id(db, Category, id=cat_id)
207                new_category = old_category
208                new_category.Commission = com
209
210                self.db_service.update(db, Category, new_category)
211
212            return OperationResult(ok=True, message=None, item=None)