概念

gRPC 支持在客户端和服务器之间发送元数据, 元数据的生命周期就是一次 RPC 的调用。元数据可以理解为 HTTP 协议中的参数。

创建方式

Metadata

meatadata 数据类型为 map 类型, key 是 string 类型, value 是 string 类型的切片
type MD map[string][]string
可以使用 New 函数创建
md := metadata.New(map[string]string{
        "name": "Gyi",
        "age":  "18",
    })
或者设置 kv 的方式创建
md := metadata.Pairs(
        "name", "Gyi", 
        "hobby", "Eat",
        "hobby", "Drink")
新建context
md := metadata.New(map[string]string{
        "name": "Gyi",
        "age":  "18",
    })


ctx := metadata.NewOutgoingContext(context.Background(), md)

KV

也可以使用追加的方式, 将元数据追加到上下文中, 这可以在有或没有关于上下文的现有元数据的情况下使用。当没有先前的元数据时,添加元数据;当上下文中已经存在元数据时,将合并 kv
// 使用元数据创建新的上线文
ctx := metadata.AppendToOutgoingContext(ctx, "name", "Gyi", "hobby", "Eat", "hobby", "Eat")

// 追加元数据
ctx := metadata.NewOutgoingContext(ctx, "age", "18")

一元调用

发送

client.go
package main

import (
    "context"
    "fmt"
    "google.golang.org/grpc"
    "google.golang.org/grpc/metadata"
    roomPb "grpc-demo/grpc-proto/api/go/room"
    "math/rand"
)

func main() {
    // 建立grpc连接
    conn, err := grpc.Dial("127.0.0.1:9101", grpc.WithInsecure())
    if err != nil {
        panic(any(err))
    }
    // 函数执行完后关闭连接
    defer func(connect *grpc.ClientConn) {
        _ = connect.Close()
    }(conn)

    // 实例化客户端
    client := roomPb.NewServiceClient(conn)
    // 定义上下文
  ctx := metadata.AppendToOutgoingContext(context.Background(), "name", "Gyi", "Hobby", "Eat", "Hobby", "Sleep")
    // 调用函数, 使用metadata上下文
    userEntity, err := client.GetRoom(ctx, &roomPb.GetRoomRequest{
        Id: int32(rand.Intn(100)),
    })
    if err != nil {
        panic(any(err))
    }
    // 打印结果
    fmt.Println(userEntity)
}

接受

server.go
package main

import (
    "context"
    "fmt"
    "google.golang.org/grpc"
    "google.golang.org/grpc/metadata"
    roomPb "grpc-demo/grpc-proto/api/go/room"
    userPb "grpc-demo/grpc-proto/api/go/user"
    "math/rand"
    "net"
)

type Server struct {
    roomPb.UnimplementedServiceServer
}

func (s Server) GetRoom(ctx context.Context, request *roomPb.GetRoomRequest) (*roomPb.GetRoomResponse, error) {
    // 接受metadata
    md, ok := metadata.FromIncomingContext(ctx)
    if !ok {
        panic(any("获取metadata失败"))
    }
    // 打印metadata参数
    for k, v := range md {
        fmt.Printf("key: %s => value: %s\n", k, v)
    }
    if mName, ok := md["name"]; ok {
        fmt.Printf("hello %s\n", mName)
    }
    if mHobby, ok := md["hobby"]; ok {
        for _, hobby := range mHobby {
            fmt.Printf("Hobby: %s\n", hobby)
        }
    }
  // 忽略返回结果
    return nil, nil
}

func main() {
    // 声明新的grpc服务
    GRPCServer := grpc.NewServer()
    // 注册用户服务
    roomPb.RegisterServiceServer(GRPCServer, &Server{})
    // 监听端口
    lis, err := net.Listen("tcp", "0.0.0.0:9101")
    if err != nil {
        panic(any(err))
    }
    // 启动服务
    err = GRPCServer.Serve(lis)
    if err != nil {
        panic(any(err))
    }
}

结果

输出结果
key: content-type => value: [application/grpc]
key: user-agent => value: [grpc-go/1.47.0]
key: name => value: [Gyi]
key: hobby => value: [Eat Sleep]
key: :authority => value: [127.0.0.1:9101]
hello [Gyi]
Hobby: Eat
Hobby: Sleep

