stream.proto
syntax = "proto3";
package stream;
option go_package = ".;stream";
service StreamService {
// 简单rpc
rpc SimpleRpc(empty) returns (empty) {};
// 服务端stream
rpc ServerRpc(StreamRequest) returns (stream StreamResponse) {};
// 客户端stream
rpc ClientRpc(stream StreamRequest) returns (StreamResponse) {};
// 双向stream
rpc ServerClientRpc(stream StreamRequest) returns (stream StreamResponse) {};
}
message empty {
}
message StreamRequest {
StreamData data = 1;
}
message StreamResponse {
StreamData data = 1;
}
message StreamData {
int32 id = 1;
string name = 2;
}
server.go
package main
import (
"context"
"google.golang.org/grpc"
"io"
"log"
"net"
pb "test/rpc/stream"
)
type StreamService struct{}
// 简单rpc
// Simple RPC存在的问题
// 1、数据包过大造成的瞬时压力
// 2、接收数据包时,需要所有数据包都接收成功且正确后,才能够正常响应(无法客户端边发送,服务端边处理)
func (s *StreamService) SimpleRpc(context.Context, *pb.Empty) (*pb.Empty, error) {
return nil, nil
}
// 服务端stream
// 单向流,Server 为 Stream 而 Client 为普通 RPC 请求
// 就是客户端发起一次普通的 RPC 请求,服务端通过流式响应多次发送数据集,客户端 Recv 接收数据集
func (s *StreamService) ServerRpc(r *pb.StreamRequest, stream pb.StreamService_ServerRpcServer) error {
for n := 0; n <= 6; n++ {
err := stream.Send(&pb.StreamResponse{
Data: &pb.StreamData{
Id: r.Data.Id + int32(n),
Name: r.Data.Name,
},
})
if err != nil {
return err
}
}
return nil
}
// 客户端stream
// 单向流,客户端通过流式发起多次 RPC 请求给服务端,服务端发起一次响应给客户端
func (s *StreamService) ClientRpc(stream pb.StreamService_ClientRpcServer) error {
for {
r, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&pb.StreamResponse{Data: &pb.StreamData{Id: 1, Name: "gRPC Stream Server: ClientRpc"}})
}
if err != nil {
return err
}
log.Printf("stream.Recv data.id: %d, data.name: %s", r.Data.Id, r.Data.Name)
}
}
// 双向stream
// 客户端和服务端异步的读取和接收数据
func (s *StreamService) ServerClientRpc(stream pb.StreamService_ServerClientRpcServer) error {
n := 0
for {
err := stream.Send(&pb.StreamResponse{
Data: &pb.StreamData{
Id: int32(n),
Name: "gPRC Stream Client: ServerClientRpc",
},
})
if err != nil {
return err
}
r, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
n++
log.Printf("stream.Recv data.id: %d, data.name: %s", r.Data.Id, r.Data.Name)
}
}
func main() {
server := grpc.NewServer()
pb.RegisterStreamServiceServer(server, &StreamService{})
lis, err := net.Listen("tcp", ":8888")
if err != nil {
log.Fatalf("net.Listen err: %v", err)
}
server.Serve(lis)
}
client.go
package main
import (
"context"
"io"
"log"
pb "test/rpc/stream"
"google.golang.org/grpc"
)
func serverRpc(client pb.StreamServiceClient, r *pb.StreamRequest) error {
stream, err := client.ServerRpc(context.Background(), r)
if err != nil {
return err
}
for {
resp, err := stream.Recv()
// 当流成功/结束时,会返回 io.EOF
if err == io.EOF {
break
}
if err != nil {
return err
}
log.Printf("resp: data.id: %d, data.name: %s", resp.Data.Id, resp.Data.Name)
}
return nil
}
func clientRpc(client pb.StreamServiceClient, r *pb.StreamRequest) error {
stream, err := client.ClientRpc(context.Background())
if err != nil {
return err
}
for n := 0; n < 6; n++ {
err := stream.Send(r)
if err != nil {
return err
}
}
resp, err := stream.CloseAndRecv()
if err != nil {
return err
}
log.Printf("resp: data.id: %d, data.name: %s", resp.Data.Id, resp.Data.Name)
return nil
}
func serverClientRpc(client pb.StreamServiceClient, r *pb.StreamRequest) error {
stream, err := client.ServerClientRpc(context.Background())
if err != nil {
return err
}
for n := 0; n <= 6; n++ {
err = stream.Send(r)
if err != nil {
return err
}
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
log.Printf("resp: data.id: %d, data.name: %s", resp.Data.Id, resp.Data.Name)
}
stream.CloseSend()
return nil
}
func main() {
conn, err := grpc.Dial(":8888", grpc.WithInsecure())
if err != nil {
log.Fatalf("grpc.Dial err: %v", err)
}
defer conn.Close()
client := pb.NewStreamServiceClient(conn)
err = serverRpc(client, &pb.StreamRequest{Data: &pb.StreamData{Name: "gRPC Stream Client: ServerRpc", Id: 2022}})
if err != nil {
log.Fatalf("serverRpc.err: %v", err)
}
err = clientRpc(client, &pb.StreamRequest{Data: &pb.StreamData{Name: "gRPC Stream Client: ClientRpc", Id: 2022}})
if err != nil {
log.Fatalf("clientRpc.err: %v", err)
}
err = serverClientRpc(client, &pb.StreamRequest{Data: &pb.StreamData{Name: "gRPC Stream Client: ServerClientRpc", Id: 2022}})
if err != nil {
log.Fatalf("serverClientRpc.err: %v", err)
}
}
输出:
$ go run server.go
2022/01/28 15:11:43 stream.Recv data.id: 2022, data.name: gRPC Stream Client: ClientRpc
2022/01/28 15:11:43 stream.Recv data.id: 2022, data.name: gRPC Stream Client: ClientRpc
2022/01/28 15:11:43 stream.Recv data.id: 2022, data.name: gRPC Stream Client: ClientRpc
2022/01/28 15:11:43 stream.Recv data.id: 2022, data.name: gRPC Stream Client: ClientRpc
2022/01/28 15:11:43 stream.Recv data.id: 2022, data.name: gRPC Stream Client: ClientRpc
2022/01/28 15:11:43 stream.Recv data.id: 2022, data.name: gRPC Stream Client: ClientRpc
2022/01/28 15:11:43 stream.Recv data.id: 2022, data.name: gRPC Stream Client: ServerClientRpc
2022/01/28 15:11:43 stream.Recv data.id: 2022, data.name: gRPC Stream Client: ServerClientRpc
2022/01/28 15:11:43 stream.Recv data.id: 2022, data.name: gRPC Stream Client: ServerClientRpc
2022/01/28 15:11:43 stream.Recv data.id: 2022, data.name: gRPC Stream Client: ServerClientRpc
2022/01/28 15:11:43 stream.Recv data.id: 2022, data.name: gRPC Stream Client: ServerClientRpc
2022/01/28 15:11:43 stream.Recv data.id: 2022, data.name: gRPC Stream Client: ServerClientRpc
2022/01/28 15:11:43 stream.Recv data.id: 2022, data.name: gRPC Stream Client: ServerClientRpc
$ go run client.go
2022/01/28 15:11:43 resp: data.id: 2022, data.name: gRPC Stream Client: ServerRpc
2022/01/28 15:11:43 resp: data.id: 2023, data.name: gRPC Stream Client: ServerRpc
2022/01/28 15:11:43 resp: data.id: 2024, data.name: gRPC Stream Client: ServerRpc
2022/01/28 15:11:43 resp: data.id: 2025, data.name: gRPC Stream Client: ServerRpc
2022/01/28 15:11:43 resp: data.id: 2026, data.name: gRPC Stream Client: ServerRpc
2022/01/28 15:11:43 resp: data.id: 2027, data.name: gRPC Stream Client: ServerRpc
2022/01/28 15:11:43 resp: data.id: 2028, data.name: gRPC Stream Client: ServerRpc
2022/01/28 15:11:43 resp: data.id: 1, data.name: gRPC Stream Server: ClientRpc
2022/01/28 15:11:43 resp: data.id: 0, data.name: gPRC Stream Client: ServerClientRpc
2022/01/28 15:11:43 resp: data.id: 1, data.name: gPRC Stream Client: ServerClientRpc
2022/01/28 15:11:43 resp: data.id: 2, data.name: gPRC Stream Client: ServerClientRpc
2022/01/28 15:11:43 resp: data.id: 3, data.name: gPRC Stream Client: ServerClientRpc
2022/01/28 15:11:43 resp: data.id: 4, data.name: gPRC Stream Client: ServerClientRpc
2022/01/28 15:11:43 resp: data.id: 5, data.name: gPRC Stream Client: ServerClientRpc
2022/01/28 15:11:43 resp: data.id: 6, data.name: gPRC Stream Client: ServerClientRpc