0x4a52466c696e74 1 рік тому
коміт
7befe66f62
13 змінених файлів з 839 додано та 0 видалено
  1. 5 0
      application.go
  2. 2 0
      client.go
  3. 24 0
      event.go
  4. 5 0
      go.mod
  5. 6 0
      go.sum
  6. 189 0
      protocol_data.go
  7. 73 0
      protocol_json.go
  8. 65 0
      question.go
  9. 24 0
      request_type.go
  10. 81 0
      server.go
  11. 141 0
      socket.go
  12. 70 0
      socket_client.go
  13. 154 0
      z_test.go

+ 5 - 0
application.go

@@ -0,0 +1,5 @@
+package microservice
+
+type Application interface {
+	NewConnection() chan<- *Socket
+}

+ 2 - 0
client.go

@@ -0,0 +1,2 @@
+package microservice
+

+ 24 - 0
event.go

@@ -0,0 +1,24 @@
+package microservice
+
+import "git.ali33.ru/fcg-xvii/go-tools/json"
+
+func NewEvent(data json.Map) *Event {
+	return &Event{
+		data: data,
+	}
+}
+
+type Event struct {
+	data json.Map
+}
+
+func (s *Event) JSONMap() json.Map {
+	return json.Map{
+		"type": RequestEvent,
+		"data": s.data,
+	}
+}
+
+func (s *Event) Data() json.Map {
+	return s.data
+}

+ 5 - 0
go.mod

@@ -0,0 +1,5 @@
+module git.ali33.ru/fcg-xvii/microservice
+
+go 1.18
+
+require git.ali33.ru/fcg-xvii/go-tools v0.0.0-20230514063340-d0ce885e1443

+ 6 - 0
go.sum

@@ -0,0 +1,6 @@
+git.ali33.ru/fcg-xvii/go-tools v0.0.0-20230110093640-a2f2134e5c74 h1:X0E5UmZ/IUHElzoi99GObO+7uvTyej7O9Pct2L2QQ8o=
+git.ali33.ru/fcg-xvii/go-tools v0.0.0-20230110093640-a2f2134e5c74/go.mod h1:YbBhWFFNNQIKcRisQFnpVaN5KA+XHGImSU1Z/MuntqU=
+git.ali33.ru/fcg-xvii/go-tools v0.0.0-20230514062332-23b664717b8d h1:7mTaXAwMJkgyMt/fob58STe8NekkasMWmyXLgghB71w=
+git.ali33.ru/fcg-xvii/go-tools v0.0.0-20230514062332-23b664717b8d/go.mod h1:YbBhWFFNNQIKcRisQFnpVaN5KA+XHGImSU1Z/MuntqU=
+git.ali33.ru/fcg-xvii/go-tools v0.0.0-20230514063340-d0ce885e1443 h1:BptI66LSC9g5T2f3jkL/SsAw6yqaXkIFuGVYAaDkGQ8=
+git.ali33.ru/fcg-xvii/go-tools v0.0.0-20230514063340-d0ce885e1443/go.mod h1:YbBhWFFNNQIKcRisQFnpVaN5KA+XHGImSU1Z/MuntqU=

+ 189 - 0
protocol_data.go

