概念

grpc 是一款高性能,开源和通用的 rpc 框架,支持 C++ 、Java 、Go 、Python、Node.js、PHP等主流编程语言。grpc使用的是HTTP/2.0协议。

数据流模式

简单模式

ProtoBuf

// 定义proto版本
syntax = "proto3";
// 为go制定生成文件位置: 目录/包名
option go_package="api/go;userPb";

// 定义包名
package user;

// 定于用户对象
message User{
  string name = 1; // 第一个参数为名称
  string avatar = 2; // 第二个参数为头像
  int32 age = 3; // 第三个参数为性别
}

// 定义用户实体对象
message UserEntity{
  int32 id = 1; // 第一个参数为用户id
  User user = 2; // 第二个参数为用户对象
}

// 定义用户请求对象
message GetUserRequest {
  int32 id = 1; // 第一个请求参数为用户id
}

// 定义返回对象
message GetUserResponse {
  UserEntity user_entity = 1; // 第一个返回参数为用户实体
}

// 第一用户服务
service Service {
  rpc GetUser(GetUserRequest) returns (GetUserResponse);
}
生成 go 语言的 proto 文件
protoc -I=./proto user.proto --go_out api/go --go_opt paths=source_relative --go-grpc_out api/go --go-grpc_opt=paths=source_relative user.proto

服务端

package main

import (
    "context"
    "google.golang.org/grpc"
    userPb "grpc-demo/grpc-proto/api/go"
    "net"
)

type Server struct {
    userPb.UnimplementedServiceServer
}

func (s *Server) GetUser(ctx context.Context, request *userPb.GetUserRequest) (*userPb.GetUserResponse, error) {
    res := &userPb.GetUserResponse{
        UserEntity: &userPb.UserEntity{
            Id: request.Id,
            User: &userPb.User{
                Name:   "user_" + string(request.Id),
                Age:    13,
                Avatar: "https://blog.qvbilam.xin",
            },
        },
    }
    return res, nil
}

func main() {
    // 声明新的grpc服务
    GRPCServer := grpc.NewServer()
    // 注册用户服务
    userPb.RegisterServiceServer(GRPCServer, &Server{})
    // 监听端口
    lis, err := net.Listen("tcp", "0.0.0.0:9101")
    if err != nil {
        panic(any(err))
    }
    // 启动服务
    err = GRPCServer.Serve(lis)
    if err != nil {
        panic(any(err))
    }
}
启动服务
# 监听9101端口
go run server/server.go

客户端

package main

import (
    "context"
    "fmt"
    "google.golang.org/grpc"
    userPb "grpc-demo/grpc-proto/api/go"
    "math/rand"
)

func main() {
    // 建立grpc连接
    conn, err := grpc.Dial("127.0.0.1:9101", grpc.WithInsecure())
    if err != nil {
        panic(any(err))
    }
    // 函数执行完后关闭连接
    defer func(connect *grpc.ClientConn) {
        _ = connect.Close()
    }(conn)

    // 实例化客户端
    client := userPb.NewServiceClient(conn)
    // 调用函数
    userEntity, err := client.GetUser(context.Background(), &userPb.GetUserRequest{
        Id: int32(rand.Intn(100)),
    })
    if err != nil {
        panic(any(err))
    }
    // 打印结果
    fmt.Println(userEntity)
}
请求服务
go run client/client.go 

# 输出
user_entity:{id:81  user:{name:"user_Q"  avatar:"https://blog.qvbilam.xin"  age:13}}

数据流模式

目录

demo                                                # demo目录, 生成命令执行位置
├─ api                                             # proto生成目录
│  ├─ go
│     ├─ car.pb.go                   # 参数结构
│     └─ car_grpc.proto          # GRPC服务定义
│
├─ client                                     # 客户端目录
│  ├─ chat_client.go                # 聊天客户端(双向流模式
│  ├─ get_client.go                    # 获取位置(服务端流模式
│  └─ push_grpc.proto              # 上报位置(客户端流模式
│
├─ proto                                         # proto服务定义目录
│  ├─ car.proto                            # 定义用户服务proto
│
├─ server                                     # 服务目录
│  ├─ server.go                            # 打车grpc服务

