package application import ( "context" "log" "sync" "sync/atomic" "git.ali33.ru/fcg-xvii/rest" ) type StreamStore struct { mu sync.RWMutex items []rest.IStream userData any } /////////////////////////////////////////// func NewStreamStore(stream rest.IStream, userData any) *StreamStore { items := []rest.IStream{} if stream != nil { items = append(items, stream) } return &StreamStore{ items: items, } } func (s *StreamStore) UserData() any { return s.userData } func (s *StreamStore) add(item rest.IStream) { s.mu.Lock() s.items = append(s.items, item) s.mu.Unlock() } func (s *StreamStore) remove(item rest.IStream) int { s.mu.Lock() for i, val := range s.items { if val == item { s.items = append(s.items[:i], s.items[i+1:]...) break } } res := len(s.items) s.mu.Unlock() return res } func (s *StreamStore) get(index int) (res rest.IStream) { s.mu.RLock() if index >= 0 && index < len(s.items) { res = s.items[index] } s.mu.RUnlock() return res } func (s *StreamStore) Size() int { s.mu.RLock() res := len(s.items) s.mu.RUnlock() return res } func (s *StreamStore) SendMessage(message rest.IRequestOut) { s.mu.RLock() for _, val := range s.items { val.SendMessage(message) } s.mu.RUnlock() } //////////////////////////////////////////////////////////// func NewSockets() *Sockets { res := &Sockets{ counter: &atomic.Int64{}, chConnect: make(chan rest.IStream, 5), clients: &sync.Map{}, } go res.work() return res } type Sockets struct { ctx context.Context cancel context.CancelFunc counter *atomic.Int64 chConnect chan rest.IStream clients *sync.Map } func (s *Sockets) work() { s.ctx, s.cancel = context.WithCancel(context.Background()) for { select { case <-s.ctx.Done(): return case stream := <-s.chConnect: log.Println("CONNECT-------", s.counter.Add(1)) go func() { <-stream.Context().Done() log.Println("DISCONNECT-------", s.counter.Add(-1)) // auth auth := stream.Auth() if auth != nil { id := auth.Int32("id", 0) iStore, ok := s.clients.Load(id) if ok { store := iStore.(*StreamStore) if store.remove(stream) == 0 { s.clients.Delete(id) } } } }() } } } func (s *Sockets) Connect() chan<- rest.IStream { return s.chConnect } func (s *Sockets) OnlineCount() int64 { return s.counter.Load() } func (s *Sockets) ClientAuth(id any, stream rest.IStream, userData any) { store, exists := s.clients.LoadOrStore(id, NewStreamStore(stream, userData)) if exists { store.(*StreamStore).add(stream) } } func (s *Sockets) ClientExists(id any) bool { _, exists := s.clients.Load(id) return exists } func (s *Sockets) SendMessage(id any, message rest.IRequestOut) { store, exists := s.clients.Load(id) if exists { store.(*StreamStore).SendMessage(message) } } func (s *Sockets) ClientsEach(cb func(id any, store *StreamStore) bool) { s.clients.Range(func(key, val any) bool { return cb(key, val.(*StreamStore)) }) }