@@ -0,0 +1,189 @@
+package microservice
+
+import (
+	"context"
+	"encoding/binary"
+	"io"
+	"log"
+	"net"
+	"time"
+)
+
+const (
+	PackageData byte = iota
+	PackagePing
+	PackagePong
+	PackageClose
+)
+
+type Package struct {
+	id   byte
+	data []byte
+}
+
+func MakeProtocolData(conn net.Conn, ctx context.Context) *ProtocolData {
+	proto := &ProtocolData{
+		conn:     conn,
+		receiver: make(chan []byte, 1),
+		sender:   make(chan Package, 10),
+		deadline: time.Second * 10,
+	}
+	proto.ctx, proto.cancel = context.WithCancel(ctx)
+
+	// receiver
+	go proto.workReceive()
+	// sender
+	go proto.workSend()
+	return proto
+}
+
+type ProtocolData struct {
+	conn     net.Conn
+	receiver chan []byte
+	sender   chan Package
+	ctx      context.Context
+	cancel   context.CancelFunc
+	deadline time.Duration
+}
+
+func (s *ProtocolData) upWriteDL(t *time.Time) {
+	*t = time.Now()
+	s.conn.SetWriteDeadline(time.Now().Add(s.deadline).Add(time.Second))
+}
+
+func (s *ProtocolData) workSend() {
+	t := time.NewTicker(s.deadline / 2)
+	defer func() {
+		//log.Println("sender closed")
+		t.Stop()
+		s.cancel()
+		close(s.receiver)
+	}()
+	closed := false
+	lastSended := time.Now()
+	s.upWriteDL(&lastSended)
+	for {
+		select {
+		case <-s.ctx.Done():
+			return
+		case <-t.C:
+			if closed {
+				return
+			}
+			if lastSended.Add(s.deadline / 2).Before(time.Now()) {
+				// send ping
+				if _, err := s.conn.Write([]byte{PackagePing}); err != nil {
+					//log.Println("error sending package id:", err)
+					return
+				}
+				s.upWriteDL(&lastSended)
+			}
+		case p, ok := <-s.sender:
+			if !ok {
+				return
+			}
+			if _, err := s.conn.Write([]byte{p.id}); err != nil {
+				//log.Println("error sending package id:", err)
+				return
+			}
+			if p.id == PackageData {
+				size := uint32(len(p.data))
+				if err := binary.Write(s.conn, binary.LittleEndian, size); err != nil {
+					//log.Println("error sending data size:", err)
+					return
+				}
+				if _, err := s.conn.Write(p.data); err != nil {
+					//log.Println("error sending JSON data:", err)
+					return
+				}
+			} else if p.id == PackageClose {
+				//log.Println("--- close sended")
+				closed = true
+				s.conn.Close()
+				return
+			}
+			if !closed {
+				s.upWriteDL(&lastSended)
+			}
+		}
+	}
+}
+
+func (s *ProtocolData) workReceive() {
+	//defer log.Println("receive closed")
+	for {
+		select {
+		case <-s.ctx.Done():
+			return
+		default:
+			packageID := []byte{0}
+			if _, err := s.conn.Read(packageID); err != nil {
+				/*if err != io.EOF {
+					log.Println("error read package id:", err)
+				}*/
+				return
+			}
+			switch packageID[0] {
+			case PackageData: // data
+				var size uint32
+				if err := binary.Read(s.conn, binary.LittleEndian, &size); err != nil {
+					/*if err != io.EOF {
+						log.Println("error reading data size:", err)
+					}*/
+					return
+				}
+				// Чтение JSON-данных
+				data := make([]byte, size)
+				if _, err := io.ReadFull(s.conn, data); err != nil {
+					if err != io.EOF {
+						log.Println("error reading data:", err)
+					}
+					return
+				}
+				select {
+				case s.receiver <- data:
+				default:
+				}
+				s.conn.SetReadDeadline(time.Now().Add(s.deadline).Add(time.Second))
+			case PackagePing: // ping
+				s.sender <- Package{
+					id: PackagePong,
+				}
+				s.conn.SetReadDeadline(time.Now().Add(s.deadline).Add(time.Second))
+			case PackagePong: // pong
+				log.Println("PONG")
+				s.conn.SetReadDeadline(time.Now().Add(s.deadline).Add(time.Second))
+			case PackageClose:
+				log.Println("+++ close received")
+				s.conn.Close()
+				s.cancel()
+				return
+			}
+		}
+	}
+}
+
+func (s *ProtocolData) Context() context.Context {
+	return s.ctx
+}
+
+func (s *ProtocolData) Receiver() <-chan []byte {
+	return s.receiver
+}
+
+func (s *ProtocolData) Send(data []byte) {
+	s.sender <- Package{
+		id:   PackageData,
+		data: data,
+	}
+}
+
+func (s *ProtocolData) Close() {
+	//log.Println("CLOSE--------------------------------")
+	/*
+		s.sender <- Package{
+			id: PackageClose,
+		}
+	*/
+	s.conn.Close()
+}

+ 73 - 0
protocol_json.go

