sockets.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. package application
  2. import (
  3. "context"
  4. "log"
  5. "sync"
  6. "sync/atomic"
  7. "git.ali33.ru/fcg-xvii/rest"
  8. )
  9. type StreamStore struct {
  10. mu sync.RWMutex
  11. items []rest.IStream
  12. }
  13. ///////////////////////////////////////////
  14. func NewStreamStore(stream rest.IStream) *StreamStore {
  15. items := []rest.IStream{}
  16. if stream != nil {
  17. items = append(items, stream)
  18. }
  19. return &StreamStore{
  20. items: items,
  21. }
  22. }
  23. func (s *StreamStore) add(item rest.IStream) {
  24. s.mu.Lock()
  25. s.items = append(s.items, item)
  26. s.mu.Unlock()
  27. }
  28. func (s *StreamStore) remove(item rest.IStream) int {
  29. s.mu.Lock()
  30. for i, val := range s.items {
  31. if val == item {
  32. s.items = append(s.items[:i], s.items[i+1:]...)
  33. break
  34. }
  35. }
  36. res := len(s.items)
  37. s.mu.Unlock()
  38. return res
  39. }
  40. func (s *StreamStore) get(index int) (res rest.IStream) {
  41. s.mu.RLock()
  42. if index >= 0 && index < len(s.items) {
  43. res = s.items[index]
  44. }
  45. s.mu.RUnlock()
  46. return res
  47. }
  48. func (s *StreamStore) Size() int {
  49. s.mu.RLock()
  50. res := len(s.items)
  51. s.mu.RUnlock()
  52. return res
  53. }
  54. func (s *StreamStore) SendMessage(message rest.IRequestOut) {
  55. s.mu.RLock()
  56. for _, val := range s.items {
  57. val.SendMessage(message)
  58. }
  59. s.mu.RUnlock()
  60. }
  61. ////////////////////////////////////////////////////////////
  62. func NewSockets() *Sockets {
  63. res := &Sockets{
  64. counter: &atomic.Int64{},
  65. chConnect: make(chan rest.IStream, 5),
  66. clients: &sync.Map{},
  67. }
  68. go res.work()
  69. return res
  70. }
  71. type Sockets struct {
  72. ctx context.Context
  73. cancel context.CancelFunc
  74. counter *atomic.Int64
  75. chConnect chan rest.IStream
  76. clients *sync.Map
  77. }
  78. func (s *Sockets) work() {
  79. s.ctx, s.cancel = context.WithCancel(context.Background())
  80. for {
  81. select {
  82. case <-s.ctx.Done():
  83. return
  84. case stream := <-s.chConnect:
  85. log.Println("CONNECT-------", s.counter.Add(1))
  86. go func() {
  87. <-stream.Context().Done()
  88. log.Println("DISCONNECT-------", s.counter.Add(-1))
  89. // auth
  90. auth := stream.Auth()
  91. if auth != nil {
  92. id := auth.Int32("id", 0)
  93. iStore, ok := s.clients.Load(id)
  94. if ok {
  95. store := iStore.(*StreamStore)
  96. if store.remove(stream) == 0 {
  97. s.clients.Delete(id)
  98. }
  99. }
  100. }
  101. }()
  102. }
  103. }
  104. }
  105. func (s *Sockets) Connect() chan<- rest.IStream {
  106. return s.chConnect
  107. }
  108. func (s *Sockets) OnlineCount() int64 {
  109. return s.counter.Load()
  110. }
  111. func (s *Sockets) ClientAuth(id any, stream rest.IStream) {
  112. store, exists := s.clients.LoadOrStore(id, NewStreamStore(stream))
  113. if exists {
  114. store.(*StreamStore).add(stream)
  115. }
  116. }
  117. func (s *Sockets) ClientExists(id any) bool {
  118. _, exists := s.clients.Load(id)
  119. return exists
  120. }
  121. func (s *Sockets) SendMessage(id any, message rest.IRequestOut) {
  122. store, exists := s.clients.Load(id)
  123. if exists {
  124. store.(*StreamStore).SendMessage(message)
  125. }
  126. }