socket.goo 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. package rest_websocket
  2. import (
  3. "context"
  4. "io"
  5. "log"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. "git.ali33.ru/fcg-xvii/go-tools/json"
  10. "git.ali33.ru/fcg-xvii/rest"
  11. "github.com/gorilla/websocket"
  12. )
  13. func newSocket(conn *websocket.Conn, appConf *appConfig) *Socket {
  14. ctx, cancel := context.WithCancel(context.Background())
  15. ws := &Socket{
  16. conn: conn,
  17. waitMessages: new(sync.Map),
  18. ctx: ctx,
  19. cancel: cancel,
  20. sendLocker: &sync.Mutex{},
  21. appConf: appConf,
  22. chMessageIncoming: make(chan *Request, 10),
  23. }
  24. go ws.read()
  25. return ws
  26. }
  27. type Socket struct {
  28. conn *websocket.Conn
  29. ctx context.Context
  30. cancel context.CancelFunc
  31. sendLocker *sync.Mutex
  32. appConf *appConfig
  33. chMessageIncoming chan *Request
  34. auth json.Map
  35. clientData json.Map
  36. waitMessages *sync.Map
  37. idCounter atomic.Int64
  38. }
  39. func (s *Socket) IsSocket() bool {
  40. return true
  41. }
  42. func (s *Socket) Context() context.Context {
  43. return s.ctx
  44. }
  45. func (s *Socket) MessagesIncoming() <-chan *Request {
  46. return s.chMessageIncoming
  47. }
  48. func (s *Socket) Close() {
  49. s.cancel()
  50. s.conn.Close()
  51. }
  52. func (s *Socket) ID() int64 {
  53. return s.idCounter.Add(1)
  54. }
  55. func (s *Socket) NextWriter(messageType int) (w io.WriteCloser, err rest.IErrorArgs) {
  56. s.sendLocker.Lock()
  57. var wErr error
  58. w, wErr = s.conn.NextWriter(websocket.TextMessage)
  59. if wErr != nil {
  60. err = rest.ErrorMessage("ErrWriterInit", err.Error())
  61. }
  62. s.sendLocker.Unlock()
  63. return
  64. }
  65. func (s *Socket) SendMessage(msg rest.IRequestOut) rest.IErrorArgs {
  66. if msg.RType() == rest.RequestTypeIn {
  67. id := s.ID()
  68. if req, check := msg.(*Request); check {
  69. req.id = id
  70. } else {
  71. msg = &Request{
  72. id: id,
  73. Timeout: time.Now().Add(time.Second * 5),
  74. Request: &rest.Request{
  75. Type: rest.RequestTypeIn,
  76. Command: msg.RCommand(),
  77. Data: msg.RData(),
  78. Files: msg.RFiles(),
  79. },
  80. }
  81. }
  82. s.waitMessages.Store(id, msg)
  83. }
  84. w, err := s.NextWriter(websocket.BinaryMessage)
  85. if err != nil {
  86. return err
  87. }
  88. err = msg.Write(w)
  89. w.Close()
  90. return err
  91. }
  92. func (s *Socket) execMessage(reqIn *RequestIn) {
  93. defer reqIn.RClose()
  94. log.Println("Message", reqIn)
  95. var reqOut rest.IRequestOut
  96. command, check := s.appConf.app.Executer(reqIn)
  97. if !check {
  98. reqOut = reqIn.OutError(rest.ErrorMessage("ErrNotFound", "command is not found"))
  99. } else {
  100. // serialize
  101. if err := rest.Serialize(reqIn.RData(), command); err != nil {
  102. log.Println("serialize error", err)
  103. return
  104. }
  105. // validate
  106. if validator, check := command.(rest.IValidator); check {
  107. reqOut = validator.Validate(reqIn)
  108. if reqOut != nil {
  109. if err := s.SendMessage(reqOut); err != nil {
  110. log.Println("socket send error", err.Map())
  111. }
  112. return
  113. }
  114. }
  115. reqOut = command.Execute(reqIn)
  116. }
  117. log.Println("RESP", reqOut)
  118. s.SendMessage(reqOut)
  119. reqOut.RClose()
  120. }
  121. func (s *Socket) messageIncoming() chan<- *Request {
  122. chIncoming := make(chan *Request, 100)
  123. tClean := time.NewTicker(time.Second * 60)
  124. defer tClean.Stop()
  125. go func() {
  126. for {
  127. select {
  128. case <-s.ctx.Done():
  129. return
  130. case req, ok := <-chIncoming:
  131. if !ok {
  132. return
  133. }
  134. log.Println("INCOMING!!!!!")
  135. switch req.Type {
  136. case rest.RequestTypeIn, rest.RequestTypeEvent:
  137. reqIn := &RequestIn{
  138. Request: req,
  139. owner: s,
  140. core: s.appConf.core,
  141. }
  142. s.execMessage(reqIn)
  143. case rest.RequestTypeOut:
  144. log.Println("ANSWER")
  145. }
  146. case <-tClean.C:
  147. now := time.Now()
  148. s.waitMessages.Range(func(key, value any) bool {
  149. if value.(*Request).Timeout.Before(now) {
  150. s.waitMessages.Delete(key)
  151. }
  152. return true
  153. })
  154. }
  155. }
  156. }()
  157. return chIncoming
  158. }
  159. func (s *Socket) read() {
  160. chIncoming := s.messageIncoming()
  161. defer s.cancel()
  162. for {
  163. // Read message from server
  164. mType, r, err := s.conn.NextReader()
  165. if err != nil {
  166. log.Println(err)
  167. return
  168. }
  169. log.Println("MTYPE...", mType)
  170. switch mType {
  171. case websocket.TextMessage, websocket.BinaryMessage:
  172. // Обработка текстового сообщения
  173. mes, err := ReadRequest(r)
  174. if err != nil {
  175. log.Println("data error: ", err)
  176. return
  177. }
  178. chIncoming <- mes
  179. case websocket.PingMessage:
  180. // Отправка Pong в ответ на Ping
  181. log.Println("PING......")
  182. s.sendLocker.Lock()
  183. err := s.conn.WriteControl(websocket.PongMessage, nil, time.Now().Add(time.Second))
  184. s.sendLocker.Unlock()
  185. if err != nil {
  186. log.Println("pong write:", err)
  187. return
  188. }
  189. case websocket.CloseMessage:
  190. // Обработка закрытия соединения
  191. log.Println("websocket connection closed")
  192. return
  193. }
  194. }
  195. }