package application import ( "context" "sync" "git.ali33.ru/fcg-xvii/rest" ) type StreamStore struct { items sync.Map wg sync.WaitGroup ctx context.Context cancel context.CancelFunc } func NewStreamStore(stream rest.IStream, ctx context.Context) *StreamStore { res := &StreamStore{} res.ctx, res.cancel = context.WithCancel(ctx) res.Add(stream) return res } func (s *StreamStore) Add(item rest.IStream) { s.wg.Add(1) // добавляем горутину в WaitGroup s.items.Store(item, struct{}{}) // используем sync.Map go func() { defer s.wg.Done() <-item.Context().Done() if s.remove(item) { s.cancel() // завершаем контекст, если потоков больше нет } }() } func (s *StreamStore) remove(item rest.IStream) bool { s.items.Delete(item) // Проверяем, остались ли еще элементы var hasItems bool s.items.Range(func(key, value any) bool { hasItems = true return false // останавливаем Range }) return !hasItems } func (s *StreamStore) SendEvent(message rest.IRequestOut) { s.items.Range(func(key, value any) bool { key.(rest.IStream).SendMessage(message) return true }) } func (s *StreamStore) Close() { s.items.Range(func(key, value any) bool { key.(rest.IStream).Close() return true }) s.wg.Wait() // ждем завершения всех горутин s.cancel() } func (s *StreamStore) Context() context.Context { return s.ctx }