123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210 |
- package rest_websocket
- import (
- "context"
- "io"
- "log"
- "sync"
- "sync/atomic"
- "time"
- "git.ali33.ru/fcg-xvii/go-tools/json"
- "git.ali33.ru/fcg-xvii/rest"
- "github.com/gorilla/websocket"
- )
- func newSocket(conn *websocket.Conn, appConf *appConfig) *Socket {
- ctx, cancel := context.WithCancel(context.Background())
- ws := &Socket{
- conn: conn,
- waitMessages: new(sync.Map),
- ctx: ctx,
- cancel: cancel,
- sendLocker: &sync.Mutex{},
- appConf: appConf,
- chMessageIncoming: make(chan *Request, 10),
- }
- go ws.read()
- return ws
- }
- type Socket struct {
- conn *websocket.Conn
- ctx context.Context
- cancel context.CancelFunc
- sendLocker *sync.Mutex
- appConf *appConfig
- chMessageIncoming chan *Request
- auth json.Map
- clientData json.Map
- waitMessages *sync.Map
- idCounter atomic.Int64
- }
- func (s *Socket) IsSocket() bool {
- return true
- }
- func (s *Socket) Context() context.Context {
- return s.ctx
- }
- func (s *Socket) MessagesIncoming() <-chan *Request {
- return s.chMessageIncoming
- }
- func (s *Socket) Close() {
- s.cancel()
- s.conn.Close()
- }
- func (s *Socket) ID() int64 {
- return s.idCounter.Add(1)
- }
- func (s *Socket) NextWriter(messageType int) (w io.WriteCloser, err rest.IErrorArgs) {
- s.sendLocker.Lock()
- var wErr error
- w, wErr = s.conn.NextWriter(websocket.TextMessage)
- if wErr != nil {
- err = rest.ErrorMessage("ErrWriterInit", err.Error())
- }
- s.sendLocker.Unlock()
- return
- }
- func (s *Socket) SendMessage(msg rest.IRequestOut) rest.IErrorArgs {
- if msg.RType() == rest.RequestTypeIn {
- id := s.ID()
- if req, check := msg.(*Request); check {
- req.id = id
- } else {
- msg = &Request{
- id: id,
- Timeout: time.Now().Add(time.Second * 5),
- Request: &rest.Request{
- Type: rest.RequestTypeIn,
- Command: msg.RCommand(),
- Data: msg.RData(),
- Files: msg.RFiles(),
- },
- }
- }
- s.waitMessages.Store(id, msg)
- }
- w, err := s.NextWriter(websocket.BinaryMessage)
- if err != nil {
- return err
- }
- err = msg.Write(w)
- w.Close()
- return err
- }
- func (s *Socket) execMessage(reqIn *RequestIn) {
- defer reqIn.RClose()
- log.Println("Message", reqIn)
- var reqOut rest.IRequestOut
- command, check := s.appConf.app.Executer(reqIn)
- if !check {
- reqOut = reqIn.OutError(rest.ErrorMessage("ErrNotFound", "command is not found"))
- } else {
- // serialize
- if err := rest.Serialize(reqIn.RData(), command); err != nil {
- log.Println("serialize error", err)
- return
- }
- // validate
- if validator, check := command.(rest.IValidator); check {
- reqOut = validator.Validate(reqIn)
- if reqOut != nil {
- if err := s.SendMessage(reqOut); err != nil {
- log.Println("socket send error", err.Map())
- }
- return
- }
- }
- reqOut = command.Execute(reqIn)
- }
- log.Println("RESP", reqOut)
- s.SendMessage(reqOut)
- reqOut.RClose()
- }
- func (s *Socket) messageIncoming() chan<- *Request {
- chIncoming := make(chan *Request, 100)
- tClean := time.NewTicker(time.Second * 60)
- defer tClean.Stop()
- go func() {
- for {
- select {
- case <-s.ctx.Done():
- return
- case req, ok := <-chIncoming:
- if !ok {
- return
- }
- log.Println("INCOMING!!!!!")
- switch req.Type {
- case rest.RequestTypeIn, rest.RequestTypeEvent:
- reqIn := &RequestIn{
- Request: req,
- owner: s,
- core: s.appConf.core,
- }
- s.execMessage(reqIn)
- case rest.RequestTypeOut:
- log.Println("ANSWER")
- }
- case <-tClean.C:
- now := time.Now()
- s.waitMessages.Range(func(key, value any) bool {
- if value.(*Request).Timeout.Before(now) {
- s.waitMessages.Delete(key)
- }
- return true
- })
- }
- }
- }()
- return chIncoming
- }
- func (s *Socket) read() {
- chIncoming := s.messageIncoming()
- defer s.cancel()
- for {
- // Read message from server
- mType, r, err := s.conn.NextReader()
- if err != nil {
- log.Println(err)
- return
- }
- log.Println("MTYPE...", mType)
- switch mType {
- case websocket.TextMessage, websocket.BinaryMessage:
- // Обработка текстового сообщения
- mes, err := ReadRequest(r)
- if err != nil {
- log.Println("data error: ", err)
- return
- }
- chIncoming <- mes
- case websocket.PingMessage:
- // Отправка Pong в ответ на Ping
- log.Println("PING......")
- s.sendLocker.Lock()
- err := s.conn.WriteControl(websocket.PongMessage, nil, time.Now().Add(time.Second))
- s.sendLocker.Unlock()
- if err != nil {
- log.Println("pong write:", err)
- return
- }
- case websocket.CloseMessage:
- // Обработка закрытия соединения
- log.Println("websocket connection closed")
- return
- }
- }
- }
|