@@ -0,0 +1,73 @@
+package microservice
+
+import (
+	"context"
+	"log"
+	"net"
+
+	"git.ali33.ru/fcg-xvii/go-tools/json"
+)
+
+func MakeProtocolJSON(conn net.Conn, ctx context.Context) *ProtocolJSON {
+	proto := &ProtocolJSON{
+		receiver: make(chan json.Map, 1),
+	}
+	proto.ctx, proto.cancel = context.WithCancel(ctx)
+	proto.proto = MakeProtocolData(conn, proto.ctx)
+	// receiver
+	go proto.receive()
+	return proto
+}
+
+type ProtocolJSON struct {
+	proto    *ProtocolData
+	receiver chan json.Map
+	ctx      context.Context
+	cancel   context.CancelFunc
+}
+
+func (s *ProtocolJSON) receive() {
+	defer func() {
+		log.Println("JSON receiver closed")
+		s.cancel()
+		close(s.receiver)
+	}()
+	for {
+		select {
+		case <-s.ctx.Done():
+			//s.proto.Close()
+			return
+		case <-s.proto.Context().Done():
+			return
+		case data, ok := <-s.proto.Receiver():
+			if !ok {
+				return
+			}
+			var m json.Map
+			if err := json.Unmarshal(data, &m); err != nil {
+				log.Println("Protocol JSON parse error:", err)
+				return
+			}
+			select {
+			case s.receiver <- m:
+			default:
+			}
+		}
+	}
+}
+
+func (s *ProtocolJSON) Context() context.Context {
+	return s.ctx
+}
+
+func (s *ProtocolJSON) Receiver() <-chan json.Map {
+	return s.receiver
+}
+
+func (s *ProtocolJSON) Send(m json.Map) {
+	s.proto.Send(m.JSON())
+}
+
+func (s *ProtocolJSON) Close() {
+	s.proto.Close()
+}

+ 65 - 0
question.go

@@ -0,0 +1,65 @@
+package microservice
+
+import (
+	"time"
+
+	"git.ali33.ru/fcg-xvii/go-tools/json"
+)
+
+func NewQuestion(data json.Map, timeout time.Duration) *Question {
+	if timeout == 0 {
+		timeout = time.Minute
+	}
+	q := &Question{
+		data:    data,
+		timeout: timeout,
+		wAnswer: make(chan json.Map, 1),
+	}
+	return q
+}
+
+// Request
+type Question struct {
+	id      int64
+	data    json.Map
+	socket  *Socket
+	created time.Time
+	timeout time.Duration
+	wAnswer chan json.Map
+}
+
+func (s *Question) IsActual(t time.Time) bool {
+	return s.created.Add(s.timeout).After(t)
+}
+
+func (s *Question) WaitAnswer() <-chan json.Map {
+	return s.wAnswer
+}
+
+func (s *Question) Type() RequestType {
+	return RequestQuestion
+}
+
+func (s *Question) JSONMap() json.Map {
+	return json.Map{
+		"id":   s.id,
+		"type": RequestQuestion,
+		"data": s.data,
+	}
+}
+
+func (s *Question) Answer(data json.Map) {
+	if s.timeout > 0 {
+		s.timeout = time.Minute
+	}
+	m := json.Map{
+		"id":   s.id,
+		"type": RequestAnswer,
+		"data": data,
+	}
+	s.socket.sendMap(m)
+}
+
+func (s *Question) Data() json.Map {
+	return s.data
+}

+ 24 - 0
request_type.go

@@ -0,0 +1,24 @@
+package microservice
+
+import "git.ali33.ru/fcg-xvii/go-tools/json"
+
+type RequestType byte
+
+const (
+	RequestQuestion RequestType = iota
+	RequestEvent
+	RequestAnswer
+	RequestClose
+)
+
+// Data
+type Data interface {
+	Type() RequestType
+	JSONMap() json.Map
+}
+
+func MakeQuestion(data json.Map) *Question {
+	return &Question{
+		data: data,
+	}
+}

+ 81 - 0
server.go