流式调用

发送

client.go
package main

import (
    "context"
    "fmt"
    "google.golang.org/grpc"
    "google.golang.org/grpc/metadata"
    carPb "grpc-demo/grpc-stream/api/go"
)

func main() {
    // 连接服务端
    conn, err := grpc.Dial("127.0.0.1:9102", grpc.WithInsecure())
    if err != nil {
        panic(any(err))
    }

    // 结束后关闭连接
    defer func(conn *grpc.ClientConn) {
        _ = conn.Close()
    }(conn)

    // 客户端建立连接
    client := carPb.NewServiceClient(conn)

    // 定义上下文
    ctx := metadata.AppendToOutgoingContext(context.Background(), "name", "Yn", "Hobby", "Study")

    // 调用获取司机位置. 发送metadata
    res, err := client.GetDriverPosition(ctx, &carPb.Data{
        Message: "user get driver position",
    })
    if err != nil {
        panic(any(err))
    }
}

接受

server.go 省略部分代码.
package main

import (
    "fmt"
    "google.golang.org/grpc"
    "google.golang.org/grpc/metadata"
    carPb "grpc-demo/grpc-stream/api/go"
    "net"
    "sync"
    "time"
)

type server struct {
    carPb.UnimplementedServiceServer
}

// GetDriverPosition 服务端向客户端发送司机位置
func (s *server) GetDriverPosition(data *carPb.Data, positionServer carPb.Service_GetDriverPositionServer) error {
    // 接受metadata
    md, ok := metadata.FromIncomingContext(positionServer.Context())
    if !ok {
        panic(any("获取metadata失败"))
    }
    // 打印metadata参数
    for k, v := range md {
        fmt.Printf("key: %s => value: %s\n", k, v)
    }
    if mName, ok := md["name"]; ok {
        fmt.Printf("hello %s\n", mName)
    }
    if mHobby, ok := md["hobby"]; ok {
        for _, hobby := range mHobby {
            fmt.Printf("Hobby: %s\n", hobby)
        }
    }

    long := 120.21201
    lat := 30.2084
    for i := 0; i <= 5; i++ {
        _ = positionServer.Send(&carPb.DriverPosition{
            Id: 1,
            Location: &carPb.Location{
                Longitude: long + float64(i),
                Latitude:  lat + float64(i),
            },
            Time: int32(time.Now().Unix()),
        })
        time.Sleep(200 * time.Millisecond)
    }
    return nil
}


func main() {
    // 注册grpc服务
    grpcServer := grpc.NewServer()
    lis, err := net.Listen("tcp", "0.0.0.0:9102")
    if err != nil {
        panic(any(err))
    }
    carPb.RegisterServiceServer(grpcServer, &server{})

    // 启动服务
    err = grpcServer.Serve(lis)
    if err != nil {
        panic(any(err))
    }
}

结果

输出结果
key: content-type => value: [application/grpc]
key: user-agent => value: [grpc-go/1.47.0]
key: name => value: [Yn]
key: hobby => value: [Study]
key: :authority => value: [127.0.0.1:9102]
hello [Yn]
Hobby: Study

拦截器

服务端拦截器

使用 grpc 库的 UnaryInterceptor 函数定义服务端一元服务拦截器
package main

import (
    "context"
    "fmt"
    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/metadata"
    "google.golang.org/grpc/status"
    userPb "grpc-demo/grpc-other/proto"
    "net"
    "time"
)

type Server struct {
    userPb.UnimplementedUserServiceServer
}

func (s Server) GetUser(ctx context.Context, request *userPb.GetUserRequest) (*userPb.GetUserResponse, error) {
    res := &userPb.GetUserResponse{
        UserEntity: &userPb.UserEntity{
            Id: request.Id,
            User: &userPb.User{
                Name:   "Gyi",
                Avatar: "https://blog.qvbilam.xin",
                Gender: userPb.Gender_MALE,
            },
        },
    }

    return res, nil
}

type a interface {
}

