sockets.go 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. package application
  2. import (
  3. "context"
  4. "log"
  5. "sync"
  6. "sync/atomic"
  7. "git.ali33.ru/fcg-xvii/rest"
  8. )
  9. func NewSockets() *Sockets {
  10. res := &Sockets{
  11. counter: &atomic.Int64{},
  12. chConnect: make(chan rest.IStream, 5),
  13. clients: &sync.Map{},
  14. }
  15. go res.work()
  16. return res
  17. }
  18. type Sockets struct {
  19. ctx context.Context
  20. cancel context.CancelFunc
  21. counter *atomic.Int64
  22. chConnect chan rest.IStream
  23. clients *sync.Map
  24. }
  25. func (s *Sockets) work() {
  26. s.ctx, s.cancel = context.WithCancel(context.Background())
  27. for {
  28. select {
  29. case <-s.ctx.Done():
  30. return
  31. case stream := <-s.chConnect:
  32. log.Println("CONNECT-------", s.counter.Add(1))
  33. go func() {
  34. <-stream.Context().Done()
  35. log.Println("DISCONNECT-------", s.counter.Add(-1))
  36. /*
  37. // auth
  38. auth := stream.Auth()
  39. if auth != nil {
  40. id := auth.Int32("id", 0)
  41. iStore, ok := s.clients.Load(id)
  42. if ok {
  43. store := iStore.(*StreamStore)
  44. if store.remove(stream) == 0 {
  45. s.clients.Delete(id)
  46. }
  47. }
  48. }
  49. */
  50. }()
  51. }
  52. }
  53. }
  54. func (s *Sockets) Connect() chan<- rest.IStream {
  55. return s.chConnect
  56. }
  57. func (s *Sockets) OnlineCount() int64 {
  58. return s.counter.Load()
  59. }