@@ -0,0 +1,81 @@
+package microservice
+
+import (
+	"context"
+	"fmt"
+	"log"
+	"net"
+
+	"git.ali33.ru/fcg-xvii/go-tools/containers/concurrent"
+)
+
+func NewServer(host string, port uint16, sockBufSize int, ctx context.Context) *Server {
+	ctx, cancel := context.WithCancel(ctx)
+	server := &Server{
+		ctx:          ctx,
+		cancel:       cancel,
+		host:         host,
+		port:         port,
+		chConnection: make(chan *Socket),
+		sockets:      concurrent.NewList(),
+	}
+	go server.start()
+	return server
+}
+
+type Server struct {
+	ctx          context.Context
+	cancel       context.CancelFunc
+	host         string
+	port         uint16
+	sockBufSize  int
+	listener     net.Listener
+	sockets      *concurrent.List
+	chConnection chan *Socket
+}
+
+func (s *Server) NewConnection() <-chan *Socket {
+	return s.chConnection
+}
+
+func (s *Server) start() {
+	go s.listen()
+	<-s.ctx.Done()
+	s.listener.Close()
+	close(s.chConnection)
+}
+
+func (s *Server) listen() {
+	var err error
+	if s.listener, err = net.Listen("tcp", fmt.Sprintf("%v:%v", s.host, s.port)); err != nil {
+		log.Fatal("Error starting all trade server:", err)
+	}
+	var conn net.Conn
+	for {
+		if conn, err = s.listener.Accept(); err != nil {
+			//fmt.Println("Error accepting connection:", err)
+			return
+		}
+		socket := NewSocket(conn, s.sockBufSize, s.ctx)
+		s.sockets.PushBack(socket)
+		go func(sock *Socket) {
+			<-sock.Context().Done()
+			log.Println("close sock accepted....")
+			if elem := s.sockets.Search(sock); elem != nil {
+				log.Println("socket remove from list")
+				s.sockets.Remove(elem)
+			}
+			log.Println(s.sockets.Size())
+		}(socket)
+		s.chConnection <- socket
+	}
+}
+
+func (s *Server) Close() {
+	s.cancel()
+	s.sockets.Each(func(val *concurrent.Element) bool {
+		log.Println("CLOSE socket..........")
+		val.Val().(*Socket).Close()
+		return true
+	})
+}

+ 141 - 0
socket.go

@@ -0,0 +1,141 @@
+package microservice
+
+import (
+	"context"
+	"log"
+	"net"
+	"sync/atomic"
+	"time"
+
+	"git.ali33.ru/fcg-xvii/go-tools/containers/concurrent"
+	"git.ali33.ru/fcg-xvii/go-tools/json"
+)
+
+func NewSocket(conn net.Conn, bufSize int, ctx context.Context) *Socket {
+	socket := &Socket{
+		proto:     MakeProtocolJSON(conn, ctx),
+		rCounter:  0,
+		chRecvQ:   make(chan *Question, bufSize),
+		chRecvE:   make(chan *Event, bufSize),
+		questions: concurrent.NewList(),
+	}
+	socket.ctx, socket.cancel = context.WithCancel(ctx)
+	go socket.receive()
+	go socket.workQuestions()
+	return socket
+}
+
+type Socket struct {
+	ctx       context.Context
+	cancel    context.CancelFunc
+	proto     *ProtocolJSON
+	rCounter  int64
+	questions *concurrent.List
+	chRecvQ   chan *Question
+	chRecvE   chan *Event
+}
+
+func (s *Socket) Context() context.Context {
+	return s.ctx
+}
+
+func (s *Socket) RecvQuestion() <-chan *Question {
+	return s.chRecvQ
+}
+
+func (s *Socket) RecvEvent() <-chan *Event {
+	return s.chRecvE
+}
+
+func (s *Socket) workQuestions() {
+	t := time.NewTicker(time.Second)
+	defer func() {
+		t.Stop()
+		s.cancel()
+		s.questions.Each(func(elem *concurrent.Element) bool {
+			q := elem.Val().(*Question)
+			close(q.wAnswer)
+			return true
+		})
+		log.Println("work-questions closed")
+	}()
+	for {
+		select {
+		case <-s.proto.Context().Done():
+			return
+		case <-t.C:
+			now := time.Now()
+			//log.Println(s.questions.Size())
+			s.questions.Each(func(elem *concurrent.Element) bool {
+				q := elem.Val().(*Question)
+				if !q.IsActual(now) {
+					s.questions.Remove(elem)
+					close(q.wAnswer)
+				}
+				return true
+			})
+		}
+	}
+}
+
+func (s *Socket) getQuestion(id int64) (elem *concurrent.Element, check bool) {
+	s.questions.Each(func(e *concurrent.Element) bool {
+		tq := e.Val().(*Question)
+		if tq.id == id {
+			elem, check = e, true
+			return false
+		}
+		return true
+	})
+	return
+}
+
+func (s *Socket) receive() {
+	defer log.Println(">>>>>>>>>>>>>>>> socket-receive closed")
+	for m := range s.proto.Receiver() {
+		rType := RequestType(m.Int32("type", -1))
+		switch rType {
+		case RequestQuestion:
+			q := &Question{
+				id:      m.Int("id", 0),
+				data:    m.Map("data", json.Map{}),
+				socket:  s,
+				created: time.Now(),
+			}
+			s.chRecvQ <- q
+		case RequestAnswer:
+			if e, check := s.getQuestion(m.Int("id", -1)); check {
+				s.questions.Remove(e)
+				q := e.Val().(*Question)
+				q.wAnswer <- m.Map("data", json.Map{})
+			}
+		case RequestEvent:
+			e := &Event{
+				data: m.Map("data", json.Map{}),
+			}
+			s.chRecvE <- e
+		}
+	}
+}
+
+func (s *Socket) SendQuestion(q *Question) {
+	atomic.AddInt64(&s.rCounter, 1)
+	q.id, q.socket, q.created = s.rCounter, s, time.Now()
+	s.questions.PushBack(q)
+	if q.wAnswer == nil {
+		q.wAnswer = make(chan json.Map, 1)
+	}
+	s.proto.Send(q.JSONMap())
+}
+
+func (s *Socket) SendEvent(e *Event) {
+	s.proto.Send(e.JSONMap())
+}
+
+func (s *Socket) sendMap(m json.Map) {
+	s.proto.Send(m)
+}
+
+func (s *Socket) Close() {
+	s.proto.Close()
+}

