stream_group.go 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. package application
  2. import (
  3. "context"
  4. "log"
  5. "sync"
  6. "git.ali33.ru/fcg-xvii/rest"
  7. )
  8. type StreamGroup struct {
  9. clients map[any]*StreamStore
  10. mu sync.RWMutex
  11. ctx context.Context
  12. cancel context.CancelFunc
  13. }
  14. func NewStreamGroup(ctx context.Context) *StreamGroup {
  15. groupCtx, cancel := context.WithCancel(ctx)
  16. return &StreamGroup{
  17. clients: make(map[any]*StreamStore),
  18. ctx: groupCtx,
  19. cancel: cancel,
  20. }
  21. }
  22. // Создание нового хранилища или добавление потока в существующее хранилище.
  23. func (s *StreamGroup) Store(clientID any, stream rest.IStream) *StreamStore {
  24. s.mu.Lock()
  25. defer s.mu.Unlock()
  26. // Если хранилище для клиента уже существует, добавляем поток
  27. if store, ok := s.clients[clientID]; ok {
  28. store.Add(stream)
  29. return store
  30. }
  31. // Если хранилище не существует, создаем новое
  32. newStore := NewStreamStore(stream, s.ctx)
  33. s.clients[clientID] = newStore
  34. // Запускаем горутину для мониторинга состояния хранилища
  35. go func(clID any, store *StreamStore) {
  36. <-store.ctx.Done() // Ждем завершения хранилища
  37. s.remove(clID)
  38. }(clientID, newStore)
  39. return newStore
  40. }
  41. // Удаление хранилища
  42. func (s *StreamGroup) remove(clientID any) {
  43. log.Println("remove clientID", clientID)
  44. s.mu.Lock()
  45. defer s.mu.Unlock()
  46. // Получаем хранилище, если оно существует
  47. if store, ok := s.clients[clientID]; ok {
  48. // Закрываем хранилище, чтобы очистить ресурсы
  49. store.Close()
  50. delete(s.clients, clientID)
  51. }
  52. }
  53. // Закрытие всех хранилищ и остановка группы
  54. func (s *StreamGroup) Close() {
  55. s.mu.Lock()
  56. defer s.mu.Unlock()
  57. // Закрываем каждое хранилище
  58. for _, store := range s.clients {
  59. store.Close()
  60. }
  61. s.clients = nil // Очищаем карту
  62. s.cancel() // Завершаем общий контекст
  63. }
  64. // Рассылает сообщение определенной группе
  65. func (s *StreamGroup) EventSend(clientID any, event rest.IRequestOut) {
  66. s.mu.RLock()
  67. defer s.mu.RUnlock()
  68. if store, ok := s.clients[clientID]; ok {
  69. store.SendEvent(event)
  70. }
  71. }
  72. // Рассылает сообщение всем потокам в группе
  73. func (s *StreamGroup) EventBroadcast(event rest.IRequestOut) {
  74. s.mu.RLock()
  75. defer s.mu.RUnlock()
  76. for _, store := range s.clients {
  77. store.SendEvent(event)
  78. }
  79. }