浏览代码

receivedBuf

0x4a52466c696e74 9 月之前
父节点
当前提交
75b09054d9
共有 5 个文件被更改,包括 29 次插入25 次删除
  1. 2 2
      protocol_json.go
  2. 4 2
      server.go
  3. 2 2
      socket.go
  4. 15 13
      socket_client.go
  5. 6 6
      z_test.go

+ 2 - 2
protocol_json.go

@@ -8,9 +8,9 @@ import (
 	"git.ali33.ru/fcg-xvii/go-tools/json"
 )
 
-func MakeProtocolJSON(conn net.Conn, ctx context.Context) *ProtocolJSON {
+func MakeProtocolJSON(conn net.Conn, ctx context.Context, receiveBuf int) *ProtocolJSON {
 	proto := &ProtocolJSON{
-		receiver: make(chan json.Map, 1),
+		receiver: make(chan json.Map, receiveBuf),
 	}
 	proto.ctx, proto.cancel = context.WithCancel(ctx)
 	proto.proto = MakeProtocolData(conn, proto.ctx)

+ 4 - 2
server.go

@@ -9,7 +9,7 @@ import (
 	"git.ali33.ru/fcg-xvii/go-tools/containers/concurrent"
 )
 
-func NewServer(host string, port uint16, sockBufSize int, ctx context.Context) *Server {
+func NewServer(host string, port uint16, sockBufSize int, ctx context.Context, receiveBuf int) *Server {
 	ctx, cancel := context.WithCancel(ctx)
 	server := &Server{
 		ctx:          ctx,
@@ -18,6 +18,7 @@ func NewServer(host string, port uint16, sockBufSize int, ctx context.Context) *
 		port:         port,
 		chConnection: make(chan *Socket),
 		sockets:      concurrent.NewList(),
+		receiveBuf:   receiveBuf,
 	}
 	go server.start()
 	return server
@@ -29,6 +30,7 @@ type Server struct {
 	host         string
 	port         uint16
 	sockBufSize  int
+	receiveBuf   int
 	listener     net.Listener
 	sockets      *concurrent.List
 	chConnection chan *Socket
@@ -56,7 +58,7 @@ func (s *Server) listen() {
 			//fmt.Println("Error accepting connection:", err)
 			return
 		}
-		socket := NewSocket(conn, s.sockBufSize, s.ctx)
+		socket := NewSocket(conn, s.sockBufSize, s.ctx, s.receiveBuf)
 		s.sockets.PushBack(socket)
 		go func(sock *Socket) {
 			<-sock.Context().Done()

+ 2 - 2
socket.go

@@ -11,9 +11,9 @@ import (
 	"git.ali33.ru/fcg-xvii/go-tools/json"
 )
 
-func NewSocket(conn net.Conn, bufSize int, ctx context.Context) *Socket {
+func NewSocket(conn net.Conn, bufSize int, ctx context.Context, receiveBuf int) *Socket {
 	socket := &Socket{
-		proto:     MakeProtocolJSON(conn, ctx),
+		proto:     MakeProtocolJSON(conn, ctx, receiveBuf),
 		rCounter:  0,
 		chRecvQ:   make(chan *Question, bufSize),
 		chRecvE:   make(chan *Event, bufSize),

+ 15 - 13
socket_client.go

@@ -7,26 +7,28 @@ import (
 	"time"
 )
 
-func NewSocketClient(host string, port uint16, bufSize int, ctx context.Context) *SocketClient {
+func NewSocketClient(host string, port uint16, bufSize int, ctx context.Context, receiveBuf int) *SocketClient {
 	cctx, cancel := context.WithCancel(ctx)
 	socket := &SocketClient{
-		host:    host,
-		port:    port,
-		bufSize: bufSize,
-		ctx:     cctx,
-		cancel:  cancel,
+		host:       host,
+		port:       port,
+		bufSize:    bufSize,
+		receiveBuf: receiveBuf,
+		ctx:        cctx,
+		cancel:     cancel,
 	}
 	go socket.connect()
 	return socket
 }
 
 type SocketClient struct {
-	host    string
-	port    uint16
-	bufSize int
-	ctx     context.Context
-	cancel  context.CancelFunc
-	socket  *Socket
+	host       string
+	port       uint16
+	bufSize    int
+	receiveBuf int
+	ctx        context.Context
+	cancel     context.CancelFunc
+	socket     *Socket
 }
 
 func (s *SocketClient) start() {
@@ -54,7 +56,7 @@ func (s *SocketClient) connect() {
 		default:
 			//log.Println("CONNECT...")
 			if conn, err := net.Dial("tcp", fmt.Sprintf("%v:%v", s.host, s.port)); err == nil {
-				s.socket = NewSocket(conn, s.bufSize, s.ctx)
+				s.socket = NewSocket(conn, s.bufSize, s.ctx, s.receiveBuf)
 				s.start()
 				return
 			}

+ 6 - 6
z_test.go

@@ -39,8 +39,8 @@ func TestProtoJSON(t *testing.T) {
 		}
 	}
 	c1, c2 := net.Pipe()
-	u1 := MakeProtocolJSON(c1, context.Background())
-	u2 := MakeProtocolJSON(c2, context.Background())
+	u1 := MakeProtocolJSON(c1, context.Background(), 1000)
+	u2 := MakeProtocolJSON(c2, context.Background(), 1000)
 	go work(u1, "u1")
 	go work(u2, "u2")
 	u1.Send(json.Map{
@@ -92,8 +92,8 @@ func TestSocket(t *testing.T) {
 		}
 	}
 	c1, c2 := net.Pipe()
-	s1 := NewSocket(c1, 10, context.Background())
-	s2 := NewSocket(c2, 10, context.Background())
+	s1 := NewSocket(c1, 10, context.Background(), 1000)
+	s2 := NewSocket(c2, 10, context.Background(), 1000)
 	go work(s1, "s1")
 	go work(s2, "s2")
 	log.Println(s1, s2)
@@ -117,7 +117,7 @@ func TestSocket(t *testing.T) {
 func TestServer(t *testing.T) {
 	host, port := "127.0.0.1", uint16(4500)
 	newServer := func() *Server {
-		serv := NewServer(host, port, 10, context.Background())
+		serv := NewServer(host, port, 10, context.Background(), 1000)
 		log.Println(serv)
 		go func() {
 			socket, ok := <-serv.NewConnection()
@@ -137,7 +137,7 @@ func TestServer(t *testing.T) {
 	}
 	serv := newServer()
 	log.Println(serv)
-	cl := NewSocketClient(host, port, 10, context.Background())
+	cl := NewSocketClient(host, port, 10, context.Background(), 1000)
 	log.Println(cl)
 	time.Sleep(time.Second * 20)
 	serv.Close()