概念

服务端的数据流模式,类似于订阅,服务端源源不断的向客户端推送结果;

客户端的数据流模式,类似于上报,客户度远远不断的向服务端发送数据流,结束后,服务端返回结果;

双端的数据流模式,类似于websocket聊天,服务端和客户端都可以向对方发送数据流,达到实时效果。

ProtoBuf

syntax = "proto3";
option go_package = "api/go;carPb";

message data {
  string message = 1;
}

message Location {
  double latitude = 1;
  double longitude = 2;
}

message DriverPosition {
  int32 id = 1;
  Location location = 2;
  int32  time = 3;
}

message UserPosition {
  int32 id = 1;
  Location location = 2;
  int32 time = 3;
}

message ChatContent {
  int32 user_id = 1;
  string content = 2;
  int32 time =3;
}


service Service {
  // 用户上传位置: 通过请求类型为 stream 定义客户端数据流模式
  rpc UserPushPosition (stream UserPosition) returns (data);
  // 获取司机位置: 通过返回类型为 stream 定义服务端数据流模式
  rpc GetDriverPosition (data) returns (stream DriverPosition);
  // 用户司机聊天: 定义双向流模式
  rpc Chat (stream ChatContent) returns (stream ChatContent);
}
生成 go 语言的 proto 文件
protoc -I=./proto car.proto --go_out api/go --go_opt paths=source_relative --go-grpc_out api/go --go-grpc_opt=paths=source_relative car.proto

服务端流模式

服务端
package main

import (
    "fmt"
    "google.golang.org/grpc"
    carPb "grpc-demo/grpc-stream/api/go"
    "net"
    "sync"
    "time"
)

type server struct {
    carPb.UnimplementedServiceServer
}

func (s *server) UserPushPosition(positionServer carPb.Service_UserPushPositionServer) error {
    return nil
}

// GetDriverPosition 服务端向客户端发送司机位置
func (s *server) GetDriverPosition(data *carPb.Data, positionServer carPb.Service_GetDriverPositionServer) error {
    long := 120.21201
    lat := 30.2084
    for i := 0; i <= 5; i++ {
        _ = positionServer.Send(&carPb.DriverPosition{
            Id: 1,
            Location: &carPb.Location{
                Longitude: long + float64(i),
                Latitude:  lat + float64(i),
            },
            Time: int32(time.Now().Unix()),
        })
        time.Sleep(200 * time.Millisecond)
    }
    return nil
}

func (s *server) Chat(charServer carPb.Service_ChatServer) error {
    return nil
}

func main() {
    // 注册grpc服务
    grpcServer := grpc.NewServer()
    lis, err := net.Listen("tcp", "0.0.0.0:9102")
    if err != nil {
        panic(any(err))
    }
    carPb.RegisterServiceServer(grpcServer, &server{})

    // 启动服务
    err = grpcServer.Serve(lis)
    if err != nil {
        panic(any(err))
    }
}
客户端
package main

import (
    "context"
    "fmt"
    "google.golang.org/grpc"
    carPb "grpc-demo/grpc-stream/api/go"
)

func main() {
    // 连接服务端
    conn, err := grpc.Dial("127.0.0.1:9102", grpc.WithInsecure())
    if err != nil {
        panic(any(err))
    }

    // 结束后关闭连接
    defer func(conn *grpc.ClientConn) {
        _ = conn.Close()
    }(conn)

    // 客户端建立连接
    client := carPb.NewServiceClient(conn)

    // 调用获取司机位置
    res, err := client.GetDriverPosition(context.Background(), &carPb.Data{
        Message: "user get driver position",
    })
    if err != nil {
        panic(any(err))
    }

    // 接受消息
    for {
        msg, err := res.Recv()
        if err != nil {
            break
        }
        fmt.Println(msg)
    }
    fmt.Println("END")
}
测试
启动服务
go run server/main.go
客户端调用
go run client/get_client.go

