setup sftp service
curlfs 和 sshfs 客户端
在debian上,之前一直用curlfs工具,将远程目录mount到本地目录。系统升级之后发现这个包没有了, 原来因为ftp不安全,所以现在推荐sshfs。
reference
https://www.linuxtechi.com/configure-sftp-chroot-debian10/
在debian上,之前一直用curlfs工具,将远程目录mount到本地目录。系统升级之后发现这个包没有了, 原来因为ftp不安全,所以现在推荐sshfs。
https://www.linuxtechi.com/configure-sftp-chroot-debian10/
# accept tupe and list
def calc(numbers):
sum = 0
for n in numbers:
sum = sum + n
print(sum)
calc([1,2])
calc((1,2,3))
# accept varidict argument
def calc2(*numbers):
sum = 0
for n in numbers:
sum = sum + n
return sum
calc2(0)
calc2(1,2,3,4)
numbers = [1,2,3]
# transform list to varidcit argument
print(calc(*numbers))
# varidict named arguments will be transformed to dict
# only accept named varidict argument
def person2(**kw):
if 'hello' in kw:
passs
print(kw)
# accept compose argument in sequence: normal argument & named varidict argument
def person(name, age, **kw):
print('name', name, 'age', age, 'other', kw)
person('mike',12)
person('mike',12, city='sh')
person('mike',12, city='sh', birth=1990)
person2(name='mike',age = 12, city='sh', birth=1990)
# compose normal type argument, varidict type argument, and varidict named type argument
def f2(a, b, c=0, *d, **kw):
print('a =', a, 'b =', b, 'c =', c, 'd =', d, 'kw =', kw)
cs=[3,4]
f2(1,2,*cs,hello='hello')
# use * to sepcify named type argument
def f3(a,b,c,*,d,e):
print('a =', a, 'b =', b, 'c =', c, 'd =', d, 'e =', e)
# don't accept other named argument
f3(1,2,3,d=4,e=5)
# use tupe and dict can call all kind of function
# so *arg and **kw are always used to forward all argument to another function of any type of argument
args = (1,2,3)
kw = {'c':1,'d':2}
f2(*arg, **kw)
# at last, use base type for default value
Linux的命令行是非常强大的生产工具,现实中的问题好多可以不写复杂的编程代码而通过命令来解决, 而且不过几行代码而已
当执行which的时候,比如which echo
会输出'shell built-in command'而不是返回路径,就说明这个命令是内建命令
shell在执行内建命令时是直接执行,而非内建命令则fork一个子进程来执行
xargs可以将输入多进程并发处理,配合find
非常厉害。 配合wget就是一个并发爬虫了
EXAMPLES
find /tmp -name core -type f -print | xargs /bin/rm -f
Find files named core in or below the directory /tmp and delete them. Note that this will work incorrectly if there are any filenames containing newlines or spaces.
find /tmp -name core -type f -print0 | xargs -0 /bin/rm -f
Find files named core in or below the directory /tmp and delete them, processing filenames in such a way that file or directory names containing spaces or newlines are correctly
handled.
find /tmp -depth -name core -type f -delete
Find files named core in or below the directory /tmp and delete them, but more efficiently than in the previous example (because we avoid the need to use fork(2) and exec(2) to
launch rm and we don't need the extra xargs process).
cut -d: -f1 < /etc/passwd | sort | xargs echo
Generates a compact listing of all the users on the system.
ls 本来是行分隔的 awk将ls的换行符变为|分割
ls | awk '{ ORS="|"; print; }'
echo $(ls)
则把ls的换行符变为空格
declare可以指定变量的类型'-i'为整形,'-r'制度(等同shell的readonly),'-g'全局变量
shell默认是字符串,使用'-i'后,数学运算不需要let
#retry=0
declare -i retry=0
while [ $retry -lt 30 ] ; do
ps aux --cols=1024 | grep xxx
if [ $? -eq 0 ] ; then
exit 0
fi
sleep 1
#let retry=$retry+1
retry=$retry+1
done
因为很多函数需要处理error, 当存在全局变量时,可能会覆盖全局变量
var global_var int = 1
func foo(){
// 此时覆盖了全局变量
global_var, err := os.OpenFile("file")
}
var global_var int = 1
func foo(){
var err error
global_var, error = os.OpenFile("file")
}
数组和slice使用上如同c++里的std::array和std::vector, 数组的长度固定,slice长度可以2倍扩容
但有传参上有大区别: golang中数组是值传递,相当于将整个数组复制一遍,而slice是引用传递,所以slice用得多,map同理
定义数组和slice:
// 数组
var a [3] int
arr := [5] int {1,2,3,4,5}
var array2 = [...]int {6,7,8}
q := [...] int {1,2,3} // 省略长度
q2 := [...] int {99, -1} // 前99项为0,第100个元素是-1
// slice
s1 := []int{1, 2, 3}
a := [10]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 0} //a是数组
s2 := a[2:8]
s3 := make([]int, 10, 20)
两者很多时候可以相互替换,但是在不同类型上,有区别
对于基本类型string, int, boolean, 数组,var声明
即初始化
var str string // 自动初始化为空字符串
var inter int // 自动初始化为0
var bo bool // 自动初始化为false
// 可以直接使用
fmt.Printf("%s %d %t", str, inter, bo)
而对于slice, map, chann类型而言, 使用var m map[int]string
只是声明,需要再用make
来获得内存和初始化
var m map[int] string // 此时不能使用m
m = make(map[int] string){1:"a",2:"b"}
fmt.Println(m[1])
m := make(map[int] string){1:"a", 2:"b"}
m := map[int] string{1:"a", 2:"b"}
vscode使用clangd作为c++的lsp非常好用,可以支持跳转、补全、clang-tidy、提示,但同时也对配置有要求,我的12Gi内存经常不够用而非常卡
于是摸索了一下,发现有两个问题:
1. clangd会占用很多内存
2. clangd在处理代码文件时会产生pch文件默认放到/tmp目录占用内存
这里会说怎么解决这2个问题,也顺便用到systemd-tmpfiles工具
由于代码比较复杂,随便跳转几个文件就开始感到卡顿,一会就不能动了。切换X界面到终端, 执行free
命令,可以看到shared和buff/cache两个部分占用很大内存
~ > free -h
total used free shared buff/cache available
Mem: 11Gi 7.1Gi 721Mi 1.0Gi 3.7Gi 3.1Gi
Swap: 0B 0B 0B
通过man可以知道这几项的意思, 可见share在我的系统指'/tmp', 因为tmpfs挂载在/tmp目录
shared
Memory used (mostly) by tmpfs (Shmem in /proc/meminfo)
buffers
Memory used by kernel buffers (Buffers in /proc/meminfo)
cache
Memory used by the page cache and slabs (Cached and SReclaimable in /proc/meminfo)
buff/cache
Sum of buffers and cache
cat /proc/meminfo
查看到buff占用不大,不是内核问题(因为我用的最新内核,所以抱有怀疑态度)
~ > cat /proc/meminfo | grep "Buffers\|Cached"
Buffers: 308440 kB
Cached: 3297168 kB
所以主要是/tmp目录占用内存过大,以及clangd占用内存过大
-background-index=0
时,才能避免内存占用过大~ > ls /tmp/*.pch -lh
-rw-r--r-- 1 100M May 13 11:18 /tmp/preamble-133598.pch
-rw-r--r-- 1 104M May 13 11:17 /tmp/preamble-457bd6.pch
-rw-r--r-- 1 104M May 13 11:17 /tmp/preamble-465a46.pch
-rw-r--r-- 1 104M May 13 11:18 /tmp/preamble-99f5e5.pch
-rw-r--r-- 1 99M May 13 11:17 /tmp/preamble-b56fad.pch
systemd-tmpfiles 通过systemd-tmpfiles-clean.timer
定时调用systemd-tmpfiles-clean.service
, 来清理零时文件,且是默认启动的。
虽然/tmp目录使用的内存挂载tmpfs
(也有写发行版使用物理磁盘来挂载),在重启时会清空,但对于服务器这种常年不会重启的系统,就需要这样的机制清理内存/磁盘。
timer默认的配置文件,OnBootSec=15min
表示在系统启动15分钟后开始执行,而OnUnitActiveSec=1d
表示在执行一次后,1天后再执行
~ > cat /usr/lib/systemd/system/systemd-tmpfiles-clean.timer
# SPDX-License-Identifier: LGPL-2.1-or-later
#
# This file is part of systemd.
#
# systemd is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
[Unit]
Description=Daily Cleanup of Temporary Directories
Documentation=man:tmpfiles.d(5) man:systemd-tmpfiles(8)
[Timer]
OnBootSec=15min
OnUnitActiveSec=1d
上面的timer会定时调用systemd-tmpfiles-clean.service
这个服务
~ > sudo systemctl status systemd-tmpfiles-clean.service
○ systemd-tmpfiles-clean.service - Cleanup of Temporary Directories
Loaded: loaded (/usr/lib/systemd/system/systemd-tmpfiles-clean.service; static)
Active: inactive (dead) since Wed 2021-05-12 16:37:27 HKT; 22h ago
TriggeredBy: ● systemd-tmpfiles-clean.timer
Docs: man:tmpfiles.d(5)
man:systemd-tmpfiles(8)
Process: 36637 ExecStart=systemd-tmpfiles --clean (code=exited, status=0/SUCCESS)
上面是timer的设置,这个timer适合服务器,但并不建议修改它们。而是添加规则,再手动调用systemd-tmpfiles --clean
配置规则的方法可以查看man tmpfiles.d 5
,但只建议在/etc 和 ~/ 目录添加规则, 前者对所有用户生效,后者只对自己生效
TMPFILES.D(5) tmpfiles.d TMPFILES.D(5)
NAME
tmpfiles.d - Configuration for creation, deletion and cleaning of volatile and temporary files
SYNOPSIS
/etc/tmpfiles.d/*.conf
/run/tmpfiles.d/*.conf
/usr/lib/tmpfiles.d/*.conf
~/.config/user-tmpfiles.d/*.conf
$XDG_RUNTIME_DIR/user-tmpfiles.d/*.conf
~/.local/share/user-tmpfiles.d/*.conf
直接从/usr/lib/tmpfs.d/ 找合适的配置,拷贝到自己的目录再改
删除/tmp/*.pch文件的效果
~ > free
total used free shared buff/cache available
Mem: 12158548 7808240 854296 1010820 3496012 3019688
Swap: 0 0 0
~ > rm /tmp/*.pch
~ > free
total used free shared buff/cache available
Mem: 12158548 7804356 1169052 699936 3185140 3334460
Swap: 0 0 0
shared项少了许多, 再设置background-index=0
后,内存占用大大减小
使用systemd-tmpfs 来清除文件的方式不太适合这种场景, 但其本身是非常功能强大的管理工具
存储周期类型: 有关变量的创建和销毁 链接类型: 有关变量函数的内存位置 作用域: 有关变量函数的可见范围
本文讨论的标识符,包括变量和函数
存储说明符
控制变量何时分配和释放,有以下几种
说明 - automatic: 最常见的局部变量,且没有声明为static或者thread_local,位于栈上, 随着代码块的执行和结束而自动分配和销毁 - static: 静态变量, 在程序启动和结束时创建和销毁,但初始化是在第一次执行初始化代码时执行 - thread: 在线程开始和结束时分配和销毁 - dynamic: 最常见的堆上的变量, 需要执行new和delete,
auto 在c++11中不是声明存储周期,而是类型推导符, 但这种存储周期类型的依然存在(局部变量)
标识符(变量&函数)用一块内存里的值或者函数体来表示的, 而linkage决定其他相同的标识符是否指向同一块内存。c/c++有3种linkage, no-linkage, internal linkage和external linkage
static
声明internal linkage,所以允许在不同文件声明两个名称&类型相同的internal linkage 标识符,他们指向不同的内存单元。 static
没有影响extern
声明这个变量,就能使用指向同一内存的变量extern
声明这个函数(可省),就能使用指向同一内存的函数可见static
和extern
即表示存储周期,又表示linkage, static相对简单,extern则比较复杂,如以下情况
int g_x = 1; // 定义有初始化的全局变量(可加可不加extern)
int g_x; // 定义没有初始化的全局变量(不可加extern),可选初始化
extern int g_x; // 前置声明一个全局变量,不可初始化
extern const int g_y { 1 }; // 定义全局常量,const必须初始化
extern const int g_y; // 前置声明全局常量,不可初始化
所以若是定义未初始化的全局变量,不能加extern,不然就成了前置声明了。
虽然通过给constexpr添加extern
修饰符来让其具备external属性,但不能在其他文件前置声明。因为constexpr是在编译期替换的,编译器(compile)的可见域限定在文件内,所以编译期无法知道constexpr的值,所以在编译期无法获取到其内存单元的值, 也就无法在其他文件进行声明,只能定义。
局部变量的scope、no-linkage以及duration相同,从{
开始到}
结束。 理论上global scope涵盖了file scope。而linkage来规定其是否能在其他文件里使用。
local class 不允许有static data member
https://en.cppreference.com/w/cpp/language/storage_duration
系统原厂商是不喜欢讨论系统调优的,一方面说起来没完没了,二来比较复杂,而且私以为调优即说明系统默认不够好?
而且SUSE的原厂规定:
原理机制的介绍及系统调优并不在我们的技术支持范畴
这里是一点相关介绍
buffer是用于存放将要输出到disk(块设备)的数据,而cache是存放从disk上读出的数据。二者都是为提高IO性能而设计的。
- buffer:缓冲将数据缓冲下来,解决速度慢和快的交接问题;速度快的需要通过缓冲区将数据一点一点传给速度慢的区域。
例如:从内存中将数据往硬盘中写入,并不是直接写入,而是缓冲到一定大小之后刷入硬盘中。
A buffer is something that has yet to be "written" to disk.
总之buff和cache都是内存和硬盘之间的过渡,前者是写入磁盘方向,而后者是写入内存方向
drop_caches回收一下。
#sync;sync;sync
#echo 3 > /proc/sys/vm/drop_caches
Swap意思是交换分区,是硬盘中的一个分区。内核将内存Page移出内存到swap分区(swap out)
swap通过 vm.swappiness 这个内核参数控制,默认值是60。cat /proc/sys/vm/swappiness
可以查看当前值
这个参数控制内核使用swap的优先级。该参数从0到100。
设置该参数为0,表示只要有可能就尽力避免交换进程移出物理内存;
设置该参数为100,这告诉内核疯狂的将swapout物理内存移到swap分区。
注意:设置该参数为0,并不代表禁用swap分区,只是告诉内核,能少用到swap分区就尽量少用到,设置vm.swappiness=100的话,则表示尽量使用swap分区。
这里面涉及到当然还涉swappiness及到复杂的算法。如果以为所有物理内在用完之后,再使用swap, 实事并不是这样。以前曾经遇到过,物理内存只剩下10M了,但是依然没有使用Swap交换空间,另外一台服务器,物理内存还剩下15G,居然用了一点点Swap交换空间。 其实少量使用Swap交换空间是不会影响性能,只有当内存资源出现瓶颈或者内存泄露,进程异常时导致频繁、大量使用交换分区才会导致严重性能问题。
这个问题如上面说的,比较难说,理论上是当物理内存不够用的时候,又需要读入内存时,会将一些长时间不用的程序的内存Page 交换出去。
但是很多时候会发现,内核即使在内存充足的情况下也是使用到swap
可以看下面的测试
swapoff 之后执行sudo sysctl vm.swappiness=0
临时让内核不用swapout
并把swap的数据加载内存,并重启swap
#swapoff -a
#swapon -a
5.10.0-8-amd64
total used free shared buff/cache available
Mem: 12162380 4911564 5605744 459364 1645072 6466572
Swap: 1000444 763040 237404
重启swap后
total used free shared buff/cache available
Mem: 12162380 5605800 4843176 524984 1713404 5707112
Swap: 1000444 0 1000444
可见,停用swap后,swap的used大部分到了mem的used
,小部分到了Mem的shared
perf + flame火焰图: 查看运行耗时,可以查看函数调用耗时,如果是自己的程序,可以知道哪些函数需要优化
vmstat 查看磁盘io情况,使用vmstat -t 3
命令,如果b状态的数字一直很大,那么说明磁盘阻塞严重,可能是磁盘坏了,可能是程序设计不合理
还有top,iperf等等
"""
更新
"""
parser = ArgumentParser(description=__description__,
epilog=__doc__, formatter_class=RawTextHelpFormatter)
parser.add_argument('-v', '--version',
action='version', version=__version__)
parser.add_argument('-c', '--config',
default="config.json", help="run with config file [配置文件路径]")
config_file = parser.parse_args().config
get_config(path=config_file)
# Dynamicly import the dns module as configuration
dns_provider = str(get_config('dns', 'dnspod').lower())
dns = getattr(__import__('dns', fromlist=[dns_provider]), dns_provider)
dns.Config.ID = get_config('id')
dns.Config.TOKEN = get_config('token')
dns.Config.TTL = get_config('ttl')
if get_config('debug'):
ip.DEBUG = get_config('debug')
basicConfig(
level=DEBUG,
format='%(asctime)s <%(module)s.%(funcName)s> %(lineno)d@%(pathname)s \n[%(levelname)s] %(message)s')
print("DDNS[", __version__, "] run:", os_name, sys.platform)
print("Configuration was loaded from <==", path.abspath(config_file))
print("=" * 25, ctime(), "=" * 25, sep=' ')
proxy = get_config('proxy') or 'DIRECT'
proxy_list = proxy.strip('; ') .split(';')
cache = get_config('cache', True) and Cache(CACHE_FILE)
if cache is False:
info("Cache is disabled!")
elif get_config.time >= cache.time:
warning("Cache file is out of dated.")
cache.clear()
elif not cache:
debug("Cache is empty.")
update_ip('4', cache, dns, proxy_list)
update_ip('6', cache, dns, proxy_list)
if __name__ == '__main__':
main()
{
"$schema": "https://ddns.newfuture.cc/schema/v2.8.json",
"id": "",
"token": "",
"dns": "alidns",
"ipv4": ["", ""],
"index4": "public",
"ttl": 600,
"proxy": "DIRECT",
"debug": false
}
Provide an asynchronous gRPC API for C++ in which the completion of RPC actions in the library will result in callbacks to user code,
Since its initial release, gRPC has provided two C++ APIs:
The goal of the synchronous version is to be easy to program. However, this comes at the cost of high thread-switching overhead and high thread storage for systems with many concurrent RPCs. On the other hand, the asynchronous API allows the application full control over its threading and thus can scale further. The biggest problem with the asynchronous API is that it is just difficult to use. Server RPCs must be explicitly requested, RPC polling must be explicitly controlled by the application, lifetime management is complicated, etc. These have proved sufficiently difficult that the full features of the asynchronous API are basically never used by applications. Even if one can use the async API correctly, it also presents challenges in deciding how many completion queues to use and how many threads to use for polling them, as one can either optimize for reducing thread hops, avoiding stranding, reducing CQ contention, or improving locality. These goals are often in conflict and require substantial tuning.
The callback API is designed to have the performance and thread scalability of an asynchronous API without the burdensome programming model of the completion-queue-based model. In particular, the following are fundamental guiding principles of the API:
The most general form of the callback API is built around a reactor model. Each type of RPC has a reactor base class provided by the library. These types are:
ClientUnaryReactor
and ServerUnaryReactor
for unary RPCsClientBidiReactor
and ServerBidiReactor
for bidi-streaming RPCsClientReadReactor
and ServerWriteReactor
for server-streaming RPCsClientWriteReactor
and ServerReadReactor
for client-streaming RPCsClient RPC invocations from a stub provide a reactor pointer as one of their arguments, and the method handler of a server RPC must return a reactor pointer.
These base classes provide three types of methods:
void
return type. The ReadMessageType
below is the request type for a server RPC and the response type for a client RPC; the WriteMessageType
is the response type for a server RPC or the request type for a client RPC.void StartCall()
: (Client only) Initiates the operations of a call from the client, including sending any client-side initial metadata associated with the RPC. Must be called exactly once. No reads or writes will actually be started until this is called (i.e., any previous calls to StartRead
, StartWrite
, or StartWritesDone
will be queued until StartCall
is invoked). This operation is not needed at the server side since streaming operations at the server are released from backlog automatically by the library as soon as the application returns a reactor from the method handler, and because there is a separate method just for sending initial metadata.void StartSendInitialMetadata()
: (Server only) Sends server-side initial metadata. To be used in cases where initial metadata should be sent without sending a message. Optional; if not called, initial metadata will be sent when StartWrite
or Finish
is called. May not be invoked more than once or after StartWrite
or Finish
has been called. This does not exist at the client because sending initial metadata is part of StartCall
.void StartRead(ReadMessageType*)
: Starts a read of a message into the object pointed to by the argument. OnReadDone
will be invoked when the read is complete. Only one read may be outstanding at any given time for an RPC (though a read and a write can be concurrent with each other). If this operation is invoked by a client before calling StartCall
or by a server before returning from the method handler, it will be queued until one of those events happens and will not actually trigger any activity or reactions until it is thereby released from the queue.void StartWrite(const WriteMessageType*)
: Starts a write of the object pointed to by the argument. OnWriteDone
will be invoked when the write is complete. Only one write may be outstanding at any given time for an RPC (though a read and a write can be concurrent with each other). As with StartRead
, if this operation is invoked by a client before calling StartCall
or by a server before returning from the method handler, it will be queued until one of those events happens and will not actually trigger any activity or reactions until it is thereby released from the queue.void StartWritesDone()
: (Client only) For client RPCs to indicate that there are no more writes coming in this stream. OnWritesDoneDone
will be invoked when this operation is complete. This causes future read operations on the server RPC to indicate that there is no more data available. Highly recommended but technically optional; may not be called more than once per call. As with StartRead
and StartWrite
, if this operation is invoked by a client before calling StartCall
or by a server before returning from the method handler, it will be queued until one of those events happens and will not actually trigger any activity or reactions until it is thereby released from the queue.void Finish(Status)
: (Server only) Sends completion status to the client, asynchronously. Must be called exactly once for all server RPCs, even for those that have already been cancelled. No further operation-initiation methods may be invoked after Finish
.{}
) but may be overridden by the application's reactor definition. These are invoked by the library. All of these have a void
return type. Most take a bool ok
argument to indicate whether the operation completed "normally," as explained below.void OnReadInitialMetadataDone(bool ok)
: (Client only) Invoked by the library to notify that the server has sent an initial metadata response to a client RPC. If ok
is true, then the RPC received initial metadata normally. If it is false, there is no initial metadata either because the call has failed or because the call received a trailers-only response (which means that there was no actual message and that any information normally sent in initial metadata has been dispatched instead to trailing metadata, which is allowed in the gRPC HTTP/2 transport protocol). This reaction is automatically invoked by the library for RPCs of all varieties; it is uncommonly used as an application-defined reaction however.void OnReadDone(bool ok)
: Invoked by the library in response to a StartRead
operation. The ok
argument indicates whether a message was read as expected. A false ok
could mean a failed RPC (e.g., cancellation) or a case where no data is possible because the other side has already ended its writes (e.g., seen at the server-side after the client has called StartWritesDone
).void OnWriteDone(bool ok)
: Invoked by the library in response to a StartWrite
operation. The ok
argument that indicates whether the write was successfully sent; a false value indicates an RPC failure.void OnWritesDoneDone(bool ok)
: (Client only) Invoked by the library in response to a StartWritesDone
operation. The bool ok
argument that indicates whether the writes-done operation was successfully completed; a false value indicates an RPC failure.void OnCancel()
: (Server only) Invoked by the library if an RPC is canceled before it has a chance to successfully send status to the client side. The reaction may be used for any cleanup associated with cancellation or to guide the behavior of other parts of the system (e.g., by setting a flag in the service logic associated with this RPC to stop further processing since the RPC won't be able to send outbound data anyway). Note that servers must call Finish
even for RPCs that have already been canceled as this is required to cleanup all their library state and move them to a state that allows for calling OnDone
.void OnDone(const Status&)
at the client, void OnDone()
at the server: Invoked by the library when all outstanding and required RPC operations are completed for a given RPC. For the client-side, it additionally provides the status of the RPC (either as sent by the server with its Finish
call or as provided by the library to indicate a failure), in which case the signature is void OnDone(const Status&)
. The server version has no argument, and thus has a signature of void OnDone()
. Should be used for any application-level RPC-specific cleanup.OnDone
will always take place after all other reactions. No further RPC operations are permitted to be issued after OnDone
is invoked.Finish
. These methods are invoked by the application logic. All of these have a void
return type.void AddHold()
: (Client only) This prevents the RPC from being considered complete (ready for OnDone
) until each AddHold
on an RPC's reactor is matched to a corresponding RemoveHold
. An application uses this operation before it performs any extra-reaction flows, which refers to streaming operations initiated from outside a reaction method. Note that an RPC cannot complete before StartCall
, so holds are not needed for any extra-reaction flows that take place before StartCall
. As long as there are any holds present on an RPC, though, it may not have OnDone
called on it, even if it has already received server status and has no other operations outstanding. May be called 0 or more times on any client RPC.void AddMultipleHolds(int holds)
: (Client only) Shorthand for holds
invocations of AddHold
.void RemoveHold()
: (Client only) Removes a hold reference on this client RPC. Must be called exactly as many times as AddHold
was called on the RPC, and may not be called more times than AddHold
has been called so far for any RPC. Once all holds have been removed, the server has provided status, and all outstanding or required operations have completed for an RPC, the library will invoke OnDone
for that RPC.Examples are provided in the PR to de-experimentalize the callback API.
As a shortcut, client-side unary RPCs may bypass the reactor model by directly providing a std::function
for the library to call at completion rather than a reactor object pointer. This is passed as the final argument to the stub call, just as the reactor would be in the more general case. This is semantically equivalent to a reactor in which the OnDone
function simply invokes the specified function (but can be implemented in a slightly faster way since such an RPC will definitely not wait separately for initial metadata from the server) and all other reactions are left empty. In practice, this is the common and recommended model for client-side unary RPCs, unless they have a specific need to wait for initial metadata before getting their full response message. As in the reactor model, the function provided as a callback may not include operations that block for an arbitrary amount of time.
Server-side unary RPCs have the option of returning a library-provided default reactor when their method handler is invoked. This is provided by calling DefaultReactor
on the CallbackServerContext
. This default reactor provides a Finish
method, but does not provide a user callback for OnCancel
and OnDone
. In practice, this is the common and recommended model for most server-side unary RPCs unless they specifically need to react to an OnCancel
callback or do cleanup work after the RPC fully completes.
ServerContext
is now made a derived class of ServerContextBase
. There is a new derived class of ServerContextBase
called CallbackServerContext
which provides a few additional functions:
ServerUnaryReactor* DefaultReactor()
may be used by a method handler to return a default reactor from a unary RPC.RpcAllocatorState* GetRpcAllocatorState
: see advanced topics sectionAdditionally, the AsyncNotifyWhenDone
function is not present in the CallbackServerContext
.
All method handler functions for the callback API take a CallbackServerContext*
as their first argument. ServerContext
(used for the sync and CQ-based async APIs) and CallbackServerContext
(used for the callback API) actually use the same underlying structure and thus their object pointers are meaningfully convertible to each other via a static_cast
to ServerContextBase*
. We recommend that any helper functions that need to work across API variants should use a ServerContextBase
pointer or reference as their argument rather than a ServerContext
or CallbackServerContext
pointer or reference. For example, ClientContext::FromServerContext
now uses a ServerContextBase*
as its argument; this is not a breaking API change since the argument is now a parent class of the previous argument's class.
Callback services must allocate an object for the CallbackServerContext
and for the request and response objects of a unary call. Applications can supply a per-method custom memory allocator for gRPC server to use to allocate and deallocate the request and response messages, as well as a per-server custom memory allocator for context objects. These can be used for purposes like early or delayed release, freelist-based allocation, or arena-based allocation. For each unary RPC method, there is a generated method in the server called SetMessageAllocatorFor_*MethodName*
. For each server, there is a method called SetContextAllocator
. Each of these has numerous classes involved, and the best examples for how to use these features lives in the gRPC tests directory.
RegisterCallbackGenericService
is a new method of ServerBuilder
to allow for processing of generic (unparsed) RPCs. This is similar to the pre-existing RegisterAsyncGenericService
but uses the callback API and reactors rather than the CQ-based async API. It is expected to be used primarily for generic gRPC proxies where the exact serialization format or list of supported methods is unknown.
Just as with async services, callback services may also be specified on a method-by-method basis (using the syntax WithCallbackMethod_*MethodName*
), with any unlisted methods being treated as sync RPCs. The shorthand CallbackService
declares every method as being processed by the callback API. For example:
Foo::Service
-- purely synchronous serviceFoo::CallbackService
-- purely callback serviceFoo::WithCallbackMethod_Bar<Service>
-- synchronous service except for callback method Bar
Foo::WithCallbackMethod_Bar<WithCallbackMethod_Baz<Service>>
-- synchronous service except for callback methods Bar
and Baz
Besides the content described in the background section, the rationale also includes early and consistent user demand for this feature as well as the fact that many users were simply spinning up a callback model on top of gRPC's completion queue-based asynchronous model.
There is more than one mechanism available for implementing the background polling required by the C++ callback API. One has been implemented on top of the C++ completion queue API. In this approach, the callback API uses a number of library-owned threads to call Next
on an async CQ that is owned by the internal implementation. Currently, the thread count is automatically selected by the library with no user input and is set to half the system's core count, but no less than 2 and no more than 16. This selection is subject to change in the future based on our team's ongoing performance analysis and tuning efforts. Despite being built on the CQ-based async API, the developer using the callback API does not need to consider any of the CQ details (e.g., shutdown, polling, or even the existence of a CQ).
It is the gRPC team's intention that that implementation is only a temporary solution. A new structure called an EventEngine
is being developed to provide the background threads needed for polling, and this sytem is also intended to provide a direct API for application use. This event engine would also allow the direct use of the core callback API that is currently only used by the Python async implementation. If this solution is adopted, there will be a new gRFC for it. This new implementation will not change the callback API at all but rather will only affect its performance. The C++ code for the callback API already has if
branches in place to support the use of a poller that directly supplies the background threads, so the callback API will naturally layer on top of the EventEngine
without further development effort.
N/A. The gRPC C++ callback API has been used internally at Google for two years now, and the code and API have evolved substantially during that period.
rpc 意为远程过程调用
, http, grpc 广义上讲都是rpc。
而且还有个项目叫grpc-gateway, 可以将grpc通过http的方式暴露。
grpc 是rpc的一种实现,由google开源,其他还有thrift, sogorpc 等等。 并且grpc使用的http/2协议
而1.1中也已经实现服务端到客户端的流,使用'Transfer-Encoding=chunked'来替代'Content-Length',详见rfc
A sender MUST NOT send a Content-Length header field in any message
that contains a Transfer-Encoding header field.
在同一个service里的方法会codegen到同一个类,但这个类比较鸡肋。 由于RPC调用是RESTful的,所以多次调用或者多个rpc方法无法通过同一个service来共享数据,这需要使用者借助其他办法来解决。
service 还可以用以隔离相同名称的rpc, 如 - service1/helloworld - service2/helloworld
而方法和方法通过RpcServiceMethod
来保存,而通过index来调用
::grpc::Service::RequestAsyncUnary(0, context, request, response, new_call_cq, notification_cq, tag);
::grpc::Service::RequestAsyncUnary(1, context, request, response, new_call_cq, notification_cq, tag);
非流调用也称为UnaryCall
,指发送或接受的消息大小是固定的。
流调用称为StreamingCall
,可以多次发送或者接收,所以消息大小并不固定。
StreamCall 可以多次调用,直到发送WriteDone/Finish
,所以在接受的一端总是
while(read stream){}
grpc支持客户端流服务端非流、客户端非流、服务端流以及双向流,而普通的就是客户端和服务端都不流NORMAL_RPC(unary call) - grpc::internal::RpcMethod::NORMAL_RPC - grpc::internal::RpcMethod::RpcType::SERVER_STREAMING - grpc::internal::RpcMethod::RpcType::CLIENT_STREAMING - grpc::internal::RpcMethod::RpcType::BIDI_STREAMING
protoc 调用grpc_cpp_plugin
插件生成grpc.pb.{h,cc}文件,生成rpc方法的实现
pb.{h,cc}则是定义了protobuf消息的序列化和反序列化方法
pb.h 实现grpc的请求参数和返回参数的特定语言的解析,还有pb的通用方法,
例如: has_xx
(版本3里只有自定义类型才支持), class XXX_CPP_API
生成的class都继承自google::protobuf::Message
class HelloRequest PROTOBUF_FINAL :
public ::PROTOBUF_NAMESPACE_ID::Message
#define PROTOBUF_NAMESPACE "google::protobuf"
#define PROTOBUF_NAMESPACE_ID google::protobuf
message
中有注释说明, 关键函数是SerializeToString
和ParseFromString
,还有个array版本SerializeToArray
,GetDescriptor()
用来动态获取指定槽位的数据
// Example usage:
//
// Say you have a message defined as:
//
// message Foo {
// optional string text = 1;
// repeated int32 numbers = 2;
// }
//
// Then, if you used the protocol compiler to generate a class from the above
// definition, you could use it like so:
//
// std::string data; // Will store a serialized version of the message.
//
// {
// // Create a message and serialize it.
// Foo foo;
// foo.set_text("Hello World!");
// foo.add_numbers(1);
// foo.add_numbers(5);
// foo.add_numbers(42);
//
// foo.SerializeToString(&data);
// }
//
// {
// // Parse the serialized message and check that it contains the
// // correct data.
// Foo foo;
// foo.ParseFromString(data);
//
// assert(foo.text() == "Hello World!");
// assert(foo.numbers_size() == 3);
// assert(foo.numbers(0) == 1);
// assert(foo.numbers(1) == 5);
// assert(foo.numbers(2) == 42);
// }
如下可以将Message转换为基本类型
int size = reqMsg.ByteSizeLong();
char* array = new char[size];
reqMsg.SerializeToArray(array, size);
std::string bytes = reqMsg.SerializeAsString();
const char* array = bytes.data();
int size = bytes.size();
进一步看protobuf::message
继承自protobuf::message_lite
, 后者实现了SerializeAsString
和SerializeToArray
inline uint8* SerializeToArrayImpl(const MessageLite& msg, uint8* target,
int size) {
constexpr bool debug = false;
if (debug) {
// Force serialization to a stream with a block size of 1, which forces
// all writes to the stream to cross buffers triggering all fallback paths
// in the unittests when serializing to string / array.
io::ArrayOutputStream stream(target, size, 1);
uint8* ptr;
io::EpsCopyOutputStream out(
&stream, io::CodedOutputStream::IsDefaultSerializationDeterministic(),
&ptr);
ptr = msg._InternalSerialize(ptr, &out);
out.Trim(ptr);
GOOGLE_DCHECK(!out.HadError() && stream.ByteCount() == size);
return target + size;
} else {
io::EpsCopyOutputStream out(
target, size,
io::CodedOutputStream::IsDefaultSerializationDeterministic());
实际调用-> auto res = msg._InternalSerialize(target, &out);
GOOGLE_DCHECK(target + size == res);
return res;
}
}
_InternalSerialize
, 举例官方例子HelloRequest
::PROTOBUF_NAMESPACE_ID::uint8* HelloRequest::_InternalSerialize(
::PROTOBUF_NAMESPACE_ID::uint8* target, ::PROTOBUF_NAMESPACE_ID::io::EpsCopyOutputStream* stream) const {
// @@protoc_insertion_point(serialize_to_array_start:helloworld.HelloRequest)
::PROTOBUF_NAMESPACE_ID::uint32 cached_has_bits = 0;
(void) cached_has_bits;
// string name = 1;
if (this->name().size() > 0) {
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::VerifyUtf8String(
this->_internal_name().data(), static_cast<int>(this->_internal_name().length()),
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::SERIALIZE,
"helloworld.HelloRequest.name");
target = stream->WriteStringMaybeAliased(
1, this->_internal_name(), target);
}
if (PROTOBUF_PREDICT_FALSE(_internal_metadata_.have_unknown_fields())) {
target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormat::InternalSerializeUnknownFieldsToA rray(
_internal_metadata_.unknown_fields<::PROTOBUF_NAMESPACE_ID::UnknownFieldSet>(::PROTOB UF_NAMESPACE_ID::UnknownFieldSet::default_instance), target, stream);
}
// @@protoc_insertion_point(serialize_to_array_end:helloworld.HelloRequest)
return target;
}
生成的框架代码用来继承实现Service和获取stub来发起rpc call。实际上这些代码并不是必须的
在下面讲了如何使用几个工厂类来创建Stub,还有直接new
出Service
class XXXServer {
// 客户端使用的桩
class Stub
// base
class Service
// 各种版本的rpc包装,但都继承自base
class WithAsyncMethod_XXX
typedef WithAsyncMethod_XXX<Service > AsyncService;
typedef ExperimentalWithCallbackMethod_XXX<Service > CallbackService;
class WithGenericMethod_XXX
class WithRawMethod_XXX
typedef WithStreamedUnaryMethod_XXX<Service > StreamedUnaryService;
}
grpc 的异步即为使用cq事件驱动(cq-based),使用tag标记事件。另外还有callback方式
同步时,通过调用'::grpc::internal::BlockingUnaryCall'
异步时,创建'ClientAsyncResponseReader'(非流), 然后通过调用'ClientAsyncResponseReader'的write和finish,并等待tag
当存在流时分别是
- ::grpc::ClientAsyncReader
- ::grpc::ClientAsyncWriter
- ::grpc::ClientAsyncReaderWriter
这些类型可用对应的工厂类来创建, 生成代码的stub也是这么用的
class ClientReaderFactory
class ClientWriterFactory
class ClientReaderWriterFactory
同步时,通过'AddMethod'来注册,生成代码会在父类构造时执行。注册后由grpc调用
Greeter::Service::Service() {
AddMethod(new ::grpc::internal::RpcServiceMethod(
Greeter_method_names[0],
::grpc::internal::RpcMethod::NORMAL_RPC,
new ::grpc::internal::RpcMethodHandler< Greeter::Service, ::helloworld::HelloRequest, ::helloworld::HelloReply>(
[](Greeter::Service* service,
::grpc_impl::ServerContext* ctx,
const ::helloworld::HelloRequest* req,
::helloworld::HelloReply* resp) {
return service->SayHello(ctx, req, resp);
}, this)));
}
异步时,类似客户端 - grpc::ServerAsyncReaderWriter - grpc::ServerAsyncReader - grpc::ServerAsyncWriter
可见服务端是直接new出来的,异步时这些io操作对象也是直接new出来的, 在调用以下时传入
RequestAsyncBidiStreaming
RequestAsyncClientStreaming
RequestAsyncServerStreaming
只在客户端使用,callback方式的请求可以传入一个lambda, 在请求完成时调用
stub_->async()->SayHello(&context, &request, &reply,
[&mu, &cv, &done, &status](Status s) {
status = std::move(s);
std::lock_guard<std::mutex> lock(mu);
done = true;
cv.notify_one();
});
新版本的grpc已经将实验性的标记去除,说明此方式成熟了
#ifdef GRPC_CALLBACK_API_NONEXPERIMENTAL
::grpc::Service::
#else
::grpc::Service::experimental().
#endif
官方仓库的示例代码没有异步且流的, 在实际项目中用到异步流,使用大概方法 1. 手动创建writereader 2. 启动时,调用'grpc::Service::RequestAsyncBidiStreaming' 和 'grpc::Service::RequestAsyncClientStreaming' 以及'RequestAsyncServerStreaming', 向cq塞请求new_connection事件 3. 收到'new_connection'事件返回后,再调用read事件。
一共有5个类型
new_connection, read, write, finish, done
老版本的grpc中,发送端是支持无限大小的,但接受端只能是4M
#define GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH -1
#define GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH (4 * 1024 * 1024)
std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
if (max_receive_message_size_ >= 0) {
args.SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, max_receive_message_size_);
}
但在新版grpc中变了
std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() {
grpc::ChannelArguments args;
if (max_receive_message_size_ >= -1) {
args.SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, max_receive_message_size_);
}
if (max_send_message_size_ >= -1) {
args.SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, max_send_message_size_);
}
https://github.com/grpc/grpc/issues/13841
因为异步服务端通过completionqueue来通知rpc执行结果和执行下次调用,通常使用多queue和多线程的方式提高处理效率 1. 通常情况是多queue, 即每个service对应一个queue, 而每个service又有多个rpc,线程去轮询这个complete_queue。这样导致高线程切换开销,而且complete_queue也占用大量内存 2. 多线程,queue可以用多个线程去轮询,但0.13版本可能出现bug
grpc区别与其他框架很大一个优势是支持异步流,即可以多次请求和多次回复。异步是基于cq的事件驱动,所以必须等待tag回调, 连续两次发送会异常。 而真正的请求一般在业务模块处理, 不知道tag的状态即不知道是否正在发送, 那么如何在cq回调外发送消息呢?
办法是维护一个发送队列,消息先存队列里,等待cq回调时取出发送。 另外由于流同步需要显式发送结束标记(服务端调Stream::Finish, 客户端调用WriteDown和Finish), 所以需要有一个特殊消息加以区分,通常用空指针,也可以设置结束标志。另外由于发送代码会同时被业务调用和cq回调,需要对发送代码加锁
通过设置环境变量,让grpc向控制台打印详细信息
export GRPC_VERBOSITY=DEBUG
bash-5.0# ./build/bin/hasync slave stdin stdout @127.0.0.1:7615
D1026 08:27:44.142802149 24658 ev_posix.cc:174] Using polling engine: epollex
D1026 08:27:44.143406685 24658 dns_resolver_ares.cc:490] Using ares dns resolver
I1026 08:27:44.158115785 24658 server_builder.cc:332] Synchronous server. Num CQs: 1, Min pollers: 1, Max Pollers: 2, CQ timeout (msec): 10000
项目使用客户端异步/同步,服务端全异步, 可以兼容四种传输方式
https://grpc.github.io/grpc/cpp/grpcpp_2impl_2codegen_2sync__stream_8h_source.html https://grpc.github.io/grpc/cpp/grpcpp_2impl_2codegen_2byte__buffer_8h_source.html https://grpc.github.io/grpc/cpp/call__op__set_8h_source.html