GRPC使用
GRPC
gRPC(gRPC Remote Procedure Call)是由Google开发的高性能、开源的远程过程调用(RPC)框架。它是一种用于构建分布式系统的工具,允许不同的应用程序或服务之间通过网络进行通信,就像调用本地函数一样。
需求
通过编写一个简单的gRPC服务,其中包括服务器(server)和客户端(client),以监视服务器端的数据变化。
案例
Proto文件
首先需要有一个proto
文件,是得我们通过文件生成grpc
风格的接口文件,之后我们需要根据接口文件里的接口实现,在这里proto
文件的作用是为了定义接口与返回的消息结构类型。
syntax = "proto3"; // 版本声明,使用Protocol Buffers v3版本
option go_package = "./proto"; // 希望构建的包存放的路径
package alice; // 包名
// 包含人名的一个请求消息
message AliceRequest {
int64 id = 1; // 1、2、2 是固定结构用的
string name = 2;
string message = 3;
}
// 包含问候语的响应消息
message AliceReply {
int64 id = 1;
string status = 2;
}
message DeleteRequest {
int64 id = 1;
}
service AliceService {
rpc AddMessage (AliceRequest) returns (AliceReply);
rpc DeleteMessage (DeleteRequest) returns (AliceReply);
rpc WatchMessage (AliceRequest) returns (stream AliceRequest);
}
有了proto
文件后,需要创建grpc
接口
$ protoc --go_out=. --go-grpc_out=. watch.proto
执行完毕后,有了两个文件
$ ll ch18/proto/
total 24
-rw-r--r-- 1 54225 197121 9803 9月 5 17:30 watch.pb.go
-rw-r--r-- 1 54225 197121 7262 9月 5 17:30 watch_grpc.pb.go
服务端
我们需要根据接口编写server
,server 实现的功能就是怎加数据,watch 数据,删除数据。
package main
import (
"context"
"fmt"
"log"
"net"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
"learning/ch18/proto"
)
type server struct {
mu sync.Mutex
proto.UnimplementedAliceServiceServer
m map[int64]message
}
type message struct {
id int64 `json:"id"`
name string `json:"name"`
message string `json:"message"`
}
func (s *server) AddMessage(ctx context.Context, in *proto.AliceRequest) (*proto.AliceReply, error) {
m := message{
id: in.Id,
name: in.Name,
message: in.Message,
}
s.m[in.Id] = m
return &proto.AliceReply{Id: in.Id, Status: "success"}, nil
}
func (s *server) DeleteMessage(ctx context.Context, in *proto.DeleteRequest) (*proto.AliceReply, error) {
delete(s.m, in.Id)
return &proto.AliceReply{Id: in.Id}, nil
}
func (s *server) WatchMessage(in *proto.AliceRequest, stream proto.AliceService_WatchMessageServer) error {
for {
select {
case <-stream.Context().Done():
return nil
default:
}
s.mu.Lock()
for _, v := range s.m {
req := &proto.AliceRequest{
Id: v.id,
Name: v.name,
Message: v.message}
if err := stream.Send(req); err != nil {
s.mu.Unlock()
return err
}
}
s.mu.Unlock()
time.Sleep(1 * time.Second)
}
}
func main() {
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("Failed to listen: %v", err)
}
svc := &server{
m: make(map[int64]message),
}
s := grpc.NewServer()
proto.RegisterAliceServiceServer(s, svc)
reflection.Register(s)
fmt.Println("Server is running on :50051...")
if err = s.Serve(lis); err != nil {
log.Fatalf("Failed to serve: %v", err)
}
}
客户端
服务端是提供服务的,客户端这边可以编写三种类型的客户端,一种是 watch 数据的变化,一种是添加数据,一种是删除数据
编写添加数据的客户端
package main
import (
"context"
"log"
"google.golang.org/grpc"
"learning/ch18/proto"
)
func main() {
conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()
client := proto.NewAliceServiceClient(conn)
// 创建一个上下文
ctx := context.Background()
_, err = client.AddMessage(ctx, &proto.AliceRequest{
Id: 1,
Name: "yangyangyang",
Message: "hello,yangyangyang",
})
if err != nil {
log.Fatalf("AddMessage error: %v", err)
}
}
编写删除数据的客户端
package main
import (
"context"
"log"
"google.golang.org/grpc"
"learning/ch18/proto"
)
func main() {
conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()
client := proto.NewAliceServiceClient(conn)
// 创建一个上下文
ctx := context.Background()
_, err = client.DeleteMessage(ctx, &proto.DeleteRequest{
Id: 1,
})
if err != nil {
log.Fatalf("AddMessage error: %v", err)
}
}
编写 watch 数据的客户端
package main
import (
"context"
"fmt"
"log"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"learning/ch18/proto"
)
func main() {
conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()
client := proto.NewAliceServiceClient(conn)
// 创建一个上下文
ctx := context.Background()
// 创建一个监视器
watchStream, err := client.WatchMessage(ctx, &proto.AliceRequest{})
if err != nil {
log.Fatalf("WatchMessage error: %v", err)
}
fmt.Println("Watching for Message changes...")
for {
req, err := watchStream.Recv()
if err != nil {
if status.Code(err) == codes.Canceled {
fmt.Println("Watch canceled.")
break
}
log.Fatalf("WatchMessage receive error: %v", err)
}
fmt.Printf("Received Message change: %s\n", req.Message)
}
}
执行
需要启动 server 端,提供服务
$ go run ch18/server/server.go
Server is running on :50051...
启动 client 端的 watch 服务
$ go run ch18/client/watch.go
Watching for Message changes...
启动 client 端的 add 服务,之后查看 watch 服务有没有 watch 到数据
$ go run ch18/client/add.go
watch 数据
$ go run ch18/client/watch.go
Watching for Message changes...
Received Message change: hello,yangyangyang
Received Message change: hello,yangyangyang
Received Message change: hello,yangyangyang