protocol_json.go 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. package microservice
  2. import (
  3. "context"
  4. "log"
  5. "net"
  6. "git.ali33.ru/fcg-xvii/go-tools/json"
  7. )
  8. func MakeProtocolJSON(conn net.Conn, ctx context.Context, receiveBuf int) *ProtocolJSON {
  9. proto := &ProtocolJSON{
  10. receiver: make(chan json.Map, receiveBuf),
  11. }
  12. proto.ctx, proto.cancel = context.WithCancel(ctx)
  13. proto.proto = MakeProtocolData(conn, proto.ctx)
  14. // receiver
  15. go proto.receive()
  16. return proto
  17. }
  18. type ProtocolJSON struct {
  19. proto *ProtocolData
  20. receiver chan json.Map
  21. ctx context.Context
  22. cancel context.CancelFunc
  23. }
  24. func (s *ProtocolJSON) receive() {
  25. defer func() {
  26. log.Println("JSON receiver closed")
  27. s.cancel()
  28. close(s.receiver)
  29. }()
  30. for {
  31. select {
  32. case <-s.ctx.Done():
  33. //s.proto.Close()
  34. return
  35. case <-s.proto.Context().Done():
  36. return
  37. case data, ok := <-s.proto.Receiver():
  38. if !ok {
  39. return
  40. }
  41. var m json.Map
  42. if err := json.Unmarshal(data, &m); err != nil {
  43. log.Println("Protocol JSON parse error:", err)
  44. log.Println(string(data))
  45. return
  46. }
  47. select {
  48. case s.receiver <- m:
  49. default:
  50. }
  51. }
  52. }
  53. }
  54. func (s *ProtocolJSON) Context() context.Context {
  55. return s.ctx
  56. }
  57. func (s *ProtocolJSON) Receiver() <-chan json.Map {
  58. return s.receiver
  59. }
  60. func (s *ProtocolJSON) Send(m json.Map) {
  61. s.proto.Send(m.JSON())
  62. }
  63. func (s *ProtocolJSON) Close() {
  64. s.proto.Close()
  65. }