package microservice import ( "context" "encoding/binary" "io" "log" "net" "time" ) const ( PackageData byte = iota PackagePing PackagePong PackageClose ) type Package struct { id byte data []byte } func MakeProtocolData(conn net.Conn, ctx context.Context) *ProtocolData { proto := &ProtocolData{ conn: conn, receiver: make(chan []byte, 1), sender: make(chan Package, 10), deadline: time.Second * 10, } proto.ctx, proto.cancel = context.WithCancel(ctx) // receiver go proto.workReceive() // sender go proto.workSend() return proto } type ProtocolData struct { conn net.Conn receiver chan []byte sender chan Package ctx context.Context cancel context.CancelFunc deadline time.Duration } func (s *ProtocolData) upWriteDL(t *time.Time) { *t = time.Now() s.conn.SetWriteDeadline(time.Now().Add(s.deadline).Add(time.Second)) } func (s *ProtocolData) workSend() { t := time.NewTicker(s.deadline / 2) defer func() { //log.Println("sender closed") t.Stop() s.cancel() }() closed := false lastSended := time.Now() s.upWriteDL(&lastSended) for { select { case <-s.ctx.Done(): return case <-t.C: if closed { return } if lastSended.Add(s.deadline / 2).Before(time.Now()) { // send ping if _, err := s.conn.Write([]byte{PackagePing}); err != nil { //log.Println("error sending package id:", err) return } s.upWriteDL(&lastSended) } case p, ok := <-s.sender: if !ok { return } if _, err := s.conn.Write([]byte{p.id}); err != nil { //log.Println("error sending package id:", err) return } if p.id == PackageData { size := uint32(len(p.data)) if err := binary.Write(s.conn, binary.LittleEndian, size); err != nil { //log.Println("error sending data size:", err) return } if _, err := s.conn.Write(p.data); err != nil { //log.Println("error sending JSON data:", err) return } } else if p.id == PackageClose { //log.Println("--- close sended") closed = true s.conn.Close() return } if !closed { s.upWriteDL(&lastSended) } } } } func (s *ProtocolData) workReceive() { //defer log.Println("receive closed") for { select { case <-s.ctx.Done(): close(s.receiver) return default: packageID := []byte{0} if _, err := s.conn.Read(packageID); err != nil { /*if err != io.EOF { log.Println("error read package id:", err) }*/ return } switch packageID[0] { case PackageData: // data var size uint32 if err := binary.Read(s.conn, binary.LittleEndian, &size); err != nil { /*if err != io.EOF { log.Println("error reading data size:", err) }*/ return } // Чтение JSON-данных data := make([]byte, size) if _, err := io.ReadFull(s.conn, data); err != nil { if err != io.EOF { log.Println("error reading data:", err) } return } select { case s.receiver <- data: default: } s.conn.SetReadDeadline(time.Now().Add(s.deadline).Add(time.Second)) case PackagePing: // ping s.sender <- Package{ id: PackagePong, } s.conn.SetReadDeadline(time.Now().Add(s.deadline).Add(time.Second)) case PackagePong: // pong log.Println("PONG") s.conn.SetReadDeadline(time.Now().Add(s.deadline).Add(time.Second)) case PackageClose: log.Println("+++ close received") s.conn.Close() s.cancel() return } } } } func (s *ProtocolData) Context() context.Context { return s.ctx } func (s *ProtocolData) Receiver() <-chan []byte { return s.receiver } func (s *ProtocolData) Send(data []byte) { s.sender <- Package{ id: PackageData, data: data, } } func (s *ProtocolData) Close() { //log.Println("CLOSE--------------------------------") /* s.sender <- Package{ id: PackageClose, } */ s.conn.Close() }