123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189 |
- package microservice
- import (
- "context"
- "encoding/binary"
- "io"
- "log"
- "net"
- "time"
- )
- const (
- PackageData byte = iota
- PackagePing
- PackagePong
- PackageClose
- )
- type Package struct {
- id byte
- data []byte
- }
- func MakeProtocolData(conn net.Conn, ctx context.Context) *ProtocolData {
- proto := &ProtocolData{
- conn: conn,
- receiver: make(chan []byte, 1),
- sender: make(chan Package, 10),
- deadline: time.Second * 10,
- }
- proto.ctx, proto.cancel = context.WithCancel(ctx)
- // receiver
- go proto.workReceive()
- // sender
- go proto.workSend()
- return proto
- }
- type ProtocolData struct {
- conn net.Conn
- receiver chan []byte
- sender chan Package
- ctx context.Context
- cancel context.CancelFunc
- deadline time.Duration
- }
- func (s *ProtocolData) upWriteDL(t *time.Time) {
- *t = time.Now()
- s.conn.SetWriteDeadline(time.Now().Add(s.deadline).Add(time.Second))
- }
- func (s *ProtocolData) workSend() {
- t := time.NewTicker(s.deadline / 2)
- defer func() {
- //log.Println("sender closed")
- t.Stop()
- s.cancel()
- }()
- closed := false
- lastSended := time.Now()
- s.upWriteDL(&lastSended)
- for {
- select {
- case <-s.ctx.Done():
- return
- case <-t.C:
- if closed {
- return
- }
- if lastSended.Add(s.deadline / 2).Before(time.Now()) {
- // send ping
- if _, err := s.conn.Write([]byte{PackagePing}); err != nil {
- //log.Println("error sending package id:", err)
- return
- }
- s.upWriteDL(&lastSended)
- }
- case p, ok := <-s.sender:
- if !ok {
- return
- }
- if _, err := s.conn.Write([]byte{p.id}); err != nil {
- //log.Println("error sending package id:", err)
- return
- }
- if p.id == PackageData {
- size := uint32(len(p.data))
- if err := binary.Write(s.conn, binary.LittleEndian, size); err != nil {
- //log.Println("error sending data size:", err)
- return
- }
- if _, err := s.conn.Write(p.data); err != nil {
- //log.Println("error sending JSON data:", err)
- return
- }
- } else if p.id == PackageClose {
- //log.Println("--- close sended")
- closed = true
- s.conn.Close()
- return
- }
- if !closed {
- s.upWriteDL(&lastSended)
- }
- }
- }
- }
- func (s *ProtocolData) workReceive() {
- //defer log.Println("receive closed")
- for {
- select {
- case <-s.ctx.Done():
- close(s.receiver)
- return
- default:
- packageID := []byte{0}
- if _, err := s.conn.Read(packageID); err != nil {
- /*if err != io.EOF {
- log.Println("error read package id:", err)
- }*/
- return
- }
- switch packageID[0] {
- case PackageData: // data
- var size uint32
- if err := binary.Read(s.conn, binary.LittleEndian, &size); err != nil {
- /*if err != io.EOF {
- log.Println("error reading data size:", err)
- }*/
- return
- }
- // Чтение JSON-данных
- data := make([]byte, size)
- if _, err := io.ReadFull(s.conn, data); err != nil {
- if err != io.EOF {
- log.Println("error reading data:", err)
- }
- return
- }
- select {
- case s.receiver <- data:
- default:
- }
- s.conn.SetReadDeadline(time.Now().Add(s.deadline).Add(time.Second))
- case PackagePing: // ping
- s.sender <- Package{
- id: PackagePong,
- }
- s.conn.SetReadDeadline(time.Now().Add(s.deadline).Add(time.Second))
- case PackagePong: // pong
- log.Println("PONG")
- s.conn.SetReadDeadline(time.Now().Add(s.deadline).Add(time.Second))
- case PackageClose:
- log.Println("+++ close received")
- s.conn.Close()
- s.cancel()
- return
- }
- }
- }
- }
- func (s *ProtocolData) Context() context.Context {
- return s.ctx
- }
- func (s *ProtocolData) Receiver() <-chan []byte {
- return s.receiver
- }
- func (s *ProtocolData) Send(data []byte) {
- s.sender <- Package{
- id: PackageData,
- data: data,
- }
- }
- func (s *ProtocolData) Close() {
- //log.Println("CLOSE--------------------------------")
- /*
- s.sender <- Package{
- id: PackageClose,
- }
- */
- s.conn.Close()
- }
|