socket.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. package rest_websocket
  2. import (
  3. "context"
  4. "errors"
  5. "io"
  6. "log"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "git.ali33.ru/fcg-xvii/rest"
  11. "github.com/gorilla/websocket"
  12. )
  13. func NewSocket(conn *websocket.Conn) *Socket {
  14. ctx, cancel := context.WithCancel(context.Background())
  15. ws := &Socket{
  16. conn: conn,
  17. wait: new(sync.Map),
  18. ctx: ctx,
  19. cancel: cancel,
  20. writeLocker: &sync.Mutex{},
  21. chIn: make(chan *rest.RequestStream, 10),
  22. }
  23. ws.lastWrite.Store(time.Now().Unix())
  24. go ws.read()
  25. return ws
  26. }
  27. type Socket struct {
  28. ctx context.Context
  29. cancel context.CancelFunc
  30. conn *websocket.Conn
  31. wait *sync.Map
  32. chIn chan *rest.RequestStream
  33. writeLocker *sync.Mutex
  34. idCounter atomic.Int64
  35. lastWrite atomic.Int64
  36. pingEnable bool
  37. }
  38. func (s *Socket) Context() context.Context {
  39. return s.ctx
  40. }
  41. // MessagesIn возвращает канал, в который будут переданы все входящие сообщения (rest.RequestTypeMessage и rest.RequestTypeEvent)
  42. func (s *Socket) MessagesIn() <-chan *rest.RequestStream {
  43. return s.chIn
  44. }
  45. // getID увеличивает счётчик сообщений на единицу и возвраащает результат. используется для маркирования идентификаторами исходящих сообщений
  46. func (s *Socket) getID() int64 {
  47. return s.idCounter.Add(1)
  48. }
  49. // read реализует чтение входящих сообщений
  50. func (s *Socket) read() {
  51. //defer log.Println("work close...")
  52. // контекст
  53. s.ctx, s.cancel = context.WithCancel(context.Background())
  54. // создаем канал для обработки входящих сообщений
  55. chIn := s.exec()
  56. for {
  57. // Read message from server
  58. mType, r, err := s.conn.NextReader()
  59. if err != nil {
  60. s.cancel()
  61. log.Println(err)
  62. return
  63. }
  64. switch mType {
  65. case websocket.TextMessage, websocket.BinaryMessage:
  66. // Обработка текстового или бинарного сообщения
  67. req, err := rest.ReadRequestStream(r)
  68. if err != nil {
  69. log.Println("data error: ", err)
  70. return
  71. }
  72. log.Println("RESPONSE", req)
  73. chIn <- req
  74. }
  75. }
  76. }
  77. // exec реализует обработку сообщений.
  78. func (s *Socket) exec() chan<- *rest.RequestStream {
  79. ch := make(chan *rest.RequestStream)
  80. go func() {
  81. //defer log.Println("exec close...")
  82. for {
  83. select {
  84. // закрытие контекста
  85. case <-s.ctx.Done():
  86. return
  87. // новое сообщение
  88. case req, ok := <-ch:
  89. if !ok {
  90. return
  91. }
  92. //log.Println("OOOOOOOOOOOOOOOOOOOOOOOOOOO")
  93. switch req.Type {
  94. case rest.RequestTypeIn:
  95. s.chIn <- req
  96. case rest.RequestTypeEvent:
  97. s.chIn <- req
  98. case rest.RequestTypeOut:
  99. log.Println("answer in", req.ID)
  100. ir, check := s.wait.Load(req.ID)
  101. if check {
  102. rreq := ir.(*waitRequest)
  103. s.wait.Delete(rreq.id)
  104. rreq.answerIn <- req
  105. }
  106. }
  107. // чистка просроченных сообщений и отправка пинга (при необходимости)
  108. case <-time.After(time.Second * 10):
  109. // чистим сообщения без ответа по дедлайну
  110. now := time.Now()
  111. s.wait.Range(func(key, val any) bool {
  112. if val.(*waitRequest).timeout.Before(now) {
  113. log.Println("CLEAN...", key)
  114. s.wait.Delete(key)
  115. }
  116. return true
  117. })
  118. // отправляем пинг для проверки, живое соединение или нет
  119. if s.pingEnable && now.Unix()-s.lastWrite.Load() > 10 {
  120. log.Println("PING")
  121. //s.writeLocker.Lock()
  122. err := s.conn.WriteControl(websocket.PingMessage, nil, time.Now().Add(time.Second))
  123. //s.writeLocker.Unlock()
  124. if err != nil {
  125. log.Println("ping send error")
  126. s.conn.Close()
  127. return
  128. }
  129. s.lastWrite.Store(time.Now().Unix())
  130. }
  131. }
  132. }
  133. }()
  134. return ch
  135. }
  136. func (s *Socket) nextWriter(messageType int) (io.WriteCloser, error) {
  137. s.writeLocker.Lock()
  138. res, err := s.conn.NextWriter(messageType)
  139. s.writeLocker.Unlock()
  140. s.lastWrite.Store(time.Now().Unix())
  141. return res, err
  142. }
  143. func (s *Socket) SendMessage(req *rest.RequestStream) (ch <-chan *rest.RequestStream, err error) {
  144. switch req.Type {
  145. case rest.RequestTypeIn:
  146. req.ID = s.getID()
  147. clReq := newWaitRequest(req.ID, req.Timeout)
  148. ch = clReq.answer
  149. s.wait.Store(req.ID, clReq)
  150. case rest.RequestTypeEvent, rest.RequestTypeOut:
  151. default:
  152. return nil, errors.New("unexpected request type")
  153. }
  154. var writer io.WriteCloser
  155. if writer, err = s.nextWriter(websocket.BinaryMessage); err != nil {
  156. return
  157. }
  158. err = req.Write(writer)
  159. writer.Close()
  160. return
  161. }
  162. func (s *Socket) Close() {
  163. s.cancel()
  164. s.conn.Close()
  165. }