lxc容器介绍(lxcfs容器隔离技术实现原理分析之loadavg)

奇技指南

我们知道runc没有做到完全隔离/proc、/sys路径下的文件,所以容器内通过top、free等命令看到的数据都是物理机上的。利用lxcfs可以实现将容器内/proc、/sys文件与物理机隔离,让top等命令显示容器内真实数据。本文将来详细介绍一下。

本文转载自360云计算

lxcfs是什么

我们知道runc没有做到完全隔离/proc、/sys路径下的文件,所以容器内通过top、free等命令看到的数据都是物理机上的。对于习惯了虚机,物理机的同学来说不太友好,而且这些命令似乎也失去了本质意义。 lxcfs作用就是将容器内/proc、/sys文件与物理机隔离,让top等命令显示容器内真实数据。

说明

lxcfs是以用户空间文件系统(Filesystem in Userspace)为基础,以cgroup技术实现的用户空间的虚拟文件系统。先对fuse和cgroup有大致了解,看本文效果更好些。本文不介绍lxcfs的安装及使用,网上不乏这样的好文章。我们主要介绍下lxcfs对cpuonline、loadavg的现实,这两部分弄懂,其它也大体相同。

容器中读取lxcfs文件系统

lxcfs程序启动时会指定一个路径(如下图是/var/lib/lxcfs)作为挂载点,以后读取这个路径的下文件(cgroup、proc、sys)vfs都会调用内核fuse,fuse回调lxcfs实现的文件操作函数。容器内读取lxcfs文件系统中的数据时,通过gblic系统调用vfs接口然后转到fuse内核模块,内核模块fuse回调lxcfs程序中实现的回调函数,获取容器的cgroup,然后去宿主机对应cgroup下读取并计算后得到容器的实际mem、cpu等信息。lxcfs将物理机的cgroups挂载到运行时环境/run/lxcfs/controllers,但直接在物理机上看不见,因为程序中用unshare做了mounts namespace隔离。lxcfs程序中所有的cgroups信息都从/run/lxcfs/controllers下获得。

lxc容器介绍(lxcfs容器隔离技术实现原理分析之loadavg)(1)

源码

因为工作中正好需要这两部分,所以主要介绍下cpuonline和loadavg的实现。nginx、java等程序根据cpu核心数启动相应个数的进程,cpuonline是相关系统调用的数据来源。没隔离导致的容器内获取到cpu核数是物理机的,本应该创建2个进程,实际却创建40个(容器2c,物理机40c),由于更多的上下文切换导致明显的性能下降。loadavg目前没看到有关的分析,这里也简单介绍下。

隔离效果

物理机40c128g

1、cpuonline

物理机

容器2c4g

lxc容器介绍(lxcfs容器隔离技术实现原理分析之loadavg)(2)

2、loadavg

物理机

lxc容器介绍(lxcfs容器隔离技术实现原理分析之loadavg)(3)

容器2c4g

lxc容器介绍(lxcfs容器隔离技术实现原理分析之loadavg)(4)

可以看到cpuonline和load average都已经隔离

实现分析

注:cgropu的各个controller文件在main函数执行前打开,保存在fd_hierarchies中,后面使用直接掉openat,不是每次都要open、close文件。通过c语言的attribute((constructor)) 属性,声明collect_and_mount_subsystems这个函数

看下collect_and_mount_subsystems

