0x4a52466c696e74 vor 1 Monat
Ursprung
Commit
bbc3849c93
5 geänderte Dateien mit 314 neuen und 103 gelöschten Zeilen
  1. 12 103
      application/sockets.go
  2. 94 0
      application/stream_group.go
  3. 65 0
      application/stream_store.go
  4. 142 0
      application/z_stream_group_test.go
  5. 1 0
      owner.go

+ 12 - 103
application/sockets.go

@@ -9,74 +9,6 @@ import (
 	"git.ali33.ru/fcg-xvii/rest"
 )
 
-type StreamStore struct {
-	mu       sync.RWMutex
-	items    []rest.IStream
-	userData any
-}
-
-///////////////////////////////////////////
-
-func NewStreamStore(stream rest.IStream, userData any) *StreamStore {
-	items := []rest.IStream{}
-	if stream != nil {
-		items = append(items, stream)
-	}
-	return &StreamStore{
-		items: items,
-	}
-}
-
-func (s *StreamStore) UserData() any {
-	return s.userData
-}
-
-func (s *StreamStore) add(item rest.IStream) {
-	s.mu.Lock()
-	s.items = append(s.items, item)
-	s.mu.Unlock()
-}
-
-func (s *StreamStore) remove(item rest.IStream) int {
-	s.mu.Lock()
-	for i, val := range s.items {
-		if val == item {
-			s.items = append(s.items[:i], s.items[i+1:]...)
-			break
-		}
-	}
-	res := len(s.items)
-	s.mu.Unlock()
-	return res
-
-}
-
-func (s *StreamStore) get(index int) (res rest.IStream) {
-	s.mu.RLock()
-	if index >= 0 && index < len(s.items) {
-		res = s.items[index]
-	}
-	s.mu.RUnlock()
-	return res
-}
-
-func (s *StreamStore) Size() int {
-	s.mu.RLock()
-	res := len(s.items)
-	s.mu.RUnlock()
-	return res
-}
-
-func (s *StreamStore) SendMessage(message rest.IRequestOut) {
-	s.mu.RLock()
-	for _, val := range s.items {
-		val.SendMessage(message)
-	}
-	s.mu.RUnlock()
-}
-
-////////////////////////////////////////////////////////////
-
 func NewSockets() *Sockets {
 	res := &Sockets{
 		counter:   &atomic.Int64{},
@@ -106,18 +38,20 @@ func (s *Sockets) work() {
 			go func() {
 				<-stream.Context().Done()
 				log.Println("DISCONNECT-------", s.counter.Add(-1))
-				// auth
-				auth := stream.Auth()
-				if auth != nil {
-					id := auth.Int32("id", 0)
-					iStore, ok := s.clients.Load(id)
-					if ok {
-						store := iStore.(*StreamStore)
-						if store.remove(stream) == 0 {
-							s.clients.Delete(id)
+				/*
+					// auth
+					auth := stream.Auth()
+					if auth != nil {
+						id := auth.Int32("id", 0)
+						iStore, ok := s.clients.Load(id)
+						if ok {
+							store := iStore.(*StreamStore)
+							if store.remove(stream) == 0 {
+								s.clients.Delete(id)
+							}
 						}
 					}
-				}
+				*/
 			}()
 		}
 	}
@@ -130,28 +64,3 @@ func (s *Sockets) Connect() chan<- rest.IStream {
 func (s *Sockets) OnlineCount() int64 {
 	return s.counter.Load()
 }
-
-func (s *Sockets) ClientAuth(id any, stream rest.IStream, userData any) {
-	store, exists := s.clients.LoadOrStore(id, NewStreamStore(stream, userData))
-	if exists {
-		store.(*StreamStore).add(stream)
-	}
-}
-
-func (s *Sockets) ClientExists(id any) bool {
-	_, exists := s.clients.Load(id)
-	return exists
-}
-
-func (s *Sockets) SendMessage(id any, message rest.IRequestOut) {
-	store, exists := s.clients.Load(id)
-	if exists {
-		store.(*StreamStore).SendMessage(message)
-	}
-}
-
-func (s *Sockets) ClientsEach(cb func(id any, store *StreamStore) bool) {
-	s.clients.Range(func(key, val any) bool {
-		return cb(key, val.(*StreamStore))
-	})
-}

+ 94 - 0
application/stream_group.go

@@ -0,0 +1,94 @@
+package application
+
+import (
+	"context"
+	"log"
+	"sync"
+
+	"git.ali33.ru/fcg-xvii/rest"
+)
+
+type StreamGroup struct {
+	clients map[any]*StreamStore
+	mu      sync.RWMutex
+	ctx     context.Context
+	cancel  context.CancelFunc
+}
+
+func NewStreamGroup(ctx context.Context) *StreamGroup {
+	groupCtx, cancel := context.WithCancel(ctx)
+	return &StreamGroup{
+		clients: make(map[any]*StreamStore),
+		ctx:     groupCtx,
+		cancel:  cancel,
+	}
+}
+
+// Создание нового хранилища или добавление потока в существующее хранилище.
+func (s *StreamGroup) Store(clientID any, stream rest.IStream) *StreamStore {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+
+	// Если хранилище для клиента уже существует, добавляем поток
+	if store, ok := s.clients[clientID]; ok {
+		store.Add(stream)
+		return store
+	}
+
+	// Если хранилище не существует, создаем новое
+	newStore := NewStreamStore(stream, s.ctx)
+	s.clients[clientID] = newStore
+
+	// Запускаем горутину для мониторинга состояния хранилища
+	go func(clID any, store *StreamStore) {
+		<-store.ctx.Done() // Ждем завершения хранилища
+		s.remove(clID)
+	}(clientID, newStore)
+
+	return newStore
+}
+
+// Удаление хранилища
+func (s *StreamGroup) remove(clientID any) {
+	log.Println("remove clientID", clientID)
+	s.mu.Lock()
+	defer s.mu.Unlock()
+
+	// Получаем хранилище, если оно существует
+	if store, ok := s.clients[clientID]; ok {
+		// Закрываем хранилище, чтобы очистить ресурсы
+		store.Close()
+		delete(s.clients, clientID)
+	}
+}
+
+// Закрытие всех хранилищ и остановка группы
+func (s *StreamGroup) Close() {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+
+	// Закрываем каждое хранилище
+	for _, store := range s.clients {
+		store.Close()
+	}
+	s.clients = nil // Очищаем карту
+	s.cancel()      // Завершаем общий контекст
+}
+
+// Рассылает сообщение определенной группе
+func (s *StreamGroup) EventSend(clientID any, event rest.IRequestOut) {
+	s.mu.RLock()
+	defer s.mu.RUnlock()
+	if store, ok := s.clients[clientID]; ok {
+		store.SendEvent(event)
+	}
+}
+
+// Рассылает сообщение всем потокам в группе
+func (s *StreamGroup) EventBroadcast(event rest.IRequestOut) {
+	s.mu.RLock()
+	defer s.mu.RUnlock()
+	for _, store := range s.clients {
+		store.SendEvent(event)
+	}
+}

+ 65 - 0
application/stream_store.go

@@ -0,0 +1,65 @@
+package application
+
+import (
+	"context"
+	"sync"
+
+	"git.ali33.ru/fcg-xvii/rest"
+)
+
+type StreamStore struct {
+	items  sync.Map
+	wg     sync.WaitGroup
+	ctx    context.Context
+	cancel context.CancelFunc
+}
+
+func NewStreamStore(stream rest.IStream, ctx context.Context) *StreamStore {
+	res := &StreamStore{}
+	res.ctx, res.cancel = context.WithCancel(ctx)
+	res.Add(stream)
+	return res
+}
+
+func (s *StreamStore) Add(item rest.IStream) {
+	s.wg.Add(1)                     // добавляем горутину в WaitGroup
+	s.items.Store(item, struct{}{}) // используем sync.Map
+	go func() {
+		defer s.wg.Done()
+		<-item.Context().Done()
+		if s.remove(item) {
+			s.cancel() // завершаем контекст, если потоков больше нет
+		}
+	}()
+}
+
+func (s *StreamStore) remove(item rest.IStream) bool {
+	s.items.Delete(item)
+	// Проверяем, остались ли еще элементы
+	var hasItems bool
+	s.items.Range(func(key, value any) bool {
+		hasItems = true
+		return false // останавливаем Range
+	})
+	return !hasItems
+}
+
+func (s *StreamStore) SendEvent(message rest.IRequestOut) {
+	s.items.Range(func(key, value any) bool {
+		key.(rest.IStream).SendMessage(message)
+		return true
+	})
+}
+
+func (s *StreamStore) Close() {
+	s.items.Range(func(key, value any) bool {
+		key.(rest.IStream).Close()
+		return true
+	})
+	s.wg.Wait() // ждем завершения всех горутин
+	s.cancel()
+}
+
+func (s *StreamStore) Context() context.Context {
+	return s.ctx
+}

+ 142 - 0
application/z_stream_group_test.go

@@ -0,0 +1,142 @@
+package application
+
+import (
+	"context"
+	"io"
+	"log"
+	"testing"
+	"time"
+
+	"git.ali33.ru/fcg-xvii/go-tools/json"
+	"git.ali33.ru/fcg-xvii/rest"
+)
+
+// Пустая реализация IStream
+type TestStream struct {
+	ctx    context.Context
+	cancel context.CancelFunc
+}
+
+func NewTestStream() *TestStream {
+	ctx, cancel := context.WithCancel(context.Background())
+	return &TestStream{
+		ctx:    ctx,
+		cancel: cancel,
+	}
+}
+
+func (t *TestStream) SetAuth(auth json.Map) {}
+
+func (t *TestStream) Auth() json.Map { return nil }
+
+func (t *TestStream) SetClientData(key string, val any) {}
+
+func (t *TestStream) ClientData(key string) (any, bool) { return nil, false }
+
+func (t *TestStream) Context() context.Context {
+	return t.ctx
+}
+
+func (t *TestStream) SendMessage(req rest.IRequestOut) (<-chan *rest.RequestStream, error) {
+	log.Printf("TestStream %p send message.\n", t)
+	return nil, nil
+}
+
+func (t *TestStream) Close() {
+	t.cancel()
+	log.Printf("TestStream %p closed.\n", t)
+}
+
+// Пустая реализация IRequestOut
+type TestRequestOut struct {
+	TestRequest
+}
+
+func NewTestRequestOut() *TestRequestOut {
+	return &TestRequestOut{
+		TestRequest: TestRequest{},
+	}
+}
+
+func (r *TestRequestOut) Write(w io.Writer) rest.IErrorArgs {
+	// Пустая реализация записи
+	return nil
+}
+
+// Пустая реализация IRequest
+type TestRequest struct{}
+
+func (r *TestRequest) RType() rest.RequestType {
+	return 0 // Возвращаем значение по умолчанию
+}
+
+func (r *TestRequest) RCommand() string {
+	return "" // Возвращаем пустую строку
+}
+
+func (r *TestRequest) RData() json.Map {
+	return make(json.Map) // Возвращаем пустую карту
+}
+
+func (r *TestRequest) RFiles() rest.RequestFiles {
+	return nil // Возвращаем nil
+}
+
+func (r *TestRequest) RFile(name string) (rest.IReadCloserLen, bool) {
+	return nil, false // Возвращаем nil и false
+}
+
+func (r *TestRequest) RClose() {
+	// Пустая реализация
+}
+
+////////////////////////////////////////////////////////
+
+func TestStreamClose(t *testing.T) {
+	stream := NewTestStream()
+	store := NewStreamStore(stream, context.Background())
+	go func() {
+		<-store.Context().Done()
+		log.Println("store closedddd...")
+		store.Close()
+		store.Close()
+	}()
+	stream.Close()
+	time.Sleep(time.Second * 5)
+}
+
+func TestStreamStoreClose(t *testing.T) {
+	stream := NewTestStream()
+	store := NewStreamStore(stream, context.Background())
+	store.Add(NewTestStream())
+	store.Add(NewTestStream())
+	store.Add(NewTestStream())
+	store.Add(NewTestStream())
+	store.Close()
+	time.Sleep(time.Minute * 5)
+}
+
+func TestGroupClose(t *testing.T) {
+	group := NewStreamGroup(context.Background())
+	stream := NewTestStream()
+	group.Store("1", stream)
+	group.Close()
+	time.Sleep(time.Minute * 5)
+}
+
+func TestGroupStreamClose(t *testing.T) {
+	group := NewStreamGroup(context.Background())
+	add := func() {
+		stream := NewTestStream()
+		group.Store("1", stream)
+		go func() {
+			time.Sleep(time.Second * 3)
+			log.Println("remove", stream)
+			stream.Close()
+		}()
+		log.Println("add", stream)
+	}
+	add()
+	add()
+	time.Sleep(time.Second * 10)
+}

+ 1 - 0
owner.go

@@ -17,4 +17,5 @@ type IStream interface {
 	ClientData(key string) (any, bool)
 	Context() context.Context
 	SendMessage(IRequestOut) (<-chan *RequestStream, error)
+	Close()
 }