七叶笔记 » golang编程 » go grpc stream

go grpc stream

stream.proto

 syntax = "proto3";

package stream;

option go_package = ".;stream";

service StreamService {
  // 简单rpc
  rpc SimpleRpc(empty) returns (empty) {};
  // 服务端stream
  rpc ServerRpc(StreamRequest) returns (stream StreamResponse) {};
  // 客户端stream
  rpc ClientRpc(stream StreamRequest) returns (StreamResponse) {};
  // 双向stream
  rpc ServerClientRpc(stream StreamRequest) returns (stream StreamResponse) {};
}

message empty {

}

message StreamRequest {
  StreamData data = 1;
}

message StreamResponse {
  StreamData data = 1;
}

message StreamData {
  int32 id = 1;
  string name = 2;
}  

server.go

 package main

import (
"context"
"google.golang.org/grpc"
"io"
"log"
"net"
pb "test/rpc/stream"
)

type StreamService struct{}

// 简单rpc
// Simple RPC存在的问题
// 1、数据包过大造成的瞬时压力
// 2、接收数据包时,需要所有数据包都接收成功且正确后,才能够正常响应(无法客户端边发送,服务端边处理)
func (s *StreamService) SimpleRpc(context.Context, *pb.Empty) (*pb.Empty, error) {
return nil, nil
}

// 服务端stream
// 单向流,Server 为 Stream 而 Client 为普通 RPC 请求
// 就是客户端发起一次普通的 RPC 请求,服务端通过流式响应多次发送数据集,客户端 Recv 接收数据集
func (s *StreamService) ServerRpc(r *pb.StreamRequest, stream pb.StreamService_ServerRpcServer) error {
for n := 0; n <= 6; n++ {
err := stream.Send(&pb.StreamResponse{
Data: &pb.StreamData{
Id:   r.Data.Id + int32(n),
Name: r.Data.Name,
},
})
if err != nil {
return err
}
}

return nil
}

// 客户端stream
// 单向流,客户端通过流式发起多次 RPC 请求给服务端,服务端发起一次响应给客户端
func (s *StreamService) ClientRpc(stream pb.StreamService_ClientRpcServer) error {
for {
r, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&pb.StreamResponse{Data: &pb.StreamData{Id: 1, Name: "gRPC Stream Server: ClientRpc"}})
}
if err != nil {
return err
}

log.Printf("stream.Recv data.id: %d, data.name: %s", r.Data.Id, r.Data.Name)
}
}

// 双向stream
// 客户端和服务端异步的读取和接收数据
func (s *StreamService) ServerClientRpc(stream pb.StreamService_ServerClientRpcServer) error {
n := 0
for {
err := stream.Send(&pb.StreamResponse{
Data: &pb.StreamData{
Id:   int32(n),
Name: "gPRC Stream Client: ServerClientRpc",
},
})
if err != nil {
return err
}

r, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
n++

log.Printf("stream.Recv data.id: %d, data.name: %s", r.Data.Id, r.Data.Name)
}
}

func main() {
server := grpc.NewServer()
pb.RegisterStreamServiceServer(server, &StreamService{})

lis, err := net.Listen("tcp", ":8888")
if err != nil {
log.Fatalf("net.Listen err: %v", err)
}

server.Serve(lis)
}
  

client.go

 package main

import (
"context"
"io"
"log"
pb "test/rpc/stream"

"google.golang.org/grpc"
)

func serverRpc(client pb.StreamServiceClient, r *pb.StreamRequest) error {
stream, err := client.ServerRpc(context.Background(), r)
if err != nil {
return err
}

for {
resp, err := stream.Recv()
// 当流成功/结束时,会返回 io.EOF
if err == io.EOF {
break
}
if err != nil {
return err
}

log.Printf("resp: data.id: %d, data.name: %s", resp.Data.Id, resp.Data.Name)
}

return nil
}

func clientRpc(client pb.StreamServiceClient, r *pb.StreamRequest) error {
stream, err := client.ClientRpc(context.Background())
if err != nil {
return err
}

for n := 0; n < 6; n++ {
err := stream.Send(r)
if err != nil {
return err
}
}

resp, err := stream.CloseAndRecv()
if err != nil {
return err
}

log.Printf("resp: data.id: %d, data.name: %s", resp.Data.Id, resp.Data.Name)

return nil
}

func serverClientRpc(client pb.StreamServiceClient, r *pb.StreamRequest) error {
stream, err := client.ServerClientRpc(context.Background())
if err != nil {
return err
}

for n := 0; n <= 6; n++ {
err = stream.Send(r)
if err != nil {
return err
}

resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}

log.Printf("resp: data.id: %d, data.name: %s", resp.Data.Id, resp.Data.Name)
}