+ 70 - 0
socket_client.go

@@ -0,0 +1,70 @@
+package microservice
+
+import (
+	"context"
+	"fmt"
+	"net"
+	"time"
+)
+
+func NewSocketClient(host string, port uint16, bufSize int, ctx context.Context) *SocketClient {
+	cctx, cancel := context.WithCancel(ctx)
+	socket := &SocketClient{
+		host:    host,
+		port:    port,
+		bufSize: bufSize,
+		ctx:     cctx,
+		cancel:  cancel,
+	}
+	go socket.connect()
+	return socket
+}
+
+type SocketClient struct {
+	host    string
+	port    uint16
+	bufSize int
+	ctx     context.Context
+	cancel  context.CancelFunc
+	socket  *Socket
+}
+
+func (s *SocketClient) start() {
+	//log.Println("Start(X)...")
+	for {
+		select {
+		case <-s.ctx.Done():
+			//log.Println("XDONE--------------------------")
+			return
+		case <-s.socket.Context().Done():
+			//log.Println("DONE>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
+			s.connect()
+			return
+		}
+	}
+}
+
+func (s *SocketClient) connect() {
+	//log.Println("CONNECT(X)..............")
+	for {
+		select {
+		case <-s.ctx.Done():
+			s.socket.Close()
+			return
+		default:
+			//log.Println("CONNECT...")
+			if conn, err := net.Dial("tcp", fmt.Sprintf("%v:%v", s.host, s.port)); err == nil {
+				s.socket = NewSocket(conn, s.bufSize, s.ctx)
+				s.start()
+				return
+			}
+			time.Sleep(time.Second)
+		}
+	}
+}
+
+func (s *SocketClient) Close() {
+	//log.Println("CLIENT-CLOSE...")
+	s.socket.Close()
+	s.cancel()
+}

+ 154 - 0
z_test.go

