前言
gRPC的流式分为三种类型,本文只是实现双向流的简单示例,更多了解可以访问gRPC官网,
- Server-side streaming RPC:服务器端流式 RPC
- Client-side streaming RPC:客户端流式 RPC
- Bidirectional streaming RPC:双向流式 RPC
srteam顾名思义就是一种流,可以源源不断的推送数据,很适合传输一些大数据,或者服务端和客户端长时间数据交互,比如客户端可以向服务端订阅一个数据,服务端就可以利用stream,源源不断地推送数据。
github地址
项目目录
grpc | go.mod | go.sum | readme.md | +---client | stream.go | +---proto | counter.proto | +---server stream.go
定义protobuf
syntax = "proto3"; // 声明protobuf版本 option go_package = ".;pb"; service Counter { rpc Sum (stream NumberReq) returns (stream NumberRep) {} } message NumberReq { int64 x = 1; int64 y = 2; } message NumberRep { int64 result = 1; }
编译proto文件
protoc -I proto/ --go-grpc_out=proto/ proto/counter.proto
服务端代码
package main
import (
"fmt"
"google.golang.org/grpc"
pb "grpc/proto"
"io"
"log"
"net"
)
const Port = 8080
type server struct {
pb.CounterServer
}
func (s *server) Sum(stream pb.Counter_SumServer) error {
for {
req, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
fmt.Printf("Server Received: [X = %d, Y = %d]\n", req.X, req.Y)
resp := &pb.NumberRep{Result: req.X + req.Y}
stream.Send(resp)
fmt.Printf("Server Sended: [Result = %d]\n", resp.Result)
}
return nil
}
func main() {
fmt.Printf("启动服务[Port=%d]...\n", Port)
conn, err := net.Listen("tcp", fmt.Sprintf(":%d", Port))
if err != nil {
log.Fatalf("Error: %v\n", err)
}
s := grpc.NewServer()
pb.RegisterCounterServer(s, &server{})
s.Serve(conn)
}
客户端代码
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
pb "grpc/proto"
"io"
"log"
"math/rand"
"time"
)
func init() {
rand.Seed(time.Now().Unix())
}
func main() {
conn, err := grpc.Dial(":8080", grpc.WithInsecure())
if err != nil {
log.Fatalf("failed to connect: %v\n", err)
}
defer conn.Close()
c := pb.NewCounterClient(conn)
r, err := c.Sum(context.Background())
for {
req := &pb.NumberReq{X: rand.Int63n(100), Y: rand.Int63n(100)}
err := r.Send(req)
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("Error: %v\n", err)
}
fmt.Printf("Client Sended: [X = %d, Y = %d]\n", req.X, req.Y)
resp, err := r.Recv()
if err != nil {
log.Fatalf("Error: %v\n", err)
}
fmt.Printf("Client Received: [Result = %d]\n", resp.Result)
time.Sleep(time.Second)
}
}
Protoc版本
$ protoc --version
libprotoc 3.14.0