sockets.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. package application
  2. import (
  3. "context"
  4. "log"
  5. "sync"
  6. "sync/atomic"
  7. "git.ali33.ru/fcg-xvii/rest"
  8. )
  9. type StreamStore struct {
  10. mu sync.RWMutex
  11. items []rest.IStream
  12. userData any
  13. }
  14. ///////////////////////////////////////////
  15. func NewStreamStore(stream rest.IStream, userData any) *StreamStore {
  16. items := []rest.IStream{}
  17. if stream != nil {
  18. items = append(items, stream)
  19. }
  20. return &StreamStore{
  21. items: items,
  22. }
  23. }
  24. func (s *StreamStore) UserData() any {
  25. return s.userData
  26. }
  27. func (s *StreamStore) add(item rest.IStream) {
  28. s.mu.Lock()
  29. s.items = append(s.items, item)
  30. s.mu.Unlock()
  31. }
  32. func (s *StreamStore) remove(item rest.IStream) int {
  33. s.mu.Lock()
  34. for i, val := range s.items {
  35. if val == item {
  36. s.items = append(s.items[:i], s.items[i+1:]...)
  37. break
  38. }
  39. }
  40. res := len(s.items)
  41. s.mu.Unlock()
  42. return res
  43. }
  44. func (s *StreamStore) get(index int) (res rest.IStream) {
  45. s.mu.RLock()
  46. if index >= 0 && index < len(s.items) {
  47. res = s.items[index]
  48. }
  49. s.mu.RUnlock()
  50. return res
  51. }
  52. func (s *StreamStore) Size() int {
  53. s.mu.RLock()
  54. res := len(s.items)
  55. s.mu.RUnlock()
  56. return res
  57. }
  58. func (s *StreamStore) SendMessage(message rest.IRequestOut) {
  59. s.mu.RLock()
  60. for _, val := range s.items {
  61. val.SendMessage(message)
  62. }
  63. s.mu.RUnlock()
  64. }
  65. ////////////////////////////////////////////////////////////
  66. func NewSockets() *Sockets {
  67. res := &Sockets{
  68. counter: &atomic.Int64{},
  69. chConnect: make(chan rest.IStream, 5),
  70. clients: &sync.Map{},
  71. }
  72. go res.work()
  73. return res
  74. }
  75. type Sockets struct {
  76. ctx context.Context
  77. cancel context.CancelFunc
  78. counter *atomic.Int64
  79. chConnect chan rest.IStream
  80. clients *sync.Map
  81. }
  82. func (s *Sockets) work() {
  83. s.ctx, s.cancel = context.WithCancel(context.Background())
  84. for {
  85. select {
  86. case <-s.ctx.Done():
  87. return
  88. case stream := <-s.chConnect:
  89. log.Println("CONNECT-------", s.counter.Add(1))
  90. go func() {
  91. <-stream.Context().Done()
  92. log.Println("DISCONNECT-------", s.counter.Add(-1))
  93. // auth
  94. auth := stream.Auth()
  95. if auth != nil {
  96. id := auth.Int32("id", 0)
  97. iStore, ok := s.clients.Load(id)
  98. if ok {
  99. store := iStore.(*StreamStore)
  100. if store.remove(stream) == 0 {
  101. s.clients.Delete(id)
  102. }
  103. }
  104. }
  105. }()
  106. }
  107. }
  108. }
  109. func (s *Sockets) Connect() chan<- rest.IStream {
  110. return s.chConnect
  111. }
  112. func (s *Sockets) OnlineCount() int64 {
  113. return s.counter.Load()
  114. }
  115. func (s *Sockets) ClientAuth(id any, stream rest.IStream, userData any) {
  116. store, exists := s.clients.LoadOrStore(id, NewStreamStore(stream, userData))
  117. if exists {
  118. store.(*StreamStore).add(stream)
  119. }
  120. }
  121. func (s *Sockets) ClientExists(id any) bool {
  122. _, exists := s.clients.Load(id)
  123. return exists
  124. }
  125. func (s *Sockets) SendMessage(id any, message rest.IRequestOut) {
  126. store, exists := s.clients.Load(id)
  127. if exists {
  128. store.(*StreamStore).SendMessage(message)
  129. }
  130. }
  131. func (s *Sockets) ClientsEach(cb func(id any, store *StreamStore) bool) {
  132. s.clients.Range(func(key, val any) bool {
  133. return cb(key, val.(*StreamStore))
  134. })
  135. }