@@ -0,0 +1,154 @@
+package microservice
+
+import (
+	"context"
+	"log"
+	"net"
+	"testing"
+	"time"
+
+	"git.ali33.ru/fcg-xvii/go-tools/json"
+)
+
+func TestProtoData(t *testing.T) {
+	work := func(u *ProtocolData, name string) {
+		for data := range u.Receiver() {
+			t.Logf("%v data: %v", name, data)
+			time.Sleep(time.Second / 3)
+			u.Send(data)
+		}
+	}
+	c1, c2 := net.Pipe()
+	u1 := MakeProtocolData(c1, context.Background())
+	u2 := MakeProtocolData(c2, context.Background())
+	go work(u1, "u1")
+	go work(u2, "u2")
+	time.Sleep(time.Second * 2)
+	u1.Send([]byte{1, 2, 3, 4, 5})
+	time.Sleep(time.Second * 100)
+	u1.Close()
+	time.Sleep(time.Second * 10)
+}
+
+func TestProtoJSON(t *testing.T) {
+	work := func(u *ProtocolJSON, name string) {
+		for m := range u.Receiver() {
+			t.Logf("%v data: %v", name, m)
+			time.Sleep(time.Second * 3)
+			u.Send(m)
+		}
+	}
+	c1, c2 := net.Pipe()
+	u1 := MakeProtocolJSON(c1, context.Background())
+	u2 := MakeProtocolJSON(c2, context.Background())
+	go work(u1, "u1")
+	go work(u2, "u2")
+	u1.Send(json.Map{
+		"one": 1,
+		"two": 2,
+	})
+	time.Sleep(time.Second * 20)
+	u1.Close()
+	time.Sleep(time.Second * 10)
+}
+
+func TestSocket(t *testing.T) {
+	work := func(s *Socket, name string) {
+		for {
+			select {
+			case <-s.Context().Done():
+				log.Println("DONE....")
+				return
+			case q, ok := <-s.RecvQuestion():
+				if ok {
+					log.Println("Question", name, q.Data())
+				}
+				q.Answer(json.Map{
+					"answer": true,
+					"ts":     time.Now().Unix(),
+				})
+				time.Sleep(time.Second)
+				s.SendQuestion(
+					NewQuestion(
+						json.Map{
+							"ts": time.Now().Unix(),
+						},
+						0,
+					),
+				)
+			case e, ok := <-s.RecvEvent():
+				if ok {
+					log.Println("Event", e.Data())
+					q := NewQuestion(
+						json.Map{
+							"r-answer": "OK",
+						},
+						0,
+					)
+					time.Sleep(time.Second)
+					s.SendQuestion(q)
+				}
+			}
+		}
+	}
+	c1, c2 := net.Pipe()
+	s1 := NewSocket(c1, 10, context.Background())
+	s2 := NewSocket(c2, 10, context.Background())
+	go work(s1, "s1")
+	go work(s2, "s2")
+	log.Println(s1, s2)
+	time.Sleep(time.Second * 2)
+	s1.SendEvent(NewEvent(json.Map{
+		"one": 1,
+	}))
+	time.Sleep(time.Second * 2)
+	q := NewQuestion(
+		json.Map{
+			"ok": true,
+		},
+		time.Second*5,
+	)
+	s1.SendQuestion(q)
+	time.Sleep(time.Second * 20)
+	c1.Close()
+	time.Sleep(time.Second * 30)
+}
+
+func TestServer(t *testing.T) {
+	host, port := "127.0.0.1", uint16(4500)
+	newServer := func() *Server {
+		serv := NewServer(host, port, 10, context.Background())
+		log.Println(serv)
+		go func() {
+			socket, ok := <-serv.NewConnection()
+			if !ok {
+				log.Println("connection error")
+			}
+			for {
+				select {
+				case e := <-socket.RecvEvent():
+					log.Println("event", e)
+				case q := <-socket.RecvQuestion():
+					log.Println("question", q)
+				}
+			}
+		}()
+		return serv
+	}
+	serv := newServer()
+	log.Println(serv)
+	cl := NewSocketClient(host, port, 10, context.Background())
+	log.Println(cl)
+	time.Sleep(time.Second * 20)
+	serv.Close()
+	//cl.Close()
+	log.Println("serv closed")
+	time.Sleep(time.Second * 30)
+	serv = newServer()
+	time.Sleep(time.Second * 30)
+	serv.Close()
+	log.Println("serv-closed-2")
+	time.Sleep(time.Second * 50)
+	cl.Close()
+	time.Sleep(time.Second * 50)
+}