z_stream_group_test.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. package application
  2. import (
  3. "context"
  4. "io"
  5. "log"
  6. "testing"
  7. "time"
  8. "git.ali33.ru/fcg-xvii/go-tools/json"
  9. "git.ali33.ru/fcg-xvii/rest"
  10. )
  11. // Пустая реализация IStream
  12. type TestStream struct {
  13. ctx context.Context
  14. cancel context.CancelFunc
  15. }
  16. func NewTestStream() *TestStream {
  17. ctx, cancel := context.WithCancel(context.Background())
  18. return &TestStream{
  19. ctx: ctx,
  20. cancel: cancel,
  21. }
  22. }
  23. func (t *TestStream) SetAuth(auth json.Map) {}
  24. func (t *TestStream) Auth() json.Map { return nil }
  25. func (t *TestStream) SetClientData(key string, val any) {}
  26. func (t *TestStream) ClientData(key string) (any, bool) { return nil, false }
  27. func (t *TestStream) Context() context.Context {
  28. return t.ctx
  29. }
  30. func (t *TestStream) SendMessage(req rest.IRequestOut) (<-chan *rest.RequestStream, error) {
  31. log.Printf("TestStream %p send message.\n", t)
  32. return nil, nil
  33. }
  34. func (t *TestStream) Close() {
  35. t.cancel()
  36. log.Printf("TestStream %p closed.\n", t)
  37. }
  38. // Пустая реализация IRequestOut
  39. type TestRequestOut struct {
  40. TestRequest
  41. }
  42. func NewTestRequestOut() *TestRequestOut {
  43. return &TestRequestOut{
  44. TestRequest: TestRequest{},
  45. }
  46. }
  47. func (r *TestRequestOut) Write(w io.Writer) rest.IErrorArgs {
  48. // Пустая реализация записи
  49. return nil
  50. }
  51. // Пустая реализация IRequest
  52. type TestRequest struct{}
  53. func (r *TestRequest) RType() rest.RequestType {
  54. return 0 // Возвращаем значение по умолчанию
  55. }
  56. func (r *TestRequest) RCommand() string {
  57. return "" // Возвращаем пустую строку
  58. }
  59. func (r *TestRequest) RData() json.Map {
  60. return make(json.Map) // Возвращаем пустую карту
  61. }
  62. func (r *TestRequest) RFiles() rest.RequestFiles {
  63. return nil // Возвращаем nil
  64. }
  65. func (r *TestRequest) RFile(name string) (rest.IReadCloserLen, bool) {
  66. return nil, false // Возвращаем nil и false
  67. }
  68. func (r *TestRequest) RClose() {
  69. // Пустая реализация
  70. }
  71. ////////////////////////////////////////////////////////
  72. func TestStreamClose(t *testing.T) {
  73. stream := NewTestStream()
  74. store := NewStreamStore(stream, context.Background())
  75. go func() {
  76. <-store.Context().Done()
  77. log.Println("store closedddd...")
  78. store.Close()
  79. store.Close()
  80. }()
  81. stream.Close()
  82. time.Sleep(time.Second * 5)
  83. }
  84. func TestStreamStoreClose(t *testing.T) {
  85. stream := NewTestStream()
  86. store := NewStreamStore(stream, context.Background())
  87. store.Add(NewTestStream())
  88. store.Add(NewTestStream())
  89. store.Add(NewTestStream())
  90. store.Add(NewTestStream())
  91. store.Close()
  92. time.Sleep(time.Minute * 5)
  93. }
  94. func TestGroupClose(t *testing.T) {
  95. group := NewStreamGroup(context.Background())
  96. stream := NewTestStream()
  97. group.Store("1", stream)
  98. group.Close()
  99. time.Sleep(time.Minute * 5)
  100. }
  101. func TestGroupStreamClose(t *testing.T) {
  102. group := NewStreamGroup(context.Background())
  103. add := func() {
  104. stream := NewTestStream()
  105. group.Store("1", stream)
  106. go func() {
  107. time.Sleep(time.Second * 3)
  108. log.Println("remove", stream)
  109. stream.Close()
  110. }()
  111. log.Println("add", stream)
  112. }
  113. add()
  114. add()
  115. time.Sleep(time.Second * 10)
  116. }
  117. func TestGroupBroadcast(t *testing.T) {
  118. group := NewStreamGroup(context.Background())
  119. group.Store("1", NewTestStream())
  120. group.Store("12", NewTestStream())
  121. group.EventBroadcast(NewTestRequestOut())
  122. time.Sleep(time.Second * 10)
  123. }