package rest_websocket import ( "context" "io" "log" "sync" "sync/atomic" "time" "git.ali33.ru/fcg-xvii/go-tools/json" "git.ali33.ru/fcg-xvii/rest" "github.com/gorilla/websocket" ) func newSocket(conn *websocket.Conn, appConf *appConfig) *Socket { ctx, cancel := context.WithCancel(context.Background()) ws := &Socket{ conn: conn, waitMessages: new(sync.Map), ctx: ctx, cancel: cancel, sendLocker: &sync.Mutex{}, appConf: appConf, chMessageIncoming: make(chan *Request, 10), } go ws.read() return ws } type Socket struct { conn *websocket.Conn ctx context.Context cancel context.CancelFunc sendLocker *sync.Mutex appConf *appConfig chMessageIncoming chan *Request auth json.Map clientData json.Map waitMessages *sync.Map idCounter atomic.Int64 } func (s *Socket) IsSocket() bool { return true } func (s *Socket) Context() context.Context { return s.ctx } func (s *Socket) MessagesIncoming() <-chan *Request { return s.chMessageIncoming } func (s *Socket) Close() { s.cancel() s.conn.Close() } func (s *Socket) ID() int64 { return s.idCounter.Add(1) } func (s *Socket) NextWriter(messageType int) (w io.WriteCloser, err rest.IErrorArgs) { s.sendLocker.Lock() var wErr error w, wErr = s.conn.NextWriter(websocket.TextMessage) if wErr != nil { err = rest.ErrorMessage("ErrWriterInit", err.Error()) } s.sendLocker.Unlock() return } func (s *Socket) SendMessage(msg rest.IRequestOut) rest.IErrorArgs { if msg.RType() == rest.RequestTypeIn { id := s.ID() if req, check := msg.(*Request); check { req.id = id } else { msg = &Request{ id: id, Timeout: time.Now().Add(time.Second * 5), Request: &rest.Request{ Type: rest.RequestTypeIn, Command: msg.RCommand(), Data: msg.RData(), Files: msg.RFiles(), }, } } s.waitMessages.Store(id, msg) } w, err := s.NextWriter(websocket.BinaryMessage) if err != nil { return err } err = msg.Write(w) w.Close() return err } func (s *Socket) execMessage(reqIn *RequestIn) { defer reqIn.RClose() log.Println("Message", reqIn) var reqOut rest.IRequestOut command, check := s.appConf.app.Executer(reqIn) if !check { reqOut = reqIn.OutError(rest.ErrorMessage("ErrNotFound", "command is not found")) } else { // serialize if err := rest.Serialize(reqIn.RData(), command); err != nil { log.Println("serialize error", err) return } // validate if validator, check := command.(rest.IValidator); check { reqOut = validator.Validate(reqIn) if reqOut != nil { if err := s.SendMessage(reqOut); err != nil { log.Println("socket send error", err.Map()) } return } } reqOut = command.Execute(reqIn) } log.Println("RESP", reqOut) s.SendMessage(reqOut) reqOut.RClose() } func (s *Socket) messageIncoming() chan<- *Request { chIncoming := make(chan *Request, 100) tClean := time.NewTicker(time.Second * 60) defer tClean.Stop() go func() { for { select { case <-s.ctx.Done(): return case req, ok := <-chIncoming: if !ok { return } log.Println("INCOMING!!!!!") switch req.Type { case rest.RequestTypeIn, rest.RequestTypeEvent: reqIn := &RequestIn{ Request: req, owner: s, core: s.appConf.core, } s.execMessage(reqIn) case rest.RequestTypeOut: log.Println("ANSWER") } case <-tClean.C: now := time.Now() s.waitMessages.Range(func(key, value any) bool { if value.(*Request).Timeout.Before(now) { s.waitMessages.Delete(key) } return true }) } } }() return chIncoming } func (s *Socket) read() { chIncoming := s.messageIncoming() defer s.cancel() for { // Read message from server mType, r, err := s.conn.NextReader() if err != nil { log.Println(err) return } log.Println("MTYPE...", mType) switch mType { case websocket.TextMessage, websocket.BinaryMessage: // Обработка текстового сообщения mes, err := ReadRequest(r) if err != nil { log.Println("data error: ", err) return } chIncoming <- mes case websocket.PingMessage: // Отправка Pong в ответ на Ping log.Println("PING......") s.sendLocker.Lock() err := s.conn.WriteControl(websocket.PongMessage, nil, time.Now().Add(time.Second)) s.sendLocker.Unlock() if err != nil { log.Println("pong write:", err) return } case websocket.CloseMessage: // Обработка закрытия соединения log.Println("websocket connection closed") return } } }