static void __attribute__((constructor)) collect_and_mount_subsystems(void) { FILE *f; char *cret, *line = NULL; char cwd[MAXPATHLEN]; size_t len = 0; int i, init_ns = -1; bool found_unified = false; if ((f = fopen("/proc/self/cgroup", "r")) == NULL) { lxcfs_error("Error opening /proc/self/cgroup: %s\n", strerror(errno)); return; } // 读取宿主机上namespaces controller保存到hierarchies while (getline(&line, &len, f) != -1) { ...... if (!store_hierarchy(line, p)) goto out; } /* Preserve initial namespace. */ init_ns = preserve_mnt_ns(getpid()); if (init_ns < 0) { lxcfs_error("%s\n", "Failed to preserve initial mount namespace."); goto out; } fd_hierarchies = malloc(sizeof(int) * num_hierarchies); if (!fd_hierarchies) { lxcfs_error("%s\n", strerror(errno)); goto out; } for (i = 0; i < num_hierarchies; i ) fd_hierarchies[i] = -1; cret = getcwd(cwd, MAXPATHLEN); if (!cret) lxcfs_debug("Could not retrieve current working directory: %s.\n", strerror(errno)); /* This function calls unshare(CLONE_NEWNS) our initial mount namespace * to privately mount lxcfs cgroups. */ // 关键是这里,将cgroup下各个控制模块,挂载到lxcfs进程的自由的mount ns下(/run/lxcfs/container) if (!cgfs_setup_controllers()) { lxcfs_error("%s\n", "Failed to setup private cgroup mounts for lxcfs."); goto out; } ...... } static bool cgfs_setup_controllers(void) { // 主要调用unshare 创建私有的mount ns if (!cgfs_prepare_mounts()) return false; if (!cgfs_mount_hierarchies()) { lxcfs_error("%s\n", "Failed to set up private lxcfs cgroup mounts."); return false; } if (!permute_root()) return false; return true; } static bool cgfs_mount_hierarchies(void) { char *target; size_t clen, len; int i, ret; for (i = 0; i < num_hierarchies; i ) { char *controller = hierarchies[i]; clen = strlen(controller); len = strlen(BASEDIR) clen 2; target = malloc(len); if (!target) return false; ret = snprintf(target, len, "%s/%s", BASEDIR, controller); if (ret < 0 || ret >= len) { free(target); return false; } if (mkdir(target, 0755) < 0 && errno != EEXIST) { free(target); return false; } if (!strcmp(controller, "unified")) ret = mount("none", target, "cgroup2", 0, NULL); else ret = mount(controller, target, "cgroup", 0, controller); if (ret < 0) { lxcfs_error("Failed mounting cgroup %s: %s\n", controller, strerror(errno)); free(target); return false; } // 将所有cgroup controller 文件打开,保存文件描述符 fd_hierarchies[i] = open(target, O_DIRECTORY); if (fd_hierarchies[i] < 0) { free(target); return false; } free(target); } return true; }

lxcfs.c main中主要是解析命令行参数,并调用fuse提供的fuse_main函数将lxcfs相关的文件操作注册,并传入挂载点。

...... if (!fuse_main(nargs, newargv, &lxcfs_ops, opts)) ...... const struct fuse_operations lxcfs_ops = { .getattr = lxcfs_getattr, .readlink = NULL, .getdir = NULL, .mknod = NULL, .mkdir = lxcfs_mkdir, .unlink = NULL, .rmdir = lxcfs_rmdir, .symlink = NULL, .rename = NULL, .link = NULL, .chmod = lxcfs_chmod, .chown = lxcfs_chown, .truncate = lxcfs_truncate, .utime = NULL, .open = lxcfs_open, .read = lxcfs_read, .release = lxcfs_release, .write = lxcfs_write, .statfs = NULL, .flush = lxcfs_flush, .fsync = lxcfs_fsync, .setxattr = NULL, .getxattr = NULL, .listxattr = NULL, .removexattr = NULL, .opendir = lxcfs_opendir, .readdir = lxcfs_readdir, .releasedir = lxcfs_releasedir, .fsyncdir = NULL, .init = NULL, .destroy = NULL, .access = lxcfs_access, .create = NULL, .ftruncate = NULL, .fgetattr = NULL, };

cpuonline

1、cpuonline信息在/sys/devices/system/cpu/路径下,lxcfs将对/sys(当然这里使用任何路径都可以)的操作注册到fuse

lxcfs.c:

const struct fuse_operations lxcfs_ops = { ...... .open = lxcfs_open, .read = lxcfs_read, .release = lxcfs_release, .write = lxcfs_write, ...... } static int lxcfs_read(const char *path, char *buf, size_t size, off_t offset, struct fuse_file_info *fi) { int ret; if (strncmp(path, "/cgroup", 7) == 0) { up_users(); ret = do_cg_read(path, buf, size, offset, fi); down_users(); return ret; } if (strncmp(path, "/proc", 5) == 0) { up_users(); ret = do_proc_read(path, buf, size, offset, fi); down_users(); return ret; } if (strncmp(path, "/sys", 4) == 0) { up_users(); ret = do_sys_read(path, buf, size, offset, fi); down_users(); return ret; } return -EINVAL; } static int do_sys_read(const char *path, char *buf, size_t size, off_t offset, struct fuse_file_info *fi) { int (*sys_read)(const char *path, char *buf, size_t size, off_t offset, struct fuse_file_info *fi); char *error; dlerror(); /* Clear any existing error */ sys_read = (int (*)(const char *, char *, size_t, off_t, struct fuse_file_info *)) dlsym(dlopen_handle, "sys_read"); error = dlerror(); if (error != NULL) { lxcfs_error("%s\n", error); return -1; } return sys_read(path, buf, size, offset, fi); }

文件操作相关代码(bindings.c、sysfs_fuse.c,cpuset.c)被封装成liblxcfs.so动态库,供lxcfs.c调用。上面do_sys_read通过dlsym获取liblxcfs.so动态库中的sys_read函数。

2、接着看下读cpuonline的过程

sysfs_fuse.c:

int sys_read(const char *path, char *buf, size_t size, off_t offset, struct fuse_file_info *fi) { struct file_info *f = (struct file_info *)fi->fh; switch (f->type) { //cpuonline 模块,type在open时设置,这里不做过多介绍,主要看下 //sys_devices_system_cpu_online_read函数的实现 case LXC_TYPE_SYS_DEVICES_SYSTEM_CPU_ONLINE: return sys_devices_system_cpu_online_read(buf, size, offset, fi); case LXC_TYPE_SYS_DEVICES: case LXC_TYPE_SYS_DEVICES_SYSTEM: case LXC_TYPE_SYS_DEVICES_SYSTEM_CPU: default: return -EINVAL; } } static int sys_devices_system_cpu_online_read(char *buf, size_t size, off_t offset, struct fuse_file_info *fi) { //获取上下文信息,主要是读取cuponline进程(例如cat /sys/devices/system/cpu/online的cat进程,以下简称“调用进程”)的进程id struct fuse_context *fc = fuse_get_context(); struct file_info *d = (struct file_info *)fi->fh; char *cache = d->buf; char *cg; char *cpuset = NULL; bool use_view; int max_cpus = 0; pid_t initpid; ssize_t total_len = 0; if (offset) { if (!d->cached) return 0; if (offset > d->size) return -EINVAL; int left = d->size - offset; total_len = left > size ? size : left; memcpy(buf, cache offset, total_len); return total_len; } //获取容器中1号进程在物理机上的进程id;initpid返回为0时,说明调用进程是物理上的进程 initpid = lookup_initpid_in_store(fc->pid); if (initpid <= 0) initpid = fc->pid; //获取容器1号进程的cgroup //例如:docker/368adedeb87172d68388cee9818e873d73503a5b1d1d2a6b47fbd053f6d68601 cg = get_pid_cgroup(initpid, "cpuset"); if (!cg) return read_file("/sys/devices/system/cpu/online", buf, size, d); prune_init_slice(cg); cpuset = get_cpuset(cg); if (!cpuset) goto err; // 检查cpu、 cpuacct 控制器是否存在,不存在直接返回物理机cpuonine信息 use_view = use_cpuview(cg); if (use_view) // 获取容器真正可使用的cpu个数,如果容器没配置cpu quota(默认-1),则直接返回物理信息 max_cpus = max_cpu_count(cg); if (max_cpus == 0) return read_file("/sys/devices/system/cpu/online", buf, size, d); if (max_cpus > 1) total_len = snprintf(d->buf, d->buflen, "0-%d\n", max_cpus - 1); else total_len = snprintf(d->buf, d->buflen, "0\n"); if (total_len < 0 || total_len >= d->buflen) { lxcfs_error("%s\n", "failed to write to cache"); return 0; } d->size = (int)total_len; d->cached = 1; if (total_len > size) total_len = size; memcpy(buf, d->buf, total_len); err: free(cpuset); free(cg); return total_len; } /* * Return the maximum number of visible CPUs based on CPU quotas. * If there is no quota set, zero is returned. */ int max_cpu_count(const char *cg) { int rv, nprocs; int64_t cfs_quota, cfs_period; int nr_cpus_in_cpuset = 0; char *cpuset = NULL; // 读取物理机上容器cpu的quota值 if (!read_cpu_cfs_param(cg, "quota", &cfs_quota)) return 0; // 读取物理机上容器cpu的period值 if (!read_cpu_cfs_param(cg, "period", &cfs_period)) return 0; cpuset = get_cpuset(cg); if (cpuset) nr_cpus_in_cpuset = cpu_number_in_cpuset(cpuset); if (cfs_quota <= 0 || cfs_period <= 0){ if (nr_cpus_in_cpuset > 0) return nr_cpus_in_cpuset; return 0; } // 容器何用的cpu计算 rv = cfs_quota / cfs_period; /* In case quota/period does not yield a whole number, add one CPU for * the remainder.这里的意思是限制cpu为0.5和,视图效果为1核。1.5 即 2 */ if ((cfs_quota % cfs_period) > 0) rv = 1; /*获取可用的cpu核数sysconf(_SC_NPROCESSORS_ONLN)*/ nprocs = get_nprocs(); if (rv > nprocs) rv = nprocs; /* use min value in cpu quota and cpuset */ if (nr_cpus_in_cpuset > 0 && nr_cpus_in_cpuset < rv) rv = nr_cpus_in_cpuset; return rv; } // 看下quota是怎么获取的 /* * Read cgroup CPU quota parameters from `cpu.cfs_quota_us` or `cpu.cfs_period_us`, * depending on `param`. Parameter value is returned throuh `value`. */ static bool read_cpu_cfs_param(const char *cg, const char *param, int64_t *value) { bool rv = false; char file[11 6 1]; // cpu.cfs__us quota/period \0 char *str = NULL; sprintf(file, "cpu.cfs_%s_us", param); // 重点是这里 if (!cgfs_get_value("cpu", cg, file, &str)) goto err; ...... } bool cgfs_get_value(const char *controller, const char *cgroup, const char *file, char **value) { int ret, fd, cfd; size_t len; char *fnam, *tmpc; // 获取cpu controller文件描述符,到之前说过fd_hierarchies中查 tmpc = find_mounted_controller(controller, &cfd); if (!tmpc) return false; /* Make sure we pass a relative path to *at() family of functions. * . /cgroup / file \0 */ len = strlen(cgroup) strlen(file) 3; fnam = alloca(len); ret = snprintf(fnam, len, "%s%s/%s", *cgroup == '/' ? "." : "", cgroup, file); if (ret < 0 || (size_t)ret >= len) return false; // fd也就是 /run/lxcfs/controllers/cpu/docker/dockerid/cpu.cfs_quota_us fd = openat(cfd, fnam, O_RDONLY); if (fd < 0) return false; // 读值cfs_quota_us *value = slurp_file(fnam, fd); return *value != NULL; }

loadavg

  • 平均负载的概念:平均负载是一段时间内 活跃task队列 的平均值,活跃进程指的是TASK_RUNNING, TASK_UNINTERRUPTIBLE状态的进程。内核计算loadavg的方式,感兴趣的同学可以看看源码。
  • loadavg和其他部分不太一样的是,lxcfs需要用daemon进程计算平均负载,因为我们需要的容器(也就是特定进程的cgroup)的平均负载,宿主机没有这部分数据。lxcfs用与内核完全相同的方式计算负载,所以loadavg的值还是相当准足准确的。 宿主机计算的平均负载是根据所有的task(进程、线程)计算得到,容器的平均负载是根据容器内的进程计算而得。

1、loadavg daemon分析

load daemon的调用流程:main-> start_loadavg-> load_daemon-> load_begin

load_begin就像注释写的一样,每5s遍历一次load哈希表,并更新负载值

/* * Traverse the hash table and update it. */ void *load_begin(void *arg) { ...... while (1) { if (loadavg_stop == 1) return NULL; time1 = clock(); for (i = 0; i < LOAD_SIZE; i ) { pthread_mutex_lock(&load_hash[i].lock); if (load_hash[i].next == NULL) { pthread_mutex_unlock(&load_hash[i].lock); continue; } f = load_hash[i].next; first_node = 1; while (f) { ...... // 更新负载 sum = refresh_load(f, path); if (sum == 0) { f = del_node(f, i); } else { out: f = f->next; } free(path); ...... } if (loadavg_stop == 1) return NULL; time2 = clock(); usleep(FLUSH_TIME * 1000000 - (int)((time2 - time1) * 1000000 / CLOCKS_PER_SEC)); } }

主要分析下refresh_load

/* * Return 0 means that container p->cg is closed. * Return -1 means that error occurred in refresh. * Positive num equals the total number of pid. */ static int refresh_load(struct load_node *p, char *path) { FILE *f = NULL; char **idbuf; char proc_path[256]; int i, ret, run_pid = 0, total_pid = 0, last_pid = 0; char *line = NULL; size_t linelen = 0; int sum, length; DIR *dp; struct dirent *file; do { idbuf = malloc(sizeof(char *)); } while (!idbuf); // 这里从/sys/fs/cgroup/cpu/docker/containerid/cgroup.procs to find the process pid.那容器内进程的pid sum = calc_pid(&idbuf, path, DEPTH_DIR, 0, p->cfd); /* normal exit */ if (sum == 0) goto out; for (i = 0; i < sum; i ) { /*clean up '\n' */ length = strlen(idbuf[i])-1; idbuf[i][length] = '\0'; ret = snprintf(proc_path, 256, "/proc/%s/task", idbuf[i]); if (ret < 0 || ret > 255) { lxcfs_error("%s\n", "snprintf() failed in refresh_load."); i = sum; sum = -1; goto err_out; } dp = opendir(proc_path); if (!dp) { lxcfs_error("%s\n", "Open proc_path failed in refresh_load."); continue; } // 遍历/proc/<pid>/task 目录(一个进程中创建的每个线程,/proc/<pid>/task 中会创建一个相应的目录),查找状态为R或者D的task while ((file = readdir(dp)) != NULL) { if (strncmp(file->d_name, ".", 1) == 0) continue; if (strncmp(file->d_name, "..", 1) == 0) continue; total_pid ; /* We make the biggest pid become last_pid.*/ ret = atof(file->d_name); last_pid = (ret > last_pid) ? ret : last_pid; ret = snprintf(proc_path, 256, "/proc/%s/task/%s/status", idbuf[i], file->d_name); if (ret < 0 || ret > 255) { lxcfs_error("%s\n", "snprintf() failed in refresh_load."); i = sum; sum = -1; closedir(dp); goto err_out; } f = fopen(proc_path, "r"); if (f != NULL) { while (getline(&line, &linelen, f) != -1) { /* Find State */ if ((line[0] == 'S') && (line[1] == 't')) break; } if ((line[7] == 'R') || (line[7] == 'D')) run_pid ; fclose(f); } } closedir(dp); } /*Calculate the loadavg.*/ // 获取到活跃的task数量后,是时候表演真正的技术了(计算平均负载)。计算公式与内核一致:load(t) = load(t-1) e-5/60 n (1 - e-5/60) // 具体含义可以参考:[https://www.helpsystems.com/resources/guideshow-it-works](https://w/unix-load-average-part-1-ww.helpsystems.com/resources/guides/unix-load-average-part-1-how-it-works) p->avenrun[0] = calc_load(p->avenrun[0], EXP_1, run_pid); p->avenrun[1] = calc_load(p->avenrun[1], EXP_5, run_pid); p->avenrun[2] = calc_load(p->avenrun[2], EXP_15, run_pid); p->run_pid = run_pid; p->total_pid = total_pid; p->last_pid = last_pid; free(line); err_out: for (; i > 0; i--) free(idbuf[i-1]); out: free(idbuf); return sum; }

2、读取loadavg

负载计算明白了,读就简单了。这里要注意下的是,load_hash,哈希表中的数据是容器第一次读/proc/loadavg时插入的(毕竟没办法事先知道容器的进程cgroup)。

static int proc_loadavg_read(char *buf, size_t size, off_t offset, struct fuse_file_info *fi) { struct fuse_context *fc = fuse_get_context(); struct file_info *d = (struct file_info *)fi->fh; pid_t initpid; char *cg; size_t total_len = 0; char *cache = d->buf; struct load_node *n; int hash; int cfd, rv = 0; unsigned long a, b, c; if (offset) { if (offset > d->size) return -EINVAL; if (!d->cached) return 0; int left = d->size - offset; total_len = left > size ? size : left; memcpy(buf, cache offset, total_len); return total_len; } if (!loadavg) return read_file("/proc/loadavg", buf, size, d); initpid = lookup_initpid_in_store(fc->pid); if (initpid <= 0) initpid = fc->pid; cg = get_pid_cgroup(initpid, "cpu"); if (!cg) return read_file("/proc/loadavg", buf, size, d); prune_init_slice(cg); hash = calc_hash(cg) % LOAD_SIZE; // 根据cgroup在hash表查找node n = locate_node(cg, hash); /* First time */ // 第一读时,先把节点信息插到hash边 if (n == NULL) { if (!find_mounted_controller("cpu", &cfd)) { /* * In locate_node() above, pthread_rwlock_unlock() isn't used * because delete is not allowed before read has ended. */ pthread_rwlock_unlock(&load_hash[hash].rdlock); rv = 0; goto err; } do { n = malloc(sizeof(struct load_node)); } while (!n); do { n->cg = malloc(strlen(cg) 1); } while (!n->cg); strcpy(n->cg, cg); n->avenrun[0] = 0; n->avenrun[1] = 0; n->avenrun[2] = 0; n->run_pid = 0; n->total_pid = 1; n->last_pid = initpid; n->cfd = cfd; insert_node(&n, hash); } // 第二次以后开始从daemon的计算结果中读取 a = n->avenrun[0] (FIXED_1/200); b = n->avenrun[1] (FIXED_1/200); c = n->avenrun[2] (FIXED_1/200); total_len = snprintf(d->buf, d->buflen, "%lu.lu %lu.lu %lu.lu %d/%d %d\n", LOAD_INT(a), LOAD_FRAC(a), LOAD_INT(b), LOAD_FRAC(b), LOAD_INT(c), LOAD_FRAC(c), n->run_pid, n->total_pid, n->last_pid); pthread_rwlock_unlock(&load_hash[hash].rdlock); if (total_len < 0 || total_len >= d->buflen) { lxcfs_error("%s\n", "Failed to write to cache"); rv = 0; goto err; } d->size = (int)total_len; d->cached = 1; if (total_len > size) total_len = size; memcpy(buf, d->buf, total_len); rv = total_len; err: free(cg); return rv; }


,

免责声明:本文仅代表文章作者的个人观点,与本站无关。其原创性、真实性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容文字的真实性、完整性和原创性本站不作任何保证或承诺,请读者仅作参考,并自行核实相关内容。文章投诉邮箱:anhduc.ph@yahoo.com

    分享
    投诉
    首页