func main() {
    // 服务端拦截器
    interceptor := func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
        start := time.Now()
        fmt.Println("接受到请求")
        res, err := handler(ctx, req)
        if err != nil { // grpc 内置错误包
            return nil, status.Error(codes.Internal, "处理请求失败")
        }
        md, ok := metadata.FromIncomingContext(ctx)
        if !ok {
            return nil, status.Error(codes.Internal, "缺少metadata")
        }
        var token string
        if t, ok := md["token"]; ok {
            token = t[0] // 值为string切片, 只取第一个元素作为token
        }

        if token != "1205" {
            return nil, status.Error(codes.Unauthenticated, "验证失败")
        }

        fmt.Printf("处理完请求用时: %s\n", time.Since(start))
        return res, nil
    }
    // 增加grpc服务参数
    opt := grpc.UnaryInterceptor(interceptor)
    // 声明新的grpc服务GF
    GRPCServer := grpc.NewServer(opt)
    // 注册用户服务
    userPb.RegisterUserServiceServer(GRPCServer, &Server{})
    // 监听端口
    lis, err := net.Listen("tcp", "0.0.0.0:9101")
    if err != nil {
        panic(any(err))
    }
    // 启动服务
    err = GRPCServer.Serve(lis)
    if err != nil {
        panic(any(err))
    }
}
打印结果
接受到请求
处理完请求用时: 15.741µs

客户端拦截器

使用grpc库的WithUnaryInterceptor函数定义客户端的一元服务拦截器
package main

import (
    "context"
    "fmt"
    "google.golang.org/grpc"
    "google.golang.org/grpc/metadata"
    userPb "grpc-demo/grpc-other/proto"
    "math/rand"
    "time"
)

func main() {
    // 定义客户端的拦截器
    interceptor := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
        // 调用开始时间
        start := time.Now()
        ctx = metadata.AppendToOutgoingContext(ctx, "token", "1204")
        // 继续调用服务
        err := invoker(ctx, method, req, reply, cc, opts...)
        if err != nil {
            return err
        }
        fmt.Printf("use time: %s\n", time.Since(start))
        return nil
    }
    var opts []grpc.DialOption // 定义选项切片
    opts = append(opts, grpc.WithInsecure())
    opts = append(opts, grpc.WithUnaryInterceptor(interceptor))
    // 建立grpc连接
    conn, err := grpc.Dial("127.0.0.1:9101", opts...)
    if err != nil {
        panic(any(err))
    }
    // 函数执行完后关闭连接
    defer func(connect *grpc.ClientConn) {
        _ = connect.Close()
    }(conn)

    // 实例化客户端
    client := userPb.NewUserServiceClient(conn)
    // 定义上下文
    ctx := metadata.AppendToOutgoingContext(context.Background(), "name", "Gyi", "Hobby", "Eat", "Hobby", "Sleep")
    // 调用函数, 使用metadata上下文
    userEntity, err := client.GetUser(ctx, &userPb.GetUserRequest{
        Id: int32(rand.Intn(100)),
    })
    if err != nil {
        panic(any(err))
    }
    // 打印结果
    fmt.Println(userEntity)
}
打印结果
panic: rpc error: code = Unauthenticated desc = 验证失败

# 请求token改为1205
use time: 1.58855ms
user_entity:{id:81  user:{name:"Gyi"  avatar:"https://blog.qvbilam.xin"  gender:MALE}}

也可以使用 gRPC 封装好的 WithPerRPCCredentials 函数定义metadata

实现 WithPerRPCCredentials 请求参数类型 credentials.PerRPCCredentials 接口
package main

import (
    "context"
    "fmt"
    "google.golang.org/grpc"
    "google.golang.org/grpc/metadata"
    userPb "grpc-demo/grpc-other/proto"
    "math/rand"
)

type credential struct{}

// GetRequestMetadata 获取请求metadata
func (c credential) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
    return map[string]string{
        "token": "1205",
    }, nil
}

// RequireTransportSecurity 是否使用安全传输
func (c credential) RequireTransportSecurity() bool {
    return false
}

func main() {
    // 定义客户端的拦截器
    var opts []grpc.DialOption // 定义选项切片
    opts = append(opts, grpc.WithInsecure())
    opts = append(opts, grpc.WithPerRPCCredentials(credential{}))
    // 建立grpc连接
    conn, err := grpc.Dial("127.0.0.1:9101", opts...)
    if err != nil {
        panic(any(err))
    }
    // ... 省略请求服务部分代码
}
Last modification:June 16th, 2022 at 10:34 pm