socket.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. package rest_websocket
  2. import (
  3. "context"
  4. "errors"
  5. "io"
  6. "log"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "git.ali33.ru/fcg-xvii/rest"
  11. "github.com/gorilla/websocket"
  12. )
  13. func NewSocket(conn *websocket.Conn) *Socket {
  14. ctx, cancel := context.WithCancel(context.Background())
  15. ws := &Socket{
  16. conn: conn,
  17. wait: new(sync.Map),
  18. ctx: ctx,
  19. cancel: cancel,
  20. writeLocker: &sync.Mutex{},
  21. chIn: make(chan *rest.RequestStream, 10),
  22. }
  23. ws.lastWrite.Store(time.Now().Unix())
  24. go ws.read()
  25. return ws
  26. }
  27. type Socket struct {
  28. ctx context.Context
  29. cancel context.CancelFunc
  30. conn *websocket.Conn
  31. wait *sync.Map
  32. chIn chan *rest.RequestStream
  33. writeLocker *sync.Mutex
  34. idCounter atomic.Int64
  35. lastWrite atomic.Int64
  36. pingEnable bool
  37. }
  38. func (s *Socket) Context() context.Context {
  39. return s.ctx
  40. }
  41. // MessagesIn возвращает канал, в который будут переданы все входящие сообщения (rest.RequestTypeMessage и rest.RequestTypeEvent)
  42. func (s *Socket) MessagesIn() <-chan *rest.RequestStream {
  43. return s.chIn
  44. }
  45. // getID увеличивает счётчик сообщений на единицу и возвраащает результат. используется для маркирования идентификаторами исходящих сообщений
  46. func (s *Socket) getID() int64 {
  47. return s.idCounter.Add(1)
  48. }
  49. // read реализует чтение входящих сообщений
  50. func (s *Socket) read() {
  51. //defer log.Println("work close...")
  52. // контекст
  53. s.ctx, s.cancel = context.WithCancel(context.Background())
  54. defer func() {
  55. s.cancel()
  56. s.conn.Close()
  57. }()
  58. // создаем канал для обработки входящих сообщений
  59. chIn := s.exec()
  60. for {
  61. // Read message from server
  62. mType, r, err := s.conn.NextReader()
  63. if err != nil {
  64. s.cancel()
  65. log.Println(err)
  66. return
  67. }
  68. switch mType {
  69. case websocket.TextMessage, websocket.BinaryMessage:
  70. // Обработка текстового или бинарного сообщения
  71. req, err := rest.ReadRequestStream(r)
  72. if err != nil {
  73. log.Println("data error: ", err)
  74. return
  75. }
  76. log.Println("RESPONSE", req)
  77. chIn <- req
  78. }
  79. }
  80. }
  81. // exec реализует обработку сообщений.
  82. func (s *Socket) exec() chan<- *rest.RequestStream {
  83. ch := make(chan *rest.RequestStream)
  84. go func() {
  85. //defer log.Println("exec close...")
  86. for {
  87. select {
  88. // закрытие контекста
  89. case <-s.ctx.Done():
  90. return
  91. // новое сообщение
  92. case req, ok := <-ch:
  93. if !ok {
  94. return
  95. }
  96. //log.Println("OOOOOOOOOOOOOOOOOOOOOOOOOOO")
  97. switch req.Type {
  98. case rest.RequestTypeIn:
  99. s.chIn <- req
  100. case rest.RequestTypeEvent:
  101. s.chIn <- req
  102. case rest.RequestTypeOut:
  103. log.Println("answer in", req.ID)
  104. ir, check := s.wait.Load(req.ID)
  105. if check {
  106. rreq := ir.(*waitRequest)
  107. s.wait.Delete(rreq.id)
  108. rreq.answerIn <- req
  109. }
  110. }
  111. // чистка просроченных сообщений и отправка пинга (при необходимости)
  112. case <-time.After(time.Second * 10):
  113. // чистим сообщения без ответа по дедлайну
  114. now := time.Now()
  115. s.wait.Range(func(key, val any) bool {
  116. if val.(*waitRequest).timeout.Before(now) {
  117. log.Println("CLEAN...", key)
  118. s.wait.Delete(key)
  119. }
  120. return true
  121. })
  122. // отправляем пинг для проверки, живое соединение или нет
  123. if s.pingEnable && now.Unix()-s.lastWrite.Load() > 10 {
  124. log.Println("PING")
  125. //s.writeLocker.Lock()
  126. err := s.conn.WriteControl(websocket.PingMessage, nil, time.Now().Add(time.Second))
  127. //s.writeLocker.Unlock()
  128. if err != nil {
  129. log.Println("ping send error")
  130. s.conn.Close()
  131. return
  132. }
  133. s.lastWrite.Store(time.Now().Unix())
  134. }
  135. }
  136. }
  137. }()
  138. return ch
  139. }
  140. func (s *Socket) nextWriter(messageType int) (io.WriteCloser, error) {
  141. s.writeLocker.Lock()
  142. res, err := s.conn.NextWriter(messageType)
  143. s.writeLocker.Unlock()
  144. s.lastWrite.Store(time.Now().Unix())
  145. return res, err
  146. }
  147. func (s *Socket) SendMessage(req *rest.RequestStream) (ch <-chan *rest.RequestStream, err error) {
  148. switch req.Type {
  149. case rest.RequestTypeIn:
  150. req.ID = s.getID()
  151. clReq := newWaitRequest(req.ID, req.Timeout)
  152. ch = clReq.answer
  153. s.wait.Store(req.ID, clReq)
  154. case rest.RequestTypeEvent, rest.RequestTypeOut:
  155. default:
  156. return nil, errors.New("unexpected request type")
  157. }
  158. var writer io.WriteCloser
  159. if writer, err = s.nextWriter(websocket.BinaryMessage); err != nil {
  160. return
  161. }
  162. err = req.Write(writer)
  163. writer.Close()
  164. return
  165. }
  166. func (s *Socket) Close() {
  167. s.cancel()
  168. s.conn.Close()
  169. }