stream_store.go 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. package application
  2. import (
  3. "context"
  4. "sync"
  5. "git.ali33.ru/fcg-xvii/rest"
  6. )
  7. type StreamStore struct {
  8. items sync.Map
  9. wg sync.WaitGroup
  10. ctx context.Context
  11. cancel context.CancelFunc
  12. }
  13. func NewStreamStore(stream rest.IStream, ctx context.Context) *StreamStore {
  14. res := &StreamStore{}
  15. res.ctx, res.cancel = context.WithCancel(ctx)
  16. res.Add(stream)
  17. return res
  18. }
  19. func (s *StreamStore) Add(item rest.IStream) {
  20. s.wg.Add(1) // добавляем горутину в WaitGroup
  21. s.items.Store(item, struct{}{}) // используем sync.Map
  22. go func() {
  23. defer s.wg.Done()
  24. <-item.Context().Done()
  25. if s.remove(item) {
  26. s.cancel() // завершаем контекст, если потоков больше нет
  27. }
  28. }()
  29. }
  30. func (s *StreamStore) remove(item rest.IStream) bool {
  31. s.items.Delete(item)
  32. // Проверяем, остались ли еще элементы
  33. var hasItems bool
  34. s.items.Range(func(key, value any) bool {
  35. hasItems = true
  36. return false // останавливаем Range
  37. })
  38. return !hasItems
  39. }
  40. func (s *StreamStore) SendEvent(message rest.IRequestOut) {
  41. s.items.Range(func(key, value any) bool {
  42. key.(rest.IStream).SendMessage(message)
  43. return true
  44. })
  45. }
  46. func (s *StreamStore) Close() {
  47. s.items.Range(func(key, value any) bool {
  48. key.(rest.IStream).Close()
  49. return true
  50. })
  51. s.wg.Wait() // ждем завершения всех горутин
  52. s.cancel()
  53. }
  54. func (s *StreamStore) Context() context.Context {
  55. return s.ctx
  56. }