protocol_data.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. package microservice
  2. import (
  3. "context"
  4. "encoding/binary"
  5. "io"
  6. "log"
  7. "net"
  8. "time"
  9. )
  10. const (
  11. PackageData byte = iota
  12. PackagePing
  13. PackagePong
  14. PackageClose
  15. )
  16. type Package struct {
  17. id byte
  18. data []byte
  19. }
  20. func MakeProtocolData(conn net.Conn, ctx context.Context) *ProtocolData {
  21. proto := &ProtocolData{
  22. conn: conn,
  23. receiver: make(chan []byte, 1),
  24. sender: make(chan Package, 10),
  25. deadline: time.Second * 10,
  26. }
  27. proto.ctx, proto.cancel = context.WithCancel(ctx)
  28. // receiver
  29. go proto.workReceive()
  30. // sender
  31. go proto.workSend()
  32. return proto
  33. }
  34. type ProtocolData struct {
  35. conn net.Conn
  36. receiver chan []byte
  37. sender chan Package
  38. ctx context.Context
  39. cancel context.CancelFunc
  40. deadline time.Duration
  41. }
  42. func (s *ProtocolData) upWriteDL(t *time.Time) {
  43. *t = time.Now()
  44. s.conn.SetWriteDeadline(time.Now().Add(s.deadline).Add(time.Second))
  45. }
  46. func (s *ProtocolData) workSend() {
  47. t := time.NewTicker(s.deadline / 2)
  48. defer func() {
  49. //log.Println("sender closed")
  50. t.Stop()
  51. s.cancel()
  52. }()
  53. closed := false
  54. lastSended := time.Now()
  55. s.upWriteDL(&lastSended)
  56. for {
  57. select {
  58. case <-s.ctx.Done():
  59. return
  60. case <-t.C:
  61. if closed {
  62. return
  63. }
  64. if lastSended.Add(s.deadline / 2).Before(time.Now()) {
  65. // send ping
  66. if _, err := s.conn.Write([]byte{PackagePing}); err != nil {
  67. //log.Println("error sending package id:", err)
  68. return
  69. }
  70. s.upWriteDL(&lastSended)
  71. }
  72. case p, ok := <-s.sender:
  73. if !ok {
  74. return
  75. }
  76. if _, err := s.conn.Write([]byte{p.id}); err != nil {
  77. //log.Println("error sending package id:", err)
  78. return
  79. }
  80. if p.id == PackageData {
  81. size := uint32(len(p.data))
  82. if err := binary.Write(s.conn, binary.LittleEndian, size); err != nil {
  83. //log.Println("error sending data size:", err)
  84. return
  85. }
  86. if _, err := s.conn.Write(p.data); err != nil {
  87. //log.Println("error sending JSON data:", err)
  88. return
  89. }
  90. } else if p.id == PackageClose {
  91. //log.Println("--- close sended")
  92. closed = true
  93. s.conn.Close()
  94. return
  95. }
  96. if !closed {
  97. s.upWriteDL(&lastSended)
  98. }
  99. }
  100. }
  101. }
  102. func (s *ProtocolData) workReceive() {
  103. //defer log.Println("receive closed")
  104. for {
  105. select {
  106. case <-s.ctx.Done():
  107. close(s.receiver)
  108. return
  109. default:
  110. packageID := []byte{0}
  111. if _, err := s.conn.Read(packageID); err != nil {
  112. /*if err != io.EOF {
  113. log.Println("error read package id:", err)
  114. }*/
  115. return
  116. }
  117. switch packageID[0] {
  118. case PackageData: // data
  119. var size uint32
  120. if err := binary.Read(s.conn, binary.LittleEndian, &size); err != nil {
  121. /*if err != io.EOF {
  122. log.Println("error reading data size:", err)
  123. }*/
  124. return
  125. }
  126. // Чтение JSON-данных
  127. data := make([]byte, size)
  128. if _, err := io.ReadFull(s.conn, data); err != nil {
  129. if err != io.EOF {
  130. log.Println("error reading data:", err)
  131. }
  132. return
  133. }
  134. select {
  135. case s.receiver <- data:
  136. default:
  137. }
  138. s.conn.SetReadDeadline(time.Now().Add(s.deadline).Add(time.Second))
  139. case PackagePing: // ping
  140. s.sender <- Package{
  141. id: PackagePong,
  142. }
  143. s.conn.SetReadDeadline(time.Now().Add(s.deadline).Add(time.Second))
  144. case PackagePong: // pong
  145. log.Println("PONG")
  146. s.conn.SetReadDeadline(time.Now().Add(s.deadline).Add(time.Second))
  147. case PackageClose:
  148. log.Println("+++ close received")
  149. s.conn.Close()
  150. s.cancel()
  151. return
  152. }
  153. }
  154. }
  155. }
  156. func (s *ProtocolData) Context() context.Context {
  157. return s.ctx
  158. }
  159. func (s *ProtocolData) Receiver() <-chan []byte {
  160. return s.receiver
  161. }
  162. func (s *ProtocolData) Send(data []byte) {
  163. s.sender <- Package{
  164. id: PackageData,
  165. data: data,
  166. }
  167. }
  168. func (s *ProtocolData) Close() {
  169. //log.Println("CLOSE--------------------------------")
  170. /*
  171. s.sender <- Package{
  172. id: PackageClose,
  173. }
  174. */
  175. s.conn.Close()
  176. }