socket_server.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. package rest_websocket
  2. import (
  3. "context"
  4. "log"
  5. "time"
  6. "git.ali33.ru/fcg-xvii/go-tools/json"
  7. "git.ali33.ru/fcg-xvii/rest"
  8. )
  9. func NewSocketServer(socket *Socket, appConf *appConfig) *SocketServer {
  10. res := &SocketServer{
  11. socket: socket,
  12. appConf: appConf,
  13. auth: json.Map{},
  14. chIn: make(chan rest.IRequestIn, 1),
  15. clientData: make(map[string]any),
  16. }
  17. go res.work()
  18. return res
  19. }
  20. type SocketServer struct {
  21. socket *Socket
  22. appConf *appConfig
  23. auth json.Map
  24. chIn chan rest.IRequestIn
  25. clientData json.Map
  26. }
  27. func (s *SocketServer) ClientData(key string) (any, bool) {
  28. res, check := s.clientData[key]
  29. return res, check
  30. }
  31. func (s *SocketServer) SetClientData(key string, data any) {
  32. s.clientData[key] = data
  33. }
  34. func (s *SocketServer) Auth() json.Map {
  35. return s.auth
  36. }
  37. func (s *SocketServer) SetAuth(auth json.Map) {
  38. s.auth = auth
  39. }
  40. func (s *SocketServer) In() <-chan rest.IRequestIn {
  41. return s.chIn
  42. }
  43. func (s *SocketServer) IsStream() bool {
  44. return true
  45. }
  46. func (s *SocketServer) Context() context.Context {
  47. return s.socket.Context()
  48. }
  49. func (s *SocketServer) Close() {
  50. s.socket.Close()
  51. }
  52. func (s *SocketServer) SendMessage(req rest.IRequestOut) (<-chan *rest.RequestStream, error) {
  53. var request *rest.RequestStream
  54. if rreq, check := req.(*rest.RequestStream); check {
  55. request = rreq
  56. } else {
  57. request = &rest.RequestStream{
  58. Request: &rest.Request{
  59. Type: req.RType(),
  60. Command: req.RCommand(),
  61. Data: req.RData(),
  62. Files: req.RFiles(),
  63. },
  64. Timeout: time.Now().Add(time.Second * 10),
  65. }
  66. }
  67. return s.socket.SendMessage(request)
  68. }
  69. func (s *SocketServer) work() {
  70. loop:
  71. for {
  72. select {
  73. case <-s.socket.Context().Done():
  74. return
  75. case req := <-s.socket.MessagesIn():
  76. reqIn := &RequestIn{
  77. RequestStream: req,
  78. owner: s,
  79. core: s.appConf.core,
  80. }
  81. var reqOut rest.IRequestOut
  82. command, check := s.appConf.app.Executer(reqIn)
  83. if !check {
  84. reqOut = reqIn.OutError(rest.ErrorMessage("ErrNotFound", "command is not found"))
  85. } else {
  86. // serialize
  87. if err := rest.Serialize(reqIn.RData(), command); err != nil {
  88. //s.socket.Close()
  89. reqOut = reqIn.OutError(err)
  90. if _, err := s.SendMessage(reqOut); err != nil {
  91. log.Println("socket send error", err)
  92. }
  93. reqOut.RClose()
  94. continue loop
  95. }
  96. // validate
  97. if validator, check := command.(rest.IValidator); check {
  98. reqOut = validator.Validate(reqIn)
  99. if reqOut != nil {
  100. if _, err := s.SendMessage(reqOut); err != nil {
  101. log.Println("socket send error", err)
  102. }
  103. reqOut.RClose()
  104. continue loop
  105. }
  106. }
  107. reqOut = command.Execute(reqIn)
  108. }
  109. s.SendMessage(reqOut)
  110. reqOut.RClose()
  111. //s.chIn <- reqIn
  112. }
  113. }
  114. }