1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465 |
- package application
- import (
- "context"
- "sync"
- "git.ali33.ru/fcg-xvii/rest"
- )
- type StreamStore struct {
- items sync.Map
- wg sync.WaitGroup
- ctx context.Context
- cancel context.CancelFunc
- }
- func NewStreamStore(stream rest.IStream, ctx context.Context) *StreamStore {
- res := &StreamStore{}
- res.ctx, res.cancel = context.WithCancel(ctx)
- res.Add(stream)
- return res
- }
- func (s *StreamStore) Add(item rest.IStream) {
- s.wg.Add(1) // добавляем горутину в WaitGroup
- s.items.Store(item, struct{}{}) // используем sync.Map
- go func() {
- defer s.wg.Done()
- <-item.Context().Done()
- if s.remove(item) {
- s.cancel() // завершаем контекст, если потоков больше нет
- }
- }()
- }
- func (s *StreamStore) remove(item rest.IStream) bool {
- s.items.Delete(item)
- // Проверяем, остались ли еще элементы
- var hasItems bool
- s.items.Range(func(key, value any) bool {
- hasItems = true
- return false // останавливаем Range
- })
- return !hasItems
- }
- func (s *StreamStore) SendEvent(message rest.IRequestOut) {
- s.items.Range(func(key, value any) bool {
- key.(rest.IStream).SendMessage(message)
- return true
- })
- }
- func (s *StreamStore) Close() {
- s.items.Range(func(key, value any) bool {
- key.(rest.IStream).Close()
- return true
- })
- s.wg.Wait() // ждем завершения всех горутин
- s.cancel()
- }
- func (s *StreamStore) Context() context.Context {
- return s.ctx
- }
|