socket.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. package microservice
  2. import (
  3. "context"
  4. "log"
  5. "net"
  6. "sync/atomic"
  7. "time"
  8. "git.ali33.ru/fcg-xvii/go-tools/containers/concurrent"
  9. "git.ali33.ru/fcg-xvii/go-tools/json"
  10. )
  11. func NewSocket(conn net.Conn, bufSize int, ctx context.Context, receiveBuf int) *Socket {
  12. socket := &Socket{
  13. proto: MakeProtocolJSON(conn, ctx, receiveBuf),
  14. rCounter: 0,
  15. chRecvQ: make(chan *Question, bufSize),
  16. chRecvE: make(chan *Event, bufSize),
  17. questions: concurrent.NewList(),
  18. }
  19. socket.ctx, socket.cancel = context.WithCancel(ctx)
  20. go socket.receive()
  21. go socket.workQuestions()
  22. return socket
  23. }
  24. type Socket struct {
  25. ctx context.Context
  26. cancel context.CancelFunc
  27. proto *ProtocolJSON
  28. rCounter int64
  29. questions *concurrent.List
  30. chRecvQ chan *Question
  31. chRecvE chan *Event
  32. }
  33. func (s *Socket) Context() context.Context {
  34. return s.ctx
  35. }
  36. func (s *Socket) RecvQuestion() <-chan *Question {
  37. return s.chRecvQ
  38. }
  39. func (s *Socket) RecvEvent() <-chan *Event {
  40. return s.chRecvE
  41. }
  42. func (s *Socket) workQuestions() {
  43. t := time.NewTicker(time.Second)
  44. defer func() {
  45. t.Stop()
  46. s.cancel()
  47. s.questions.Each(func(elem *concurrent.Element) bool {
  48. q := elem.Val().(*Question)
  49. close(q.wAnswer)
  50. return true
  51. })
  52. log.Println("work-questions closed")
  53. }()
  54. for {
  55. select {
  56. case <-s.proto.Context().Done():
  57. return
  58. case <-t.C:
  59. now := time.Now()
  60. //log.Println(s.questions.Size())
  61. s.questions.Each(func(elem *concurrent.Element) bool {
  62. q := elem.Val().(*Question)
  63. if !q.IsActual(now) {
  64. s.questions.Remove(elem)
  65. close(q.wAnswer)
  66. }
  67. return true
  68. })
  69. }
  70. }
  71. }
  72. func (s *Socket) getQuestion(id int64) (elem *concurrent.Element, check bool) {
  73. s.questions.Each(func(e *concurrent.Element) bool {
  74. tq := e.Val().(*Question)
  75. if tq.id == id {
  76. elem, check = e, true
  77. return false
  78. }
  79. return true
  80. })
  81. return
  82. }
  83. func (s *Socket) receive() {
  84. defer log.Println(">>>>>>>>>>>>>>>> socket-receive closed")
  85. for m := range s.proto.Receiver() {
  86. rType := RequestType(m.Int32("type", -1))
  87. switch rType {
  88. case RequestQuestion:
  89. q := &Question{
  90. id: m.Int("id", 0),
  91. data: m.Map("data", json.Map{}),
  92. socket: s,
  93. created: time.Now(),
  94. }
  95. s.chRecvQ <- q
  96. case RequestAnswer:
  97. if e, check := s.getQuestion(m.Int("id", -1)); check {
  98. s.questions.Remove(e)
  99. q := e.Val().(*Question)
  100. q.wAnswer <- m.Map("data", json.Map{})
  101. }
  102. case RequestEvent:
  103. e := &Event{
  104. data: m.Map("data", json.Map{}),
  105. }
  106. s.chRecvE <- e
  107. }
  108. }
  109. }
  110. func (s *Socket) SendQuestion(q *Question) {
  111. atomic.AddInt64(&s.rCounter, 1)
  112. q.id, q.socket, q.created = s.rCounter, s, time.Now()
  113. s.questions.PushBack(q)
  114. if q.wAnswer == nil {
  115. q.wAnswer = make(chan json.Map, 1)
  116. }
  117. s.proto.Send(q.JSONMap())
  118. }
  119. func (s *Socket) SendEvent(e *Event) {
  120. s.proto.Send(e.JSONMap())
  121. }
  122. func (s *Socket) sendMap(m json.Map) {
  123. s.proto.Send(m)
  124. }
  125. func (s *Socket) Close() {
  126. s.proto.Close()
  127. }