application.go 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. package application
  2. import (
  3. "context"
  4. "log"
  5. "sync/atomic"
  6. "time"
  7. "git.ali33.ru/fcg-xvii/rest"
  8. "git.ali33.ru/fcg-xvii/rest/rest_http"
  9. "git.ali33.ru/fcg-xvii/rest/rest_websocket"
  10. )
  11. func New(appConf *AppConfig, swaggerConf *SwaggerConf, ctx context.Context) rest.IApplication {
  12. app := &Application{
  13. conf: appConf,
  14. }
  15. if swaggerConf != nil && swaggerConf.Enabled {
  16. app.swagger = NewSwagger(swaggerConf)
  17. go app.swagger.Start(ctx)
  18. }
  19. go app.start(ctx)
  20. return app
  21. }
  22. type Application struct {
  23. conf *AppConfig
  24. swagger *Swagger
  25. //core any
  26. ctx context.Context
  27. cancel context.CancelFunc
  28. chConnect chan rest.IStream
  29. }
  30. func (s *Application) start(ctx context.Context) (err error) {
  31. s.ctx, s.cancel = context.WithCancel(ctx)
  32. var server rest.IServer
  33. if s.conf.SSL {
  34. server = rest.NewServerTLS(s.conf.Addr, s.conf.Secret, s.conf.TLSKeyPath, s.conf.TLSCertPath)
  35. } else {
  36. server = rest.NewServer(s.conf.Addr, s.conf.Secret)
  37. }
  38. if s.conf.Http {
  39. restServ := rest_http.New(s, s.conf.Core, s.conf.GetHTTPHeaders()())
  40. restServ.Prepare(server, s.conf.GetHttpPrefix())
  41. //log.Println("http part prepared...")
  42. }
  43. if s.conf.Websocket {
  44. s.chConnect = make(chan rest.IStream)
  45. ws := rest_websocket.New(s, s.conf.Core)
  46. ws.Prepare(server, s.conf.GetWebsocketPrefix())
  47. go s.work()
  48. //log.Println("websocket engine prepared...")
  49. }
  50. if err = server.Listen(time.Second, s.ctx); err != nil {
  51. s.cancel()
  52. }
  53. return
  54. }
  55. func (s *Application) Stop() {
  56. s.cancel()
  57. }
  58. func (s *Application) work() {
  59. var counter atomic.Int32
  60. for {
  61. select {
  62. case <-s.ctx.Done():
  63. return
  64. case stream := <-s.chConnect:
  65. log.Println("CONNECT-------", counter.Add(1))
  66. if s.conf.OnSocketConnect != nil {
  67. s.conf.OnSocketConnect(stream)
  68. }
  69. go func(rStream rest.IStream) {
  70. <-rStream.Context().Done()
  71. log.Println("DISCONNECT-------", counter.Add(-1))
  72. if s.conf.OnSocketDisconnect != nil {
  73. s.conf.OnSocketDisconnect(rStream)
  74. }
  75. }(stream)
  76. }
  77. }
  78. }
  79. func (s *Application) Executer(r rest.IRequestIn) (rest.IExecuter, bool) {
  80. method := s.conf.GetCommandsMethod()
  81. if command, check := method(r.RCommand()); check {
  82. return command, true
  83. }
  84. return nil, false
  85. }
  86. func (s *Application) Connect() chan<- rest.IStream {
  87. return s.chConnect
  88. }