# 输出
id:1  location:{latitude:30.2084  longitude:120.21201}  time:1654522574
id:1  location:{latitude:31.2084  longitude:121.21201}  time:1654522575
id:1  location:{latitude:32.2084  longitude:122.21201}  time:1654522575
id:1  location:{latitude:33.2084  longitude:123.21201}  time:1654522575
id:1  location:{latitude:34.2084  longitude:124.21201}  time:1654522575
id:1  location:{latitude:35.2084  longitude:125.21201}  time:1654522575
END

客户端流模式

服务端
package main

import (
    "fmt"
    "google.golang.org/grpc"
    carPb "grpc-demo/grpc-stream/api/go"
    "net"
    "sync"
    "time"
)

type server struct {
    carPb.UnimplementedServiceServer
}

// UserPushPosition 接受客户端推送的位置
func (s *server) UserPushPosition(positionServer carPb.Service_UserPushPositionServer) error {
    for {
        msg, err := positionServer.Recv()
        if err != nil { // 结束推送
            fmt.Println("END")
            break
        }
        fmt.Println(msg)
    }
    return nil
}

// GetDriverPosition 服务端向客户端发送司机位置
func (s *server) GetDriverPosition(data *carPb.Data, positionServer carPb.Service_GetDriverPositionServer) error {
    long := 120.21201
    lat := 30.2084
    for i := 0; i <= 5; i++ {
        _ = positionServer.Send(&carPb.DriverPosition{
            Id: 1,
            Location: &carPb.Location{
                Longitude: long + float64(i),
                Latitude:  lat + float64(i),
            },
            Time: int32(time.Now().Unix()),
        })
        time.Sleep(200 * time.Millisecond)
    }
    return nil
}

func (s *server) Chat(charServer carPb.Service_ChatServer) error {
    return nil
}

func main() {
    // 注册grpc服务
    grpcServer := grpc.NewServer()
    lis, err := net.Listen("tcp", "0.0.0.0:9102")
    if err != nil {
        panic(any(err))
    }
    carPb.RegisterServiceServer(grpcServer, &server{})

    // 启动服务
    err = grpcServer.Serve(lis)
    if err != nil {
        panic(any(err))
    }
}
客户端
package main

import (
    "context"
    "google.golang.org/grpc"
    carPb "grpc-demo/grpc-stream/api/go"
    "time"
)

func main() {
    conn, err := grpc.Dial("127.0.0.1:9102", grpc.WithInsecure())
    if err != nil {
        panic(any(err))
    }
    client := carPb.NewServiceClient(conn)

    // 获取服务
    pushSer, err := client.UserPushPosition(context.Background())
    if err != nil {
        panic(any(err))
    }

    // 用户初始位置
    long := 115.21201
    lat := 32.2084
    for i := 0; i < 5; i++ {
        long += float64(i)
        lat += float64(i)
        err := pushSer.Send(&carPb.UserPosition{
            Id: 1,
            Location: &carPb.Location{
                Latitude:  long,
                Longitude: lat,
            },
            Time: int32(time.Now().Unix()),
        })
        if err != nil {
            panic(any(err))
        }
        // 用户移动速度
        time.Sleep(2 * time.Second)
    }
}
测试
启动服务
go run server/main.go

# 调用后服务端输出
id:1  location:{latitude:115.21201  longitude:32.2084}  time:1654522776
id:1  location:{latitude:116.21201  longitude:33.2084}  time:1654522778
id:1  location:{latitude:118.21201  longitude:35.2084}  time:1654522780
id:1  location:{latitude:121.21201  longitude:38.2084}  time:1654522782
id:1  location:{latitude:125.21201  longitude:42.2084}  time:1654522784
END
客户端调用
go run client/push_client.go

双向流模式

服务端
package main

import (
    "fmt"
    "google.golang.org/grpc"
    carPb "grpc-demo/grpc-stream/api/go"
    "net"
    "sync"
    "time"
)

