123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157 |
- package application
- import (
- "context"
- "log"
- "sync"
- "sync/atomic"
- "git.ali33.ru/fcg-xvii/rest"
- )
- type StreamStore struct {
- mu sync.RWMutex
- items []rest.IStream
- userData any
- }
- ///////////////////////////////////////////
- func NewStreamStore(stream rest.IStream, userData any) *StreamStore {
- items := []rest.IStream{}
- if stream != nil {
- items = append(items, stream)
- }
- return &StreamStore{
- items: items,
- }
- }
- func (s *StreamStore) UserData() any {
- return s.userData
- }
- func (s *StreamStore) add(item rest.IStream) {
- s.mu.Lock()
- s.items = append(s.items, item)
- s.mu.Unlock()
- }
- func (s *StreamStore) remove(item rest.IStream) int {
- s.mu.Lock()
- for i, val := range s.items {
- if val == item {
- s.items = append(s.items[:i], s.items[i+1:]...)
- break
- }
- }
- res := len(s.items)
- s.mu.Unlock()
- return res
- }
- func (s *StreamStore) get(index int) (res rest.IStream) {
- s.mu.RLock()
- if index >= 0 && index < len(s.items) {
- res = s.items[index]
- }
- s.mu.RUnlock()
- return res
- }
- func (s *StreamStore) Size() int {
- s.mu.RLock()
- res := len(s.items)
- s.mu.RUnlock()
- return res
- }
- func (s *StreamStore) SendMessage(message rest.IRequestOut) {
- s.mu.RLock()
- for _, val := range s.items {
- val.SendMessage(message)
- }
- s.mu.RUnlock()
- }
- ////////////////////////////////////////////////////////////
- func NewSockets() *Sockets {
- res := &Sockets{
- counter: &atomic.Int64{},
- chConnect: make(chan rest.IStream, 5),
- clients: &sync.Map{},
- }
- go res.work()
- return res
- }
- type Sockets struct {
- ctx context.Context
- cancel context.CancelFunc
- counter *atomic.Int64
- chConnect chan rest.IStream
- clients *sync.Map
- }
- func (s *Sockets) work() {
- s.ctx, s.cancel = context.WithCancel(context.Background())
- for {
- select {
- case <-s.ctx.Done():
- return
- case stream := <-s.chConnect:
- log.Println("CONNECT-------", s.counter.Add(1))
- go func() {
- <-stream.Context().Done()
- log.Println("DISCONNECT-------", s.counter.Add(-1))
- // auth
- auth := stream.Auth()
- if auth != nil {
- id := auth.Int32("id", 0)
- iStore, ok := s.clients.Load(id)
- if ok {
- store := iStore.(*StreamStore)
- if store.remove(stream) == 0 {
- s.clients.Delete(id)
- }
- }
- }
- }()
- }
- }
- }
- func (s *Sockets) Connect() chan<- rest.IStream {
- return s.chConnect
- }
- func (s *Sockets) OnlineCount() int64 {
- return s.counter.Load()
- }
- func (s *Sockets) ClientAuth(id any, stream rest.IStream, userData any) {
- store, exists := s.clients.LoadOrStore(id, NewStreamStore(stream, userData))
- if exists {
- store.(*StreamStore).add(stream)
- }
- }
- func (s *Sockets) ClientExists(id any) bool {
- _, exists := s.clients.Load(id)
- return exists
- }
- func (s *Sockets) SendMessage(id any, message rest.IRequestOut) {
- store, exists := s.clients.Load(id)
- if exists {
- store.(*StreamStore).SendMessage(message)
- }
- }
- func (s *Sockets) ClientsEach(cb func(id any, store *StreamStore) bool) {
- s.clients.Range(func(key, val any) bool {
- return cb(key, val.(*StreamStore))
- })
- }
|