123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141 |
- package microservice
- import (
- "context"
- "log"
- "net"
- "sync/atomic"
- "time"
- "git.ali33.ru/fcg-xvii/go-tools/containers/concurrent"
- "git.ali33.ru/fcg-xvii/go-tools/json"
- )
- func NewSocket(conn net.Conn, bufSize int, ctx context.Context, receiveBuf int) *Socket {
- socket := &Socket{
- proto: MakeProtocolJSON(conn, ctx, receiveBuf),
- rCounter: 0,
- chRecvQ: make(chan *Question, bufSize),
- chRecvE: make(chan *Event, bufSize),
- questions: concurrent.NewList(),
- }
- socket.ctx, socket.cancel = context.WithCancel(ctx)
- go socket.receive()
- go socket.workQuestions()
- return socket
- }
- type Socket struct {
- ctx context.Context
- cancel context.CancelFunc
- proto *ProtocolJSON
- rCounter int64
- questions *concurrent.List
- chRecvQ chan *Question
- chRecvE chan *Event
- }
- func (s *Socket) Context() context.Context {
- return s.ctx
- }
- func (s *Socket) RecvQuestion() <-chan *Question {
- return s.chRecvQ
- }
- func (s *Socket) RecvEvent() <-chan *Event {
- return s.chRecvE
- }
- func (s *Socket) workQuestions() {
- t := time.NewTicker(time.Second)
- defer func() {
- t.Stop()
- s.cancel()
- s.questions.Each(func(elem *concurrent.Element) bool {
- q := elem.Val().(*Question)
- close(q.wAnswer)
- return true
- })
- log.Println("work-questions closed")
- }()
- for {
- select {
- case <-s.proto.Context().Done():
- return
- case <-t.C:
- now := time.Now()
- //log.Println(s.questions.Size())
- s.questions.Each(func(elem *concurrent.Element) bool {
- q := elem.Val().(*Question)
- if !q.IsActual(now) {
- s.questions.Remove(elem)
- close(q.wAnswer)
- }
- return true
- })
- }
- }
- }
- func (s *Socket) getQuestion(id int64) (elem *concurrent.Element, check bool) {
- s.questions.Each(func(e *concurrent.Element) bool {
- tq := e.Val().(*Question)
- if tq.id == id {
- elem, check = e, true
- return false
- }
- return true
- })
- return
- }
- func (s *Socket) receive() {
- defer log.Println(">>>>>>>>>>>>>>>> socket-receive closed")
- for m := range s.proto.Receiver() {
- rType := RequestType(m.Int32("type", -1))
- switch rType {
- case RequestQuestion:
- q := &Question{
- id: m.Int("id", 0),
- data: m.Map("data", json.Map{}),
- socket: s,
- created: time.Now(),
- }
- s.chRecvQ <- q
- case RequestAnswer:
- if e, check := s.getQuestion(m.Int("id", -1)); check {
- s.questions.Remove(e)
- q := e.Val().(*Question)
- q.wAnswer <- m.Map("data", json.Map{})
- }
- case RequestEvent:
- e := &Event{
- data: m.Map("data", json.Map{}),
- }
- s.chRecvE <- e
- }
- }
- }
- func (s *Socket) SendQuestion(q *Question) {
- atomic.AddInt64(&s.rCounter, 1)
- q.id, q.socket, q.created = s.rCounter, s, time.Now()
- s.questions.PushBack(q)
- if q.wAnswer == nil {
- q.wAnswer = make(chan json.Map, 1)
- }
- s.proto.Send(q.JSONMap())
- }
- func (s *Socket) SendEvent(e *Event) {
- s.proto.Send(e.JSONMap())
- }
- func (s *Socket) sendMap(m json.Map) {
- s.proto.Send(m)
- }
- func (s *Socket) Close() {
- s.proto.Close()
- }
|