type server struct {
    carPb.UnimplementedServiceServer
}

// UserPushPosition 接受客户端推送的位置
func (s *server) UserPushPosition(positionServer carPb.Service_UserPushPositionServer) error {
    for {
        msg, err := positionServer.Recv()
        if err != nil { // 结束推送
            fmt.Println("END")
            break
        }
        fmt.Println(msg)
    }
    return nil
}

// GetDriverPosition 服务端向客户端发送司机位置
func (s *server) GetDriverPosition(data *carPb.Data, positionServer carPb.Service_GetDriverPositionServer) error {
    long := 120.21201
    lat := 30.2084
    for i := 0; i <= 5; i++ {
        _ = positionServer.Send(&carPb.DriverPosition{
            Id: 1,
            Location: &carPb.Location{
                Longitude: long + float64(i),
                Latitude:  lat + float64(i),
            },
            Time: int32(time.Now().Unix()),
        })
        time.Sleep(200 * time.Millisecond)
    }
    return nil
}

func (s *server) Chat(charServer carPb.Service_ChatServer) error {
    wg := sync.WaitGroup{}
    wg.Add(2)
    // 接受用户消息
    go func() {
        for {
            msg, err := charServer.Recv()
            if err != nil {
                fmt.Println("用户退出聊天")
                break
            }
            fmt.Printf("【接受消息】用户%d: %s\n", msg.UserId, msg.Content)
        }
        defer wg.Done()
    }()

    // 发送服务端消息
    go func() {
        for i := 0; i < 5; i++ {
            ct := []string{"我已到达指定位置", "快上车", "戴好口罩", "系好安全带", "行程结束,快付款"}
            err := charServer.Send(&carPb.ChatContent{
                UserId:  1,
                Content: ct[i],
                Time:    int32(time.Now().Unix()),
            })
            if err != nil {
                fmt.Println("用户退出聊天")
                break
            }

            fmt.Printf("【发送消息】%s\n", ct[i])
            time.Sleep(time.Second)
        }
        defer wg.Done()
    }()

    wg.Wait()
    return nil
}

func main() {
    // 注册grpc服务
    grpcServer := grpc.NewServer()
    lis, err := net.Listen("tcp", "0.0.0.0:9102")
    if err != nil {
        panic(any(err))
    }
    carPb.RegisterServiceServer(grpcServer, &server{})

    // 启动服务
    err = grpcServer.Serve(lis)
    if err != nil {
        panic(any(err))
    }
}
客户端
package main

import (
    "context"
    "fmt"
    "google.golang.org/grpc"
    carPb "grpc-demo/grpc-stream/api/go"
    "sync"
    "time"
)

func main() {
    wg := sync.WaitGroup{}
    wg.Add(2)
    conn, err := grpc.Dial("127.0.0.1:9102", grpc.WithInsecure())
    if err != nil {
        panic(any(err))
    }
    client := carPb.NewServiceClient(conn)

    chat, err := client.Chat(context.Background())
    if err != nil {
        panic(any(err))
    }

    // 接受服务端消息
    go func() {
        for {
            msg, err := chat.Recv()
            if err != nil {
                fmt.Println("司机退出聊天")
                break
            }
            fmt.Printf("【接受消息】司机%d: %s\n", msg.UserId, msg.Content)
        }
        defer wg.Done()
    }()

    // 发送消息
    go func() {
        ct := "师傅, 你在哪?"
        for i := 0; i <= 10; i++ {
            err = chat.Send(&carPb.ChatContent{
                UserId:  1,
                Content: ct,
                Time:    int32(time.Now().Unix()),
            })
            if err != nil {
                fmt.Println("司机退出聊天")
                break
            }
            fmt.Printf("【发送消息】%s\n", ct)
            time.Sleep(150 * time.Millisecond)
        }
        defer wg.Done()
    }()

    wg.Wait()
    fmt.Println("结束聊天")
}
测试
# 启动服务
go run server/main.go

# 客户端调用
go run client/chat_client.go
Last modification:June 16th, 2022 at 10:34 pm