gRPC是一个高性能、通用的开源RPC框架,其由Google 2015年主要面向移动应用开发并基于HTTP/2协议标准而设计,基于ProtoBuf(Protocol Buffers)序列化协议开发,且支持众多开发语言。gRPC提供了一种简单的方法来精确地定义服务和为iOS、Android和后台支持服务自动生成可靠性很强的客户端功能库。客户端充分利用高级流和链接功能,从而有助于节省带宽、降低的TCP链接次数、节省CPU使用、电池寿命。
编码协议
gprc是基于ProtoBuf的,我们需要先了解一下ProtoBuf。
ProtoBuf(Protocol Buffers)是一种灵活,高效,自动化的机制,用于序列化结构化数据 - 类似于XML,但更小,更快,更简单,其由Google 2001年设计,2008年开源,Google内部的服务几乎都是用的PB协议,所以久经考验、充分验证。您可以定义数据的结构化结构,然后使用工具生成的源代码轻松地将结构化数据写入和读取各种数据流,并使用各种语言包括Java、Python、C++、Go、JavaNano、Ruby、C#等。您甚至可以更新数据结构,而不会破坏根据“旧”格式编译的已部署程序。
目前有很多产品使用比如:Google、Hadoop、ActiveMQ、Netty
简单使用
可以通过在.proto文件中定义ProtoBuf(Protocol Buffers)消息类型来指定您希望如何构建序列化信息。每个ProtoBuf(Protocol Buffers)消息都是一个小的逻辑信息记录,包含一系列name-value 对。以下.proto是定义包含有关人员信息的消息的文件的一个非常基本的示例:
message Person {
required string name = 1;
required int32 id = 2;
optional string email = 3;
enum PhoneType {
MOBILE = 0;
HOME = 1;
WORK = 2;
}
message PhoneNumber {
required string number = 1;
optional PhoneType type = 2 [default = HOME];
}
repeated PhoneNumber phone = 4;
}
解析
1、指定字段类型
在上面的例子中,所有字段都是标量类型:整型,string类型。当然,你也可以为字段指定其他的合成类型,包括枚举(enumerations)或其他消息类型。目前常规的语言类型都支持
2、分配标识号
正如上述文件格式,在消息定义中,每个字段都有唯一的一个数字标识符。这些标识符是用来在消息的二进制格式中识别各个字段的,一旦开始使用就不能够再改变。注:[1,15]之内的标识号在编码的时候会占用一个字节。[16,2047]之内的标识号则占用2个字节。所以应该为那些频繁出现的消息元素保留 [1,15]之内的标识号。切记:要为将来有可能添加的、频繁出现的标识号预留一些标识号。
最小的标识号可以从1开始,最大到2^29 - 1, or 536,870,911。不可以使用其中的[19000-19999]的标识号, Protobuf协议实现中对这些进行了预留。如果非要在.proto文件中使用这些预留标识号,编译时就会报警。
3、指定字段规则
所指定的消息字段修饰符必须是如下之一:
required:一个格式良好的消息一定要含有1个这种字段。表示该值是必须要设置的;
optional:消息格式中该字段可以有0个或1个值(不超过1个)。可以设置默认值,比如optional PhoneType type = 2 [default = HOME];
repeated:在一个格式良好的消息中,这种字段可以重复任意多次(包括0次)。重复的值的顺序会被保留。表示该值可以重复,相当于java中的List。
4、向.proto文件添加注释,可以使用C/C++/java风格的双斜杠(//) 语法格式,
5、消息中可以嵌套message,也可以支持枚举定义
编译
当用protocolbuffer编译器来运行.proto文件时,编译器将生成所选择语言的代码,这些代码可以操作在.proto文件中定义的消息类型,包括获取、设置字段值,将消息序列化到一个输出流中,以及从一个输入流中解析消息。
gprc的使用
创建服务端
定义服务(使用ProtoBuf )
- 1.在一个后缀名为.proto的文件内定义服务。
- 2.用protocol buffer编辑器生成服务端和客户端代码。
- 3.使用gRPC的Go API实现客户端与服务端代码。
gRPC允许您定义四种服务方法:
1、一个简单的RPC,客户端使用存根向服务器发送请求并等待响应返回,就像正常的函数调用一样。
这个用于远程调用,客户端查询服务端数据就想查询本地数据一样,一个请求一个响应
// Obtains the feature at a given position.
rpc GetFeature(Point) returns (Feature) {}
2、服务器端流RPC,其中客户端发送请求到服务器,并获得一个流中读取消息的序列后面。客户端从返回的流中读取,直到没有更多消息。正如您在我们的示例中所看到的,您通过将stream 关键字放在响应类型之前来指定服务器端流式方法。
这个用于远程调用,客户端查询服务端数据就想查询本地数据一样,并且是一股数据流
// Obtains the Features available within the given Rectangle. Results are
// streamed rather than returned at once (e.g. in a response message with a
// repeated field), as the rectangle may cover a large area and contain a
// huge number of features.
rpc ListFeatures(Rectangle) returns (stream Feature) {}
3、客户端流传输的RPC,其中客户端将消息写入的序列,并且将它们发送到服务器,再次使用提供的流。一旦客户端写完消息,它就等待服务器全部读取它们并返回它的响应。您可以通过stream在请求类型之前放置关键字来指定客户端流方法。
这个用于传输数据,使用流的方式
// Accepts a stream of Points on a route being traversed, returning a
// RouteSummary when traversal is completed.
rpc RecordRoute(stream Point) returns (RouteSummary) {}
4、一个双向流动的RPC双方都派出使用读写流的消息序列。这两个流独立运行,因此客户端和服务器可以按照自己喜欢的顺序进行读写:例如,服务器可以在写入响应之前等待接收所有客户端消息,或者它可以交替地读取消息然后写入消息,或者其他一些读写组合。保留每个流中的消息顺序。您可以通过stream 在请求和响应之前放置关键字来指定此类方法。
这个用于传输读取双向方式,都是数据流,可以控制
// Accepts a stream of RouteNotes sent while a route is being traversed,
// while receiving other RouteNotes (e.g. from other users).
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
定义消息类型
message UserInfoResponse {
string name = 1; // 用户姓名
uint32 age = 2; // 用户年龄
uint32 sex = 3; // 用户性别
uint32 count = 4; // 账户余额
}
如上例子每个字段每个字段都有唯一的一个数字标识符,这些标识符是用来在消息的二进制格式中识别各个字段的,一旦开始使用就不能够再改变。
注:[1,15]之内的标识号在编码的时候会占用一个字节。[16,2047]之内的标识号则占用2个字节。所以应该为那些频繁出现的消息元素保留 [1,15]之内的标识号。切记:要为将来有可能添加的、频繁出现的标识号预留一些标识号。
从.proto服务定义生成gRPC客户端和服务器接口
直接使用protoc工具生成
protoc -I routeguide/ routeguide/route_guide.proto --go_out=plugins=grpc:routeguide
运行此命令会routeguide在route_guide示例目录下的目录中生成以下文件:route_guide.pb.go
1、-I PATH:specify the directory in which to search for imports. May be specified multiple times; directories will be searched in order. If not given, the current working directory is used.
2、--go_out:指定输出go代码
3、plugins=grpc:.proto中的service是grpc扩展的功能,需要使用grpc插件进行解析才能生成对应的接口定义代码。
这个时候就生成了对应的序列化函数和服务接口
实现客户端和服务器接口
比如我们定义一个结构体
type routeGuideServer struct{}
实现对应的函数接口
1、简单的RPC
routeGuideServer实现我们所有的服务方法。让我们先看一下最简单的类型GetFeature,它只是Point从客户端获取一个并从其数据库中返回相应的特征信息Feature。
func (s *routeGuideServer) GetFeature(ctx context.Context, point *pb.Point) (*pb.Feature, error) {
for _, feature := range s.savedFeatures {
if proto.Equal(feature.Location, point) {
return feature, nil
}
}
// No feature was found, return an unnamed feature
return &pb.Feature{"", point}, nil
}
该方法传递RPC的上下文对象和客户端的Point 协议缓冲请求。它返回一个Feature协议缓冲区对象,其中包含响应信息和error。在方法中,我们Feature 使用适当的信息填充,然后return它与nil错误一起告诉gRPC我们已经完成了RPC的处理并且Feature可以返回到客户端。
2、服务器端流式RPC
现在让我们看一下我们的流式RPC。ListFeatures是一个服务器端流式RPC,所以我们需要将多个Features 发回给我们的客户端。
func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
for _, feature := range s.savedFeatures {
if inRange(feature.Location, rect) {
if err := stream.Send(feature); err != nil {
return err
}
}
}
return nil
}
正如您所看到的,我们不是在方法参数中获取简单的请求和响应对象,而是这次我们得到一个请求对象(Rectangle我们的客户端想要在其中找到Features)和一个特殊的 RouteGuide_ListFeaturesServer对象来编写我们的响应。
在该方法中,我们填充了Feature我们需要返回的对象,将它们写入RouteGuide_ListFeaturesServer使用其Send()方法。最后,就像在我们简单的RPC中一样,我们返回一个nil错误告诉gRPC我们已经完成了写入响应。如果在此调用中发生任何错误,我们将返回非nil错误; gRPC层会将其转换为适当的RPC状态,以便在线路上发送。
3、客户端流式RPC
现在让我们看一些更复杂的东西:客户端流式传输方法RecordRoute,我们Point从客户端获取s 流并返回一个RouteSummary包含其行程信息的单个流。如您所见,这次该方法根本没有请求参数。相反,它获取一个RouteGuide_RecordRouteServer流,服务器可以使用它来读取和写入消息 - 它可以使用其Recv()方法接收客户端消息并使用其方法返回其单个响应SendAndClose() 。
func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
var pointCount, featureCount, distance int32
var lastPoint *pb.Point
startTime := time.Now()
for {
point, err := stream.Recv()
if err == io.EOF {
endTime := time.Now()
return stream.SendAndClose(&pb.RouteSummary{
PointCount: pointCount,
FeatureCount: featureCount,
Distance: distance,
ElapsedTime: int32(endTime.Sub(startTime).Seconds()),
})
}
if err != nil {
return err
}
pointCount++
for _, feature := range s.savedFeatures {
if proto.Equal(feature.Location, point) {
featureCount++
}
}
if lastPoint != nil {
distance += calcDistance(lastPoint, point)
}
lastPoint = point
}
}
在方法体中,我们使用RouteGuide_RecordRouteServer的Recv()方法在我们的客户机的请求的请求对象反复读取(在这种情况下 Point),直到没有更多的消息:服务器需要检查从返回的错误Read()每次通话后。如果是这样nil,流仍然很好,它可以继续阅读; 如果它是io.EOF消息流已经结束并且服务器可以返回它RouteSummary。如果它有任何其他值,我们将“按原样”返回错误,以便gRPC层将其转换为RPC状态。
4、双向流式RPC
最后,我们来看看我们的双向流式RPC RouteChat()。
func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
key := serialize(in.Location)
... // look for notes to be sent to client
for _, note := range s.routeNotes[key] {
if err := stream.Send(note); err != nil {
return err
}
}
}
}
这次我们得到一个RouteGuide_RouteChatServer流,就像我们的客户端流示例一样,可用于读取和写入消息。但是,这次我们通过方法的流返回值,而客户端仍在向其消息流写入消息。
这里的读写语法与我们的客户端流方法非常相似,只是服务器使用流的Send()方法而不是 SendAndClose()因为它正在编写多个响应。尽管每一方都会按照编写顺序获取对方的消息,但客户端和服务器都可以按任何顺序进行读写 - 这些流完全独立运行。
服务实现
一旦我们实现了所有方法,我们还需要启动一个gRPC服务器,以便客户端可以实际使用我们的服务。以下代码段显示了我们如何为我们的RouteGuide服务执行此操作:
flag.Parse()
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
grpcServer := grpc.NewServer()
pb.RegisterRouteGuideServer(grpcServer, &routeGuideServer{})
... // determine whether to use TLS
grpcServer.Serve(lis)
要构建和启动服务器,我们:
- 指定我们要用于侦听客户端请求的端口lis, err := net.Listen(“tcp”, fmt.Sprintf(”:%d”, *port))。
- 使用创建gRPC服务器的实例grpc.NewServer()。
- 使用gRPC服务器注册我们的服务实现。
- Serve()使用我们的端口详细信息调用服务器以执行阻塞等待,直到进程被终止或被Stop()调用。
要编译和运行服务器,假设您在文件夹中 $GOPATH/src/google.golang.org/grpc/examples/route_guide,只需:
$ go run server/server.go
实战
1、简单调用和双向流
我们举例说明前几步,首先创建一个proto文件,包含服务和数据类型。名字为friday.proto
syntax = "proto3";
package friday;
// 请求用户信息
message UserInfoRequest {
int64 uid = 1; // 用户ID
}
// 请求用户信息的结果
message UserInfoResponse {
string name = 1; // 用户姓名
uint32 age = 2; // 用户年龄
uint32 sex = 3; // 用户性别
uint32 count = 4; // 账户余额
}
service Data {
//简单Rpc
// 获取用户数据
rpc GetUserInfo(UserInfoRequest) returns (UserInfoResponse){}
// 修改用户 双向流模式
rpc ChangeUserInfo(stream UserInfoResponse) returns (stream UserInfoResponse){}
}
然后使用工具就可以生成客户端和服务端代码
protoc -I rpc/ rpc/friday.proto --go_out=plugins=grpc:rpc--最后这个是目录
然后就可以生成一个friday.pb.go的文件,打开文件我们就可以看到该文件中定义了客户端与服务端的方法,这里就不详细的说明了,下面我们就开始动手实现。
服务端的接口实现是需要我们自己来处理的
/*服务端的方法*/
package response
import (
"golang.org/x/net/context"
pb "rpcTest/rpcbuild/rpcbuild/friday"
"fmt"
"io"
)
type Server struct{
routeNotes []*pb.UserInfoResponse
}
//简单模式
func (this *Server)GetUserInfo(ctx context.Context, in *pb.UserInfoRequest)(*pb.UserInfoResponse,error){
uid := in.GetUid()
fmt.Println("The uid is ",uid)
return &pb.UserInfoResponse{
Name : "Jim",
Age : 18,
Sex : 0,
Count:1000,
},nil
}
//双向流模式
func (this *Server) ChangeUserInfo(stream pb.Data_ChangeUserInfoServer)(error){
for {
in, err := stream.Recv()
if err == io.EOF {
fmt.Println("read done")
return nil
}
if err != nil {
fmt.Println("ERR",err)
return err
}
fmt.Println("userinfo ",in)
for _, note := range this.routeNotes{
if err := stream.Send(note); err != nil {
return err
}
}
}
}
---------------------
package main
import (
"net"
"google.golang.org/grpc"
pb "rpcTest/rpcbuild/rpcbuild/friday"
"rpcTest/rpcbuild/response"
"log"
)
const (
PORT = ":10023"
)
func main() {
lis, err := net.Listen("tcp", PORT)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterDataServer(s, &response.Server{})
s.Serve(lis)
}
---------------------
下面可以直接使用客户端进行操作,结构体已经定义好,函数也已经实现,不需要重写,但是可以封装,其实最终客户端调用的就是服务我们写的函数。如下
package main
import (
"google.golang.org/grpc"
pb "rpcTest/rpcbuild/rpcbuild/friday"
"rpcTest/rpcbuild/connect"
"fmt"
"sync"
)
const (
address = "127.0.0.1:10023"
)
func main(){
conn, err := grpc.Dial(address, grpc.WithInsecure())
if err != nil {
fmt.Println("did not connect: %v", err)
}
defer conn.Close()
// 创建连接
factory := func() (interface{}, error) {
return pb.NewDataClient(conn),nil
}
// 关闭链接,此处只是定义不需要调用了因为上面有defer conn.Close(),定义的目的在于初始化链接池。
close := func(v interface{}) error { return conn.Close()}
//初始化链接池
p,err := connect.InitThread(10,30,factory)
if err != nil{
fmt.Println("init error")
return
}
var wg sync.WaitGroup
for i := 0;i < 50;i++ {
wg.Add(1)
go func(){
defer wg.Done()
//获取连接
v,_ := p.Get()
client := v.(pb.DataClient)
info := &pb.UserInfoRequest{
Uid:10012,
}
connect.GetUserInfo(client,info)
//归还链接
p.Put(v)
}()
wg.Wait()
}
for i := 0;i < 50;i++ {
wg.Add(1)
go func(){
defer wg.Done()
//获取连接
v,_ := p.Get()
client := v.(pb.DataClient)
connect.ChangeUserInfo(client)
//归还链接
p.Put(v)
}()
wg.Wait()
}
//获取链接池大小
current := p.Len()
fmt.Println("len=", current)
}
---------------------
package connect
import (
"github.com/silenceper/pool"
"fmt"
"time"
"net"
)
/*
初始化
min // 最小链接数
max // 最大链接数
factory func() (interface{}, error) //创建链接的方法
close func(v interface{}) error //关闭链接的方法
*/
func InitThread(min,max int,factory func() (interface{}, error),close func(v interface{}) error)(pool.Pool,error){
poolConfig := &pool.PoolConfig{
InitialCap: min,
MaxCap: max,
Factory: factory,
Close: close,
//链接最大空闲时间,超过该时间的链接 将会关闭,可避免空闲时链接EOF,自动失效的问题
IdleTimeout: 15 * time.Second,
}
p, err := pool.NewChannelPool(poolConfig)
if err != nil {
fmt.Println("Init err=", err)
return nil,err
}
return p,nil
}
---------------------
/*客户端方法*/
package connect
import (
"golang.org/x/net/context"
pb "rpcTest/rpcbuild/rpcbuild/friday"
"fmt"
"io"
)
//简单模式
func GetUserInfo(client pb.DataClient, info *pb.UserInfoRequest) {
req, err := client.GetUserInfo(context.Background(),info)
if err != nil {
fmt.Println("Could not create Customer: %v", err)
}
fmt.Println("userinfo is ",req.GetAge(),req.GetCount(),req.GetName(),req.GetSex())
}
//双向流模式
func ChangeUserInfo(client pb.DataClient){
notes := []*pb.UserInfoResponse{
{Name:"jim",Age:18,Sex:2,Count:100},
{Name:"Tom",Age:20,Sex:1,Count:666},
}
stream, err := client.ChangeUserInfo(context.Background())
if err != nil {
fmt.Println("%v.RouteChat(_) = _, %v", client, err)
}
waitc := make(chan struct{})
go func() {
for {
in, err := stream.Recv()
if err == io.EOF {
// read done.
fmt.Println("read done ")
close(waitc)
return
}
if err != nil {
fmt.Println("Failed to receive a note : %v", err)
}
fmt.Println("Got message %s at point(%d, %d)",in.Count,in.Sex,in.Age,in.Name)
}
}()
fmt.Println("notes",notes)
for _, note := range notes {
if err := stream.Send(note); err != nil {
fmt.Println("Failed to send a note: %v", err)
}
}
stream.CloseSend()
<-waitc
}
---------------------
2、单向流式
客户端流式传输实例
先定义数据和接口helloworld.proto
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld";
option java_outer_classname = "HelloWorldProto";
package helloworld;
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (stream HelloRequest) returns (HelloReply) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
使用proto的工具生成这些文件定义中的数据的结构体定义以及使用这些结构体的接口的具体定义,还包含一些描述和序列化的函数
然后就可以开发对应的服务端和客户端
服务端server.go
import (
"io"
"log"
"net"
"strings"
"time"
"google.golang.org/grpc"
pb "google.golang.org/grpc/examples/helloworld/helloworld"
)
const (
port = ":50051"
)
// server is used to implement helloworld.GreeterServer.
type server struct{}
// SayHello implements helloworld.GreeterServer
func (s *server) SayHello(stream pb.Greeter_SayHelloServer) ( error) {
time.Sleep(1*time.Second)
var names []string
i := 0
for {
in, err := stream.Recv()
if err == io.EOF {
stream.SendAndClose(&pb.HelloReply{Message: "Hello " + strings.Join(names, ",")})
return nil
}
if err != nil {
log.Printf("failed to recv: %v", err)
return err
}
names = append(names, in.Name)
i++
log.Printf("Received: %v,%d", in.Name,i)
}
}
func main() {
lis, err := net.Listen("tcp", port)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterGreeterServer(s, &server{})
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
客户端client.go
import (
"context"
"fmt"
"log"
"sync"
"time"
"google.golang.org/grpc"
pb "google.golang.org/grpc/examples/helloworld/helloworld"
)
const (
address = "localhost:50051"
defaultName = "world"
)
var wg sync.WaitGroup
func main() {
// Set up a connection to the server.
conn, err := grpc.Dial(address, grpc.WithInsecure())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewGreeterClient(conn)
// Contact the server and print out its response.
//name := defaultName
//if len(os.Args) > 1 {
// name = os.Args[1]
//}
wg.Add(1000)
start := time.Now()
for i := 0;i < 1000;i++ {
go getData(c)
}
wg.Wait()
log.Println("used time : ",time.Now().Sub(start))
}
func getData(c pb.GreeterClient){
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := c.SayHello(ctx)
if err != nil {
log.Fatalf("could not greet: %v", err)
}
for i := 0;i < 100;i++ {
stream.Send(&pb.HelloRequest{Name: defaultName})
}
r, err := stream.CloseAndRecv()
if err != nil {
fmt.Printf("failed to recv: %v", err)
}
log.Printf("Greeting: %s", r.Message)
wg.Done()
}
这样就能完成客户端批量发送数据的作用
服务端流式方式,用于批量响应
比如下面简单介绍一下proto文件
syntax = "proto3";
package pb;
import "github.com/gogo/protobuf/gogoproto/gogo.proto";
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello1 (HelloRequest) returns (stream HelloReply) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
服务端函数
func (s *server) SayHello1(in *pb.HelloRequest, gs pb.Greeter_SayHello1Server) error {
name := in.Name
for i := 0; i < 100; i++ {
gs.Send(&pb.HelloReply{Message: "Hello " + name + strconv.Itoa(i)})
}
return nil
}
客户端接受
conn, err := grpc.Dial(*address, grpc.WithInsecure())
if err != nil {
log.Fatalf("faild to connect: %v", err)
}
defer conn.Close()
c := pb.NewGreeterClient(conn)
stream, err := c.SayHello1(context.Background(), &pb.HelloRequest{Name: *name})
if err != nil {
log.Fatalf("could not greet: %v", err)
}
for {
reply, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Printf("failed to recv: %v", err)
}
log.Printf("Greeting: %s", reply.Message)
}
几种基本情况的使用大体都包含到了。
创建客户端
首先需要创建一个gRPC 通道来与服务器通信。我们通过传递服务器地址和端口号来创建它, grpc.Dial()如下所示:
conn, err := grpc.Dial(*serverAddr)
if err != nil {
...
}
defer conn.Close()
如果您请求的服务需要,您可以使用DialOptions设置身份验证凭据(例如,TLS,GCE凭据,JWT凭证)。
设置gRPC 通道后,我们需要一个客户端来执行RPC。我们使用从.proto生成NewRouteGuideClient的pb包中提供的方法得到这个。
client := pb.NewRouteGuideClient(conn)
调用服务方法
现在让我们来看看我们如何调用我们的服务方法。请注意,在gRPC-Go中,RPC以阻塞/同步模式运行,这意味着RPC调用等待服务器响应,并将返回响应或错误。
1、简单的RPC
调用简单的RPC GetFeature几乎与调用本地方法一样简单。
feature, err := client.GetFeature(context.Background(), &pb.Point{409146138, -746188906})
if err != nil {
...
}
log.Println(feature)
如您所见,我们在之前获得的存根上调用该方法。在我们的方法参数中,我们创建并填充请求协议缓冲区对象(在我们的例子中 Point)。我们还传递了一个context.Context对象,它允许我们在必要时更改RPC的行为,例如超时/取消正在运行的RPC。如果调用没有返回错误,那么我们可以从第一个返回值中读取服务器的响应信息。
2、服务器端流式RPC
这里我们称之为服务器端流方法ListFeatures,它返回地理流Feature。如果您已经阅读过创建服务器,其中一些可能看起来非常熟悉 - 流式RPC在双方都以类似的方式实现。
rect := &pb.Rectangle{ ... } // initialize a pb.Rectangle
stream, err := client.ListFeatures(context.Background(), rect)
if err != nil {
...
}
for {
feature, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("%v.ListFeatures(_) = _, %v", client, err)
}
log.Println(feature)
}
与简单的RPC一样,我们将方法传递给上下文和请求。但是,我们不再获取响应对象,而是返回一个实例 RouteGuide_ListFeaturesClient。客户端可以使用 RouteGuide_ListFeaturesClient流来读取服务器的响应。
我们使用RouteGuide_ListFeaturesClient的Recv()方法重复读取服务器对响应协议缓冲区对象(在本例中为a Feature)的响应, 直到没有更多消息:客户端需要检查每次调用后err返回的错误 Recv()。如果nil,流仍然很好,它可以继续阅读; 如果是,io.EOF那么消息流已经结束; 否则必须有一个RPC错误,它会被传递通过err。
3、客户端流式RPC
客户端流方法RecordRoute类似于服务器端方法,除了我们只传递方法一个上下文并获得一个 RouteGuide_RecordRouteClient流,我们可以用它来写和 读消息。
// Create a random number of random points
r := rand.New(rand.NewSource(time.Now().UnixNano()))
pointCount := int(r.Int31n(100)) + 2 // Traverse at least two points
var points []*pb.Point
for i := 0; i < pointCount; i++ {
points = append(points, randomPoint(r))
}
log.Printf("Traversing %d points.", len(points))
stream, err := client.RecordRoute(context.Background())
if err != nil {
log.Fatalf("%v.RecordRoute(_) = _, %v", client, err)
}
for _, point := range points {
if err := stream.Send(point); err != nil {
if err == io.EOF {
break
}
log.Fatalf("%v.Send(%v) = %v", stream, point, err)
}
}
reply, err := stream.CloseAndRecv()
if err != nil {
log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
}
log.Printf("Route summary: %v", reply)
该RouteGuide_RecordRouteClient有一个Send(),我们可以用它来发送请求到服务器的方法。一旦我们使用完成了客户端对流的请求Send(),我们需要调用CloseAndRecv()流来让gRPC知道我们已完成写入并期望收到响应。我们从err返回的中获取RPC状态CloseAndRecv()。如果状态为nil,则第一个返回值CloseAndRecv()将是有效的服务器响应。
4、双向流式RPC
最后,我们来看看我们的双向流式RPC RouteChat()。在这种情况下RecordRoute,我们只传递方法一个上下文对象并获取一个我们可用于写入和读取消息的流。但是,这次我们通过方法的流返回值,而服务器仍在向其消息流写入消息。
stream, err := client.RouteChat(context.Background())
waitc := make(chan struct{})
go func() {
for {
in, err := stream.Recv()
if err == io.EOF {
// read done.
close(waitc)
return
}
if err != nil {
log.Fatalf("Failed to receive a note : %v", err)
}
log.Printf("Got message %s at point(%d, %d)", in.Message, in.Location.Latitude, in.Location.Longitude)
}
}()
for _, note := range notes {
if err := stream.Send(note); err != nil {
log.Fatalf("Failed to send a note: %v", err)
}
}
stream.CloseSend()
<-waitc
这里的读写语法与我们的客户端流方法非常相似,只是在CloseSend()我们完成调用后使用流的方法。尽管每一方都会按照编写顺序获取对方的消息,但客户端和服务器都可以按任何顺序进行读写 - 这些流完全独立运行。
启动客户端
$ go run client/client.go
到这边基本就完成基本使用了
使用总结
为什么要使用grpc?
首先rpc的好处就不多说了,其次再看grpc很多优势和封装好的功能特性都能完善架构,提高开发效率。
1. 支持多种语言。完成不同语言之间的调用,还能自动生成代码。
2. 二进制的数据可以加快传输速度
3. 基于http2的多路复用可以减少服务之间的连接次数,也天然的支持了TLS和流控。
4. 和函数一样的调用方式也有效的提升了开发效率。
5. 支持流式调用方式
6. 使用 protobuf 作为序列化库, 专门的 IDL 语法也使得接口定义更加简洁规范了,protobuf 内置支持兼容性处理,proto3 支持 json 序列化,更容易和其它系统交互;
7. 底层数据传输基于 HTTP/2,所以也会带来 flow control, header compression 等 HTTP/2 特性,数据传输还支持压缩、加密、重试(支持 Backoff)等,性能相应该会有一定提升(没有做测试);
8. 很多大型项目在用,比如etcd v3 和 docker containerd,k8s,tidb
缺点:
1. 开发调试就不像 REST 接口那样有很多现成的命令行或图形界面工具可以使用,grpc 的数据通信传输的是二进制的 protobuf 数据,不利于调试,而且 grpc 的 cli 还不够完善,所以我们测试基本上都是通过写代码进行的;
rpc和http
http 和 rpc 并不是一个并行概念。
rpc是远端调用协议, 包含传输协议和编码协议。传输协议包含: 如著名的 gRPC 使用的 http2 协议,也有如dubbo一类的自定义报文的tcp协议。编码协议包含: 如基于文本编码的 xml json,也有二进制编码的 protobuf binpack 等。
把一个http server容器上封装一层服务发现和函数代理调用,那它就已经可以做一个rpc框架了。所以为什么要用rpc调用?因为良好的rpc调用是面向服务的封装,针对服务的可用性和效率等都做了优化。单纯使用http调用则缺少了这些特性。
http1.1 和2.0
grpc使用的是http2.0,相对http1.1协议,http2.0协议已经优化编码效率问题,grpc这种rpc库使用的就是http2.0协议。
1. http协议是支持连接池复用的,也就是建立一定数量的连接不断开,并不会频繁的创建和销毁连接。
2. 要说的是http也可以使用protobuf这种二进制编码协议对内容进行编码,
为什么用tcp而不是http1.1
通用定义的http1.1协议的tcp报文包含太多废信息,一个POST协议的格式大致如下
HTTP/1.0 200 OK
Content-Type: text/plain
Content-Length: 137582
Expires: Thu, 05 Dec 1997 16:00:00 GMT
Last-Modified: Wed, 5 August 1996 15:55:28 GMT
Server: Apache 0.84
<html>
<body>Hello World</body>
</html>
即使编码协议也就是body是使用二进制编码协议,报文元数据也就是header头的键值对却用了文本编码,非常占字节数。如上图所使用的报文中有效字节数仅仅占约 30%,也就是70%的时间用于传输元数据废编码。当然实际情况下报文内容可能会比这个长,但是报头所占的比例也是非常可观的。那么假如我们使用自定义tcp协议的报文报头占用的字节数也就只有16个byte,极大地精简了传输内容。这也就是为什么后端进程间通常会采用自定义tcp协议的rpc来进行通信的原因。
rpc和rest也不是一个概念
rest是一种规范,一种资源抽象的设计思想。是靠原生的http api实现的,rpc是对http的一种封装,(最重要的是实现了批量操作和本地化调用方式)有大量已经开发的框架,能够直接使用,算的上一种上层协议。
gprc控制并发
func runServer() error {
...
srv := grpc.NewServer(grpc.Creds(tlsCreds),
ServerInterceptor(),
grpc.MaxConcurrentStreams(64),
grpc.InTapHandle(NewTap().Handler))
rpc.RegisterCacheServer(srv, NewCacheService(accounts))
l, err := net.Listen("tcp", "localhost:5051")
if err != nil {
return err
}
l = netutil.LimitListener(l, 1024)
return srv.Serve(l)
}
这里使用了 netutil.LimitListener(l, 1024) 控制了总共可以有多少个连接
然后用 grpc.MaxConcurrentStreams(64) 指定了每个 grpc 的连接可以有多少个并发流(stream)。一个链接可以传输多个流,比如我们可以使用一个链接来传输jvm和call的流
这两个结合起来基本控制了并发的总数。
拦截器 Interceptor
grpc服务端和客户端都提供了interceptor功能,功能类似middleware,很适合在这里处理验证、日志等流程。
在自定义Token认证的示例中,认证信息是由每个服务中的方法处理并认证的,如果有大量的接口方法,这种姿势就太不优雅了,每个接口实现都要先处理认证信息。这个时候interceptor就可以用来解决了这个问题,在请求被转到具体接口之前处理认证信息,一处认证,到处无忧。
可以在服务端接收到请求后,先对请求中的数据做一些处理后再转交给指定的服务处理并响应,常见的如权限校验、日志、接口调用延迟等,这里简单打印下客户端的 IP 地址。
增加拦截器
在注册之前加入对应函数的参数
package main
import (
"fmt"
"log"
"net"
pb "./proto/helloworld"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/reflection"
)
const (
port = ":50051"
)
// server is used to implement helloworld.GreeterServer.
type server struct{}
// SayHello implements helloworld.GreeterServer
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
return &pb.HelloReply{Message: "Hello " + in.Name}, nil
}
func getClietIP(ctx context.Context) (string, error) {
pr, ok := peer.FromContext(ctx)
if !ok {
return "", fmt.Errorf("getClinetIP, invoke FromContext() failed")
}
if pr.Addr == net.Addr(nil) {
return "", fmt.Errorf("getClientIP, peer.Addr is nil")
}
return pr.Addr.String(), nil
}
func main() {
lis, err := net.Listen("tcp", port)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
var opts []grpc.ServerOption
// Register interceptor
var interceptor grpc.UnaryServerInterceptor
interceptor = func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (resp interface{}, err error) {
cli, err := getClietIP(ctx)
if err != nil {
log.Println("Failed to get client address")
}
log.Println("Client address is", cli)
return handler(ctx, req)
}
opts = append(opts, grpc.UnaryInterceptor(interceptor))
s := grpc.NewServer(opts...)
pb.RegisterGreeterServer(s, &server{})
// Register reflection service on gRPC server.
reflection.Register(s)
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
获取IP
gRPC 服务和客户端之间是通过 http2 进行交互,其中包含了客户端的地址信息,
在 gRPC 源码 peer/peer.go 中包含了创建的上下文信息,其中就记录的远端地址;而且在 gRPC 请求中默认都会含有 Context 值,这样就可以通过如下方法获取。
func getClietIP(ctx context.Context) (string, error) {
pr, ok := peer.FromContext(ctx)
if !ok {
return "", fmt.Errorf("getClinetIP, invoke FromContext() failed")
}
if pr.Addr == net.Addr(nil) {
return "", fmt.Errorf("getClientIP, peer.Addr is nil")
}
addSlice := strings.Split(pr.Addr.String(), ":")
return addSlice[0], nil
}
注意:在使用 stream 方式时 context 值可以直接从 stream 中获取,也就是 stream.Context() 。
这边也有端口相关信息
负载均衡
gRPC开源组件官方并未直接提供服务注册与发现的功能实现,但其设计文档已提供实现的思路,并在不同语言的gRPC代码API中已提供了命名解析和负载均衡接口供扩展。
我们先谈谈LB,据负载均衡实现所在的位置不同,根据具体实现通常可分为以下三种解决方案:
1、集中式LB(Proxy Model)
在服务消费者和服务提供者之间有一个独立的LB,通常是专门的硬件设备如 F5,或者基于软件如 LVS,HAproxy等实现。LB上有所有服务的地址映射表,通常由运维配置注册,当服务消费方调用某个目标服务时,它向LB发起请求,由LB以某种策略,比如轮询(Round-Robin)做负载均衡后将请求转发到目标服务。LB一般具备健康检查能力,能自动摘除不健康的服务实例。 该方案主要问题:
单点问题,所有服务调用流量都经过LB,当服务数量和调用量大的时候,LB容易成为瓶颈,且一旦LB发生故障影响整个系统;
服务消费方、提供方之间增加了一级,有一定性能开销。
2、进程内LB(Balancing-aware Client)
针对第一个方案的不足,此方案将LB的功能集成到服务消费方进程里,也被称为软负载或者客户端负载方案。服务提供方启动时,首先将服务地址注册到服务注册表,同时定期报心跳到服务注册表以表明服务的存活状态,相当于健康检查,服务消费方要访问某个服务时,它通过内置的LB组件向服务注册表查询,同时缓存并定期刷新目标服务地址列表,然后以某种负载均衡策略选择一个目标服务地址,最后向目标服务发起请求。LB和服务发现能力被分散到每一个服务消费者的进程内部,同时服务消费方和服务提供方之间是直接调用,没有额外开销,性能比较好。该方案主要问题:
开发成本,该方案将服务调用方集成到客户端的进程里头,如果有多种不同的语言栈,就要配合开发多种不同的客户端,有一定的研发和维护成本;
另外生产环境中,后续如果要对客户库进行升级,势必要求服务调用方修改代码并重新发布,升级较复杂。
3、独立 LB 进程(External Load Balancing Service)
该方案是针对第二种方案的不足而提出的一种折中方案,原理和第二种方案基本类似。 不同之处是将LB和服务发现功能从进程内移出来,变成主机上的一个独立进程。主机上的一个或者多个服务要访问目标服务时,他们都通过同一主机上的独立LB进程做服务发现和负载均衡。该方案也是一种分布式方案没有单点问题,一个LB进程挂了只影响该主机上的服务调用方,服务调用方和LB之间是进程内调用性能好,同时该方案还简化了服务调用方,不需要为不同语言开发客户库,LB的升级不需要服务调用方改代码。 该方案主要问题:部署较复杂,环节多,出错调试排查问题不方便。
再来看看gRPC服务发现及负载均衡实现
基本实现原理:
服务启动后gRPC客户端向命名服务器发出名称解析请求,名称将解析为一个或多个IP地址,每个IP地址标示它是服务器地址还是负载均衡器地址,以及标示要使用那个客户端负载均衡策略或服务配置。
客户端实例化负载均衡策略,如果解析返回的地址是负载均衡器地址,则客户端将使用grpclb策略,否则客户端使用服务配置请求的负载均衡策略。
负载均衡策略为每个服务器地址创建一个子通道(channel)。
当有rpc请求时,负载均衡策略决定那个子通道即grpc服务器将接收请求,当可用服务器为空时客户端的请求将被阻塞。
根据gRPC官方提供的设计思路,基于进程内LB方案(即第2个案,阿里开源的服务框架 Dubbo 也是采用类似机制),结合分布式一致的组件(如Zookeeper、Consul、Etcd),可找到gRPC服务发现和负载均衡的可行解决方案。
接下来以GO语言为例,简单介绍下基于Etcd3的关键代码实现:
1)命名解析实现:resolver.go
package etcdv3
import (
"errors"
"fmt"
"strings"
etcd3 "github.com/coreos/etcd/clientv3"
"google.golang.org/grpc/naming"
)
// resolver is the implementaion of grpc.naming.Resolver
type resolver struct {
serviceName string // service name to resolve
}
// NewResolver return resolver with service name
func NewResolver(serviceName string) *resolver {
return &resolver{serviceName: serviceName}
}
// Resolve to resolve the service from etcd, target is the dial address of etcd
// target example: "http://127.0.0.1:2379,http://127.0.0.1:12379,http://127.0.0.1:22379"
func (re *resolver) Resolve(target string) (naming.Watcher, error) {
if re.serviceName == "" {
return nil, errors.New("grpclb: no service name provided")
}
// generate etcd client
client, err := etcd3.New(etcd3.Config{
Endpoints: strings.Split(target, ","),
})
if err != nil {
return nil, fmt.Errorf("grpclb: creat etcd3 client failed: %s", err.Error())
}
// Return watcher
return &watcher{re: re, client: *client}, nil
}
2)服务发现实现:watcher.go
package etcdv3
import (
"fmt"
etcd3 "github.com/coreos/etcd/clientv3"
"golang.org/x/net/context"
"google.golang.org/grpc/naming"
"github.com/coreos/etcd/mvcc/mvccpb"
)
// watcher is the implementaion of grpc.naming.Watcher
type watcher struct {
re *resolver // re: Etcd Resolver
client etcd3.Client
isInitialized bool
}
// Close do nothing
func (w *watcher) Close() {
}
// Next to return the updates
func (w *watcher) Next() ([]*naming.Update, error) {
// prefix is the etcd prefix/value to watch
prefix := fmt.Sprintf("/%s/%s/", Prefix, w.re.serviceName)
// check if is initialized
if !w.isInitialized {
// query addresses from etcd
resp, err := w.client.Get(context.Background(), prefix, etcd3.WithPrefix())
w.isInitialized = true
if err == nil {
addrs := extractAddrs(resp)
//if not empty, return the updates or watcher new dir
if l := len(addrs); l != 0 {
updates := make([]*naming.Update, l)
for i := range addrs {
updates[i] = &naming.Update{Op: naming.Add, Addr: addrs[i]}
}
return updates, nil
}
}
}
// generate etcd Watcher
rch := w.client.Watch(context.Background(), prefix, etcd3.WithPrefix())
for wresp := range rch {
for _, ev := range wresp.Events {
switch ev.Type {
case mvccpb.PUT:
return []*naming.Update{{Op: naming.Add, Addr: string(ev.Kv.Value)}}, nil
case mvccpb.DELETE:
return []*naming.Update{{Op: naming.Delete, Addr: string(ev.Kv.Value)}}, nil
}
}
}
return nil, nil
}
func extractAddrs(resp *etcd3.GetResponse) []string {
addrs := []string{}
if resp == nil || resp.Kvs == nil {
return addrs
}
for i := range resp.Kvs {
if v := resp.Kvs[i].Value; v != nil {
addrs = append(addrs, string(v))
}
}
return addrs
}
3)服务注册实现:register.go
package etcdv3
import (
"fmt"
"log"
"strings"
"time"
etcd3 "github.com/coreos/etcd/clientv3"
"golang.org/x/net/context"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
)
// Prefix should start and end with no slash
var Prefix = "etcd3_naming"
var client etcd3.Client
var serviceKey string
var stopSignal = make(chan bool, 1)
// Register
func Register(name string, host string, port int, target string, interval time.Duration, ttl int) error {
serviceValue := fmt.Sprintf("%s:%d", host, port)
serviceKey = fmt.Sprintf("/%s/%s/%s", Prefix, name, serviceValue)
// get endpoints for register dial address
var err error
client, err := etcd3.New(etcd3.Config{
Endpoints: strings.Split(target, ","),
})
if err != nil {
return fmt.Errorf("grpclb: create etcd3 client failed: %v", err)
}
go func() {
// invoke self-register with ticker
ticker := time.NewTicker(interval)
for {
// minimum lease TTL is ttl-second
resp, _ := client.Grant(context.TODO(), int64(ttl))
// should get first, if not exist, set it
_, err := client.Get(context.Background(), serviceKey)
if err != nil {
if err == rpctypes.ErrKeyNotFound {
if _, err := client.Put(context.TODO(), serviceKey, serviceValue, etcd3.WithLease(resp.ID)); err != nil {
log.Printf("grpclb: set service '%s' with ttl to etcd3 failed: %s", name, err.Error())
}
} else {
log.Printf("grpclb: service '%s' connect to etcd3 failed: %s", name, err.Error())
}
} else {
// refresh set to true for not notifying the watcher
if _, err := client.Put(context.Background(), serviceKey, serviceValue, etcd3.WithLease(resp.ID)); err != nil {
log.Printf("grpclb: refresh service '%s' with ttl to etcd3 failed: %s", name, err.Error())
}
}
select {
case <-stopSignal:
return
case <-ticker.C:
}
}
}()
return nil
}
// UnRegister delete registered service from etcd
func UnRegister() error {
stopSignal <- true
stopSignal = make(chan bool, 1) // just a hack to avoid multi UnRegister deadlock
var err error;
if _, err := client.Delete(context.Background(), serviceKey); err != nil {
log.Printf("grpclb: deregister '%s' failed: %s", serviceKey, err.Error())
} else {
log.Printf("grpclb: deregister '%s' ok.", serviceKey)
}
return err
}
4)接口描述文件:helloworld.proto
syntax = "proto3";
option java_multiple_files = true;
option java_package = "com.midea.jr.test.grpc";
option java_outer_classname = "HelloWorldProto";
option objc_class_prefix = "HLW";
package helloworld;
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {
}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
5)实现服务端接口:helloworldserver.go
package main
import (
"flag"
"fmt"
"log"
"net"
"os"
"os/signal"
"syscall"
"time"
"golang.org/x/net/context"
"google.golang.org/grpc"
grpclb "com.midea/jr/grpclb/naming/etcd/v3"
"com.midea/jr/grpclb/example/pb"
)
var (
serv = flag.String("service", "hello_service", "service name")
port = flag.Int("port", 50001, "listening port")
reg = flag.String("reg", "http://127.0.0.1:2379", "register etcd address")
)
func main() {
flag.Parse()
lis, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", *port))
if err != nil {
panic(err)
}
err = grpclb.Register(*serv, "127.0.0.1", *port, *reg, time.Second*10, 15)
if err != nil {
panic(err)
}
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT)
go func() {
s := <-ch
log.Printf("receive signal '%v'", s)
grpclb.UnRegister()
os.Exit(1)
}()
log.Printf("starting hello service at %d", *port)
s := grpc.NewServer()
pb.RegisterGreeterServer(s, &server{})
s.Serve(lis)
}
// server is used to implement helloworld.GreeterServer.
type server struct{}
// SayHello implements helloworld.GreeterServer
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
fmt.Printf("%v: Receive is %s\n", time.Now(), in.Name)
return &pb.HelloReply{Message: "Hello " + in.Name}, nil
}
6)实现客户端接口:helloworldclient.go
package main
import (
"flag"
"fmt"
"time"
grpclb "com.midea/jr/grpclb/naming/etcd/v3"
"com.midea/jr/grpclb/example/pb"
"golang.org/x/net/context"
"google.golang.org/grpc"
"strconv"
)
var (
serv = flag.String("service", "hello_service", "service name")
reg = flag.String("reg", "http://127.0.0.1:2379", "register etcd address")
)
func main() {
flag.Parse()
r := grpclb.NewResolver(*serv)
b := grpc.RoundRobin(r)
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
conn, err := grpc.DialContext(ctx, *reg, grpc.WithInsecure(), grpc.WithBalancer(b))
if err != nil {
panic(err)
}
ticker := time.NewTicker(1 * time.Second)
for t := range ticker.C {
client := pb.NewGreeterClient(conn)
resp, err := client.SayHello(context.Background(), &pb.HelloRequest{Name: "world " + strconv.Itoa(t.Second())})
if err == nil {
fmt.Printf("%v: Reply is %s\n", t, resp.Message)
}
}
}
7)运行测试
运行3个服务端S1、S2、S3,1个客户端C,观察各服务端接收的请求数是否相等?
关闭1个服务端S1,观察请求是否会转移到另外2个服务端?
重新启动S1服务端,观察另外2个服务端请求是否会平均分配到S1?
关闭Etcd3服务器,观察客户端与服务端通信是否正常?
关闭通信仍然正常,但新服务端不会注册进来,服务端掉线了也无法摘除掉。
重新启动Etcd3服务器,服务端上下线可自动恢复正常。
关闭所有服务端,客户端请求将被阻塞。
基本原理
Server端接收到客户端建立的连接后,使用一个goroutine专门处理此客户端的连接(即一个tcp连接或者说一个http2连接),所以同一个grpc客户端连接上服务端后,后续的请求都是通过同一个tcp连接。
客户端和服务端的连接在应用层由Transport抽象(类似通常多路复用实现中的封装的channel),在客户端是ClientTransport,在服务端是ServerTransport。Server端接收到一个客户端的http2请求后即打开一个新的流,ClientTransport和ServerTransport之间使用这个新打开的流以http2帧的形式交换数据。
客户端的每个http2请求会打开一个新的流。流可以从两边关闭,对于单次请求来说,客户端会主动关闭流,对于流式请求客户端不会主动关闭(即使使用了CloseSend也只是发送了数据发送结束的标识,还是由服务端关闭)。
gRPC-Go中的单次方法和流式方法
无论是单次方法还是流式方法,服务端在调用完用户的处理逻辑函数返回后,都会关闭流(这也是为什么ServerStream不需要实现CloseSend的原因)。区别只是对于服务端的流式方法来说,可循环多次读取这个流中的帧数据并处理,以此”复用”这个流。
客户端如果是流式方法,需要显示调用CloseSend,表示数据发送的结束
rpc的调用服务端是使用了workpool,不管是正常的请求,还是流式调用,每个request或者stream就是一个work,其实流式就是相当于有很长的传参或者返回参数,原理都是一样的,stream就是批量处理的概念。具体实现应该是封装好了,需要去查看源码。