فهرست منبع

application, rest

0x4a52466c696e74 1 ماه پیش
والد
کامیت
e8a9c15d3e
1فایلهای تغییر یافته به همراه146 افزوده شده و 0 حذف شده
  1. 146 0
      application/sockets.go

+ 146 - 0
application/sockets.go

@@ -0,0 +1,146 @@
+package application
+
+import (
+	"context"
+	"log"
+	"sync"
+	"sync/atomic"
+
+	"git.ali33.ru/fcg-xvii/rest"
+)
+
+type StreamStore struct {
+	mu    sync.RWMutex
+	items []rest.IStream
+}
+
+///////////////////////////////////////////
+
+func NewStreamStore(stream rest.IStream) *StreamStore {
+	items := []rest.IStream{}
+	if stream != nil {
+		items = append(items, stream)
+	}
+	return &StreamStore{
+		items: items,
+	}
+}
+
+func (s *StreamStore) add(item rest.IStream) {
+	s.mu.Lock()
+	s.items = append(s.items, item)
+	s.mu.Unlock()
+}
+
+func (s *StreamStore) remove(item rest.IStream) int {
+	s.mu.Lock()
+	for i, val := range s.items {
+		if val == item {
+			s.items = append(s.items[:i], s.items[i+1:]...)
+			break
+		}
+	}
+	res := len(s.items)
+	s.mu.Unlock()
+	return res
+
+}
+
+func (s *StreamStore) get(index int) (res rest.IStream) {
+	s.mu.RLock()
+	if index >= 0 && index < len(s.items) {
+		res = s.items[index]
+	}
+	s.mu.RUnlock()
+	return res
+}
+
+func (s *StreamStore) Size() int {
+	s.mu.RLock()
+	res := len(s.items)
+	s.mu.RUnlock()
+	return res
+}
+
+func (s *StreamStore) SendMessage(message rest.IRequestOut) {
+	s.mu.RLock()
+	for _, val := range s.items {
+		val.SendMessage(message)
+	}
+	s.mu.RUnlock()
+}
+
+////////////////////////////////////////////////////////////
+
+func NewSockets() *Sockets {
+	res := &Sockets{
+		counter:   &atomic.Int64{},
+		chConnect: make(chan rest.IStream, 5),
+		clients:   &sync.Map{},
+	}
+	go res.work()
+	return res
+}
+
+type Sockets struct {
+	ctx       context.Context
+	cancel    context.CancelFunc
+	counter   *atomic.Int64
+	chConnect chan rest.IStream
+	clients   *sync.Map
+}
+
+func (s *Sockets) work() {
+	s.ctx, s.cancel = context.WithCancel(context.Background())
+	for {
+		select {
+		case <-s.ctx.Done():
+			return
+		case stream := <-s.chConnect:
+			log.Println("CONNECT-------", s.counter.Add(1))
+			go func() {
+				<-stream.Context().Done()
+				log.Println("DISCONNECT-------", s.counter.Add(-1))
+				// auth
+				auth := stream.Auth()
+				if auth != nil {
+					id := auth.Int32("id", 0)
+					iStore, ok := s.clients.Load(id)
+					if ok {
+						store := iStore.(*StreamStore)
+						if store.remove(stream) == 0 {
+							s.clients.Delete(id)
+						}
+					}
+				}
+			}()
+		}
+	}
+}
+
+func (s *Sockets) Connect() chan<- rest.IStream {
+	return s.chConnect
+}
+
+func (s *Sockets) OnlineCount() int64 {
+	return s.counter.Load()
+}
+
+func (s *Sockets) ClientAuth(id any, stream rest.IStream) {
+	store, exists := s.clients.LoadOrStore(id, NewStreamStore(stream))
+	if exists {
+		store.(*StreamStore).add(stream)
+	}
+}
+
+func (s *Sockets) ClientExists(id any) bool {
+	_, exists := s.clients.Load(id)
+	return exists
+}
+
+func (s *Sockets) SendMessage(id any, message rest.IRequestOut) {
+	store, exists := s.clients.Load(id)
+	if exists {
+		store.(*StreamStore).SendMessage(message)
+	}
+}