socket.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. package rest_websocket
  2. import (
  3. "context"
  4. "errors"
  5. "io"
  6. "log"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "git.ali33.ru/fcg-xvii/rest"
  11. "github.com/gorilla/websocket"
  12. )
  13. func NewSocket(conn *websocket.Conn) *Socket {
  14. ctx, cancel := context.WithCancel(context.Background())
  15. ws := &Socket{
  16. conn: conn,
  17. wait: new(sync.Map),
  18. ctx: ctx,
  19. cancel: cancel,
  20. writeLocker: &sync.Mutex{},
  21. chIn: make(chan *rest.RequestStream, 10),
  22. }
  23. ws.lastWrite.Store(time.Now().Unix())
  24. go ws.read()
  25. return ws
  26. }
  27. type Socket struct {
  28. ctx context.Context
  29. cancel context.CancelFunc
  30. conn *websocket.Conn
  31. wait *sync.Map
  32. chIn chan *rest.RequestStream
  33. writeLocker *sync.Mutex
  34. idCounter atomic.Int64
  35. lastWrite atomic.Int64
  36. pingEnable bool
  37. }
  38. func (s *Socket) Context() context.Context {
  39. return s.ctx
  40. }
  41. // MessagesIn возвращает канал, в который будут переданы все входящие сообщения (rest.RequestTypeMessage и rest.RequestTypeEvent)
  42. func (s *Socket) MessagesIn() <-chan *rest.RequestStream {
  43. return s.chIn
  44. }
  45. // getID увеличивает счётчик сообщений на единицу и возвраащает результат. используется для маркирования идентификаторами исходящих сообщений
  46. func (s *Socket) getID() int64 {
  47. return s.idCounter.Add(1)
  48. }
  49. // read реализует чтение входящих сообщений
  50. func (s *Socket) read() {
  51. //defer log.Println("work close...")
  52. // контекст
  53. s.ctx, s.cancel = context.WithCancel(context.Background())
  54. // создаем канал для обработки входящих сообщений
  55. chIn := s.exec()
  56. for {
  57. // Read message from server
  58. mType, r, err := s.conn.NextReader()
  59. if err != nil {
  60. s.cancel()
  61. log.Println(err)
  62. return
  63. }
  64. switch mType {
  65. case websocket.TextMessage, websocket.BinaryMessage:
  66. // Обработка текстового или бинарного сообщения
  67. req, err := rest.ReadRequestStream(r)
  68. if err != nil {
  69. s.conn.Close()
  70. log.Println("data error: ", err)
  71. return
  72. }
  73. log.Println("RESPONSE", req)
  74. chIn <- req
  75. }
  76. }
  77. }
  78. // exec реализует обработку сообщений.
  79. func (s *Socket) exec() chan<- *rest.RequestStream {
  80. ch := make(chan *rest.RequestStream)
  81. go func() {
  82. //defer log.Println("exec close...")
  83. for {
  84. select {
  85. // закрытие контекста
  86. case <-s.ctx.Done():
  87. return
  88. // новое сообщение
  89. case req, ok := <-ch:
  90. if !ok {
  91. return
  92. }
  93. //log.Println("OOOOOOOOOOOOOOOOOOOOOOOOOOO")
  94. switch req.Type {
  95. case rest.RequestTypeIn:
  96. s.chIn <- req
  97. case rest.RequestTypeEvent:
  98. s.chIn <- req
  99. case rest.RequestTypeOut:
  100. log.Println("answer in", req.ID)
  101. ir, check := s.wait.Load(req.ID)
  102. if check {
  103. rreq := ir.(*waitRequest)
  104. s.wait.Delete(rreq.id)
  105. rreq.answerIn <- req
  106. }
  107. }
  108. // чистка просроченных сообщений и отправка пинга (при необходимости)
  109. case <-time.After(time.Second * 10):
  110. // чистим сообщения без ответа по дедлайну
  111. now := time.Now()
  112. s.wait.Range(func(key, val any) bool {
  113. if val.(*waitRequest).timeout.Before(now) {
  114. log.Println("CLEAN...", key)
  115. s.wait.Delete(key)
  116. }
  117. return true
  118. })
  119. // отправляем пинг для проверки, живое соединение или нет
  120. if s.pingEnable && now.Unix()-s.lastWrite.Load() > 10 {
  121. log.Println("PING")
  122. //s.writeLocker.Lock()
  123. err := s.conn.WriteControl(websocket.PingMessage, nil, time.Now().Add(time.Second))
  124. //s.writeLocker.Unlock()
  125. if err != nil {
  126. log.Println("ping send error")
  127. s.conn.Close()
  128. return
  129. }
  130. s.lastWrite.Store(time.Now().Unix())
  131. }
  132. }
  133. }
  134. }()
  135. return ch
  136. }
  137. func (s *Socket) nextWriter(messageType int) (io.WriteCloser, error) {
  138. s.writeLocker.Lock()
  139. res, err := s.conn.NextWriter(messageType)
  140. s.writeLocker.Unlock()
  141. s.lastWrite.Store(time.Now().Unix())
  142. return res, err
  143. }
  144. func (s *Socket) SendMessage(req *rest.RequestStream) (ch <-chan *rest.RequestStream, err error) {
  145. switch req.Type {
  146. case rest.RequestTypeIn:
  147. req.ID = s.getID()
  148. clReq := newWaitRequest(req.ID, req.Timeout)
  149. ch = clReq.answer
  150. s.wait.Store(req.ID, clReq)
  151. case rest.RequestTypeEvent, rest.RequestTypeOut:
  152. default:
  153. return nil, errors.New("unexpected request type")
  154. }
  155. var writer io.WriteCloser
  156. if writer, err = s.nextWriter(websocket.BinaryMessage); err != nil {
  157. return
  158. }
  159. err = req.Write(writer)
  160. writer.Close()
  161. return
  162. }
  163. func (s *Socket) Close() {
  164. s.cancel()
  165. s.conn.Close()
  166. }