概念
grpc 是一款高性能,开源和通用的 rpc 框架,支持 C++ 、Java 、Go 、Python、Node.js、PHP等主流编程语言。grpc使用的是HTTP/2.0协议。
数据流模式
简单模式
ProtoBuf
// 定义proto版本
syntax = "proto3";
// 为go制定生成文件位置: 目录/包名
option go_package="api/go;userPb";
// 定义包名
package user;
// 定于用户对象
message User{
string name = 1; // 第一个参数为名称
string avatar = 2; // 第二个参数为头像
int32 age = 3; // 第三个参数为性别
}
// 定义用户实体对象
message UserEntity{
int32 id = 1; // 第一个参数为用户id
User user = 2; // 第二个参数为用户对象
}
// 定义用户请求对象
message GetUserRequest {
int32 id = 1; // 第一个请求参数为用户id
}
// 定义返回对象
message GetUserResponse {
UserEntity user_entity = 1; // 第一个返回参数为用户实体
}
// 第一用户服务
service Service {
rpc GetUser(GetUserRequest) returns (GetUserResponse);
}
生成 go 语言的 proto 文件
protoc -I=./proto user.proto --go_out api/go --go_opt paths=source_relative --go-grpc_out api/go --go-grpc_opt=paths=source_relative user.proto
服务端
package main
import (
"context"
"google.golang.org/grpc"
userPb "grpc-demo/grpc-proto/api/go"
"net"
)
type Server struct {
userPb.UnimplementedServiceServer
}
func (s *Server) GetUser(ctx context.Context, request *userPb.GetUserRequest) (*userPb.GetUserResponse, error) {
res := &userPb.GetUserResponse{
UserEntity: &userPb.UserEntity{
Id: request.Id,
User: &userPb.User{
Name: "user_" + string(request.Id),
Age: 13,
Avatar: "https://blog.qvbilam.xin",
},
},
}
return res, nil
}
func main() {
// 声明新的grpc服务
GRPCServer := grpc.NewServer()
// 注册用户服务
userPb.RegisterServiceServer(GRPCServer, &Server{})
// 监听端口
lis, err := net.Listen("tcp", "0.0.0.0:9101")
if err != nil {
panic(any(err))
}
// 启动服务
err = GRPCServer.Serve(lis)
if err != nil {
panic(any(err))
}
}
启动服务
# 监听9101端口
go run server/server.go
客户端
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
userPb "grpc-demo/grpc-proto/api/go"
"math/rand"
)
func main() {
// 建立grpc连接
conn, err := grpc.Dial("127.0.0.1:9101", grpc.WithInsecure())
if err != nil {
panic(any(err))
}
// 函数执行完后关闭连接
defer func(connect *grpc.ClientConn) {
_ = connect.Close()
}(conn)
// 实例化客户端
client := userPb.NewServiceClient(conn)
// 调用函数
userEntity, err := client.GetUser(context.Background(), &userPb.GetUserRequest{
Id: int32(rand.Intn(100)),
})
if err != nil {
panic(any(err))
}
// 打印结果
fmt.Println(userEntity)
}
请求服务
go run client/client.go
# 输出
user_entity:{id:81 user:{name:"user_Q" avatar:"https://blog.qvbilam.xin" age:13}}
数据流模式
目录
demo # demo目录, 生成命令执行位置
├─ api # proto生成目录
│ ├─ go
│ ├─ car.pb.go # 参数结构
│ └─ car_grpc.proto # GRPC服务定义
│
├─ client # 客户端目录
│ ├─ chat_client.go # 聊天客户端(双向流模式
│ ├─ get_client.go # 获取位置(服务端流模式
│ └─ push_grpc.proto # 上报位置(客户端流模式
│
├─ proto # proto服务定义目录
│ ├─ car.proto # 定义用户服务proto
│
├─ server # 服务目录
│ ├─ server.go # 打车grpc服务
概念
服务端的数据流模式,类似于订阅,服务端源源不断的向客户端推送结果;客户端的数据流模式,类似于上报,客户度远远不断的向服务端发送数据流,结束后,服务端返回结果;
双端的数据流模式,类似于websocket聊天,服务端和客户端都可以向对方发送数据流,达到实时效果。
ProtoBuf
syntax = "proto3";
option go_package = "api/go;carPb";
message data {
string message = 1;
}
message Location {
double latitude = 1;
double longitude = 2;
}
message DriverPosition {
int32 id = 1;
Location location = 2;
int32 time = 3;
}
message UserPosition {
int32 id = 1;
Location location = 2;
int32 time = 3;
}
message ChatContent {
int32 user_id = 1;
string content = 2;
int32 time =3;
}
service Service {
// 用户上传位置: 通过请求类型为 stream 定义客户端数据流模式
rpc UserPushPosition (stream UserPosition) returns (data);
// 获取司机位置: 通过返回类型为 stream 定义服务端数据流模式
rpc GetDriverPosition (data) returns (stream DriverPosition);
// 用户司机聊天: 定义双向流模式
rpc Chat (stream ChatContent) returns (stream ChatContent);
}
生成 go 语言的 proto 文件
protoc -I=./proto car.proto --go_out api/go --go_opt paths=source_relative --go-grpc_out api/go --go-grpc_opt=paths=source_relative car.proto
服务端流模式
服务端
package main
import (
"fmt"
"google.golang.org/grpc"
carPb "grpc-demo/grpc-stream/api/go"
"net"
"sync"
"time"
)
type server struct {
carPb.UnimplementedServiceServer
}
func (s *server) UserPushPosition(positionServer carPb.Service_UserPushPositionServer) error {
return nil
}
// GetDriverPosition 服务端向客户端发送司机位置
func (s *server) GetDriverPosition(data *carPb.Data, positionServer carPb.Service_GetDriverPositionServer) error {
long := 120.21201
lat := 30.2084
for i := 0; i <= 5; i++ {
_ = positionServer.Send(&carPb.DriverPosition{
Id: 1,
Location: &carPb.Location{
Longitude: long + float64(i),
Latitude: lat + float64(i),
},
Time: int32(time.Now().Unix()),
})
time.Sleep(200 * time.Millisecond)
}
return nil
}
func (s *server) Chat(charServer carPb.Service_ChatServer) error {
return nil
}
func main() {
// 注册grpc服务
grpcServer := grpc.NewServer()
lis, err := net.Listen("tcp", "0.0.0.0:9102")
if err != nil {
panic(any(err))
}
carPb.RegisterServiceServer(grpcServer, &server{})
// 启动服务
err = grpcServer.Serve(lis)
if err != nil {
panic(any(err))
}
}
客户端
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
carPb "grpc-demo/grpc-stream/api/go"
)
func main() {
// 连接服务端
conn, err := grpc.Dial("127.0.0.1:9102", grpc.WithInsecure())
if err != nil {
panic(any(err))
}
// 结束后关闭连接
defer func(conn *grpc.ClientConn) {
_ = conn.Close()
}(conn)
// 客户端建立连接
client := carPb.NewServiceClient(conn)
// 调用获取司机位置
res, err := client.GetDriverPosition(context.Background(), &carPb.Data{
Message: "user get driver position",
})
if err != nil {
panic(any(err))
}
// 接受消息
for {
msg, err := res.Recv()
if err != nil {
break
}
fmt.Println(msg)
}
fmt.Println("END")
}
测试
启动服务
go run server/main.go
客户端调用
go run client/get_client.go
# 输出
id:1 location:{latitude:30.2084 longitude:120.21201} time:1654522574
id:1 location:{latitude:31.2084 longitude:121.21201} time:1654522575
id:1 location:{latitude:32.2084 longitude:122.21201} time:1654522575
id:1 location:{latitude:33.2084 longitude:123.21201} time:1654522575
id:1 location:{latitude:34.2084 longitude:124.21201} time:1654522575
id:1 location:{latitude:35.2084 longitude:125.21201} time:1654522575
END
客户端流模式
服务端
package main
import (
"fmt"
"google.golang.org/grpc"
carPb "grpc-demo/grpc-stream/api/go"
"net"
"sync"
"time"
)
type server struct {
carPb.UnimplementedServiceServer
}
// UserPushPosition 接受客户端推送的位置
func (s *server) UserPushPosition(positionServer carPb.Service_UserPushPositionServer) error {
for {
msg, err := positionServer.Recv()
if err != nil { // 结束推送
fmt.Println("END")
break
}
fmt.Println(msg)
}
return nil
}
// GetDriverPosition 服务端向客户端发送司机位置
func (s *server) GetDriverPosition(data *carPb.Data, positionServer carPb.Service_GetDriverPositionServer) error {
long := 120.21201
lat := 30.2084
for i := 0; i <= 5; i++ {
_ = positionServer.Send(&carPb.DriverPosition{
Id: 1,
Location: &carPb.Location{
Longitude: long + float64(i),
Latitude: lat + float64(i),
},
Time: int32(time.Now().Unix()),
})
time.Sleep(200 * time.Millisecond)
}
return nil
}
func (s *server) Chat(charServer carPb.Service_ChatServer) error {
return nil
}
func main() {
// 注册grpc服务
grpcServer := grpc.NewServer()
lis, err := net.Listen("tcp", "0.0.0.0:9102")
if err != nil {
panic(any(err))
}
carPb.RegisterServiceServer(grpcServer, &server{})
// 启动服务
err = grpcServer.Serve(lis)
if err != nil {
panic(any(err))
}
}
客户端
package main
import (
"context"
"google.golang.org/grpc"
carPb "grpc-demo/grpc-stream/api/go"
"time"
)
func main() {
conn, err := grpc.Dial("127.0.0.1:9102", grpc.WithInsecure())
if err != nil {
panic(any(err))
}
client := carPb.NewServiceClient(conn)
// 获取服务
pushSer, err := client.UserPushPosition(context.Background())
if err != nil {
panic(any(err))
}
// 用户初始位置
long := 115.21201
lat := 32.2084
for i := 0; i < 5; i++ {
long += float64(i)
lat += float64(i)
err := pushSer.Send(&carPb.UserPosition{
Id: 1,
Location: &carPb.Location{
Latitude: long,
Longitude: lat,
},
Time: int32(time.Now().Unix()),
})
if err != nil {
panic(any(err))
}
// 用户移动速度
time.Sleep(2 * time.Second)
}
}
测试
启动服务
go run server/main.go
# 调用后服务端输出
id:1 location:{latitude:115.21201 longitude:32.2084} time:1654522776
id:1 location:{latitude:116.21201 longitude:33.2084} time:1654522778
id:1 location:{latitude:118.21201 longitude:35.2084} time:1654522780
id:1 location:{latitude:121.21201 longitude:38.2084} time:1654522782
id:1 location:{latitude:125.21201 longitude:42.2084} time:1654522784
END
客户端调用
go run client/push_client.go
双向流模式
服务端
package main
import (
"fmt"
"google.golang.org/grpc"
carPb "grpc-demo/grpc-stream/api/go"
"net"
"sync"
"time"
)
type server struct {
carPb.UnimplementedServiceServer
}
// UserPushPosition 接受客户端推送的位置
func (s *server) UserPushPosition(positionServer carPb.Service_UserPushPositionServer) error {
for {
msg, err := positionServer.Recv()
if err != nil { // 结束推送
fmt.Println("END")
break
}
fmt.Println(msg)
}
return nil
}
// GetDriverPosition 服务端向客户端发送司机位置
func (s *server) GetDriverPosition(data *carPb.Data, positionServer carPb.Service_GetDriverPositionServer) error {
long := 120.21201
lat := 30.2084
for i := 0; i <= 5; i++ {
_ = positionServer.Send(&carPb.DriverPosition{
Id: 1,
Location: &carPb.Location{
Longitude: long + float64(i),
Latitude: lat + float64(i),
},
Time: int32(time.Now().Unix()),
})
time.Sleep(200 * time.Millisecond)
}
return nil
}
func (s *server) Chat(charServer carPb.Service_ChatServer) error {
wg := sync.WaitGroup{}
wg.Add(2)
// 接受用户消息
go func() {
for {
msg, err := charServer.Recv()
if err != nil {
fmt.Println("用户退出聊天")
break
}
fmt.Printf("【接受消息】用户%d: %s\n", msg.UserId, msg.Content)
}
defer wg.Done()
}()
// 发送服务端消息
go func() {
for i := 0; i < 5; i++ {
ct := []string{"我已到达指定位置", "快上车", "戴好口罩", "系好安全带", "行程结束,快付款"}
err := charServer.Send(&carPb.ChatContent{
UserId: 1,
Content: ct[i],
Time: int32(time.Now().Unix()),
})
if err != nil {
fmt.Println("用户退出聊天")
break
}
fmt.Printf("【发送消息】%s\n", ct[i])
time.Sleep(time.Second)
}
defer wg.Done()
}()
wg.Wait()
return nil
}
func main() {
// 注册grpc服务
grpcServer := grpc.NewServer()
lis, err := net.Listen("tcp", "0.0.0.0:9102")
if err != nil {
panic(any(err))
}
carPb.RegisterServiceServer(grpcServer, &server{})
// 启动服务
err = grpcServer.Serve(lis)
if err != nil {
panic(any(err))
}
}
客户端
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
carPb "grpc-demo/grpc-stream/api/go"
"sync"
"time"
)
func main() {
wg := sync.WaitGroup{}
wg.Add(2)
conn, err := grpc.Dial("127.0.0.1:9102", grpc.WithInsecure())
if err != nil {
panic(any(err))
}
client := carPb.NewServiceClient(conn)
chat, err := client.Chat(context.Background())
if err != nil {
panic(any(err))
}
// 接受服务端消息
go func() {
for {
msg, err := chat.Recv()
if err != nil {
fmt.Println("司机退出聊天")
break
}
fmt.Printf("【接受消息】司机%d: %s\n", msg.UserId, msg.Content)
}
defer wg.Done()
}()
// 发送消息
go func() {
ct := "师傅, 你在哪?"
for i := 0; i <= 10; i++ {
err = chat.Send(&carPb.ChatContent{
UserId: 1,
Content: ct,
Time: int32(time.Now().Unix()),
})
if err != nil {
fmt.Println("司机退出聊天")
break
}
fmt.Printf("【发送消息】%s\n", ct)
time.Sleep(150 * time.Millisecond)
}
defer wg.Done()
}()
wg.Wait()
fmt.Println("结束聊天")
}
测试
# 启动服务
go run server/main.go
# 客户端调用
go run client/chat_client.go