sysrepo
sysrepo - 1.4.2 笔记
1. sysrepo 概述 Sysrepo
是 Linux/Unix
系统下一个基于 YANG
模型的配置和操作数据库,为应用程序提供统一的操作数据的接口。应用程序使用 YANG
模型来建模,通过利用 YANG
模型完成数据合法性的检查,保证的风格的一致,不需要应用程序直接操作配置文件的一种数据管理方式。
1.1 基本特性与原则
sysrepo
只是一个库,不是一个独立的进程
全部的数据始终由 Yang
模型区分,这就可能造成许多严重的后果,例如,允许同时使用不同的模型进行工作,这将可 导致数据访问时异常。
在所有有 IPC
中使用共享内存的方式,取代了之前的 UNIX中进程间通信的方式,这样更高效,性能更优,扩展性更强
在 sysrepo
中几乎不存在 CPU 时间浪费,没有活动等待或者定期检查
完全可定制化的事件处理,从定期检查或者 poll/select
到自动线程处理
访问控制严格受制于文件系统的权限
sysrepo
操作期间可以修改 Yang
模型
1.2 主要特点
sysrepo
的主要功能是使用 YANG
模型对数据进行操作并订阅各种事件。但是,在执行任何操作时,都需要创建会话,连接会话,并要 install
所支持的各类 Yang
模型。假如设置了日志操作记录,sysrepo
在运行时,也可以保留它的行为记录。
通过 Yang
的 xpath
来修改与获取数据,所以要求了解 xpath
的基础知识。
最常见的操作订阅事件和修改订阅事件,订阅事件是允许应用程序根据特定的事件回调相应的数据执行,更改操作。操作执行成功后,会将对应配置操作保存,这样 sysrepo
可以充当更智能的配置文件,从而保证配置的可恢复性。
也支持 Rpc/Action/Notify
的订阅,这样可以通过执行特别的 Rpc
,就可以分别向其他 sysrepo
客户端通知各种生成的事件。
1.3 访问方法 应用程序可以通过两种方法来访问 sysrepo
,一种是直接的方法,即当应用程序需要配置数据或者执行相应的 callback
来响应配置变化时,可以通过 sysrepo
自带的应用程序来触发用 sysrepo
的功能函数来实现。这种方法一般用于开发人员自测或验证某个模块时使用;另一种是间接的方法,即应用程序通过创建 Deamon
进程的方法,该方法是通过将对 sysrepo
的调用转化为对应用程序的特定操作,该方法也最容易扩展,也无需为了使用 sysrepo
数据库而做相应的更改。如果有多个类似的 Deamon
进程,可以将这些进程合成一个 plugind
,最后由一个进程统一纳管。可扩展性得到大大的提高。间接方法的使用如图所示:
1.4 数据库 数据库结构大多是遵循 NMDA
(网络管理数据存储区体)所定义的体系架构。sysrepo
同样也不例外,sysrepo
中定义了四类数据库,分别是 startup
,running
,candidate
和 operational
。
startup
库,是 sysrepo
中唯一的持久性数据存储库,它包含设备启动时的配置文件,系统启动后创建的第一个 sysrepo
连接(共享内存)时,会将配置文件从 startup
库 copy
到 running
库;
running
数据库,是保存当前所运行时系统配置,当一个配置发生变化时并且设备需要重新配置时, running
数据库需要修改。系统重启时不会存在,如果需要,可以将配置 copy
到 startup
库。
candidate
数据库,候选库,顾名思义,它是一个准备配置的数据但又不影响实际设备使用。虽然该库中的数据不限制设备的正常使用,可以不必严格按照 NETCONF协议的定义,但也是需要遵循一般的数据存储规则。该库正常是无效的,实际使用时,需要将该库 mirror
到 running
,由 running
完成修改和配置下发,最后通过 sr_copy_config()
, 将 candidate
库重置。整个会话的过程中可能需要相应的 lock
操作,来保证操作的一致与完整性。
operational
库,维护当前使用的配置,并且该库只可读。它通常与对应的 running
库有所不同,而且,只包含任何状态数据结点。在默认的情况下,该库是空的,对于用户来说,全部的订阅数据和操作数据都存储于 operational
库中。并且 Notificationg RPC/Action
数据的校验都是在 operational
库是完成。
1.5 运行模式
对于连接与会话来说,会话是不同步的,所以不会在多个线程中共享一个会话。每个线程都需要建立属于自己的会话,来确保本线程运行的正确。
对于订阅来说,可以由应用程序对感兴趣的事件通过 *_subscribe()
函数来做相应的订阅。订阅在原则上是将全部的的事件一并处理,应用程序也可以将根据不同的事件类型拆分成多个不同的订阅,用于保证事件的并发处理。
每个订阅可以由不同的方式处理,这个由 sysrepo
做统一管理。sysrepo
创建一个单独的线程来捕获各种订阅事件的发生,然后通过订阅所注册的回调函数不处理它们。
2. sysrepo 常用操作命令 sysrepo
提供两个独立的,非常实用的程序。方便开发者便捷地使用 sysrepo
来开发与调试自己的应用。
2.1 Sysrepoctl sysrepoctl
,它用于列出,安装,卸载或更新 sysrepo
模块,也能用于修改一个 sysrepo
模块的特性,权限等。开发过程中经常使用的命令如下
1 2 3 4 # 列出全部已经安装在 sysrepo 中的 Yang 模块,并包含模块的基本信息 $ sysrepoctl -l, --list # 例如: $ sysrepoctl -l
1 2 3 4 5 6 7 8 # 安装指定 Yang 模型 $ sysrepoctl -i, --install # 例如: # 以默认权限安装 ietf-interfaces.yang 模型 $ sysrepoctl --install /root/ietf-interfaces.yang # 为特定 admin 用户安装可访问权限为 644 的 ietf-interfaces.yang 模型 $ sysrepoctl --install /root/ietf-interfaces.yang --owner=admin:admin --permissions=644
1 2 3 4 # 卸载已安装的 Yang 模型 $ sysrepoctl -u, --uninstall # 例如: $ sysrepoctl --uninstall ietf-interfaces
1 2 3 4 # 修改 Yang 模型,常用的是设置模型支持的特性 $ sysrepoctl -c, --chang # 例如: $ sysrepoctl --change ietf-interfaces --(disable |enable )-feature if-mib
1 2 3 4 # 更新 Yang 模型,如果已安装的 Yang 模型有更新,可以执行该命令 $ sysrepoctl -U, --update # 例如: $ sysrepoctl --update /root/ietf-netconf@2013-09-29.yang
更多 sysrepoctl
的使用,请参考 sysrepoctl -h
。
2.2 sysrepocfg sysrepocfg
是用于 importing
,exporting
,editing
,replacing
配置到指定的数据库中。命令默认是操作 running
库,也支持多种数据格式,json
, xml
, lyb
,除非通过 –format
特定指出,默认的采用 xml
格式。常用的命令如下:
1 2 3 4 5 6 7 8 # 导入一个配置 $ sysrepocfg -I, --import[=] # 例如: # 将 ietf-interfaces 配置导入默认 running 下的 ietf-interfaces 模块 $ sysrepocfg --import=/root/ietf-interfaces.xml # 将 json 格式的 ietf-interfaces 配置导入 startup 的 ietf-interfaces 模块 $ sysrepocfg --import=/root/ietf-interfaces_startup.json --datastore startup --module ietf-interfaces
1 2 3 4 5 # 导出一个配置 $ sysrepocfg -X, --export [=] # 例如: # 将 running 库 ietf-interfaces 的配置 xml 的格式导入,并以 ietf-interfaces_running.xml 名字命令配置文件 $ sysrepocfg --export =ietf-interfaces_running.xml --format xml --module ietf-interfaces
1 2 3 4 5 6 # 编辑或修改配置文件,应用到指定的数据库 $ sysrepocfg -E, --edit[=/] # 例如: $ sysrepocfg --edit=candidate.xml --datastore candidate # 如果是修改 running 库,需要加相应的锁 $ sysrepocfg --edit=vim --lock
1 2 3 4 # 发一个 RPC 请求,RPC 返回的结果直接输出于终端 $ sysrepocfg -R, --rpc[=/] # 例如: $ sysrepocfg --rpc=vim
更多 sysrepocfg
的使用,请参考 sysrepocfg -h
。
3. sysrepo-plugind 源码分析 应用程序通过将对 sysrepo
的调用通过 sysrepo
提供的相应的 API接口访问方法,称为 syrepo
的间接访问方法。该方法是应用程序通过创建 Deamon进程,通过 IPC Shm
机制与 sysrepo
通信。可以做到对 sysrepo
的即插即用,最后由 sysrepo
纳管,这就是 Plugind
,命名为 sysrepo-plugind
。要快速的使用 sysrepo
,并快速开发出适配于 sysrepo
的插件,就要先了解 sysrepo-plugind
的实现原理与机制,就需要先从实现 sysrepo-plugind
的源码处着手。sysrepo
源码路径:git clone https://github.com/sysrepo/sysrepo.git
。 Sysrepo-plugind
实现的路径为 sysrepo/src/executables/sysrepo-plugind.c
。下面也就从该文件开始说。
3.1 数据结构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 struct srpd_plugin_s { void *handle; srp_init_cb_t init_cb; srp_cleanup_cb_t cleanup_cb; void *private_data; }; handle: 动态库句柄,在load_plugin中细说 srp_init_cb_t :typedef int (*srp_init_cb_t ) (sr_session_ctx_t *session, void **private_data) ; srp_cleanup_cb_t :typedef void (*srp_cleanup_cb_t ) (sr_session_ctx_t *session, void *private_data) ; private_data: Private context opaque to sysrepo
3.2 main 函数实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 int main (int argc, char ** argv) { struct srpd_plugin_s *plugins = NULL ; sr_conn_ctx_t *conn = NULL ; sr_session_ctx_t *sess = NULL ; sr_log_level_t log_level = SR_LL_ERR; int plugin_count = 0 , i, r, rc = EXIT_FAILURE, opt, debug = 0 ; struct option options[] = { {"help" , no_argument, NULL , 'h' }, {"version" , no_argument, NULL , 'V' }, {"verbosity" , required_argument, NULL , 'v' }, {"debug" , no_argument, NULL , 'd' }, {NULL , 0 , NULL , 0 }, }; opterr = 0 ; while ((opt = getopt_long (argc, argv, "hVv:d" , options, NULL )) != -1 ) { switch (opt) { case 'h' : version_print (); help_print (); rc = EXIT_SUCCESS; goto cleanup; case 'V' : version_print (); rc = EXIT_SUCCESS; goto cleanup; case 'v' : if (!strcmp (optarg, "none" )) { log_level = SR_LL_NONE; } else if (!strcmp (optarg, "error" )) { log_level = SR_LL_ERR; } else if (!strcmp (optarg, "warning" )) { log_level = SR_LL_WRN; } else if (!strcmp (optarg, "info" )) { log_level = SR_LL_INF; } else if (!strcmp (optarg, "debug" )) { log_level = SR_LL_DBG; } else if ((strlen (optarg) == 1 ) && (optarg[0 ] >= '0' ) && (optarg[0 ] <= '4' )) { log_level = atoi (optarg); } else { error_print (0 , "Invalid verbosity \"%s\"" , optarg); goto cleanup; } break ; case 'd' : debug = 1 ; break ; default : error_print (0 , "Invalid option or missing argument: -%c" , optopt); goto cleanup; } } if (optind < argc) { error_print (0 , "Redundant parameters" ); goto cleanup; } if (load_plugins (&plugins, &plugin_count)) { goto cleanup; } daemon_init (debug, log_level); if ((r = sr_connect (0 , &conn)) != SR_ERR_OK) { error_print (r, "Failed to connect" ); goto cleanup; } if ((r = sr_session_start (conn, SR_DS_RUNNING, &sess)) != SR_ERR_OK) { error_print (r, "Failed to start new session" ); goto cleanup; } for (i = 0 ; i < plugin_count; ++i) { r = plugins[i].init_cb (sess, &plugins[i].private_data); if (r != SR_ERR_OK) { SRP_LOG_ERR ("Plugin initialization failed (%s)." , sr_strerror (r)); goto cleanup; } } pthread_mutex_lock (&lock); while (!loop_finish) { pthread_cond_wait (&cond, &lock); } pthread_mutex_unlock (&lock); for (i = 0 ; i < plugin_count; ++i) { plugins[i].cleanup_cb (sess, plugins[i].private_data); } rc = EXIT_SUCCESS; cleanup: for (i = 0 ; i < plugin_count; ++i) { dlclose (plugins[i].handle); } free (plugins); sr_disconnect (conn); return rc; }
3.3 load_plugins 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 static int load_plugins (struct srpd_plugin_s **plugins, int *plugin_count) { void *mem, *handle; DIR *dir; struct dirent *ent; const char *plugins_dir; char *path; int rc = 0 ; *plugins = NULL ; *plugin_count = 0 ; plugins_dir = getenv ("SRPD_PLUGINS_PATH" ); if (!plugins_dir) { plugins_dir = SRPD_PLUGINS_PATH; } if (access (plugins_dir, F_OK) == -1 ) { if (errno != ENOENT) { error_print (0 , "Checking plugins dir existence failed (%s)." , strerror (errno)); return -1 ; } if (sr_mkpath (plugins_dir, 00777 ) == -1 ) { error_print (0 , "Creating plugins dir \"%s\" failed (%s)." , plugins_dir, strerror (errno)); return -1 ; } } dir = opendir (plugins_dir); if (!dir) { error_print (0 , "Opening \"%s\" directory failed (%s)." , plugins_dir, strerror (errno)); return -1 ; } while ((ent = readdir (dir))) { if (!strcmp (ent->d_name, "." ) || !strcmp (ent->d_name, ".." )) { continue ; } if (asprintf (&path, "%s/%s" , plugins_dir, ent->d_name) == -1 ) { error_print (0 , "asprintf() failed (%s)." , strerror (errno)); rc = -1 ; break ; } handle = dlopen (path, RTLD_LAZY); if (!handle) { error_print (0 , "Opening plugin \"%s\" failed (%s)." , path, dlerror ()); free (path); rc = -1 ; break ; } free (path); mem = realloc (*plugins, (*plugin_count + 1 ) * sizeof **plugins); if (!mem) { error_print (0 , "realloc() failed (%s)." , strerror (errno)); dlclose (handle); rc = -1 ; break ; } *plugins = mem; *(void **)&(*plugins)[*plugin_count].init_cb = dlsym (handle, SRP_INIT_CB); if (!(*plugins)[*plugin_count].init_cb) { error_print (0 , "Failed to find function \"%s\" in plugin \"%s\"." , SRP_INIT_CB, ent->d_name); dlclose (handle); rc = -1 ; break ; } *(void **)&(*plugins)[*plugin_count].cleanup_cb = dlsym (handle, SRP_CLEANUP_CB); if (!(*plugins)[*plugin_count].cleanup_cb) { error_print (0 , "Failed to find function \"%s\" in plugin \"%s\"." , SRP_CLEANUP_CB, ent->d_name); dlclose (handle); rc = -1 ; break ; } (*plugins)[*plugin_count].handle = handle; (*plugins)[*plugin_count].private_data = NULL ; ++(*plugin_count); } closedir (dir); return rc; }
3.4 init_cb 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 typedef int (*srp_init_cb_t ) (sr_session_ctx_t *session, void **private_data) ;#define SRP_INIT_CB "sr_plugin_init_cb" init_cb = dlsym (handle, SRP_INIT_CB); int sr_plugin_init_cb (sr_session_ctx_t *session, void **private_ctx) { int rc; struct plugind_ctx *ctx; ctx = calloc (1 , sizeof *ctx); if (!ctx) { rc = SR_ERR_NOMEM; goto error; } ... SRP_LOG_DBGMSG ("plugin initialized successfully." ); ctx->session = session; *private_ctx = ctx; return SR_ERR_OK; error: SRP_LOG_ERR ("plugin initialization failed (%s)." , sr_strerror (rc)); sr_unsubscribe (ctx->subscription); free (ctx); return rc; }
3.5 cleanup_cb 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 typedef void (*srp_cleanup_cb_t ) (sr_session_ctx_t *session, void *private_data) ;#define SRP_CLEANUP_CB "sr_plugin_cleanup_cb" cleanup_cb = dlsym (handle, SRP_CLEANUP_CB); void sr_plugin_cleanup_cb (sr_session_ctx_t *session, void *private_ctx) { (void )session; struct plugind_ctx *ctx = (struct plugind_ctx *)private_ctx; sr_unsubscribe (ctx->subscription); free (ctx); nb_terminate (); yang_terminate (); SRP_LOG_DBGMSG ("plugin cleanup finished." ); }
整个 sysrepo-plugind.c
代码结构简单,注释丰富,没有使用复杂的语法,还是非常容易理解的。
4. sysrepo 连接与会话 4.1 何为连接与会话 开发者要开始使用 sysrepo
,首先必须创建一个连接。一个应用程序或者进程即使可以允许创建多个连接,但是一般情况只会创建一个连接。sysrepo
允许同时创建多个连接。简单的举个例子,通常情况下,sysrepo-plugin
在 init_cb
初始时就会创建一个连接,这是一个由 sysrepo-plugin
与 sysrepo
所创建的连接,只要发生异常不释放,该连接会一直存在整个 sysrepo-plugin
进程的生命周期,此外,例如用户通过 sysrepoctl -l |grep ***
看某个 Yang
模型是否已经加载,sysrepoctl
应用程序也创建一个短连接,该连接在命令执行结束后立即释放,假如是极端修改,不释放该连接,再使用 sysrepocfg
来配置 runing
库,这时有 3 个与 sysrepo
连接。并且 3 个连接不干扰,也不影响 sysrepo
的正常工作。
而会话,是建立在连接之下,一个连接下可以创建多个会话,每个会话都有一个唯一的标识,每个会话总是可以选择一个可随时更改的数据库,使用些会话的所有 API 调用都将在该数据库下操作。
连接与会话的关系如下所示,可能不是特别准备,但大概就是这个意思。
4.2 核心数据结构 connection
的数据结构主要是存储 sysrepo
连接与 Libyang
的上下文,该连接所创建的共享内存结构。数据结构定义如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 struct sr_conn_ctx_s { struct ly_ctx *ly_ctx; sr_conn_options_t opts; sr_diff_check_cb diff_check_cb; pthread_mutex_t ptr_lock; sr_session_ctx_t **sessions; uint32_t session_count; int main_create_lock; sr_rwlock_t ext_remap_lock; sr_shm_t main_shm; sr_shm_t ext_shm; struct sr_mod_cache_s { sr_rwlock_t lock; struct lyd_node *data; struct { const struct lys_module *ly_mod; uint32_t ver; } *mods; uint32_t mod_count; } mod_cache; }
cache
需要特别说明:如果一个会话工作在 running
的数据库下操作,并且该会话的连接使能 cache
功能,则不会每次都从 sysrepo
中加载数据,可以从 cache
中复制数据,这样,可以大幅度提高 sysrepo
的处理性能。
session
的主要数据结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 struct sr_session_ctx_s { sr_conn_ctx_t *conn; sr_datastore_t ds; sr_sub_event_t ev; sr_sid_t sid; sr_error_info_t *err_info; pthread_mutex_t ptr_lock; sr_subscription_ctx_t **subscriptions; uint32_t subscription_count; struct { struct lyd_node *edit; struct lyd_node *diff; } dt[SR_DS_COUNT]; struct sr_sess_notif_buf { ATOMIC_T thread_running; pthread_t tid; sr_rwlock_t lock; struct sr_sess_notif_buf_node { char *notif_lyb; time_t notif_ts; const struct lys_module *notif_mod; struct sr_sess_notif_buf_node *next; } *first; struct sr_sess_notif_buf_node *last; } notif_buf; }
从 session
结构中主要是用于该次 session
的连接,该次 session
要连接的数据库类型(4种,runing
, startup
, candidate
, operation
),以及重中之重的 sr_subscription_ctx_t **subscriptions
, sysrepo
的所支持操作的订阅都在该结构中定义,不多说,直接看数据结构定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 struct sr_subscription_ctx_s { sr_conn_ctx_t *conn; uint32_t evpipe_num; int evpipe; ATOMIC_T thread_running; pthread_t tid; pthread_mutex_t subs_lock; struct modsub_change_s { char *module_name; sr_datastore_t ds; struct modsub_changesub_s { char *xpath; uint32_t priority; sr_subscr_options_t opts; sr_module_change_cb cb; void *private_data; sr_session_ctx_t *sess; uint32_t request_id; sr_sub_event_t event; } *subs; uint32_t sub_count; sr_shm_t sub_shm; } *change_subs; uint32_t change_sub_count; struct modsub_oper_s { char *module_name; struct modsub_opersub_s { char *xpath; sr_oper_get_items_cb cb; void *private_data; sr_session_ctx_t *sess; uint32_t request_id; sr_shm_t sub_shm; } *subs; uint32_t sub_count; } *oper_subs; uint32_t oper_sub_count; struct modsub_notif_s { char *module_name; struct modsub_notifsub_s { char *xpath; time_t start_time; int replayed; time_t stop_time; sr_event_notif_cb cb; sr_event_notif_tree_cb tree_cb; void *private_data; sr_session_ctx_t *sess; } *subs; uint32_t sub_count; uint32_t request_id; sr_shm_t sub_shm; } *notif_subs; uint32_t notif_sub_count; struct opsub_rpc_s { char *op_path; struct opsub_rpcsub_s { char *xpath; uint32_t priority; sr_rpc_cb cb; sr_rpc_tree_cb tree_cb; void *private_data; sr_session_ctx_t *sess; uint32_t request_id; sr_sub_event_t event; } *subs; uint32_t sub_count; sr_shm_t sub_shm; } *rpc_subs; uint32_t rpc_sub_count; }
4.3 connection 函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 API int sr_connect (const sr_conn_options_t opts, sr_conn_ctx_t **conn_p) { sr_error_info_t *err_info = NULL ; sr_conn_ctx_t *conn = NULL ; struct lyd_node *sr_mods = NULL ; int created = 0 , changed; sr_main_shm_t *main_shm; uint32_t conn_count; SR_CHECK_ARG_APIRET (!conn_p, NULL , err_info); if ((err_info = sr_shmmain_check_dirs ())) { goto cleanup; } if ((err_info = sr_conn_new (opts, &conn))) { goto cleanup; } if ((err_info = sr_shmmain_createlock (conn->main_create_lock))) { goto cleanup; } if ((err_info = sr_shmmain_main_open (&conn->main_shm, &created))) { goto cleanup_unlock; } if ((err_info = sr_shmmain_ext_open (&conn->ext_shm, created))) { goto cleanup_unlock; } /*Sysrepo SHM使用主+扩展SHM机制,整体机制在后面细谈,此处主要是将连接的创建。先略过*/ if ((err_info = sr_conn_lydmods_ctx_update (conn, created || !(opts & SR_CONN_NO_SCHED_CHANGES), &sr_mods, &changed))) { goto cleanup_unlock; } if (changed || created) { if ((err_info = sr_shm_remap (&conn->main_shm, sizeof (sr_main_shm_t )))) { goto cleanup_unlock; } main_shm = (sr_main_shm_t *)conn->main_shm.addr; main_shm->mod_count = 0 ; if ((err_info = sr_shm_remap (&conn->ext_shm, sizeof (size_t )))) { goto cleanup_unlock; } *((size_t *)conn->ext_shm.addr) = 0 ; if ((err_info = sr_shmmain_add (conn, sr_mods->child))) { goto cleanup_unlock; } if ((err_info = sr_shmmain_files_startup2running (conn, created))) { goto cleanup_unlock; } if ((err_info = sr_shmmain_check_data_files (conn))) { goto cleanup_unlock; } } main_shm = (sr_main_shm_t *)conn->main_shm.addr; conn_count = main_shm->conn_state.conn_count; sr_shmmain_createunlock (conn->main_create_lock); if ((err_info = sr_shmmain_lock_remap (conn, SR_LOCK_NONE, 1 , 0 , __func__))) { goto cleanup; } if (conn_count && !(opts & SR_CONN_NO_SCHED_CHANGES) && !main_shm->conn_state.conn_count) { sr_shmmain_unlock (conn, SR_LOCK_NONE, 1 , 0 , __func__); assert (!err_info); lyd_free_withsiblings (sr_mods); sr_conn_free (conn); return sr_connect (opts, conn_p); } err_info = sr_shmmain_conn_state_add (conn); sr_shmmain_unlock (conn, SR_LOCK_NONE, 1 , 0 , __func__); goto cleanup; cleanup_unlock: sr_shmmain_createunlock (conn->main_create_lock); cleanup: lyd_free_withsiblings (sr_mods); if (err_info) { sr_conn_free (conn); if (created) { shm_unlink (SR_MAIN_SHM); shm_unlink (SR_EXT_SHM); } } else { *conn_p = conn; } return sr_api_ret (NULL , err_info); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 for (i = 0 ; i < conn->session_count; ++i) { while (conn->sessions[i]->subscription_count && conn->sessions[i]->subscriptions[0 ]) { if (!wr_lock) { lock_err = sr_shmmain_lock_remap (conn, SR_LOCK_WRITE, 1 , 0 , __func__); sr_errinfo_merge (&err_info, lock_err); wr_lock = 1 ; } tmp_err = _sr_unsubscribe(conn->sessions[i]->subscriptions[0 ]); sr_errinfo_merge (&err_info, tmp_err); } } if (!wr_lock) { lock_err = sr_shmmain_lock_remap (conn, SR_LOCK_NONE, 1 , 0 , __func__); sr_errinfo_merge (&err_info, lock_err); } while (conn->session_count) { tmp_err = _sr_session_stop(conn->sessions[0 ]); sr_errinfo_merge (&err_info, tmp_err); } tmp_err = sr_shmmod_oper_stored_del_conn (conn, conn, getpid ()); sr_errinfo_merge (&err_info, tmp_err); main_shm = (sr_main_shm_t *)conn->main_shm.addr; tmp_err = sr_mlock (&main_shm->conn_state.lock, SR_CONN_STATE_LOCK_TIMEOUT, __func__); sr_errinfo_merge (&err_info, tmp_err); sr_shmmain_conn_state_del (main_shm, conn->ext_shm.addr, conn, getpid ()); sr_munlock (&main_shm->conn_state.lock); if (!lock_err) { if (wr_lock) { sr_shmmain_unlock (conn, SR_LOCK_WRITE, 1 , 0 , __func__); } else { sr_shmmain_unlock (conn, SR_LOCK_NONE, 1 , 0 , __func__); } } sr_conn_free (conn); return sr_api_ret (NULL , err_info); }
4.4 session 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 API int sr_session_start (sr_conn_ctx_t *conn, const sr_datastore_t datastore, sr_session_ctx_t **session) { sr_error_info_t *err_info = NULL ; sr_main_shm_t *main_shm; uid_t uid; SR_CHECK_ARG_APIRET (!conn || !session, NULL , err_info); *session = calloc (1 , sizeof **session); if (!*session) { SR_ERRINFO_MEM (&err_info); return sr_api_ret (NULL , err_info); } main_shm = (sr_main_shm_t *)conn->main_shm.addr; (*session)->sid.sr = ATOMIC_INC_RELAXED (main_shm->new_sr_sid); if ((*session)->sid.sr == (uint32_t )(ATOMIC_T_MAX - 1 )) { ATOMIC_STORE_RELAXED (main_shm->new_sr_sid, 1 ); } uid = getuid (); if ((err_info = sr_get_pwd (&uid, &(*session)->sid.user))) { goto error; } if ((err_info = sr_ptr_add (&conn->ptr_lock, (void ***)&conn->sessions, &conn->session_count, *session))) { goto error; } (*session)->conn = conn; (*session)->ds = datastore; if ((err_info = sr_mutex_init (&(*session)->ptr_lock, 0 ))) { goto error; } if ((err_info = sr_rwlock_init (&(*session)->notif_buf.lock, 0 ))) { goto error; } SR_LOG_INF ("Session %u (user \"%s\") created." , (*session)->sid.sr, (*session)->sid.user); return sr_api_ret (NULL , NULL ); error: free ((*session)->sid.user); free (*session); *session = NULL ; return sr_api_ret (NULL , err_info); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 ####函数清晰,简单,注释丰富,一看就懂,就不多废话. API int sr_session_stop (sr_session_ctx_t *session) { sr_error_info_t *err_info = NULL , *lock_err = NULL , *tmp_err; sr_conn_ctx_t *conn; int wr_lock = 0 ; if (!session) { return sr_api_ret (NULL , NULL ); } conn = session->conn; while (session->subscription_count) { if (!wr_lock) { lock_err = sr_shmmain_lock_remap (conn, SR_LOCK_WRITE, 1 , 0 , __func__); sr_errinfo_merge (&err_info, lock_err); wr_lock = 1 ; } tmp_err = sr_subs_session_del (session, session->subscriptions[0 ]); sr_errinfo_merge (&err_info, tmp_err); } if (wr_lock && !lock_err) { sr_shmmain_unlock (conn, SR_LOCK_WRITE, 1 , 0 , __func__); } tmp_err = _sr_session_stop(session); sr_errinfo_merge (&err_info, tmp_err); return sr_api_ret (NULL , err_info); }
连接与会话核心处就是这 4 个 API 函数, 其它与连接与会话有关的 API 都是对相关的补充,想要进一步了解的.请阅读源码.
接下来会分析 sysrepo
的共享内存机制. SHM
机制是新 sysrepo
的核心,需要好好说道说道.
5. sysrepo 共享内存机制 5.1 共享内存机制 sysrepo0.X.X
版本使用的进程间通信的机制,在实际的使用过程中,出现了诸如数据不同步、数据处理TimeOut
、完成一次 Get
请求时,但实际处理的请求会较多,导致性能与规格上不去的各类问题。sysrepo-devel
分支开始引入共享机制后,合入到 sysrepo
的 Master
分支,也就是现在的 sysrepo1.X.X
版本。
简单说一说什么是共享内存,共享内存就是允许两个或多个进程共享一定的存储区,说白了,就是两个进程访问同一块内存区域,当一个进程改变了这块地址中的内容的时候,其它进程都会察觉到这个更改,所以数据不需要在客户机和服务器端之间复制,数据直接写到内存,不用若干次数据拷贝,是一种最快的 IPC
。原理图如下所示,需要注意的是,共享内存本向并没有任何的同步与互斥机制,所以必须使用信号量来实现对共享内存的存取的同步。其它有关的共享内存的概念使用,网上有很多,可自行查阅理解。本处这分析与 sysrepo
相关的共享内存机制的使用。
5.2 数据结构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 typedef struct sr_shm_s { int fd; size_t size; char *addr; } sr_shm_t ; #define SR_MAIN_SHM "/sr_main" #define SR_EXT_SHM "/sr_ext" typedef struct sr_main_shm_s { sr_rwlock_t lock; pthread_mutex_t lydmods_lock; uint32_t mod_count; off_t rpc_subs; uint16_t rpc_sub_count; ATOMIC_T new_sr_sid; ATOMIC_T new_evpipe_num; struct { pthread_mutex_t lock; off_t conns; uint32_t conn_count; } conn_state; } sr_main_shm_t ; typedef struct sr_mod_s sr_mod_t ;struct sr_mod_s { struct sr_mod_lock_s { sr_rwlock_t lock; uint8_t write_locked; uint8_t ds_locked; sr_sid_t sid; time_t ds_ts; } data_lock_info[SR_DS_COUNT]; sr_rwlock_t replay_lock; uint32_t ver; off_t name; char rev[11 ]; uint8_t flags; off_t features; uint16_t feat_count; off_t data_deps; uint16_t data_dep_count; off_t inv_data_deps; uint16_t inv_data_dep_count; off_t op_deps; uint16_t op_dep_count; struct { off_t subs; uint16_t sub_count; } change_sub[SR_DS_COUNT]; off_t oper_subs; uint16_t oper_sub_count; off_t notif_subs; uint16_t notif_sub_count; };
5.3 源码分析
此添加 shm main
的入口代码,将全部模块以 lydmod
数据形式添加到 main SHM
中。参考前一章的 sr_connect
函数,这就是将在与 sysrepo
连接时,会将全部模块的加载到共享内存中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 sr_error_info_t *sr_shmmain_add (sr_conn_ctx_t *conn, struct lyd_node *sr_mod) { sr_error_info_t *err_info = NULL ; struct lyd_node *next; sr_mod_t *shm_mod; sr_main_shm_t *main_shm; off_t main_end, ext_end; size_t *wasted_ext, new_ext_size, new_mod_count; new_mod_count = 0 ; LY_TREE_FOR (sr_mod, next) { ++new_mod_count; } main_end = conn->main_shm.size; ext_end = conn->ext_shm.size; if ((err_info = sr_shm_remap (&conn->main_shm, conn->main_shm.size + new_mod_count * sizeof *shm_mod))) { return err_info; } wasted_ext = (size_t *)conn->ext_shm.addr; new_ext_size = sizeof (size_t ) + sr_shmmain_ext_get_size_main_shm (&conn->main_shm, conn->ext_shm.addr) + sr_shmmain_ext_get_lydmods_size (sr_mod->parent); if ((err_info = sr_shm_remap (&conn->ext_shm, new_ext_size + *wasted_ext))) { return err_info; } wasted_ext = (size_t *)conn->ext_shm.addr; if ((err_info = sr_shmmain_add_modules (conn->ext_shm.addr, sr_mod, (sr_mod_t *)(conn->main_shm.addr + main_end), &ext_end))) { return err_info; } main_shm = (sr_main_shm_t *)conn->main_shm.addr; main_shm->mod_count += new_mod_count; assert (main_shm->mod_count == (conn->main_shm.size - sizeof *main_shm) / sizeof *shm_mod); sr_shmmain_del_modules_deps (&conn->main_shm, conn->ext_shm.addr, SR_FIRST_SHM_MOD (conn->main_shm.addr)); if ((err_info = sr_shm_remap (&conn->ext_shm, new_ext_size + *wasted_ext))) { return err_info; } wasted_ext = (size_t *)conn->ext_shm.addr; if ((err_info = sr_shmmain_add_modules_deps (&conn->main_shm, conn->ext_shm.addr, sr_mod->parent->child, SR_FIRST_SHM_MOD (conn->main_shm.addr), &ext_end))) { return err_info; } SR_CHECK_INT_RET ((unsigned )ext_end != new_ext_size + *wasted_ext, err_info); return NULL ; } sr_shmmain_add_modules (char *ext_shm_addr, struct lyd_node *first_sr_mod, sr_mod_t *first_shm_mod, off_t *ext_end) sr_shmmain_add_modules_deps (sr_shm_t *shm_main, char *ext_shm_addr, struct lyd_node *first_sr_mod, sr_mod_t *first_shm_mod, off_t *ext_end)
共享内存间在初始操作,包括信号的创建与初始化,也是在 sr_connet
函数中处理。``sr_connet是
plugind与
sysrepo 的连接入口,
SHM是在入口中初始的一种机制,用来保证
sysrepo与
plugind` 的通信高效,快速。
先用 sysrepo
共享内存机制为后面的各类订阅打个底。先了解一下 sysrepo
的共享内存机理的实现。
Reference
libyang – GitHub
netopeer2 – GitHub
sysrepo – GitHub
pyang – GitHub
libyang – Doc
libnetconf2 – Doc
sysrepo – Doc
pyang – Doc
XPath 教程 – RUNOOB.COM
XPath教程 – 易百教程
netopeer2 + sysrepo研究总结
sysrepo简单使用
第三章 sysrepo-plugind源码分析