package application import ( "context" "log" "sync" "git.ali33.ru/fcg-xvii/rest" ) type StreamGroup struct { clients map[any]*StreamStore mu sync.RWMutex ctx context.Context cancel context.CancelFunc } func NewStreamGroup(ctx context.Context) *StreamGroup { groupCtx, cancel := context.WithCancel(ctx) return &StreamGroup{ clients: make(map[any]*StreamStore), ctx: groupCtx, cancel: cancel, } } // Создание нового хранилища или добавление потока в существующее хранилище. func (s *StreamGroup) Store(clientID any, stream rest.IStream) *StreamStore { s.mu.Lock() defer s.mu.Unlock() // Если хранилище для клиента уже существует, добавляем поток if store, ok := s.clients[clientID]; ok { store.Add(stream) return store } // Если хранилище не существует, создаем новое newStore := NewStreamStore(stream, s.ctx) s.clients[clientID] = newStore // Запускаем горутину для мониторинга состояния хранилища go func(clID any, store *StreamStore) { <-store.ctx.Done() // Ждем завершения хранилища s.remove(clID) }(clientID, newStore) return newStore } // Удаление хранилища func (s *StreamGroup) remove(clientID any) { log.Println("remove clientID", clientID) s.mu.Lock() defer s.mu.Unlock() // Получаем хранилище, если оно существует if store, ok := s.clients[clientID]; ok { // Закрываем хранилище, чтобы очистить ресурсы store.Close() delete(s.clients, clientID) } } // Закрытие всех хранилищ и остановка группы func (s *StreamGroup) Close() { s.mu.Lock() defer s.mu.Unlock() // Закрываем каждое хранилище for _, store := range s.clients { store.Close() } s.clients = nil // Очищаем карту s.cancel() // Завершаем общий контекст } // Рассылает сообщение определенной группе func (s *StreamGroup) EventSend(clientID any, event rest.IRequestOut) { s.mu.RLock() defer s.mu.RUnlock() if store, ok := s.clients[clientID]; ok { store.SendEvent(event) } } // Рассылает сообщение всем потокам в группе func (s *StreamGroup) EventBroadcast(event rest.IRequestOut) { s.mu.RLock() defer s.mu.RUnlock() for _, store := range s.clients { store.SendEvent(event) } }