stream.CloseSend()

return nil
}

func main() {
conn, err := grpc.Dial(":8888", grpc.WithInsecure())
if err != nil {
log.Fatalf("grpc.Dial err: %v", err)
}
defer conn.Close()

client := pb.NewStreamServiceClient(conn)

err = serverRpc(client, &pb.StreamRequest{Data: &pb.StreamData{Name: "gRPC Stream Client: ServerRpc", Id: 2022}})
if err != nil {
log.Fatalf("serverRpc.err: %v", err)
}

err = clientRpc(client, &pb.StreamRequest{Data: &pb.StreamData{Name: "gRPC Stream Client: ClientRpc", Id: 2022}})
if err != nil {
log.Fatalf("clientRpc.err: %v", err)
}

err = serverClientRpc(client, &pb.StreamRequest{Data: &pb.StreamData{Name: "gRPC Stream Client: ServerClientRpc", Id: 2022}})
if err != nil {
log.Fatalf("serverClientRpc.err: %v", err)
}
}
  

输出:

 $ go run server.go
2022/01/28 15:11:43 stream.Recv data.id: 2022, data.name: gRPC Stream Client: ClientRpc
2022/01/28 15:11:43 stream.Recv data.id: 2022, data.name: gRPC Stream Client: ClientRpc
2022/01/28 15:11:43 stream.Recv data.id: 2022, data.name: gRPC Stream Client: ClientRpc
2022/01/28 15:11:43 stream.Recv data.id: 2022, data.name: gRPC Stream Client: ClientRpc
2022/01/28 15:11:43 stream.Recv data.id: 2022, data.name: gRPC Stream Client: ClientRpc
2022/01/28 15:11:43 stream.Recv data.id: 2022, data.name: gRPC Stream Client: ClientRpc
2022/01/28 15:11:43 stream.Recv data.id: 2022, data.name: gRPC Stream Client: ServerClientRpc
2022/01/28 15:11:43 stream.Recv data.id: 2022, data.name: gRPC Stream Client: ServerClientRpc
2022/01/28 15:11:43 stream.Recv data.id: 2022, data.name: gRPC Stream Client: ServerClientRpc
2022/01/28 15:11:43 stream.Recv data.id: 2022, data.name: gRPC Stream Client: ServerClientRpc
2022/01/28 15:11:43 stream.Recv data.id: 2022, data.name: gRPC Stream Client: ServerClientRpc
2022/01/28 15:11:43 stream.Recv data.id: 2022, data.name: gRPC Stream Client: ServerClientRpc
2022/01/28 15:11:43 stream.Recv data.id: 2022, data.name: gRPC Stream Client: ServerClientRpc

$ go run client.go
2022/01/28 15:11:43 resp: data.id: 2022, data.name: gRPC Stream Client: ServerRpc
2022/01/28 15:11:43 resp: data.id: 2023, data.name: gRPC Stream Client: ServerRpc
2022/01/28 15:11:43 resp: data.id: 2024, data.name: gRPC Stream Client: ServerRpc
2022/01/28 15:11:43 resp: data.id: 2025, data.name: gRPC Stream Client: ServerRpc
2022/01/28 15:11:43 resp: data.id: 2026, data.name: gRPC Stream Client: ServerRpc
2022/01/28 15:11:43 resp: data.id: 2027, data.name: gRPC Stream Client: ServerRpc
2022/01/28 15:11:43 resp: data.id: 2028, data.name: gRPC Stream Client: ServerRpc
2022/01/28 15:11:43 resp: data.id: 1, data.name: gRPC Stream Server: ClientRpc
2022/01/28 15:11:43 resp: data.id: 0, data.name: gPRC Stream Client: ServerClientRpc
2022/01/28 15:11:43 resp: data.id: 1, data.name: gPRC Stream Client: ServerClientRpc
2022/01/28 15:11:43 resp: data.id: 2, data.name: gPRC Stream Client: ServerClientRpc
2022/01/28 15:11:43 resp: data.id: 3, data.name: gPRC Stream Client: ServerClientRpc
2022/01/28 15:11:43 resp: data.id: 4, data.name: gPRC Stream Client: ServerClientRpc
2022/01/28 15:11:43 resp: data.id: 5, data.name: gPRC Stream Client: ServerClientRpc
2022/01/28 15:11:43 resp: data.id: 6, data.name: gPRC Stream Client: ServerClientRpc  

相关文章