rpc rpc 意为远程过程调用
, http, grpc 广义上讲都是rpc。 而且还有个项目叫grpc-gateway , 可以将grpc通过http的方式暴露。
grpc grpc 是rpc的一种实现,由google开源,其他还有thrift, sogorpc 等等。 并且grpc使用的http/2协议
http/1.1 与 http/2 的区别
2使用二进制,而1.1使用文本,提高效率
2将相同的tcp连接合并为一个请求,提高性能,而1.1则为每个请求创建tcp连接
2的客户端使用流,这样可以多次请求
2含有trailers,也就是尾部消息,可以用来发送body的checksume等, 当然也可以直接放到body里 …
而1.1中也已经实现服务端到客户端的流,使用’Transfer-Encoding=chunked’来替代’Content-Length’,详见rfc
1 2 A sender MUST NOT send a Content-Length header field in any message that contains a Transfer-Encoding header field.
认识proto文件 proto 文件中多个service和单个service 区别 在同一个service里的方法会codegen到同一个类,但这个类比较鸡肋。 由于RPC调用是RESTful的,所以多次调用或者多个rpc方法无法通过同一个service来共享数据,这需要使用者借助其他办法来解决。
service 还可以用以隔离相同名称的rpc, 如
service1/helloworld
service2/helloworld
而方法和方法通过RpcServiceMethod
来保存,而通过index来调用
1 2 ::grpc::Service::RequestAsyncUnary(0, context, request, response, new_call_cq, notification_cq, tag); ::grpc::Service::RequestAsyncUnary(1, context, request, response, new_call_cq, notification_cq, tag);
rpc 声明UnaryCall&StreamingCall 非流调用也称为UnaryCall
,指发送或接受的消息大小是固定的。 流调用称为StreamingCall
,可以多次发送或者接收,所以消息大小并不固定。
StreamCall 可以多次调用,直到发送WriteDone/Finish
,所以在接受的一端总是
grpc支持客户端流服务端非流、客户端非流、服务端流以及双向流,而普通的就是客户端和服务端都不流NORMAL_RPC(unary call)
grpc::internal::RpcMethod::NORMAL_RPC
grpc::internal::RpcMethod::RpcType::SERVER_STREAMING
grpc::internal::RpcMethod::RpcType::CLIENT_STREAMING
grpc::internal::RpcMethod::RpcType::BIDI_STREAMING
认识pb.h和grpc.pb.h文件 protoc 调用grpc_cpp_plugin
插件生成grpc.pb.{h,cc}文件,生成rpc方法的实现
pb.{h,cc}则是定义了protobuf消息的序列化和反序列化方法
反射、序列化和反序列化的实现 pb.h 实现grpc的请求参数和返回参数的特定语言的解析,还有pb的通用方法, 例如: has_xx
(版本3里只有自定义类型才支持), class XXX_CPP_API
生成的class都继承自google::protobuf::Message
1 2 3 4 5 class HelloRequest PROTOBUF_FINAL : public ::PROTOBUF_NAMESPACE_ID::Message #define PROTOBUF_NAMESPACE "google::protobuf" #define PROTOBUF_NAMESPACE_ID google::protobuf
而在message
中有注释说明, 关键函数是SerializeToString
和ParseFromString
,还有个array版本SerializeToArray
, 还有一个反射函数GetDescriptor()
用来动态获取指定槽位的数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 // Example usage: // // Say you have a message defined as: // // message Foo { // optional string text = 1; // repeated int32 numbers = 2; // } // // Then, if you used the protocol compiler to generate a class from the above // definition, you could use it like so: // // std::string data; // Will store a serialized version of the message. // // { // // Create a message and serialize it. // Foo foo; // foo.set_text("Hello World!"); // foo.add_numbers(1); // foo.add_numbers(5); // foo.add_numbers(42); // // foo.SerializeToString(&data); // } // // { // // Parse the serialized message and check that it contains the // // correct data. // Foo foo; // foo.ParseFromString(data); // // assert(foo.text() == "Hello World!"); // assert(foo.numbers_size() == 3); // assert(foo.numbers(0) == 1); // assert(foo.numbers(1) == 5); // assert(foo.numbers(2) == 42); // }
如下可以将Message转换为基本类型
1 2 3 int size = reqMsg.ByteSizeLong(); char* array = new char[size]; reqMsg.SerializeToArray(array, size);
1 2 3 std::string bytes = reqMsg.SerializeAsString(); const char* array = bytes.data(); int size = bytes.size();
进一步看protobuf::message
继承自protobuf::message_lite
, 后者实现了SerializeAsString
和SerializeToArray
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 inline uint8* SerializeToArrayImpl(const MessageLite& msg, uint8* target, int size) { constexpr bool debug = false; if (debug) { // Force serialization to a stream with a block size of 1, which forces // all writes to the stream to cross buffers triggering all fallback paths // in the unittests when serializing to string / array. io::ArrayOutputStream stream(target, size, 1); uint8* ptr; io::EpsCopyOutputStream out( &stream, io::CodedOutputStream::IsDefaultSerializationDeterministic(), &ptr); ptr = msg._InternalSerialize(ptr, &out); out.Trim(ptr); GOOGLE_DCHECK(!out.HadError() && stream.ByteCount() == size); return target + size; } else { io::EpsCopyOutputStream out( target, size, io::CodedOutputStream::IsDefaultSerializationDeterministic()); 实际调用-> auto res = msg._InternalSerialize(target, &out); GOOGLE_DCHECK(target + size == res); return res; } }
可见,其实序列化最终调用的是pb.h文件里定义的_InternalSerialize
, 举例官方例子HelloRequest
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 ::PROTOBUF_NAMESPACE_ID::uint8* HelloRequest::_InternalSerialize( ::PROTOBUF_NAMESPACE_ID::uint8* target, ::PROTOBUF_NAMESPACE_ID::io::EpsCopyOutputStream* stream) const { // @@protoc_insertion_point(serialize_to_array_start:helloworld.HelloRequest) ::PROTOBUF_NAMESPACE_ID::uint32 cached_has_bits = 0; (void) cached_has_bits; // string name = 1; if (this->name().size() > 0) { ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::VerifyUtf8String( this->_internal_name().data(), static_cast<int>(this->_internal_name().length()), ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::SERIALIZE, "helloworld.HelloRequest.name"); target = stream->WriteStringMaybeAliased( 1, this->_internal_name(), target); } if (PROTOBUF_PREDICT_FALSE(_internal_metadata_.have_unknown_fields())) { target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormat::InternalSerializeUnknownFieldsToA rray( _internal_metadata_.unknown_fields<::PROTOBUF_NAMESPACE_ID::UnknownFieldSet>(::PROTOB UF_NAMESPACE_ID::UnknownFieldSet::default_instance), target, stream); } // @@protoc_insertion_point(serialize_to_array_end:helloworld.HelloRequest) return target; }
grpc.pb生成的代码实现rpc调用 生成的框架代码用来继承实现Service和获取stub来发起rpc call。实际上这些代码并不是必须的 在下面讲了如何使用几个工厂类来创建Stub,还有直接new
出Service
1 2 3 4 5 6 7 8 9 10 11 12 13 class XXXServer { // 客户端使用的桩 class Stub // base class Service // 各种版本的rpc包装,但都继承自base class WithAsyncMethod_XXX typedef WithAsyncMethod_XXX<Service > AsyncService; typedef ExperimentalWithCallbackMethod_XXX<Service > CallbackService; class WithGenericMethod_XXX class WithRawMethod_XXX typedef WithStreamedUnaryMethod_XXX<Service > StreamedUnaryService; }
同步与异步 grpc 的异步即为使用cq事件驱动(cq-based),使用tag标记事件。另外还有callback方式
对于客户端 同步时,通过调用’::grpc::internal::BlockingUnaryCall’ 异步时,创建’ClientAsyncResponseReader’(非流), 然后通过调用’ClientAsyncResponseReader’的write和finish,并等待tag 当存在流时分别是
::grpc::ClientAsyncReader
::grpc::ClientAsyncWriter
::grpc::ClientAsyncReaderWriter
这些类型可用对应的工厂类来创建, 生成代码的stub也是这么用的
1 2 3 class ClientReaderFactory class ClientWriterFactory class ClientReaderWriterFactory
对于服务端 同步时,通过’AddMethod’来注册,生成代码会在父类构造时执行。注册后由grpc调用
1 2 3 4 5 6 7 8 9 10 11 12 Greeter::Service::Service() { AddMethod(new ::grpc::internal::RpcServiceMethod( Greeter_method_names[0], ::grpc::internal::RpcMethod::NORMAL_RPC, new ::grpc::internal::RpcMethodHandler< Greeter::Service, ::helloworld::HelloRequest, ::helloworld::HelloReply>( [](Greeter::Service* service, ::grpc_impl::ServerContext* ctx, const ::helloworld::HelloRequest* req, ::helloworld::HelloReply* resp) { return service->SayHello(ctx, req, resp); }, this))); }
异步时,类似客户端
grpc::ServerAsyncReaderWriter
grpc::ServerAsyncReader
grpc::ServerAsyncWriter
可见服务端是直接new出来的,异步时这些io操作对象也是直接new出来的, 在调用以下时传入
1 2 3 RequestAsyncBidiStreaming RequestAsyncClientStreaming RequestAsyncServerStreaming
grpc callback 只在客户端使用,callback方式的请求可以传入一个lambda, 在请求完成时调用
1 2 3 4 5 6 7 stub_->async()->SayHello(&context, &request, &reply, [&mu, &cv, &done, &status](Status s) { status = std::move(s); std::lock_guard<std::mutex> lock(mu); done = true; cv.notify_one(); });
新版本的grpc已经将实验性的标记去除,说明此方式成熟了
1 2 3 4 5 #ifdef GRPC_CALLBACK_API_NONEXPERIMENTAL ::grpc::Service:: #else ::grpc::Service::experimental(). #endif
grpc异步流 官方仓库的示例代码没有异步且流的, 在实际项目中用到异步流,使用大概方法
手动创建writereader
启动时,调用’grpc::Service::RequestAsyncBidiStreaming’ 和 ‘grpc::Service::RequestAsyncClientStreaming’ 以及’RequestAsyncServerStreaming’, 向cq塞请求new_connection事件
收到’new_connection’事件返回后,再调用read事件。
一共有5个类型
1 new_connection, read, write, finish, done
我写了一个demo grpcstreamhelloworld
grpc 消息大小 老版本的grpc中,发送端是支持无限大小的,但接受端只能是4M
1 2 #define GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH -1 #define GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH (4 * 1024 * 1024)
服务端代码
1 2 3 4 std::unique_ptr<Server> ServerBuilder::BuildAndStart() { if (max_receive_message_size_ >= 0) { args.SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, max_receive_message_size_); }
但在新版grpc中变了
1 2 3 4 5 6 7 8 std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() { grpc::ChannelArguments args; if (max_receive_message_size_ >= -1) { args.SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, max_receive_message_size_); } if (max_send_message_size_ >= -1) { args.SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, max_send_message_size_); }
grpc 编译安装的问题 https://github.com/grpc/grpc/issues/13841
grpc异步存在问题 因为异步服务端通过completionqueue来通知rpc执行结果和执行下次调用,通常使用多queue和多线程的方式提高处理效率
通常情况是多queue, 即每个service对应一个queue, 而每个service又有多个rpc,线程去轮询这个complete_queue。这样导致高线程切换开销,而且complete_queue也占用大量内存
多线程,queue可以用多个线程去轮询,但0.13版本可能出现bug
grpc异步流存在的问题 grpc区别与其他框架很大一个优势是支持异步流,即可以多次请求和多次回复。异步是基于cq的事件驱动,所以必须等待tag回调, 连续两次发送会异常。 而真正的请求一般在业务模块处理, 不知道tag的状态即不知道是否正在发送, 那么如何在cq回调外发送消息呢?
办法是维护一个发送队列,消息先存队列里,等待cq回调时取出发送。 另外由于流同步需要显式发送结束标记(服务端调Stream::Finish, 客户端调用WriteDown和Finish), 所以需要有一个特殊消息加以区分,通常用空指针,也可以设置结束标志。另外由于发送代码会同时被业务调用和cq回调,需要对发送代码加锁
调试grpc 通过设置环境变量,让grpc向控制台打印详细信息
1 2 3 4 5 export GRPC_VERBOSITY=DEBUG bash-5.0# ./build/bin/hasync slave stdin stdout @127.0.0.1:7615 D1026 08:27:44.142802149 24658 ev_posix.cc:174] Using polling engine: epollex D1026 08:27:44.143406685 24658 dns_resolver_ares.cc:490] Using ares dns resolver I1026 08:27:44.158115785 24658 server_builder.cc:332] Synchronous server. Num CQs: 1, Min pollers: 1, Max Pollers: 2, CQ timeout (msec): 10000
项目实践 项目使用客户端异步/同步,服务端全异步, 可以兼容四种传输方式
引用 https://grpc.github.io/grpc/cpp/grpcpp_2impl_2codegen_2sync__stream_8h_source.html https://grpc.github.io/grpc/cpp/grpcpp_2impl_2codegen_2byte__buffer_8h_source.html https://grpc.github.io/grpc/cpp/call__op__set_8h_source.html