浏览代码

sockets each

0x4a52466c696e74 3 月之前
父节点
当前提交
b37752df6c
共有 1 个文件被更改,包括 16 次插入5 次删除
  1. 16 5
      application/sockets.go

+ 16 - 5
application/sockets.go

@@ -10,13 +10,14 @@ import (
 )
 
 type StreamStore struct {
-	mu    sync.RWMutex
-	items []rest.IStream
+	mu       sync.RWMutex
+	items    []rest.IStream
+	userData any
 }
 
 ///////////////////////////////////////////
 
-func NewStreamStore(stream rest.IStream) *StreamStore {
+func NewStreamStore(stream rest.IStream, userData any) *StreamStore {
 	items := []rest.IStream{}
 	if stream != nil {
 		items = append(items, stream)
@@ -26,6 +27,10 @@ func NewStreamStore(stream rest.IStream) *StreamStore {
 	}
 }
 
+func (s *StreamStore) UserData() any {
+	return s.userData
+}
+
 func (s *StreamStore) add(item rest.IStream) {
 	s.mu.Lock()
 	s.items = append(s.items, item)
@@ -126,8 +131,8 @@ func (s *Sockets) OnlineCount() int64 {
 	return s.counter.Load()
 }
 
-func (s *Sockets) ClientAuth(id any, stream rest.IStream) {
-	store, exists := s.clients.LoadOrStore(id, NewStreamStore(stream))
+func (s *Sockets) ClientAuth(id any, stream rest.IStream, userData any) {
+	store, exists := s.clients.LoadOrStore(id, NewStreamStore(stream, userData))
 	if exists {
 		store.(*StreamStore).add(stream)
 	}
@@ -144,3 +149,9 @@ func (s *Sockets) SendMessage(id any, message rest.IRequestOut) {
 		store.(*StreamStore).SendMessage(message)
 	}
 }
+
+func (s *Sockets) ClientsEach(cb func(id any, store *StreamStore) bool) {
+	s.clients.Range(func(key, val any) bool {
+		return cb(key, val.(*StreamStore))
+	})
+}