概念
gRPC 支持在客户端和服务器之间发送元数据, 元数据的生命周期就是一次 RPC 的调用。元数据可以理解为 HTTP 协议中的参数。
创建方式
Metadata
meatadata 数据类型为 map 类型, key 是 string 类型, value 是 string 类型的切片
type MD map[string][]string
可以使用 New 函数创建
md := metadata.New(map[string]string{
"name": "Gyi",
"age": "18",
})
或者设置 kv 的方式创建
md := metadata.Pairs(
"name", "Gyi",
"hobby", "Eat",
"hobby", "Drink")
新建context
md := metadata.New(map[string]string{
"name": "Gyi",
"age": "18",
})
ctx := metadata.NewOutgoingContext(context.Background(), md)
KV
也可以使用追加的方式, 将元数据追加到上下文中, 这可以在有或没有关于上下文的现有元数据的情况下使用。当没有先前的元数据时,添加元数据;当上下文中已经存在元数据时,将合并 kv
// 使用元数据创建新的上线文
ctx := metadata.AppendToOutgoingContext(ctx, "name", "Gyi", "hobby", "Eat", "hobby", "Eat")
// 追加元数据
ctx := metadata.NewOutgoingContext(ctx, "age", "18")
一元调用
发送
client.go
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
roomPb "grpc-demo/grpc-proto/api/go/room"
"math/rand"
)
func main() {
// 建立grpc连接
conn, err := grpc.Dial("127.0.0.1:9101", grpc.WithInsecure())
if err != nil {
panic(any(err))
}
// 函数执行完后关闭连接
defer func(connect *grpc.ClientConn) {
_ = connect.Close()
}(conn)
// 实例化客户端
client := roomPb.NewServiceClient(conn)
// 定义上下文
ctx := metadata.AppendToOutgoingContext(context.Background(), "name", "Gyi", "Hobby", "Eat", "Hobby", "Sleep")
// 调用函数, 使用metadata上下文
userEntity, err := client.GetRoom(ctx, &roomPb.GetRoomRequest{
Id: int32(rand.Intn(100)),
})
if err != nil {
panic(any(err))
}
// 打印结果
fmt.Println(userEntity)
}
接受
server.go
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
roomPb "grpc-demo/grpc-proto/api/go/room"
userPb "grpc-demo/grpc-proto/api/go/user"
"math/rand"
"net"
)
type Server struct {
roomPb.UnimplementedServiceServer
}
func (s Server) GetRoom(ctx context.Context, request *roomPb.GetRoomRequest) (*roomPb.GetRoomResponse, error) {
// 接受metadata
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
panic(any("获取metadata失败"))
}
// 打印metadata参数
for k, v := range md {
fmt.Printf("key: %s => value: %s\n", k, v)
}
if mName, ok := md["name"]; ok {
fmt.Printf("hello %s\n", mName)
}
if mHobby, ok := md["hobby"]; ok {
for _, hobby := range mHobby {
fmt.Printf("Hobby: %s\n", hobby)
}
}
// 忽略返回结果
return nil, nil
}
func main() {
// 声明新的grpc服务
GRPCServer := grpc.NewServer()
// 注册用户服务
roomPb.RegisterServiceServer(GRPCServer, &Server{})
// 监听端口
lis, err := net.Listen("tcp", "0.0.0.0:9101")
if err != nil {
panic(any(err))
}
// 启动服务
err = GRPCServer.Serve(lis)
if err != nil {
panic(any(err))
}
}
结果
输出结果
key: content-type => value: [application/grpc]
key: user-agent => value: [grpc-go/1.47.0]
key: name => value: [Gyi]
key: hobby => value: [Eat Sleep]
key: :authority => value: [127.0.0.1:9101]
hello [Gyi]
Hobby: Eat
Hobby: Sleep
流式调用
发送
client.go
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
carPb "grpc-demo/grpc-stream/api/go"
)
func main() {
// 连接服务端
conn, err := grpc.Dial("127.0.0.1:9102", grpc.WithInsecure())
if err != nil {
panic(any(err))
}
// 结束后关闭连接
defer func(conn *grpc.ClientConn) {
_ = conn.Close()
}(conn)
// 客户端建立连接
client := carPb.NewServiceClient(conn)
// 定义上下文
ctx := metadata.AppendToOutgoingContext(context.Background(), "name", "Yn", "Hobby", "Study")
// 调用获取司机位置. 发送metadata
res, err := client.GetDriverPosition(ctx, &carPb.Data{
Message: "user get driver position",
})
if err != nil {
panic(any(err))
}
}
接受
server.go 省略部分代码.
package main
import (
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
carPb "grpc-demo/grpc-stream/api/go"
"net"
"sync"
"time"
)
type server struct {
carPb.UnimplementedServiceServer
}
// GetDriverPosition 服务端向客户端发送司机位置
func (s *server) GetDriverPosition(data *carPb.Data, positionServer carPb.Service_GetDriverPositionServer) error {
// 接受metadata
md, ok := metadata.FromIncomingContext(positionServer.Context())
if !ok {
panic(any("获取metadata失败"))
}
// 打印metadata参数
for k, v := range md {
fmt.Printf("key: %s => value: %s\n", k, v)
}
if mName, ok := md["name"]; ok {
fmt.Printf("hello %s\n", mName)
}
if mHobby, ok := md["hobby"]; ok {
for _, hobby := range mHobby {
fmt.Printf("Hobby: %s\n", hobby)
}
}
long := 120.21201
lat := 30.2084
for i := 0; i <= 5; i++ {
_ = positionServer.Send(&carPb.DriverPosition{
Id: 1,
Location: &carPb.Location{
Longitude: long + float64(i),
Latitude: lat + float64(i),
},
Time: int32(time.Now().Unix()),
})
time.Sleep(200 * time.Millisecond)
}
return nil
}
func main() {
// 注册grpc服务
grpcServer := grpc.NewServer()
lis, err := net.Listen("tcp", "0.0.0.0:9102")
if err != nil {
panic(any(err))
}
carPb.RegisterServiceServer(grpcServer, &server{})
// 启动服务
err = grpcServer.Serve(lis)
if err != nil {
panic(any(err))
}
}
结果
输出结果
key: content-type => value: [application/grpc]
key: user-agent => value: [grpc-go/1.47.0]
key: name => value: [Yn]
key: hobby => value: [Study]
key: :authority => value: [127.0.0.1:9102]
hello [Yn]
Hobby: Study
拦截器
服务端拦截器
使用 grpc 库的 UnaryInterceptor 函数定义服务端一元服务拦截器
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
userPb "grpc-demo/grpc-other/proto"
"net"
"time"
)
type Server struct {
userPb.UnimplementedUserServiceServer
}
func (s Server) GetUser(ctx context.Context, request *userPb.GetUserRequest) (*userPb.GetUserResponse, error) {
res := &userPb.GetUserResponse{
UserEntity: &userPb.UserEntity{
Id: request.Id,
User: &userPb.User{
Name: "Gyi",
Avatar: "https://blog.qvbilam.xin",
Gender: userPb.Gender_MALE,
},
},
}
return res, nil
}
type a interface {
}
func main() {
// 服务端拦截器
interceptor := func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
start := time.Now()
fmt.Println("接受到请求")
res, err := handler(ctx, req)
if err != nil { // grpc 内置错误包
return nil, status.Error(codes.Internal, "处理请求失败")
}
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.Internal, "缺少metadata")
}
var token string
if t, ok := md["token"]; ok {
token = t[0] // 值为string切片, 只取第一个元素作为token
}
if token != "1205" {
return nil, status.Error(codes.Unauthenticated, "验证失败")
}
fmt.Printf("处理完请求用时: %s\n", time.Since(start))
return res, nil
}
// 增加grpc服务参数
opt := grpc.UnaryInterceptor(interceptor)
// 声明新的grpc服务GF
GRPCServer := grpc.NewServer(opt)
// 注册用户服务
userPb.RegisterUserServiceServer(GRPCServer, &Server{})
// 监听端口
lis, err := net.Listen("tcp", "0.0.0.0:9101")
if err != nil {
panic(any(err))
}
// 启动服务
err = GRPCServer.Serve(lis)
if err != nil {
panic(any(err))
}
}
打印结果
接受到请求
处理完请求用时: 15.741µs
客户端拦截器
使用grpc库的WithUnaryInterceptor函数定义客户端的一元服务拦截器
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
userPb "grpc-demo/grpc-other/proto"
"math/rand"
"time"
)
func main() {
// 定义客户端的拦截器
interceptor := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
// 调用开始时间
start := time.Now()
ctx = metadata.AppendToOutgoingContext(ctx, "token", "1204")
// 继续调用服务
err := invoker(ctx, method, req, reply, cc, opts...)
if err != nil {
return err
}
fmt.Printf("use time: %s\n", time.Since(start))
return nil
}
var opts []grpc.DialOption // 定义选项切片
opts = append(opts, grpc.WithInsecure())
opts = append(opts, grpc.WithUnaryInterceptor(interceptor))
// 建立grpc连接
conn, err := grpc.Dial("127.0.0.1:9101", opts...)
if err != nil {
panic(any(err))
}
// 函数执行完后关闭连接
defer func(connect *grpc.ClientConn) {
_ = connect.Close()
}(conn)
// 实例化客户端
client := userPb.NewUserServiceClient(conn)
// 定义上下文
ctx := metadata.AppendToOutgoingContext(context.Background(), "name", "Gyi", "Hobby", "Eat", "Hobby", "Sleep")
// 调用函数, 使用metadata上下文
userEntity, err := client.GetUser(ctx, &userPb.GetUserRequest{
Id: int32(rand.Intn(100)),
})
if err != nil {
panic(any(err))
}
// 打印结果
fmt.Println(userEntity)
}
打印结果
panic: rpc error: code = Unauthenticated desc = 验证失败
# 请求token改为1205
use time: 1.58855ms
user_entity:{id:81 user:{name:"Gyi" avatar:"https://blog.qvbilam.xin" gender:MALE}}
也可以使用 gRPC 封装好的 WithPerRPCCredentials 函数定义metadata
实现 WithPerRPCCredentials 请求参数类型 credentials.PerRPCCredentials 接口
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
userPb "grpc-demo/grpc-other/proto"
"math/rand"
)
type credential struct{}
// GetRequestMetadata 获取请求metadata
func (c credential) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
return map[string]string{
"token": "1205",
}, nil
}
// RequireTransportSecurity 是否使用安全传输
func (c credential) RequireTransportSecurity() bool {
return false
}
func main() {
// 定义客户端的拦截器
var opts []grpc.DialOption // 定义选项切片
opts = append(opts, grpc.WithInsecure())
opts = append(opts, grpc.WithPerRPCCredentials(credential{}))
// 建立grpc连接
conn, err := grpc.Dial("127.0.0.1:9101", opts...)
if err != nil {
panic(any(err))
}
// ... 省略请求服务部分代码
}