socket.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. package rest_websocket
  2. import (
  3. "context"
  4. "errors"
  5. "io"
  6. "log"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "git.ali33.ru/fcg-xvii/rest"
  11. "github.com/gorilla/websocket"
  12. )
  13. func NewSocket(conn *websocket.Conn, pingEnable bool) *Socket {
  14. ctx, cancel := context.WithCancel(context.Background())
  15. ws := &Socket{
  16. conn: conn,
  17. wait: new(sync.Map),
  18. ctx: ctx,
  19. cancel: cancel,
  20. writeLocker: &sync.Mutex{},
  21. chIn: make(chan *rest.RequestStream, 10),
  22. pingEnable: pingEnable,
  23. }
  24. ws.lastWrite.Store(time.Now().Unix())
  25. ws.read()
  26. return ws
  27. }
  28. type Socket struct {
  29. ctx context.Context
  30. cancel context.CancelFunc
  31. conn *websocket.Conn
  32. wait *sync.Map
  33. chIn chan *rest.RequestStream
  34. writeLocker *sync.Mutex
  35. idCounter atomic.Int64
  36. lastWrite atomic.Int64
  37. pingEnable bool
  38. }
  39. func (s *Socket) Context() context.Context {
  40. return s.ctx
  41. }
  42. // MessagesIn возвращает канал, в который будут переданы все входящие сообщения (rest.RequestTypeMessage и rest.RequestTypeEvent)
  43. func (s *Socket) MessagesIn() <-chan *rest.RequestStream {
  44. return s.chIn
  45. }
  46. // getID увеличивает счётчик сообщений на единицу и возвраащает результат. используется для маркирования идентификаторами исходящих сообщений
  47. func (s *Socket) getID() int64 {
  48. return s.idCounter.Add(1)
  49. }
  50. // read реализует чтение входящих сообщений
  51. func (s *Socket) read() {
  52. //defer log.Println("work close...")
  53. // контекст
  54. s.ctx, s.cancel = context.WithCancel(context.Background())
  55. // создаем канал для обработки входящих сообщений
  56. chIn := s.exec()
  57. go func() {
  58. defer func() {
  59. s.cancel()
  60. s.conn.Close()
  61. }()
  62. for {
  63. // Read message from server
  64. mType, r, err := s.conn.NextReader()
  65. if err != nil {
  66. s.cancel()
  67. log.Println(err)
  68. return
  69. }
  70. switch mType {
  71. case websocket.TextMessage, websocket.BinaryMessage:
  72. // Обработка текстового или бинарного сообщения
  73. req, err := rest.ReadRequestStream(r)
  74. if err != nil {
  75. log.Println("data error: ", err)
  76. return
  77. }
  78. //log.Println("REQUEST", req.Request.Command, req.Request.Data.JSONPrettyString())
  79. chIn <- req
  80. }
  81. }
  82. }()
  83. }
  84. // exec реализует обработку сообщений.
  85. func (s *Socket) exec() chan<- *rest.RequestStream {
  86. ch := make(chan *rest.RequestStream)
  87. go func() {
  88. //defer log.Println("exec close...")
  89. for {
  90. select {
  91. // закрытие контекста
  92. case <-s.ctx.Done():
  93. return
  94. // новое сообщение
  95. case req, ok := <-ch:
  96. if !ok {
  97. return
  98. }
  99. //log.Println("OOOOOOOOOOOOOOOOOOOOOOOOOOO")
  100. switch req.Type {
  101. case rest.RequestTypeIn:
  102. s.chIn <- req
  103. case rest.RequestTypeEvent:
  104. s.chIn <- req
  105. case rest.RequestTypeOut:
  106. //log.Println("answer in", req.ID)
  107. ir, check := s.wait.Load(req.ID)
  108. if check {
  109. rreq := ir.(*waitRequest)
  110. s.wait.Delete(rreq.id)
  111. rreq.answerIn <- req
  112. }
  113. }
  114. // чистка просроченных сообщений и отправка пинга (при необходимости)
  115. case <-time.After(time.Second * 10):
  116. // чистим сообщения без ответа по дедлайну
  117. now := time.Now()
  118. s.wait.Range(func(key, val any) bool {
  119. if val.(*waitRequest).timeout.Before(now) {
  120. log.Println("CLEAN...", key)
  121. s.wait.Delete(key)
  122. }
  123. return true
  124. })
  125. // отправляем пинг для проверки, живое соединение или нет
  126. if s.pingEnable && now.Unix()-s.lastWrite.Load() > 10 {
  127. //log.Println("PING")
  128. //s.writeLocker.Lock()
  129. err := s.conn.WriteControl(websocket.PingMessage, nil, time.Now().Add(time.Second))
  130. //s.writeLocker.Unlock()
  131. if err != nil {
  132. log.Println("ping send error")
  133. s.conn.Close()
  134. return
  135. }
  136. s.lastWrite.Store(time.Now().Unix())
  137. }
  138. }
  139. }
  140. }()
  141. return ch
  142. }
  143. func (s *Socket) nextWriter(messageType int) (io.WriteCloser, error) {
  144. s.writeLocker.Lock()
  145. res, err := s.conn.NextWriter(messageType)
  146. s.writeLocker.Unlock()
  147. s.lastWrite.Store(time.Now().Unix())
  148. return res, err
  149. }
  150. func (s *Socket) SendMessage(req *rest.RequestStream) (ch <-chan *rest.RequestStream, err error) {
  151. switch req.Type {
  152. case rest.RequestTypeIn:
  153. req.ID = s.getID()
  154. clReq := newWaitRequest(req.ID, req.Timeout)
  155. ch = clReq.answer
  156. s.wait.Store(req.ID, clReq)
  157. case rest.RequestTypeEvent, rest.RequestTypeOut:
  158. default:
  159. return nil, errors.New("unexpected request type")
  160. }
  161. var writer io.WriteCloser
  162. if writer, err = s.nextWriter(websocket.BinaryMessage); err != nil {
  163. return
  164. }
  165. err = req.Write(writer)
  166. writer.Close()
  167. return
  168. }
  169. func (s *Socket) Close() {
  170. s.cancel()
  171. s.conn.Close()
  172. }