12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394 |
- package application
- import (
- "context"
- "log"
- "sync"
- "git.ali33.ru/fcg-xvii/rest"
- )
- type StreamGroup struct {
- clients map[any]*StreamStore
- mu sync.RWMutex
- ctx context.Context
- cancel context.CancelFunc
- }
- func NewStreamGroup(ctx context.Context) *StreamGroup {
- groupCtx, cancel := context.WithCancel(ctx)
- return &StreamGroup{
- clients: make(map[any]*StreamStore),
- ctx: groupCtx,
- cancel: cancel,
- }
- }
- // Создание нового хранилища или добавление потока в существующее хранилище.
- func (s *StreamGroup) Store(clientID any, stream rest.IStream) *StreamStore {
- s.mu.Lock()
- defer s.mu.Unlock()
- // Если хранилище для клиента уже существует, добавляем поток
- if store, ok := s.clients[clientID]; ok {
- store.Add(stream)
- return store
- }
- // Если хранилище не существует, создаем новое
- newStore := NewStreamStore(stream, s.ctx)
- s.clients[clientID] = newStore
- // Запускаем горутину для мониторинга состояния хранилища
- go func(clID any, store *StreamStore) {
- <-store.ctx.Done() // Ждем завершения хранилища
- s.remove(clID)
- }(clientID, newStore)
- return newStore
- }
- // Удаление хранилища
- func (s *StreamGroup) remove(clientID any) {
- log.Println("remove clientID", clientID)
- s.mu.Lock()
- defer s.mu.Unlock()
- // Получаем хранилище, если оно существует
- if store, ok := s.clients[clientID]; ok {
- // Закрываем хранилище, чтобы очистить ресурсы
- store.Close()
- delete(s.clients, clientID)
- }
- }
- // Закрытие всех хранилищ и остановка группы
- func (s *StreamGroup) Close() {
- s.mu.Lock()
- defer s.mu.Unlock()
- // Закрываем каждое хранилище
- for _, store := range s.clients {
- store.Close()
- }
- s.clients = nil // Очищаем карту
- s.cancel() // Завершаем общий контекст
- }
- // Рассылает сообщение определенной группе
- func (s *StreamGroup) EventSend(clientID any, event rest.IRequestOut) {
- s.mu.RLock()
- defer s.mu.RUnlock()
- if store, ok := s.clients[clientID]; ok {
- store.SendEvent(event)
- }
- }
- // Рассылает сообщение всем потокам в группе
- func (s *StreamGroup) EventBroadcast(event rest.IRequestOut) {
- s.mu.RLock()
- defer s.mu.RUnlock()
- for _, store := range s.clients {
- store.SendEvent(event)
- }
- }
|