GRPC使用

Administrator
发布于 2023-09-26 / 109 阅读 / 0 评论 / 0 点赞

GRPC使用

GRPC

gRPC(gRPC Remote Procedure Call)是由Google开发的高性能、开源的远程过程调用(RPC)框架。它是一种用于构建分布式系统的工具,允许不同的应用程序或服务之间通过网络进行通信,就像调用本地函数一样。

需求

通过编写一个简单的gRPC服务,其中包括服务器(server)和客户端(client),以监视服务器端的数据变化。

案例

Proto文件

首先需要有一个proto文件,是得我们通过文件生成grpc风格的接口文件,之后我们需要根据接口文件里的接口实现,在这里proto文件的作用是为了定义接口与返回的消息结构类型。

syntax = "proto3"; // 版本声明,使用Protocol Buffers v3版本

option go_package = "./proto"; // 希望构建的包存放的路径

package alice; // 包名

// 包含人名的一个请求消息
message AliceRequest {
  int64  id   = 1;     // 1、2、2 是固定结构用的
  string name = 2;
  string message = 3;
}

// 包含问候语的响应消息
message AliceReply {
  int64 id = 1;
  string status = 2;
}

message DeleteRequest {
  int64 id = 1;
}

service AliceService {
  rpc AddMessage (AliceRequest) returns (AliceReply);
  rpc DeleteMessage (DeleteRequest) returns (AliceReply);
  rpc WatchMessage (AliceRequest) returns (stream AliceRequest);
}

有了proto文件后,需要创建grpc接口

$ protoc --go_out=. --go-grpc_out=. watch.proto

执行完毕后,有了两个文件

$ ll ch18/proto/
total 24
-rw-r--r-- 1 54225 197121 9803  9月  5 17:30 watch.pb.go
-rw-r--r-- 1 54225 197121 7262  9月  5 17:30 watch_grpc.pb.go

服务端

我们需要根据接口编写server,server 实现的功能就是怎加数据,watch 数据,删除数据。

package main

import (
	"context"
	"fmt"
	"log"
	"net"
	"sync"
	"time"

	"google.golang.org/grpc"
	"google.golang.org/grpc/reflection"
	
	"learning/ch18/proto"
)

type server struct {
	mu sync.Mutex
	proto.UnimplementedAliceServiceServer
	m map[int64]message
}

type message struct {
	id      int64  `json:"id"`
	name    string `json:"name"`
	message string `json:"message"`
}

func (s *server) AddMessage(ctx context.Context, in *proto.AliceRequest) (*proto.AliceReply, error) {
	m := message{
		id:      in.Id,
		name:    in.Name,
		message: in.Message,
	}
	s.m[in.Id] = m
	return &proto.AliceReply{Id: in.Id, Status: "success"}, nil
}

func (s *server) DeleteMessage(ctx context.Context, in *proto.DeleteRequest) (*proto.AliceReply, error) {
	delete(s.m, in.Id)
	return &proto.AliceReply{Id: in.Id}, nil

}

func (s *server) WatchMessage(in *proto.AliceRequest, stream proto.AliceService_WatchMessageServer) error {
	for {
		select {
		case <-stream.Context().Done():
			return nil
		default:
		}

		s.mu.Lock()
		for _, v := range s.m {
			req := &proto.AliceRequest{
				Id:      v.id,
				Name:    v.name,
				Message: v.message}

			if err := stream.Send(req); err != nil {
				s.mu.Unlock()
				return err
			}
		}
		s.mu.Unlock()

		time.Sleep(1 * time.Second)
	}
}

func main() {
	lis, err := net.Listen("tcp", ":50051")
	if err != nil {
		log.Fatalf("Failed to listen: %v", err)
	}

	svc := &server{
		m: make(map[int64]message),
	}

	s := grpc.NewServer()
	proto.RegisterAliceServiceServer(s, svc)
	reflection.Register(s)

	fmt.Println("Server is running on :50051...")
	if err = s.Serve(lis); err != nil {
		log.Fatalf("Failed to serve: %v", err)
	}
}

客户端

服务端是提供服务的,客户端这边可以编写三种类型的客户端,一种是 watch 数据的变化,一种是添加数据,一种是删除数据

编写添加数据的客户端

package main

import (
	"context"
	"log"

	"google.golang.org/grpc"

	"learning/ch18/proto"
)

func main() {
	conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
	if err != nil {
		log.Fatalf("Failed to connect: %v", err)
	}
	defer conn.Close()

	client := proto.NewAliceServiceClient(conn)

	// 创建一个上下文
	ctx := context.Background()

	_, err = client.AddMessage(ctx, &proto.AliceRequest{
		Id:      1,
		Name:    "yangyangyang",
		Message: "hello,yangyangyang",
	})

	if err != nil {
		log.Fatalf("AddMessage error: %v", err)
	}
}

编写删除数据的客户端

package main

import (
	"context"
	"log"

	"google.golang.org/grpc"

	"learning/ch18/proto"
)

func main() {
	conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
	if err != nil {
		log.Fatalf("Failed to connect: %v", err)
	}
	defer conn.Close()

	client := proto.NewAliceServiceClient(conn)

	// 创建一个上下文
	ctx := context.Background()

	_, err = client.DeleteMessage(ctx, &proto.DeleteRequest{
		Id: 1,
	})

	if err != nil {
		log.Fatalf("AddMessage error: %v", err)
	}
}

编写 watch 数据的客户端

package main

import (
	"context"
	"fmt"
	"log"

	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"

	"learning/ch18/proto"
)

func main() {
	conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
	if err != nil {
		log.Fatalf("Failed to connect: %v", err)
	}
	defer conn.Close()

	client := proto.NewAliceServiceClient(conn)

	// 创建一个上下文
	ctx := context.Background()

	// 创建一个监视器
	watchStream, err := client.WatchMessage(ctx, &proto.AliceRequest{})
	if err != nil {
		log.Fatalf("WatchMessage error: %v", err)
	}

	fmt.Println("Watching for Message changes...")

	for {
		req, err := watchStream.Recv()
		if err != nil {
			if status.Code(err) == codes.Canceled {
				fmt.Println("Watch canceled.")
				break
			}
			log.Fatalf("WatchMessage receive error: %v", err)
		}

		fmt.Printf("Received Message change: %s\n", req.Message)
	}
}

执行

需要启动 server 端,提供服务

$ go run ch18/server/server.go
Server is running on :50051...

启动 client 端的 watch 服务

$ go run ch18/client/watch.go
Watching for Message changes...

启动 client 端的 add 服务,之后查看 watch 服务有没有 watch 到数据

$ go run ch18/client/add.go 

watch 数据

$ go run ch18/client/watch.go 
Watching for Message changes...
Received Message change: hello,yangyangyang
Received Message change: hello,yangyangyang
Received Message change: hello,yangyangyang