package microservice import ( "context" "log" "net" "sync/atomic" "time" "git.ali33.ru/fcg-xvii/go-tools/containers/concurrent" "git.ali33.ru/fcg-xvii/go-tools/json" ) func NewSocket(conn net.Conn, bufSize int, ctx context.Context, receiveBuf int) *Socket { socket := &Socket{ proto: MakeProtocolJSON(conn, ctx, receiveBuf), rCounter: 0, chRecvQ: make(chan *Question, bufSize), chRecvE: make(chan *Event, bufSize), questions: concurrent.NewList(), } socket.ctx, socket.cancel = context.WithCancel(ctx) go socket.receive() go socket.workQuestions() return socket } type Socket struct { ctx context.Context cancel context.CancelFunc proto *ProtocolJSON rCounter int64 questions *concurrent.List chRecvQ chan *Question chRecvE chan *Event } func (s *Socket) Context() context.Context { return s.ctx } func (s *Socket) RecvQuestion() <-chan *Question { return s.chRecvQ } func (s *Socket) RecvEvent() <-chan *Event { return s.chRecvE } func (s *Socket) workQuestions() { t := time.NewTicker(time.Second) defer func() { t.Stop() s.cancel() s.questions.Each(func(elem *concurrent.Element) bool { q := elem.Val().(*Question) close(q.wAnswer) return true }) log.Println("work-questions closed") }() for { select { case <-s.proto.Context().Done(): return case <-t.C: now := time.Now() //log.Println(s.questions.Size()) s.questions.Each(func(elem *concurrent.Element) bool { q := elem.Val().(*Question) if !q.IsActual(now) { s.questions.Remove(elem) close(q.wAnswer) } return true }) } } } func (s *Socket) getQuestion(id int64) (elem *concurrent.Element, check bool) { s.questions.Each(func(e *concurrent.Element) bool { tq := e.Val().(*Question) if tq.id == id { elem, check = e, true return false } return true }) return } func (s *Socket) receive() { defer log.Println(">>>>>>>>>>>>>>>> socket-receive closed") for m := range s.proto.Receiver() { rType := RequestType(m.Int32("type", -1)) switch rType { case RequestQuestion: q := &Question{ id: m.Int("id", 0), data: m.Map("data", json.Map{}), socket: s, created: time.Now(), } s.chRecvQ <- q case RequestAnswer: if e, check := s.getQuestion(m.Int("id", -1)); check { s.questions.Remove(e) q := e.Val().(*Question) q.wAnswer <- m.Map("data", json.Map{}) } case RequestEvent: e := &Event{ data: m.Map("data", json.Map{}), } s.chRecvE <- e } } } func (s *Socket) SendQuestion(q *Question) { atomic.AddInt64(&s.rCounter, 1) q.id, q.socket, q.created = s.rCounter, s, time.Now() s.questions.PushBack(q) if q.wAnswer == nil { q.wAnswer = make(chan json.Map, 1) } s.proto.Send(q.JSONMap()) } func (s *Socket) SendEvent(e *Event) { s.proto.Send(e.JSONMap()) } func (s *Socket) sendMap(m json.Map) { s.proto.Send(m) } func (s *Socket) Close() { s.proto.Close() }