在从零开始搭建 gRPC 服务 – Golang 篇(一)中介绍了如何搭建 gRPC
环境并构建一个简单的 gRPC
服务,本文将介绍 gRPC
的 streaming
。
流式 RPC
gRPC
基于标准的 HTTP/2 进行传输,可以方便的实现 streaming
功能。要在 gRPC
中使用 streaming
,只需要在 proto
中在请求或响应前加上 stream
即可。
服务端流式 RPC:Server-side streaming RPC
客户端向服务器发送请求并获取流以读取消息序列;客户端从返回的流中读取,直到没有更多消息; gRPC 保证单个 RPC 调用中的消息排序。
在从零开始搭建 gRPC 服务 – Golang 篇(一)中的 helloworld.proto
中增加接口 LotsOfReplies
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
syntax = "proto3"; option go_package = "github.com/grpc/example/helloworld"; package helloworld; // The greeting service definition. service Greeter { rpc LotsOfReplies (HelloRequest) returns (stream HelloReply){} } // The request message containing the user's name. message HelloRequest { string name = 1; } // The response message containing the greetings message HelloReply { string message = 1; } |
编译 .proto
文件
1 2 3 4 5 6 7 8 9 10 11 12 13 |
$ protoc helloworld.proto --go_out=output $ tree . . ├── helloworld.proto └── output └── github.com └── grpc └── example └── helloworld └── helloworld.pb.go 5 directories, 2 files |
此时生成的代码就已经包含了流的处理,在使用上需要注意:服务器端代码的实现要通过流的方式发送响应。
编写 server.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
package main import ( "context" "fmt" "log" "net" "google.golang.org/grpc" pb "./output/github.com/grpc/example/helloworld" "google.golang.org/grpc/reflection" ) const ( port = ":50051" ) // server is used to implement helloworld.GreeterServer. type server struct{} func (s *server) LotsOfReplies(in *pb.HelloRequest, stream pb.Greeter_LotsOfRepliesServer) error { for idx := 0; idx < 10; idx ++ { stream.Send(&pb.HelloReply{Message: fmt.Sprintf("Hello %s %d", in.Name, idx)}) } return nil } func main() { lis, err := net.Listen("tcp", port) if err != nil { log.Fatalf("failed to listen: %v", err) } s := grpc.NewServer() pb.RegisterGreeterServer(s, &server{}) // Register reflection service on gRPC server. reflection.Register(s) if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } } |
如上代码所示,服务端在接收到请求后通过 stream 返回了 10 个响应。
编写 client.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
package main import ( "context" "io" "log" "os" "time" "google.golang.org/grpc" pb "./output/github.com/grpc/example/helloworld" ) const ( address = "localhost:50051" defaultName = "world" ) func main() { // Set up a connection to the server. conn, err := grpc.Dial(address, grpc.WithInsecure()) if err != nil { log.Fatalf("did not connect: %v", err) } defer conn.Close() c := pb.NewGreeterClient(conn) // Contact the server and print out its response. name := defaultName if len(os.Args) > 1 { name = os.Args[1] } ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() stream, err := c.LotsOfReplies(ctx, &pb.HelloRequest{Name: name}) if err != nil { log.Fatalf("could not greet: %v", err) } for { reply, err := stream.Recv() if err == io.EOF { break } if err != nil { log.Fatalf("%v.LotsOfReplies() = _, %v", c, err) } log.Printf("Greeting: %s\n", reply.Message) } } |
客户端从 stream 中读取到若干响应,直到读到 EOF 结束。
运行 gRPC
服务
打开两个会话窗口,在其中之一执行:
1 2 |
$ go run server.go |
在另一个会话窗口运行:
1 2 3 4 5 6 7 8 9 10 11 12 |
$ go run client.go gRPC_stream 2018/12/23 21:31:38 Greeting: Hello gRPC_stream 0 2018/12/23 21:31:38 Greeting: Hello gRPC_stream 1 2018/12/23 21:31:38 Greeting: Hello gRPC_stream 2 2018/12/23 21:31:38 Greeting: Hello gRPC_stream 3 2018/12/23 21:31:38 Greeting: Hello gRPC_stream 4 2018/12/23 21:31:38 Greeting: Hello gRPC_stream 5 2018/12/23 21:31:38 Greeting: Hello gRPC_stream 6 2018/12/23 21:31:38 Greeting: Hello gRPC_stream 7 2018/12/23 21:31:38 Greeting: Hello gRPC_stream 8 2018/12/23 21:31:38 Greeting: Hello gRPC_stream 9 |
客户端流式 RPC:Client-side streaming RPC
客户端再次使用提供的流写入一系列消息并将其发送到服务器;一旦客户端写完消息,它就等待服务器读取它们并返回它的响应; gRPC 保证在单个 RPC 调用中的消息排序。
改写 helloworld.proto
,增加 LotsOfGreetings
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
syntax = "proto3"; option go_package = "github.com/grpc/example/helloworld"; package helloworld; // The greeting service definition. service Greeter { rpc LotsOfGreetings (stream HelloRequest) returns (HelloReply) {} } // The request message containing the user's name. message HelloRequest { string name = 1; int32 index = 2; } // The response message containing the greetings message HelloReply { string message = 1; } |
编写 server.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
package main import ( "context" "fmt" "io" "log" "net" "google.golang.org/grpc" pb "./output/github.com/grpc/example/helloworld" "google.golang.org/grpc/reflection" ) const ( port = ":50051" ) // server is used to implement helloworld.GreeterServer. type server struct{} func (s *server) LotsOfGreetings(stream pb.Greeter_LotsOfGreetingsServer) error { var total int32 var name string for { greeting, err := stream.Recv() if err == io.EOF { return stream.SendAndClose(&pb.HelloReply{ Message: fmt.Sprintf("Hello %s, total %d", name, total), }) } if err != nil { return err } name = greeting.Name total += greeting.Index } return nil } func main() { lis, err := net.Listen("tcp", port) if err != nil { log.Fatalf("failed to listen: %v", err) } s := grpc.NewServer() pb.RegisterGreeterServer(s, &server{}) // Register reflection service on gRPC server. reflection.Register(s) if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } } |
服务端通过 stream 接收到若干请求,直到读到 EOF 后再返回响应。
编写 client.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
package main import ( "context" "log" "os" "time" "google.golang.org/grpc" pb "./output/github.com/grpc/example/helloworld" ) const ( address = "localhost:50051" defaultName = "world" ) func main() { // Set up a connection to the server. conn, err := grpc.Dial(address, grpc.WithInsecure()) if err != nil { log.Fatalf("did not connect: %v", err) } defer conn.Close() c := pb.NewGreeterClient(conn) // Contact the server and print out its response. name := defaultName if len(os.Args) > 1 { name = os.Args[1] } ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() stream, err := c.LotsOfGreetings(ctx) if err != nil { log.Fatalf("could not greet: %v", err) } for idx := 0; idx < 10; idx ++ { if err := stream.Send(&pb.HelloRequest{ Name: name, Index: int32(idx), }); err != nil { log.Fatalf("send err: %v", err) } } reply, err := stream.CloseAndRecv() if err != nil { log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil) } log.Printf("Greeting: %s\n", reply.Message) } |
客户端通过 stream 发起 10 个请求,然后关闭 stream 并接收响应。
运行 gRPC
服务
打开两个会话窗口,在其中之一执行:
1 2 |
$ go run server.go |
在另一个会话窗口运行:
1 2 3 |
$ go run client.go gRPC_stream 2018/12/23 22:06:43 Greeting: Hello gRPC_stream, total 45 |
双向流式 RPC:Bidirectional streaming RPC
双方使用读写流发送一系列消息,这两个流独立运行,因此客户端和服务器可以按照他们喜欢的顺序进行读写:例如服务器可以在写入响应之前等待接收所有客户端消息,或者它可以交替读取消息然后写入消息,或其他一些读写组合;gRPC 保证在单个 RPC 调用中的消息排序。
改写 helloworld.proto
,增加 BidiHello
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
syntax = "proto3"; option go_package = "github.com/grpc/example/helloworld"; package helloworld; // The greeting service definition. service Greeter { rpc BidiHello(stream HelloRequest) returns (stream HelloReply) {} } // The request message containing the user's name. message HelloRequest { string name = 1; int32 index = 2; } // The response message containing the greetings message HelloReply { string message = 1; } |
编写 server.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
package main import ( "context" "fmt" "io" "log" "net" "strings" "google.golang.org/grpc" pb "./output/github.com/grpc/example/helloworld" "google.golang.org/grpc/reflection" ) const ( port = ":50051" ) // server is used to implement helloworld.GreeterServer. type server struct{} func (s *server) BidiHello(stream pb.Greeter_BidiHelloServer) error { for { in, err := stream.Recv() if err == io.EOF { return nil } if err != nil { return err } message := strings.Replace(in.Name, "吗", "", -1) message = strings.Replace(message, "?", "!", -1) err = stream.Send(&pb.HelloReply{Message: message}) if err != nil { return err } } return nil } func main() { lis, err := net.Listen("tcp", port) if err != nil { log.Fatalf("failed to listen: %v", err) } s := grpc.NewServer() pb.RegisterGreeterServer(s, &server{}) // Register reflection service on gRPC server. reflection.Register(s) if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } } |
服务端从 stream 中读取到请求后立即返回。
编写 client.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
package main import ( "context" "fmt" "log" "io" "time" "google.golang.org/grpc" pb "./output/github.com/grpc/example/helloworld" ) const ( address = "localhost:50051" ) func main() { // Set up a connection to the server. conn, err := grpc.Dial(address, grpc.WithInsecure()) if err != nil { log.Fatalf("did not connect: %v", err) } defer conn.Close() c := pb.NewGreeterClient(conn) ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() stream, err := c.BidiHello(ctx) if err != nil { log.Fatalf("%v.BidiHello(_) = _, %v", c, err) } waitc := make(chan struct{}) go func() { for { in, err := stream.Recv() if err == io.EOF { // read done. close(waitc) return } if err != nil { log.Fatalf("Failed to receive a note : %v", err) } fmt.Printf("AI: %s\n", in.Message) } }() for { request := &pb.HelloRequest{} fmt.Scanln(&request.Name) if request.Name == "quit" { break } if err := stream.Send(request); err != nil { log.Fatalf("Failed to send a req: %v", err) } } stream.CloseSend() <-waitc } |
客户端从标准输出接收输入,然后通过 stream 发送请求,另一个 goroutine
则不断从 stream 中接收响应。
运行 gRPC
服务
打开两个会话窗口,在其中之一执行:
1 2 |
$ go run server.go |
在另一个会话窗口运行:
1 2 3 4 5 6 7 8 9 10 11 |
$ go run client.go 在吗? AI: 在! 你好 AI: 你好 能听懂汉语吗? AI: 能听懂汉语! 真的吗? AI: 真的! quit |
本文作者:任立翔