make build过程中忽然出现错误:

1
2
3
4
5
6
7
8
9
10
Sending build context to Docker daemon   220 MB
Step 1 : FROM warpdrive:tos-release-1-5
---> 769306738d96
Step 2 : COPY . /go/src/github.com/transwarp/warpdrive/
---> 07c99697b16e
Removing intermediate container 127c0e71a84b
Successfully built 07c99697b16e
/usr/bin/docker: Error response from daemon: invalid header field value "oci runtime error: container_linux.go:247: starting container process caused \"process_linux.go:245: running exec setns process for init caused \\\"exit status 6\\\"\"\n".
FATA[0301] exit status 125
make: *** [build] Error 1

尝试run 07c99697b16e报同样的错, 查看docker日志, 出现"oci runtime error: container_linux.go:247: starting container process caused \"process_linux .go:245: running exec setns process for init caused \\\"exit status 6\\\"\"\n"

1
2
3
4
5
6
8月 21 15:29:01 zhenghc-tos dockerd[2517]: time="2018-08-21T15:29:01.778212288+08:00" level=error msg="Handler for POST /v1.24/containers/79a0906242e0180dfd7aeff30e9fb179f7b7c37f0ba20533f83b4cd40b409d2a/start returned error: invali
d header field value \"oci runtime error: container_linux.go:247: starting container process caused \\\"process_linux.go:245: running exec setns process for init caused \\\\\\\"exit status 6\\\\\\\"\\\"\\n\""
8月 21 15:51:08 zhenghc-tos dockerd[2517]: time="2018-08-21T15:51:08.075631868+08:00" level=error msg="containerd: start container" error="oci runtime error: container_linux.go:247: starting container process caused \"process_linux
.go:245: running exec setns process for init caused \\\"exit status 6\\\"\"\n" id=b93539a02f27cc1ad0114552643d8e0c942fba053dc8cd3980cb8f24cf61ff25
8月 21 15:51:08 zhenghc-tos dockerd[2517]: time="2018-08-21T15:51:08.092002257+08:00" level=error msg="Create container failed with error: invalid header field value \"oci runtime error: container_linux.go:247: starting container p
rocess caused \\\"process_linux.go:245: running exec setns process for init caused \\\\\\\"exit status 6\\\\\\\"\\\"\\n\""

发现Docker比较卡, 于是清理了大量的images, 删掉了所有未运行的container, 再清理了系统缓存, 问题解决了, 但深层次的问题没弄清楚, 尽管清了缓存, 但是清完其实缓存差距不大, docker 也没有显示资源问题, 另外一个点就是, 只有这个image是进不去的, 其他的Images是可以run -it的, 而且在做COPY . /go/src/github.com/transwarp/warpdrive/这一步时间等的比较长.

https://github.com/opencontainers/runc/issues/1130 记录了相关的问题, 说runc的问题.

Go排序实现在/src/go4.org/sort/sort.go中.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func Sort(data Interface) {
n := data.Len()
if fs, ok := data.(*funcs); ok {
quickSort_func(fs.lessSwap, 0, n, maxDepth(n))
} else {
quickSort(data, 0, n, maxDepth(n))
}
}

func quickSort(data Interface, a, b, maxDepth int) {
for b-a > 12 { // Use ShellSort for slices <= 12 elements
if maxDepth == 0 {
heapSort(data, a, b)
return
}
maxDepth--
mlo, mhi := doPivot(data, a, b)
// Avoiding recursion on the larger subproblem guarantees
// a stack depth of at most lg(b-a).
if mlo-a < b-mhi {
quickSort(data, a, mlo, maxDepth)
a = mhi // i.e., quickSort(data, mhi, b)
} else {
quickSort(data, mhi, b, maxDepth)
b = mlo // i.e., quickSort(data, a, mlo)
}
}
if b-a > 1 {
// Do ShellSort pass with gap 6
// It could be written in this simplified form cause b-a <= 12
for i := a + 6; i < b; i++ {
if data.Less(i, i-6) {
data.Swap(i, i-6)
}
}
insertionSort(data, a, b)
}
}
// Insertion sort
func insertionSort(data Interface, a, b int) {
for i := a + 1; i < b; i++ {
for j := i; j > a && data.Less(j, j-1); j-- {
data.Swap(j, j-1)
}
}
}

quickSort和quickSort_func基本相同, 子不过自己定义的比较方法, 这里不在赘述.

可以看到主体是quickSort, 所以不保证稳定排序, 当数据量小于等于12时. 退化为希尔排序, 但希尔排序只做了一次gap为6, 之后再次退化为了简单插入排序. 这部分实现非常简单, i从前往后, j从i往前, 每次比较j-1和j, 将小的换到前面.

当数据量大于12时, 根据maxDepth是否为0决定使用对排序还是使用快速排序, maxDepth计算:

1
2
3
4
5
6
7
8
9
// maxDepth returns a threshold at which quicksort should switch
// to heapsort. It returns 2*ceil(lg(n+1)).
func maxDepth(n int) int {
var depth int
for i := n; i > 0; i >>= 1 {
depth++
}
return depth * 2
}

返回的是目前树高度的2倍. 每进行一轮快速排序的partition(doPivot), 这个数值就减1, 当减为0, 此时认为数据已经比较有序, 如果数据长度依然大于12, 改为堆排序效率更高.

这里先看快速排序的主体部分, 再来看堆排序的实现. partition是快排的思想核心, 他实现在doPivot中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
func doPivot(data Interface, lo, hi int) (midlo, midhi int) {
m := lo + (hi-lo)/2 // 避免整数溢出
if hi-lo > 40 { 如果长度大于40, 使用Tukey ninther法
// Tukey's ``Ninther,'' median of three medians of three.
s := (hi - lo) / 8
medianOfThree(data, lo, lo+s, lo+2*s)
medianOfThree(data, m, m-s, m+s)
medianOfThree(data, hi-1, hi-1-s, hi-1-2*s)
}
// medianOfThree 保证lo, m hi-1三个值是从小到大的
medianOfThree(data, lo, m, hi-1)

// Invariants are:
// data[lo] = pivot (set up by ChoosePivot)
// data[lo < i < a] < pivot
// data[a <= i < b] <= pivot
// data[b <= i < c] unexamined
// data[c <= i < hi-1] > pivot
// data[hi-1] >= pivot
pivot := lo
a, c := lo+1, hi-1

// 找到第一个比起始值点大的Index
for ; a < c && data.Less(a, pivot); a++ {
}
b := a
for {
for ; b < c && !data.Less(pivot, b); b++ { // data[b] <= pivot
}
for ; b < c && data.Less(pivot, c-1); c-- { // data[c-1] > pivot
}
if b >= c {
break
}
// data[b] > pivot; data[c-1] <= pivot
data.Swap(b, c-1)
b++
c--
}
// If hi-c<3 then there are duplicates (by property of median of nine).
// Let be a bit more conservative, and set border to 5.
protect := hi-c < 5
if !protect && hi-c < (hi-lo)/4 {
// Lets test some points for equality to pivot
dups := 0
if !data.Less(pivot, hi-1) { // data[hi-1] = pivot
data.Swap(c, hi-1)
c++
dups++
}
if !data.Less(b-1, pivot) { // data[b-1] = pivot
b--
dups++
}
// m-lo = (hi-lo)/2 > 6
// b-lo > (hi-lo)*3/4-1 > 8
// ==> m < b ==> data[m] <= pivot
if !data.Less(m, pivot) { // data[m] = pivot
data.Swap(m, b-1)
b--
dups++
}
// if at least 2 points are equal to pivot, assume skewed distribution
protect = dups > 1
}
if protect {
// Protect against a lot of duplicates
// Add invariant:
// data[a <= i < b] unexamined
// data[b <= i < c] = pivot
for {
for ; a < b && !data.Less(b-1, pivot); b-- { // data[b] == pivot
}
for ; a < b && data.Less(a, pivot); a++ { // data[a] < pivot
}
if a >= b {
break
}
// data[a] == pivot; data[b-1] < pivot
data.Swap(a, b-1)
a++
b--
}
}
// Swap pivot into middle
data.Swap(pivot, b-1)
return b - 1, c
}
// medianOfThree moves the median of the three values data[m0], data[m1], data[m2] into data[m1].
func medianOfThree(data Interface, m1, m0, m2 int) {
// sort 3 elements
if data.Less(m1, m0) {
data.Swap(m1, m0)
}
// data[m0] <= data[m1]
if data.Less(m2, m1) {
data.Swap(m2, m1)
// data[m0] <= data[m2] && data[m1] < data[m2]
if data.Less(m1, m0) {
data.Swap(m1, m0)
}
}
// 保证 data[m0] <= data[m1] <= data[m2]
}

问题:

  1. k8s中cpu的request和limit中设置”0.5”和”100m”表示什么意思?
  2. 一个4核8线程的cpu, 设置cpu为”1”的limit实际的限制多少?
  3. cpu中request和limit使用上有什么区别?
  4. cgroup中是如何限制的?

Linux Cgroup

众所周知, kubernetes和docker中对cpu, memory进行了使用限制, 用于内存和cpu的资源使用隔离. 而底层使用的是linux cgroup技术. 内存比较简单, 本文主要讲cpu的cgroup.

在cgroup里面,跟CPU相关的子系统有cpusetscpuacctcpu

其中cpuset主要用于设置CPU的亲和性,可以限制cgroup中的进程只能在指定的CPU上运行,或者不能在指定的CPU上运行,同时cpuset还能设置内存的亲和性。设置亲和性一般只在比较特殊的情况才用得着,所以这里不做介绍。

cpuacct包含当前cgroup所使用的CPU的统计信息,信息量较少,有兴趣可以去看看它的文档,这里不做介绍。

本篇只介绍cpu子系统,包括怎么限制cgroup的CPU使用上限及相对于其它cgroup的相对值。

创建子cgroup

在ubuntu下,systemd已经帮我们mount好了cpu子系统,我们只需要在相应的目录下创建子目录就可以了

1
2
3
4
5
6
7
8
9
10
11
12
13
#从这里的输出可以看到,cpuset被挂载在了/sys/fs/cgroup/cpuset,
#而cpu和cpuacct一起挂载到了/sys/fs/cgroup/cpu,cpuacct下面
dev@ubuntu:~$ mount|grep cpu
cgroup on /sys/fs/cgroup/cpuset type cgroup (rw,nosuid,nodev,noexec,relatime,cpuset)
cgroup on /sys/fs/cgroup/cpu,cpuacct type cgroup (rw,nosuid,nodev,noexec,relatime,cpu,cpuacct)

#进入/sys/fs/cgroup/cpu,cpuacct并创建子cgroup
dev@ubuntu:~$ cd /sys/fs/cgroup/cpu,cpuacct
dev@ubuntu:/sys/fs/cgroup/cpu,cpuacct$ sudo mkdir test
dev@ubuntu:/sys/fs/cgroup/cpu,cpuacct$ cd test
dev@ubuntu:/sys/fs/cgroup/cpu,cpuacct/test$ ls
cgroup.clone_children cpuacct.stat cpuacct.usage_percpu cpu.cfs_quota_us cpu.stat tasks
cgroup.procs cpuacct.usage cpu.cfs_period_us cpu.shares notify_on_release

我们只需要关注cpu.开头的文件

cpu subsystem

cpu子系统调度cpu到cgroups中, 目前有两种调度策略:

  • Completely Fair Scheduler (CFS) —-将cpu时间划分成合适的份额, 按比例和权重分配给cgroup.cfs可以设置相对权重和绝对权重, 目前k8s用的是这个调度策略.
  • Real-Time scheduler (RT) —RT调度器与CFS中的绝对权重控制相似, 不过仅用于实时任务. 可以在运行时进行实时调整参数.
  1. Completely Fair Scheduler (CFS):
  • 强制绝对控制参数:

    cpu.cfs_period_us & cpu.cfs_quota_us

cfs_period_us用来配置时间周期长度,cfs_quota_us用来配置当前cgroup在设置的周期长度内所能使用的CPU时间数,两个文件配合起来设置CPU的使用上限。两个文件的单位都是微秒(us),cfs_period_us的取值范围为1毫秒(ms)到1秒(s),cfs_quota_us的取值大于1ms即可,如果cfs_quota_us的值为-1(默认值),表示不受cpu时间的限制。下面是几个例子:

1
2
3
4
5
6
7
8
9
10
11
1.限制只能使用1个CPU(每250ms能使用250ms的CPU时间)
# echo 250000 > cpu.cfs_quota_us /* quota = 250ms */
# echo 250000 > cpu.cfs_period_us /* period = 250ms */

2.限制使用2个CPU(内核)(每500ms能使用1000ms的CPU时间,即使用两个内核)
# echo 1000000 > cpu.cfs_quota_us /* quota = 1000ms */
# echo 500000 > cpu.cfs_period_us /* period = 500ms */

3.限制使用1个CPU的20%(每50ms能使用10ms的CPU时间,即使用一个CPU核心的20%)
# echo 10000 > cpu.cfs_quota_us /* quota = 10ms */
# echo 50000 > cpu.cfs_period_us /* period = 50ms */

cpu.stat:

包含了下面三项统计结果

  • nr_periods: 表示过去了多少个cpu.cfs_period_us里面配置的时间周期
  • nr_throttled: 在上面的这些周期中,有多少次是受到了限制(即cgroup中的进程在指定的时间周期中用光了它的配额)
  • throttled_time: cgroup中的进程被限制使用CPU持续了多长时间(纳秒)
  • 相对控制参数: cpu.shares

shares用来设置CPU的相对值,并且是针对所有的CPU(内核),默认值是1024,假如系统中有两个cgroup,分别是A和B,A的shares值是1024,B的shares值是512,那么A将获得1024/(1204+512)=66%的CPU资源,而B将获得33%的CPU资源。shares有两个特点:

  • 如果A不忙,没有使用到66%的CPU时间,那么剩余的CPU时间将会被系统分配给B,即B的CPU使用率可以超过33%
  • 如果添加了一个新的cgroup C,且它的shares值是1024,那么A的限额变成了1024/(1204+512+1024)=40%,B的变成了20%

从上面两个特点可以看出:

  • 在闲的时候,shares基本上不起作用,只有在CPU忙的时候起作用,这是一个优点。
  • 由于shares是一个绝对值,需要和其它cgroup的值进行比较才能得到自己的相对限额,而在一个部署很多容器的机器上,cgroup的数量是变化的,所以这个限额也是变化的,自己设置了一个高的值,但别人可能设置了一个更高的值,所以这个功能没法精确的控制CPU使用率。
  1. Real-Time scheduler (RT):

cpu.rt_period_us: 周期时间, us, 同cfs

cpu.rt_runtime_us: 最长持续周期, us. 例如设置rt_runtime_us = 200000, rt_period_us = 1000000, 这就是说如果node有2cpu, 那么每秒钟占用时间就是2*0.2 = 0.4s. 这个也是绝对时间.

运行事例:

以cfs为例.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#继续使用上面创建的子cgroup: test
#设置只能使用1个cpu的20%的时间
dev@ubuntu:/sys/fs/cgroup/cpu,cpuacct/test$ sudo sh -c "echo 50000 > cpu.cfs_period_us"
dev@ubuntu:/sys/fs/cgroup/cpu,cpuacct/test$ sudo sh -c "echo 10000 > cpu.cfs_quota_us"

#将当前bash加入到该cgroup
dev@ubuntu:/sys/fs/cgroup/cpu,cpuacct/test$ echo $$
5456
dev@ubuntu:/sys/fs/cgroup/cpu,cpuacct/test$ sudo sh -c "echo 5456 > cgroup.procs"

#在bash中启动一个死循环来消耗cpu,正常情况下应该使用100%的cpu(即消耗一个内核)
dev@ubuntu:/sys/fs/cgroup/cpu,cpuacct/test$ while :; do echo test > /dev/null; done

#--------------------------重新打开一个shell窗口----------------------
#通过top命令可以看到5456的CPU使用率为20%左右,说明被限制住了
#不过这时系统的%us+%sy在10%左右,那是因为我测试的机器上cpu是双核的,
#所以系统整体的cpu使用率为10%左右
dev@ubuntu:~$ top
Tasks: 139 total, 2 running, 137 sleeping, 0 stopped, 0 zombie
%Cpu(s): 5.6 us, 6.2 sy, 0.0 ni, 88.2 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st
KiB Mem : 499984 total, 15472 free, 81488 used, 403024 buff/cache
KiB Swap: 0 total, 0 free, 0 used. 383332 avail Mem

PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
5456 dev 20 0 22640 5472 3524 R 20.3 1.1 0:04.62 bash

#这时可以看到被限制的统计结果
dev@ubuntu:~$ cat /sys/fs/cgroup/cpu,cpuacct/test/cpu.stat
nr_periods 1436
nr_throttled 1304
throttled_time 51542291833

kubernetes 资源控制机制

kubernetes使用runc作为runtime, runc中通过cpuGroup对cpu子系统的的调度进行设置. 其中Set()用于初始设置, Apply()方法用于动态更改设置, . 原理就是往上述的指定文件写入相应条目和数字. 没有什么可说的.

内存的Set():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func (s *MemoryGroup) Set(path string, cgroup *configs.Cgroup) error {
if err := setMemoryAndSwap(path, cgroup); err != nil {
return err
}

if cgroup.Resources.KernelMemory != 0 {
if err := setKernelMemory(path, cgroup.Resources.KernelMemory); err != nil {
return err
}
}

if cgroup.Resources.MemoryReservation != 0 {
if err := writeFile(path, "memory.soft_limit_in_bytes", strconv.FormatInt(cgroup.Resources.MemoryReservation, 10)); err != nil {
return err
}
}

if cgroup.Resources.KernelMemoryTCP != 0 {
if err := writeFile(path, "memory.kmem.tcp.limit_in_bytes", strconv.FormatInt(cgroup.Resources.KernelMemoryTCP, 10)); err != nil {
return err
}
}
if cgroup.Resources.OomKillDisable {
if err := writeFile(path, "memory.oom_control", "1"); err != nil {
return err
}
}
if cgroup.Resources.MemorySwappiness == nil || int64(*cgroup.Resources.MemorySwappiness) == -1 {
return nil
} else if *cgroup.Resources.MemorySwappiness <= 100 {
if err := writeFile(path, "memory.swappiness", strconv.FormatUint(*cgroup.Resources.MemorySwappiness, 10)); err != nil {
return err
}
} else {
return fmt.Errorf("invalid value:%d. valid memory swappiness range is 0-100", *cgroup.Resources.MemorySwappiness)
}

return nil
}

内存Apply():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (s *MemoryGroup) Apply(d *cgroupData) (err error) {
path, err := d.path("memory")
if err != nil && !cgroups.IsNotFound(err) {
return err
} else if path == "" {
return nil
}
if memoryAssigned(d.config) {
if _, err := os.Stat(path); os.IsNotExist(err) {
if err := os.MkdirAll(path, 0755); err != nil {
return err
}
// 只有内核内存可以动态设置
if err := EnableKernelMemoryAccounting(path); err != nil {
return err
}
}
}
...
}

func EnableKernelMemoryAccounting(path string) error {
// Check if kernel memory is enabled
// We have to limit the kernel memory here as it won't be accounted at all
// until a limit is set on the cgroup and limit cannot be set once the
// cgroup has children, or if there are already tasks in the cgroup.
for _, i := range []int64{1, -1} {
if err := setKernelMemory(path, i); err != nil {
return err
}
}
return nil
}

cpu的Set():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func (s *CpuGroup) Set(path string, cgroup *configs.Cgroup) error {
// 设置CFS
if cgroup.Resources.CpuShares != 0 {
if err := writeFile(path, "cpu.shares", strconv.FormatUint(cgroup.Resources.CpuShares, 10)); err != nil {
return err
}
}
if cgroup.Resources.CpuPeriod != 0 {
if err := writeFile(path, "cpu.cfs_period_us", strconv.FormatUint(cgroup.Resources.CpuPeriod, 10)); err != nil {
return err
}
}
if cgroup.Resources.CpuQuota != 0 {
if err := writeFile(path, "cpu.cfs_quota_us", strconv.FormatInt(cgroup.Resources.CpuQuota, 10)); err != nil {
return err
}
}
// 设置RT
if err := s.SetRtSched(path, cgroup); err != nil {
return err
}

return nil
}

func (s *CpuGroup) SetRtSched(path string, cgroup *configs.Cgroup) error {
if cgroup.Resources.CpuRtPeriod != 0 {
if err := writeFile(path, "cpu.rt_period_us", strconv.FormatUint(cgroup.Resources.CpuRtPeriod, 10)); err != nil {
return err
}
}
if cgroup.Resources.CpuRtRuntime != 0 {
if err := writeFile(path, "cpu.rt_runtime_us", strconv.FormatInt(cgroup.Resources.CpuRtRuntime, 10)); err != nil {
return err
}
}
return nil
}

cpu的Apply() 只能设置RT:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (s *CpuGroup) Apply(d *cgroupData) error {
// We always want to join the cpu group, to allow fair cpu scheduling
// on a container basis
path, err := d.path("cpu")
if err != nil && !cgroups.IsNotFound(err) {
return err
}
return s.ApplyDir(path, d.config, d.pid)
}

func (s *CpuGroup) ApplyDir(path string, cgroup *configs.Cgroup, pid int) error {
// This might happen if we have no cpu cgroup mounted.
// Just do nothing and don't fail.
if path == "" {
return nil
}
if err := os.MkdirAll(path, 0755); err != nil {
return err
}
// We should set the real-Time group scheduling settings before moving
// in the process because if the process is already in SCHED_RR mode
// and no RT bandwidth is set, adding it will fail.
if err := s.SetRtSched(path, cgroup); err != nil {
return err
}
// because we are not using d.join we need to place the pid into the procs file
// unlike the other subsystems
if err := cgroups.WriteCgroupProc(path, pid); err != nil {
return err
}

return nil
}
K8s Limits & Request 代码分析

k8s中管理cgroup的结构体在k8s.io/kubernetes/pkg/kubelet/cm/cgroup_manager_linux.go中:

1
2
3
4
5
6
7
8
9
10
11
// cgroupManagerImpl implements the CgroupManager interface.
// Its a stateless object which can be used to
// update,create or delete any number of cgroups
// It uses the Libcontainer raw fs cgroup manager for cgroup management.
type cgroupManagerImpl struct {
// subsystems holds information about all the
// mounted cgroup subsystems on the node
subsystems *CgroupSubsystems
// simplifies interaction with libcontainer and its cgroup managers
adapter *libcontainerAdapter
}

来看下Update方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// Update updates the cgroup with the specified Cgroup Configuration
func (m *cgroupManagerImpl) Update(cgroupConfig *CgroupConfig) error {
...
// 提取cgroup资源参数
resourceConfig := cgroupConfig.ResourceParameters
resources := m.toResources(resourceConfig)

cgroupPaths := m.buildCgroupPaths(cgroupConfig.Name)

// 获得cgroupfs的位置.
abstractCgroupFsName := string(cgroupConfig.Name)
abstractParent := CgroupName(path.Dir(abstractCgroupFsName))
abstractName := CgroupName(path.Base(abstractCgroupFsName))

driverParent := m.adapter.adaptName(abstractParent, false)
driverName := m.adapter.adaptName(abstractName, false)

// 获取systemd的绝对位置
if m.adapter.cgroupManagerType == libcontainerSystemd {
driverName = m.adapter.adaptName(cgroupConfig.Name, false)
}

// 初始化cgroup配置
libcontainerCgroupConfig := &libcontainerconfigs.Cgroup{
Name: driverName,
Parent: driverParent,
Resources: resources,
Paths: cgroupPaths,
}

// 设置cgroup配置
if err := setSupportedSubsystems(libcontainerCgroupConfig); err != nil {
return fmt.Errorf("failed to set supported cgroup subsystems for cgroup %v: %v", cgroupConfig.Name, err)
}
return nil
}

先看下参数是如何提取的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (m *cgroupManagerImpl) toResources(resourceConfig *ResourceConfig) *libcontainerconfigs.Resources {
resources := &libcontainerconfigs.Resources{}
if resourceConfig == nil {
return resources
}
if resourceConfig.Memory != nil {
resources.Memory = *resourceConfig.Memory
}
if resourceConfig.CpuShares != nil {
resources.CpuShares = *resourceConfig.CpuShares
}
if resourceConfig.CpuQuota != nil {
resources.CpuQuota = *resourceConfig.CpuQuota
}
if resourceConfig.CpuPeriod != nil {
resources.CpuPeriod = *resourceConfig.CpuPeriod
}

// huge page参数提取, 略过
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.HugePages) {
...
return resources
}

在来看看CpuShare, CpuQuota, 和CpuPeriod都来字哪里:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
// ResourceConfigForPod takes the input pod and outputs the cgroup resource config.
func ResourceConfigForPod(pod *v1.Pod) *ResourceConfig {
// sum requests and limits.
reqs, limits := resource.PodRequestsAndLimits(pod)

cpuRequests := int64(0)
cpuLimits := int64(0)
memoryLimits := int64(0)
if request, found := reqs[v1.ResourceCPU]; found {
cpuRequests = request.MilliValue()
}
if limit, found := limits[v1.ResourceCPU]; found {
cpuLimits = limit.MilliValue()
}
if limit, found := limits[v1.ResourceMemory]; found {
memoryLimits = limit.Value()
}

// convert to CFS values
cpuShares := MilliCPUToShares(cpuRequests)
cpuQuota, cpuPeriod := MilliCPUToQuota(cpuLimits)
...
}

// MilliCPUToShares converts the milliCPU to CFS shares.
func MilliCPUToShares(milliCPU int64) uint64 {
if milliCPU == 0 {
// Docker converts zero milliCPU to unset, which maps to kernel default
// for unset: 1024. Return 2 here to really match kernel default for
// zero milliCPU.
return MinShares
}
// Conceptually (milliCPU / milliCPUToCPU) * sharesPerCPU, but factored to improve rounding.
shares := (milliCPU * SharesPerCPU) / MilliCPUToCPU
if shares < MinShares {
return MinShares
}
return uint64(shares)
}

// MilliCPUToQuota converts milliCPU to CFS quota and period values.
func MilliCPUToQuota(milliCPU int64) (quota int64, period uint64) {
// CFS quota is measured in two values:
// - cfs_period_us=100ms (the amount of time to measure usage across)
// - cfs_quota=20ms (the amount of cpu time allowed to be used across a period)
// so in the above example, you are limited to 20% of a single CPU
// for multi-cpu environments, you just scale equivalent amounts

if milliCPU == 0 {
return
}

// we set the period to 100ms by default
period = QuotaPeriod

// we then convert your milliCPU to a value normalized over a period
quota = (milliCPU * QuotaPeriod) / MilliCPUToCPU

// quota needs to be a minimum of 1ms.
if quota < MinQuotaPeriod {
quota = MinQuotaPeriod
}

return
}

const (
// Taken from lmctfy https://github.com/google/lmctfy/blob/master/lmctfy/controllers/cpu_controller.cc
MinShares = 2
SharesPerCPU = 1024
MilliCPUToCPU = 1000

// 100000 is equivalent to 100ms
QuotaPeriod = 100000
MinQuotaPeriod = 1000
)

设置的代码:

1
2
3
4
5
6
7
8
9
10
11
12
func setSupportedSubsystems(cgroupConfig *libcontainerconfigs.Cgroup) error {
for _, sys := range getSupportedSubsystems() {
if _, ok := cgroupConfig.Paths[sys.Name()]; !ok {
return fmt.Errorf("Failed to find subsystem mount for subsystem: %v", sys.Name())
}
// 调用前面的cgroup.Set()方法
if err := sys.Set(cgroupConfig.Paths[sys.Name()], cgroupConfig); err != nil {
return fmt.Errorf("Failed to set config for supported subsystems : %v", err)
}
}
return nil
}

代码很明显了:

Request -> cpu.shares

Limits -> cpu.quota(CFS或者RT)

QuotaPeriod 为100ms

所以当Request设置0.5(0.5 1024=512), 等价于设置500m(500 1024/1000=512), 也就是512.

1
2
[root@tdc-tester04 ~]# cat /sys/fs/cgroup/cpu/kubepods/burstable/pod4de91174-9002-11e8-a663-ac1f6b83dd66/1cbc69fd7756efdcba38d11a57acab5cdbe0b9b2e5eef2a9e86e3c6c8850b1a1/cpu.shares 
512

当Limit设置1(1 1000 100000/1000=100000), 等价于1000m(1000 * 100000 / 1000 = 100000 ), 也就是quota=100000, period=100000.

1
2
3
4
[root@tdc-tester04 ~]# cat /sys/fs/cgroup/cpu/kubepods/burstable/pod4de91174-9002-11e8-a663-ac1f6b83dd66/1cbc69fd7756efdcba38d11a57acab5cdbe0b9b2e5eef2a9e86e3c6c8850b1a1/cpu.cfs_quota_us 
100000
[root@tdc-tester04 ~]# cat /sys/fs/cgroup/cpu/kubepods/burstable/pod4de91174-9002-11e8-a663-ac1f6b83dd66/1cbc69fd7756efdcba38d11a57acab5cdbe0b9b2e5eef2a9e86e3c6c8850b1a1/cpu.cfs_period_us
100000

至此所有路走通.

值得注意的点

1
2
3
4
5
6
The CPU resource is measured in cpu units. One cpu, in Kubernetes, is equivalent to:

1 AWS vCPU
1 GCP Core
1 Azure vCore
1 Hyperthread on a bare-metal Intel processor with Hyperthreading

从文档来看所以4核8线程对k8s来讲就是8核.

如果只设置了request没有设置limit, 意味着一个pod可以任意使用node资源, 如果没有其他pod创建. 如果集群管理员设置了LimitRange, 那么当pod没有设置limit的时候就会使用LimitRange里设置的值作为默认.

参考文献

  1. https://kubernetes.io/docs/tasks/administer-cluster/manage-resources/memory-default-namespace/#what-if-you-specify-a-container-s-request-but-not-its-limit
  2. https://segmentfault.com/a/1190000008323952
  3. https://access.redhat.com/documentation/en-us/red_hat_enterprise_linux/6/html/resource_management_guide/sec-cpu

准备用CRD做一个数据迁移的operator, 将不同结点的pvc迁移. 于是决定使用operator-sdk.

operator-sdk能够自动生成operator相关代码, 并能够直接部署到k8s集群中, 使用起来还是相当方便的.

operator-sdk需要搭配operator-lifecycle-manager来使用, 安装配置方法这里不做赘述, 可以参考官网,或者这篇文章:Developing Kubernetes Operator is now easy with Operator Framework

operator的大体框架如下所示:

1
2
3
4
5
6
graph LR
A[operaotr] -->|Watch|B[custom resource]
A[operaotr] -->|handler|F[Handler]
F --> E(All Resources)
C(CRD) --> A
D(Deployment) --> A

通过operator-sdk可以自动生成 crd.yaml, operator.yaml, cr.yaml, rbac.yaml. 这些都需要手动部署到k8s.

operator.yaml是最重要的一个. 里面包含handle逻辑和type结构, 它是你需要去实现和填充的业务逻辑, 理论上handle可以实现任何逻辑, 比如对修改apiserver中的某些资源内容, 或者创建新的资源, 或者起一个pod和job做一些hack的事情.

做完这些就可以生成operator了, 他会build一个image, operator是一个deployment的, 它去拉这个image然后watch你定义的cr, 当你创建或者删除cr, 就会触发handle操作.

map数据结构

golang的map是hashmap实现的, 代码在/src/runtime/hashmap.go. 对比C++用红黑树实现的map,Go的map是unordered map,即无法对key值排序遍历。跟传统的hashmap的实现方法一样,它通过一个buckets数组实现,所有元素被hash到数组的bucket中,buckets就是指向了这个内存连续分配的数组.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
type maptype struct {
typ _type
key *_type
elem *_type
bucket *_type // internal type representing a hash bucket
hmap *_type // internal type representing a hmap
keysize uint8 // size of key slot
indirectkey bool // store ptr to key instead of key itself
valuesize uint8 // size of value slot
indirectvalue bool // store ptr to value instead of value itself
bucketsize uint16 // size of bucket
reflexivekey bool // true if k==k for all keys
needkeyupdate bool // true if we need to update key on an overwrite
}

// A header for a Go map.
type hmap struct {
count int // len()返回的map的大小 即有多少kv对
flags uint8
B uint8 // 表示hash table总共有2^B个buckets
hash0 uint32 // hash seed

buckets unsafe.Pointer // 一系列桶的头指针, 按照low hash值可查找的连续分配的数组,初始时为16个Buckets.
oldbuckets unsafe.Pointer
nevacuate uintptr

overflow *[2]*[]*bmap //溢出链 当初始buckets都满了之后会使用overflow
}

// A bucket for a Go map.
type bmap struct {
tophash [bucketCnt]uint8
// Followed by bucketCnt keys and then bucketCnt values.
// NOTE: packing all the keys together and then all the values together makes the
// code a bit more complicated than alternating key/value/key/value/... but it allows
// us to eliminate padding which would be needed for, e.g., map[int64]int8.
// Followed by an overflow pointer.
}

如上所示, maptype表示map类型, 其中的hmap表示hashmap, 它的指针就是map的实体. 桶一个连续分配的数组组成, 而buckets是这个数组的头指针(起始地址), bmap是每个bucket的实体. 每个bucket有一个长度为8的数组叫做tophash, 他存储了8个key的高八位的值, 这样当我们找的时候, 先用key hash的低八位找到对应的桶, 再匹配key高8位值找到对应的tophash, 如果正确了再去找对应的key是否相等. 在对key/value对增删查的时候,先比较key的hash值高八位是否相等,然后再比较具体的key值。根据官方注释在tophash数组之后跟着8个key/value对,每一对都对应tophash当中的一条记录。最后bucket中还包含指向链表下一个bucket的指针。内存布局如下图。

image

1
之所以把所有k1k2放一起而不是k1v1是因为key和value的数据类型内存大小可能差距很大,比如map[int64]int8,考虑到字节对齐,kv存在一起会浪费很多空间。

map创建和初始化

我们首先来看make过程, 看一个map是如何创建的.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
// makemap implements a Go map creation make(map[k]v, hint)
// If the compiler has determined that the map or the first bucket
// can be created on the stack, h and/or bucket may be non-nil.
// If h != nil, the map can be created directly in h.
// If bucket != nil, bucket can be used as the first bucket.
// hint是map大小, h过不等于空就直接使用这个hmap, bucket不为空就当做第一个bucket.
func makemap(t *maptype, hint int64, h *hmap, bucket unsafe.Pointer) *hmap {
if sz := unsafe.Sizeof(hmap{}); sz > 48 || sz != t.hmap.size {
println("runtime: sizeof(hmap) =", sz, ", t.hmap.size =", t.hmap.size)
throw("bad hmap size")
}

// hint值不符合规范, 置0
if hint < 0 || hint > int64(maxSliceCap(t.bucket.size)) {
hint = 0
}
// 按照是否实现hash()来判断是否支持map类型
if !ismapkey(t.key) {
throw("runtime.makemap: unsupported map key type")
}

// 检查key和value大小
if t.key.size > maxKeySize && (!t.indirectkey || t.keysize != uint8(sys.PtrSize)) ||
t.key.size <= maxKeySize && (t.indirectkey || t.keysize != uint8(t.key.size)) {
throw("key size wrong")
}
if t.elem.size > maxValueSize && (!t.indirectvalue || t.valuesize != uint8(sys.PtrSize)) ||
t.elem.size <= maxValueSize && (t.indirectvalue || t.valuesize != uint8(t.elem.size)) {
throw("value size wrong")
}
// 检查各种编译的规范, 跳过
// invariants we depend on. We should probably check these at compile time
// somewhere, but for now we'll do it here.
if t.key.align > bucketCnt {
throw("key align too big")
}
if t.elem.align > bucketCnt {
throw("value align too big")
}
if t.key.size%uintptr(t.key.align) != 0 {
throw("key size not a multiple of key align")
}
if t.elem.size%uintptr(t.elem.align) != 0 {
throw("value size not a multiple of value align")
}
if bucketCnt < 8 {
throw("bucketsize too small for proper alignment")
}
if dataOffset%uintptr(t.key.align) != 0 {
throw("need padding in bucket (key)")
}
if dataOffset%uintptr(t.elem.align) != 0 {
throw("need padding in bucket (value)")
}

// 把hint参数对应成2进制的上确界那个数. 例如size=6, B就是3, 因为2^2 < 6 < 2^3
B := uint8(0)
for ; overLoadFactor(hint, B); B++ {
}

// 使用malloc分配2^B个buckets给h.
// allocate initial hash table
// if B == 0, the buckets field is allocated lazily later (in mapassign)
// If hint is large zeroing this memory could take a while.
buckets := bucket
var extra *mapextra
if B != 0 {
var nextOverflow *bmap
buckets, nextOverflow = makeBucketArray(t, B)
if nextOverflow != nil {
extra = new(mapextra)
extra.nextOverflow = nextOverflow
}
}

// initialize Hmap
if h == nil {
h = (*hmap)(newobject(t.hmap))
}
h.count = 0
h.B = B
h.extra = extra
h.flags = 0
h.hash0 = fastrand()
h.buckets = buckets
h.oldbuckets = nil
h.nevacuate = 0
h.noverflow = 0

return h
}

map存值

存储的步骤和第一部分的分析一致。首先用key的hash值低8位找到bucket,然后在bucket内部比对tophash和高8位与其对应的key值与入参key是否相等,若找到则更新这个值。若key不存在,则key优先存入在查找的过程中遇到的空的tophash数组位置。若当前的bucket已满则需要另外分配空间给这个key,新分配的bucket将挂在overflow链表后。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
// Like mapaccess, but allocates a slot for the key if it is not present in the map.
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
if h == nil {
panic(plainError("assignment to entry in nil map"))
}
if raceenabled {
callerpc := getcallerpc(unsafe.Pointer(&t))
pc := funcPC(mapassign)
racewritepc(unsafe.Pointer(h), callerpc, pc)
raceReadObjectPC(t.key, key, callerpc, pc)
}
if msanenabled {
msanread(key, t.key.size)
}
if h.flags&hashWriting != 0 {
throw("concurrent map writes")
}
alg := t.key.alg
hash := alg.hash(key, uintptr(h.hash0))

// Set hashWriting after calling alg.hash, since alg.hash may panic,
// in which case we have not actually done a write.
h.flags |= hashWriting

if h.buckets == nil {
h.buckets = newarray(t.bucket, 1)
}

again:
// 用hash值的低八位找到bucket
bucket := hash & (uintptr(1)<<h.B - 1)
if h.growing() {
growWork(t, h, bucket)
}
b := (*bmap)(unsafe.Pointer(uintptr(h.buckets) + bucket*uintptr(t.bucketsize)))
// 拿到高八位的hash值用于比对tophash
top := uint8(hash >> (sys.PtrSize*8 - 8))
if top < minTopHash {
top += minTopHash
}

var inserti *uint8
var insertk unsafe.Pointer
var val unsafe.Pointer
for {
for i := uintptr(0); i < bucketCnt; i++ {
// 遍历tophash, 找到对应的key
if b.tophash[i] != top {
// 如果没有找到key, 说明是第一次, 将这个key插入到insertk(第i+1个key所在的)的位置, 将value插入到val(第i+1个value所在)的位置, 并且将tophash的当前地址赋值给inserti, 用于记录是否插入以及插入的位置.
if b.tophash[i] == empty && inserti == nil {
inserti = &b.tophash[i]
insertk = add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
val = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.valuesize))
}
continue
}
// 如果找到了key的tophash, 就将拿对应的key跟现在的key对比, 看是否相等, 如果不相等就跳过继续找key, 如果相等就更新它的value的值.
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
if t.indirectkey {
k = *((*unsafe.Pointer)(k))
}
if !alg.equal(key, k) {
continue
}
// 如果需要更新key, 就覆盖key的值.
if t.needkeyupdate {
typedmemmove(t.key, k, key)
}
// 返回val的地址, 这个函数并不真正更新value, 只是找到value所在的地址.
val = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.valuesize))
goto done
}
// 如这个bucket没找到, 找他的overflow链表, 拿下一个bucket循环操作.
ovf := b.overflow(t)
if ovf == nil {
break
}
b = ovf
}

// 如果都没有找到对应的值, 就可能做两件事:
// 1) 建立新的overflow, 然后把值加到这个overflow的bucket中.
// 2) 如果此时map的len()超过了overLoadFactor(6.5默认值)*桶的数量(2^B, 每个桶最多8个kv), 或者overflow的bucket太多了, golang就会扩大map的容量.
// Did not find mapping for key. Allocate new cell & add entry.

// If we hit the max load factor or we have too many overflow buckets,
// and we're not already in the middle of growing, start growing.
if !h.growing() && (overLoadFactor(int64(h.count), h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
hashGrow(t, h)
goto again // Growing the table invalidates everything, so try again
}

if inserti == nil {
// all current buckets are full, allocate a new one.
newb := h.newoverflow(t, b)
inserti = &newb.tophash[0]
insertk = add(unsafe.Pointer(newb), dataOffset)
val = add(insertk, bucketCnt*uintptr(t.keysize))
}

// store new key/value at insert position
if t.indirectkey {
kmem := newobject(t.key)
*(*unsafe.Pointer)(insertk) = kmem
insertk = kmem
}
if t.indirectvalue {
vmem := newobject(t.elem)
*(*unsafe.Pointer)(val) = vmem
}
typedmemmove(t.key, insertk, key)
*inserti = top
h.count++

done:
// 不支持并发map的写.
if h.flags&hashWriting == 0 {
throw("concurrent map writes")
}
h.flags &^= hashWriting
if t.indirectvalue {
val = *((*unsafe.Pointer)(val))
}
return val
}

在往map中存值时若所有的bucket已满,需要在堆中new新的空间时需要计算是否需要扩容。扩容的时机是count > loadFactor(2^B)。这里的loadfactor选择为6.5。扩容时机的物理意义的理解 在没有溢出时hashmap总共可以存储8*(2^B)个KV对,当hashmap已经存储到6.5*(2^B)个KV对时表示hashmap已经趋于溢出,即很有可能在存值时用到overflow链表,这样会增加hitprobe和missprobe。为了使hashmap保持读取和超找的高性能,在hashmap快满时需要在新分配的bucket中重新hash元素并拷贝,源码中称之为evacuate。

overflow溢出率是指平均一个bucket有多少个kv的时候会溢出。bytes/entry是指平均存一个kv需要额外存储多少字节的数据。hitprobe是指找一个存在的key平均需要找多少次。missprobe是指找一个不存在的key平均需要找多少次。选取6.5是为了平衡这组数据。

loadFactor %overflow bytes/entry hitprobe missprobe
4.00 2.13 20.77 3.00 4.00
4.50 4.05 17.30 3.25 4.50
5.00 6.85 14.77 3.50 5.00
5.50 10.55 12.94 3.75 5.50
6.00 15.27 11.67 4.00 6.00
6.50 20.90 10.79 4.25 6.50
7.00 27.14 10.15 4.50 7.00
7.50 34.03 9.73 4.75 7.50
8.00 41.10 9.40 5.00 8.00

但这个迁移并没有在扩容之后一次性完成,而是逐步完成的,每一次insert或remove时迁移1到2个pair,即增量扩容。增量扩容的原因 主要是缩短map容器的响应时间。若hashmap很大扩容时很容易导致系统停顿无响应。增量扩容本质上就是将总的扩容时间分摊到了每一次hash操作上。由于这个工作是逐渐完成的,导致数据一部分在old table中一部分在new table中。old的bucket不会删除,只是加上一个已删除的标记。只有当所有的bucket都从old table里迁移后才会将其释放掉。

Map的取值

map取值和存值前面的过程差不多, 代码在mapaccess1中, 这里不做赘述.

Map 的删除

删除的前面部分也是找到对应的key和value, 此处省略.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) {
// 查找位置, 省略
if t.indirectkey {
// 如果存的是key的地址, 将地址变为nil
*(*unsafe.Pointer)(k) = nil
} else {
// 如果存的是key的值, 清空key的值
typedmemclr(t.key, k)
}
v := unsafe.Pointer(uintptr(unsafe.Pointer(b)) + dataOffset + bucketCnt*uintptr(t.keysize) + i*uintptr(t.valuesize))
// 同理
if t.indirectvalue {
*(*unsafe.Pointer)(v) = nil
} else {
typedmemclr(t.elem, v)
}
//将tophash相应位置标记为空, kv数量-1.
b.tophash[i] = empty
h.count--
goto done
}
}

ceph相当庞大, 准备用一个系列来学习, 先来看看基础的架构和基本概念.

基本架构

ceph_arcihitecture

ceph提供对象, 块,和文件系统存储, 它也支持直接使用librados库做存储开发. 在底层都要通过RADOS服务, 它是可靠的分布式对象存储服务. 也就是说无论上层对外提供的是何种类型的服务, ceph最终都将它转换成二进制对象进行存储.

可扩展性和高可用性

一个ceph存储集群包括两种类型的Daemon:

  1. ceph monitor: 维护一个集群map的副本. ceph monitor是高可用的.
  2. ceph osd daemon: 检查自己的状态和其他osd的状态, 然后发送给monitor. ceph client和ceph osd daemon都使用CRUSH算法有效计算数据位置信息, 而不是依赖于一个中心化的查询表.

Ceph OSD Daemon将所有数据以对象的形式保存在一个扁平的命名空间(而不是树形结构). 一个对象有一个全局唯一的ID号, 二进制数据和元数据(key/value键值对组成). 对象的语意完全由ceph客户端决定.

数据结构

ceph消除了中心化的gateway, 让Ceph client直接与ceph osd daemon交互, ceph osd daemon将数据在不同的ceph 节点 创建对象的副本, 保证数据的可用性.

集群map
  1. monitor map: 包含集群fsid, 位置, 命名地址和每个monitor的端口. epoch表示版本, 通过ceph mon dump查看

    1
    2
    3
    4
    5
    6
    7
    [root@ceph-1 ~]# ceph mon dump
    dumped monmap epoch 1
    epoch 1
    fsid a6af0fa0-0e86-4685-920a-6e999c83ba8f
    last_changed 2018-07-09 15:07:50.144554
    created 2018-07-09 15:07:50.144554
    0: 10.0.11.9:6789/0 mon.ceph-1
  2. OSD map: 包含集群fsid. a list of pools, replica sizes, PG numbers, a list of OSDs and their status (e.g., up, in).

  3. The PG map: 包含PG version, its time stamp, the last OSD map epoch, the full ratios, and details on each placement group such as the PG ID, the Up Set, the Acting Set, the state of the PG (e.g., active + clean), and data usage statistics for each pool

  4. The CRUSH map: 存储设备列表, 失败的Domain, 已经写入数据的水平路由规则.

  5. The MDS Map: Contains the current MDS map epoch, when the map was created, and the last time it changed

Ceph Crush

那么问题来了,把一份数据存到一群Server中分几步?

Ceph的答案是:两步。

  1. 计算PG
  2. 计算OSD
计算PG

在Ceph中,一切皆对象。

那么用什么来区分两个对象呢?对象名。也就是说,每个不同的对象都有不一样的对象名。于是,开篇的问题就变成了:

把一个对象存到一群Server中分几步?

这里的一群Server,由Ceph组织成一个集群,这个集群由若干的磁盘组成,也就是由若干的OSD组成。于是,继续简化问题:

把一个对象存到一堆OSD中分几步?

Ceph中的逻辑层

Ceph为了保存一个对象,对上构建了一个逻辑层,也就是池(pool),用于保存对象.

Pool再一次进行了细分,即将一个pool划分为若干的PG(归置组 Placement Group),这类似于棋盘上的方格,所有的方格构成了整个棋盘,也就是说所有的PG构成了一个pool。

现在需要解决的问题是,对象怎么知道要保存到哪个PG上,假定这里我们的pool名叫rbd,共有256个PG,给每个PG编个号分别叫做0x0, 0x1, ...0xF, 0x10, 0x11... 0xFE, 0xFF

要解决这个问题,我们先看看我们拥有什么,1,不同的对象名。2,不同的PG编号。这里就可以引入Ceph的计算方法了 : HASH。

对于对象名分别为barfoo的两个对象,对他们的对象名进行计算即:

  • HASH(‘bar’) = 0x3E0A4162
  • HASH(‘foo’) = 0x7FE391A0
  • HASH(‘bar’) = 0x3E0A4162

对于一个同样的对象名,计算出来的结果永远都是一样的.

有了这个输出,我们使用小学就会的方法:求余数!用随机数除以PG的总数256,得到的余数一定会落在[0x0, 0xFF]之间,也就是这256个PG中的某一个:

  • 0x3E0A4162 % 0xFF ===> 0x62
  • 0x7FE391A0 % 0xFF ===> 0xA0

于是乎,对象bar保存到编号为0x62的PG中,对象foo保存到编号为0xA0的PG中。

所以每个对象自有名字开始,他们要保存到的PG就已经确定了。那么爱思考的小明同学就会提出一个问题,难道不管对象的高矮胖瘦都是一样的使用这种方法计算PG吗,答案是,YES! 也就是说Ceph不区分对象的真实大小内容以及任何形式的格式,只认对象名。毕竟当对象数达到百万级时,对象的分布从宏观上来看还是平均的。

这里给出更Ceph一点的说明,实际上在Ceph中,存在着多个pool,每个pool里面存在着若干的PG,如果两个pool里面的PG编号相同,Ceph怎么区分呢? 于是乎,Ceph对每个pool进行了编号,比如刚刚的rbd池,给予编号0,再建一个pool就给予编号1,那么在Ceph里,PG的实际编号是由pool_id+.+PG_id组成的,也就是说,刚刚的bar对象会保存在0.62这个PG里,foo这个对象会保存在0.A0这个PG里。其他池里的PG名称可能为1.12f, 2.aa1,10.aa1等。

Ceph中的物理层

理解了刚刚的逻辑层,我们再看一下Ceph里的物理层,对下,也就是我们若干的服务器上的磁盘,通常,Ceph将一个磁盘看作一个OSD(实际上,OSD是管理一个磁盘的程序),于是物理层由若干的OSD组成,我们的最终目标是将对象保存到磁盘上,在逻辑层里,对象是保存到PG里面的,那么现在的任务就是打通PG和OSD之间的隧道。PG相当于一堆余数相同的对象的组合,PG把这一部分对象打了个包,现在我们需要把很多的包平均的安放在各个OSD上,这就是CRUSH算法所要做的事情:CRUSH计算PG->OSD的映射关系

加上刚刚的对象映射到PG的方法,我们将开篇的两步表示成如下的两个计算公式:

  • 池ID + HASH(‘对象名’) % pg_num ===> PG_ID
  • CRUSH(PG_ID) ===> OSD

在讨论CRUSH算法之前,我们来做一点思考,可以发现,上面两个计算公式有点类似,为何我们不把

  • CRUSH(PG_ID) ===> OSD
    改为
  • HASH(PG_ID) %OSD_num ===> OSD

我可以如下几个由此假设带来的副作用:

  • 如果挂掉一个OSD,OSD_num-1,于是所有的PG % OSD_num的余数都会变化,也就是说这个PG保存的磁盘发生了变化,对这最简单的解释就是,这个PG上的数据要从一个磁盘全部迁移到另一个磁盘上去,一个优秀的存储架构应当在磁盘损坏时使得数据迁移量降到最低,CRUSH可以做到。(这一点一致性哈希也能做到)
  • 如果保存多个副本,我们希望得到多个OSD结果的输出,HASH只能获得一个,但是CRUSH可以获得任意多个。
  • 如果增加OSD的数量,OSD_num增大了,同样会导致PG在OSD之间的胡乱迁移,但是CRUSH可以保证数据向新增机器均匀的扩散。

所以HASH只适用于一对一的映射关系计算,并且两个映射组合(对象名和PG总数)不能变化,因此这里的假设不适用于PG->OSD的映射计算。因此,这里开始引入CRUSH算法。

CRUSH算法

首先来看我们要做什么:

  • 把已有的PG_ID映射到OSD上,有了映射关系就可以把一个PG保存到一个磁盘上。
  • 如果我们想保存三个副本,可以把一个PG映射到三个不同的OSD上,这三个OSD上保存着一模一样的PG内容。

再来看我们有了什么:

  • 互不相同的PG_ID。
  • 如果给OSD也编个号,那么就有了互不相同的OSD_ID。
  • 每个OSD最大的不同的就是它们的容量,即4T还是800G的容量,我们将每个OSD的容量又称为OSD的权重(weight),规定4T权重为4,800G为0.8,也就是以T为单位的值。

现在问题转化为:如何将PG_ID映射到有各自权重的OSD上。这里我直接使用CRUSH里面采取的Straw算法,翻译过来就是抽签,说白了就是挑个最长的签,这里的签指的是OSD的权重。

  • CRUSH_HASH( PG_ID, OSD_ID, r ) ===> draw
  • ( draw &0xffff ) * osd_weight ===> osd_straw
  • pick up high_osd_straw

第一行,我们姑且把r当做一个常数,第一行实际上就做了搓一搓的事情:将PG_ID, OSD_ID和r一起当做CRUSH_HASH的输入,求出一个十六进制输出,这和HASH(对象名)完全类似,只是多了两个输入。所以需要强调的是,对于相同的三个输入,计算得出的draw的值是一定相同的。

这个draw到底有啥用?其实,CRUSH希望得到一个随机数,也就是这里的draw,然后拿这个随机数去乘以OSD的权重,这样把随机数和OSD的权重搓在一起,就得到了每个OSD的实际签长,而且每个签都不一样长(极大概率),就很容易从中挑一个最长的。

说白了,CRUSH希望随机挑一个OSD出来,但是还要满足权重越大的OSD被挑中的概率越大,为了达到随机的目的,它在挑之前让每个OSD都拿着自己的权重乘以一个随机数,再取乘积最大的那个。那么这里我们再定个小目标:挑个一亿次!从宏观来看,同样是乘以一个随机数,在样本容量足够大之后,这个随机数对挑中的结果不再有影响,起决定性影响的是OSD的权重,也就是说,OSD的权重越大,宏观来看被挑中的概率越大。

如果看到这里你已经被搅晕了,那让我再简单梳理下PG选择一个OSD时做的事情:

  • 给出一个PG_ID,作为CRUSH_HASH的输入。
  • CRUSH_HASH(PG_ID, OSD_ID, r) 得出一个随机数(重点是随机数,不是HASH)。
  • 对于所有的OSD用他们的权重乘以每个OSD_ID对应的随机数,得到乘积。
  • 选出乘积最大的OSD。
  • 这个PG就会保存到这个OSD上。

现在趁热打铁,解决一个PG映射到多个OSD的问题,还记得那个常量r吗?我们把r+1,再求一遍随机数,再去乘以每个OSD的权重,再去选出乘积最大的OSD,如果和之前的OSD编号不一样,那么就选中它,如果和之前的OSD编号一样的话,那么再把r+2,再次选一次,直到选出我们需要的三个不一样编号的OSD为止!

当然实际选择过程还要稍微复杂一点,我这里只是用最简单的方法来解释CRUSH在选择OSD的时候所做的事情。

基于这样的结构选择OSD,我们提出了新的要求:

  • 一共选出三个OSD。
  • 这三个OSD需要都位于一个row下面。
  • 每个cabinet内至多有一个OSD。

这样的要求,如果用上一节的CRUSH选OSD的方法,不能满足二三两个要求,因为OSD的分布是随机的。

Labels and Selectors

  1. label
    label是k8s中所有资源都有的一个域, 他是一个=对, 表示这个资源具有某种特定属性, key必须不能重复. 这样user和其他资源可以通过label来选择具有这种属性的资源集合.

    1
    2
    3
    4
    5
    6
    "metadata": {
    "labels": {
    "key1" : "value1",
    "key2" : "value2"
    }
    }

    label的key有两个段,分别是前缀和名称, 通过’/‘分开. 名称字段必须少于63个字符, 必须以’[a-z0-9A-Z]’, ‘-‘, ‘.’, ‘_’和数字 开头和结尾.前缀是可选的, 一旦指定, 不能超过253个字符. label的value指和key中的名称要求一样.

  2. Selectors的使用
    通过api中使用 -l 来选择特定集合的objects: $ kubectl get pods -l 'environment in (production, qa)'

一些k8s资源, 比如Service and ReplicationController 会有selector字段, 用来选择pod.

1
2
selector:
component: redis

基于相等的匹配和基于集合的匹配:

1
2
3
4
5
6
selector:
matchLabels:
component: redis
matchExpressions:
- {key: tier, operator: In, values: [cache]}
- {key: environment, operator: NotIn, values: [dev]}

注意区分概念, label永远都是key=value, 多个label在一起能组成一个集合, 类似map. 只有selector有集合的操作, label本身就是一个<k,v> 对, 没有这样的label.
集合方式有四种operator: In, NotIn, Exist, DoesNotExist. {key: tier, operator: In, values: [cache]}这条表达式等价于matchLabels中的: tier: cache.
必须满足所有的selector表达式, 才算匹配.
Job, Deployment, Replica Set, and Daemon Set都是支持基于集合的匹配.

  1. nodeSelector
    nodeSelector用于选择某个node, 再次之前, 需要在node上添加label: kubectl label nodes <node-name> <label-key>=<label-value>. 然后在pod中配置相应的nodeSelector, 就能保证pod调度到符合语义的node:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    apiVersion: v1
    kind: Pod
    metadata:
    name: nginx
    labels:
    env: test
    spec:
    containers:
    - name: nginx
    image: nginx
    imagePullPolicy: IfNotPresent
    nodeSelector:
    disktype: ssd
    k8s node不同版本会有一些内置labels:
    1
    2
    3
    4
    5
    6
    kubernetes.io/hostname
    failure-domain.beta.kubernetes.io/zone
    failure-domain.beta.kubernetes.io/region
    beta.kubernetes.io/instance-type
    beta.kubernetes.io/os
    beta.kubernetes.io/arch

Affinity

nodeSelector是比较简单的pod选择节点的方式, k8s提供了affinity/anti-affinity用来更复杂的提供节点选择方案. 他有两种: node affinity 和 inter-pod affinity/anti-affinity:

  • node affinity
    nodeAffinity与nodeSelector相似, 也是基于label, 包括两种类型, 可以分别理解为’hard’和’soft’, 一个强制要求, 一个尽可能要求.

    • requiredDuringSchedulingIgnoredDuringExecution: hard类型
    • preferredDuringSchedulingIgnoredDuringExecution: soft类型
      内容和label selector基本一致:
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      apiVersion: v1
      kind: Pod
      metadata:
      name: with-node-affinity
      spec:
      affinity:
      nodeAffinity:
      requiredDuringSchedulingIgnoredDuringExecution:
      nodeSelectorTerms:
      - matchExpressions:
      - key: kubernetes.io/e2e-az-name
      operator: In
      values:
      - e2e-az1
      - e2e-az2
      preferredDuringSchedulingIgnoredDuringExecution:
      - weight: 1
      preference:
      matchExpressions:
      - key: another-node-label-key
      operator: In
      values:
      - another-node-label-value
      containers:
      - name: with-node-affinity
      image: k8s.gcr.io/pause:2.0
      weight 是1-100.
  • pod affinity
    podAffinity/podAnti-Affinity是比较已经调度到节点上的pod而不是看node本身. 他也有两种类型hard和soft:

    • requiredDuringSchedulingIgnoredDuringExecution: hard
    • preferredDuringSchedulingIgnoredDuringExecution: soft
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
apiVersion: v1
kind: Pod
metadata:
name: with-pod-affinity
spec:
affinity:
podAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: security
operator: In
values:
- S1
topologyKey: failure-domain.beta.kubernetes.io/zone
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchExpressions:
- key: security
operator: In
values:
- S2
topologyKey: kubernetes.io/hostname
containers:
- name: with-pod-affinity
image: k8s.gcr.io/pause:2.0

Taints and Tolerations

affinity是站在pod的角度, 而taints是站在node的角度,

1
kubectl taint nodes node1 key=value:NoSchedule

这个命令将node1加了一个taint, 表示无法调度, 除非你有相应的toleration:

1
2
3
4
5
tolerations:
- key: "key"
operator: "Equal"
value: "value"
effect: "NoSchedule"
1
2
3
4
tolerations:
- key: "key"
operator: "Exists"
effect: "NoSchedule"

这两个tolerations都能匹配那个taint. operator默认是Equal
一个空的tolerations匹配所有的taint:

1
2
tolerations:
- operator: "Exists"

以下这个tolerations匹配所有key为key的taints:

1
2
3
tolerations:
- key: "key"
operator: "Exists"

effect有两种: NoExecute和NoSchedule, 如果加了NoExecute, 那么所有不匹配的pod将会立即被evicted掉, 如果加了Noschedule, 那么只是Pod无法被调度, 已经调度的Pod不受影响。NoExecute还可以指定一个可选的域tolerationSeconds, 表示尽管匹配了taint可以不被立即evicted掉, 但在一定时间之后就会被evicted掉。

通过kubectl taint nodes node1 key:NoSchedule- 取消taint

更有意思的是多个taint和多个tolerations的情况.

例如创建三个taint:

1
2
3
kubectl taint nodes node1 key1=value1:NoSchedule
kubectl taint nodes node1 key1=value1:NoExecute
kubectl taint nodes node1 key2=value2:NoSchedule

一个pod拥有两个tolerations:

1
2
3
4
5
6
7
8
9
tolerations:
- key: "key1"
operator: "Equal"
value: "value1"
effect: "NoSchedule"
- key: "key1"
operator: "Equal"
value: "value1"
effect: "NoExecute"

该pod没有匹配所有的taint, 因此他不能被调度到这个节点, 但如果调度了,他不会被移除, 因为他只有第三个NoSchedule的taint不匹配。

注意

在1.6之后, 官方加了一些内置的taint, 他会将pod evctied掉:

1
2
3
4
5
6
7
node.kubernetes.io/not-ready: notready.
node.alpha.kubernetes.io/unreachable: Unknown
node.kubernetes.io/out-of-disk: Node becomes out of disk.
node.kubernetes.io/memory-pressure: Node has memory pressure.
node.kubernetes.io/disk-pressure: Node has disk pressure.
node.kubernetes.io/network-unavailable: Node’s network is unavailable.
node.cloudprovider.kubernetes.io/uninitialized: When kubelet is started with “external” cloud provider, it sets this taint on a node to mark it as unusable. When a controller from the cloud-controller-manager initializes this node, kubelet removes this taint.

docker run 的时候如果出现类似:

1
/usr/bin/docker: Error response from daemon: linux mounts: path /tmp is mounted on / but it is not a shared mount.

之类的错误, 可以通过修改Docker启动参数解决, 注释掉mountFlags或者改为shared:

1
2
$vi /usr/lib/systemd/system/docker.service
MountFlags=slave

修改后发现新的错误:

1
2
/usr/bin/docker: Error response from daemon: OCI runtime create failed: container_linux.go:348: starting container process caused "process_linux.go:402: container init caused \"open /dev/console: input/output error\"": unknown.
FATA[0007] exit status 125

网上查有如下回复:

1
I met this problem while I suspend my computer, then I restart my computer, this error was solved. I guess it was because the docker daemon missed driver library path.

所以重启大法好…

但是真正原因没搞清楚, 待更新.

最近在调查一个kubernetes中发现Kubelet的pods目录:

1
/var/lib/kubelet/pods/xxx/volumes/

下出现了大量的包含”deleting~” 的目录:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/var/lib/kubelet/pods/612f2e76-3ce1-11e8-b2c9-0cc47ae2b22c/volumes/transwarp.io~tosdisk/pvc-f39e86b9-019b-11e8-b2c9-0cc47ae2b22c.deleting~859156558
/var/lib/kubelet/pods/612f2e76-3ce1-11e8-b2c9-0cc47ae2b22c/volumes/transwarp.io~tosdisk/pvc-f39e86b9-019b-11e8-b2c9-0cc47ae2b22c.deleting~912994645
/var/lib/kubelet/pods/612f2e76-3ce1-11e8-b2c9-0cc47ae2b22c/volumes/transwarp.io~tosdisk/pvc-f39e86b9-019b-11e8-b2c9-0cc47ae2b22c.deleting~096627888
/var/lib/kubelet/pods/612f2e76-3ce1-11e8-b2c9-0cc47ae2b22c/volumes/transwarp.io~tosdisk/pvc-f39e86b9-019b-11e8-b2c9-0cc47ae2b22c.deleting~361944655
/var/lib/kubelet/pods/612f2e76-3ce1-11e8-b2c9-0cc47ae2b22c/volumes/transwarp.io~tosdisk/pvc-f39e86b9-019b-11e8-b2c9-0cc47ae2b22c.deleting~827756898
/var/lib/kubelet/pods/612f2e76-3ce1-11e8-b2c9-0cc47ae2b22c/volumes/transwarp.io~tosdisk/pvc-f39e86b9-019b-11e8-b2c9-0cc47ae2b22c.deleting~850958169
/var/lib/kubelet/pods/612f2e76-3ce1-11e8-b2c9-0cc47ae2b22c/volumes/transwarp.io~tosdisk/pvc-f39e86b9-019b-11e8-b2c9-0cc47ae2b22c.deleting~435144420
/var/lib/kubelet/pods/612f2e76-3ce1-11e8-b2c9-0cc47ae2b22c/volumes/transwarp.io~tosdisk/pvc-f39e86b9-019b-11e8-b2c9-0cc47ae2b22c.deleting~573873907
/var/lib/kubelet/pods/612f2e76-3ce1-11e8-b2c9-0cc47ae2b22c/volumes/transwarp.io~tosdisk/pvc-f39e86b9-019b-11e8-b2c9-0cc47ae2b22c.deleting~817019830
/var/lib/kubelet/pods/612f2e76-3ce1-11e8-b2c9-0cc47ae2b22c/volumes/transwarp.io~tosdisk/pvc-f39e86b9-019b-11e8-b2c9-0cc47ae2b22c.deleting~300298653
/var/lib/kubelet/pods/612f2e76-3ce1-11e8-b2c9-0cc47ae2b22c/volumes/transwarp.io~tosdisk/pvc-f39e86b9-019b-11e8-b2c9-0cc47ae2b22c.deleting~414447192
/var/lib/kubelet/pods/612f2e76-3ce1-11e8-b2c9-0cc47ae2b22c/volumes/transwarp.io~tosdisk/pvc-f39e86b9-019b-11e8-b2c9-0cc47ae2b22c.deleting~453118423
/var/lib/kubelet/pods/612f2e76-3ce1-11e8-b2c9-0cc47ae2b22c/volumes/transwarp.io~tosdisk/pvc-f39e86b9-019b-11e8-b2c9-0cc47ae2b22c.deleting~634999626
/var/lib/kubelet/pods/612f2e76-3ce1-11e8-b2c9-0cc47ae2b22c/volumes/transwarp.io~tosdisk/pvc-f39e86b9-019b-11e8-b2c9-0cc47ae2b22c.deleting~329196065
/var/lib/kubelet/pods/612f2e76-3ce1-11e8-b2c9-0cc47ae2b22c/volumes/transwarp.io~tosdisk/pvc-f39e86b9-019b-11e8-b2c9-0cc47ae2b22c.deleting~705907980
/var/lib/kubelet/pods/612f2e76-3ce1-11e8-b2c9-0cc47ae2b22c/volumes/transwarp.io~tosdisk/pvc-f39e86b9-019b-11e8-b2c9-0cc47ae2b22c.deleting~060876539
/var/lib/kubelet/pods/612f2e76-3ce1-11e8-b2c9-0cc47ae2b22c/volumes/transwarp.io~tosdisk/pvc-f39e86b9-019b-11e8-b2c9-0cc47ae2b22c.deleting~371568670
/var/lib/kubelet/pods/612f2e76-3ce1-11e8-b2c9-0cc47ae2b22c/volumes/transwarp.io~tosdisk/pvc-f39e86b9-019b-11e8-b2c9-0cc47ae2b22c.deleting~473777381
/var/lib/kubelet/pods/612f2e76-3ce1-11e8-b2c9-0cc47ae2b22c/volumes/transwarp.io~tosdisk/pvc-f39e86b9-019b-11e8-b2c9-0cc47ae2b22c.deleting~852926720
/var/lib/kubelet/pods/612f2e76-3ce1-11e8-b2c9-0cc47ae2b22c/volumes/transwarp.io~tosdisk/pvc-f39e86b9-019b-11e8-b2c9-0cc47ae2b22c.deleting~911951455
/var/lib/kubelet/pods/612f2e76-3ce1-11e8-b2c9-0cc47ae2b22c/volumes/transwarp.io~tosdisk/pvc-f39e86b9-019b-11e8-b2c9-0cc47ae2b22c.deleting~221614642
/var/lib/kubelet/pods/612f2e76-3ce1-11e8-b2c9-0cc47ae2b22c/volumes/transwarp.io~tosdisk/pvc-f39e86b9-019b-11e8-b2c9-0cc47ae2b22c.deleting~643761641

导致每次reconciler将这些多余的”deleting~”加入到”ActualOfWorld”中, 然后触发大量的Umount操作, 使得reconciler很久才Loop一次, 现象就是pod create和delete都变得非常得慢.
一. 一开始, 我发现自己写的plugin中使用了pkg/volume/volume.go中的RenameDirectory函数, 函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func RenameDirectory(oldPath, newName string) (string, error) {
newPath, err := ioutil.TempDir(filepath.Dir(oldPath), newName)
if err != nil {
return "", err
}

// os.Rename call fails on windows (https://github.com/golang/go/issues/14527)
// Replacing with copyFolder to the newPath and deleting the oldPath directory
if runtime.GOOS == "windows" {
err = copyFolder(oldPath, newPath)
if err != nil {
glog.Errorf("Error copying folder from: %s to: %s with error: %v", oldPath, newPath, err)
return "", err
}
os.RemoveAll(oldPath)
return newPath, nil
}

err = os.Rename(oldPath, newPath)
if err != nil {
return "", err
}
return newPath, nil
}

在每次删除目录时, 并不是直接删除, 而是先创建一个随机的空目录, 然后将原目录rename到随机目录, 最后再将这个随机目录删除掉.
看似没有什么问题, 但不巧的是, 在golang1.8之后, os.Rename的实现发生了变化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func rename(oldname, newname string) error {
fi, err := Lstat(newname)
if err == nil && fi.IsDir() {
// There are two independent errors this function can return:
// one for a bad oldname, and one for a bad newname.
// At this point we've determined the newname is bad.
// But just in case oldname is also bad, prioritize returning
// the oldname error because that's what we did historically.
if _, err := Lstat(oldname); err != nil {
if pe, ok := err.(*PathError); ok {
err = pe.Err
}
return &LinkError{"rename", oldname, newname, err}
}
return &LinkError{"rename", oldname, newname, syscall.EEXIST}
}
//------------------------版本分割线1.8
err = syscall.Rename(oldname, newname)
if err != nil {
return &LinkError{"rename", oldname, newname, err}
}
return nil
}

在虚线以上是go1.8之后新加的内容, 如果rename之后的目录存在, 就会打印”File Exits”错误, 这样就会创建大量的”deleting~”目录. 相关修改和讨论在Bugs in EmptyDir Teardown path.

以为问题就这样解决了, 然而不是, 我检查了版本, 使用的是1.7.4, 同时查看了日志, 并没有打印”FILE Exist”的log, 而是打印了”device or resource busy”. 通过多种测试, 发现rename操作无论是进程占用还是进程对文件的读写, 都不会导致device busy问题. 这说明我找到了出问题的地方, 却没找到背后的原因.

二. 如果rename没有返回错误, 那么只能是之后的remove操作返回的错误. 然而却有几个疑点:

  1. 查看rename出来的deleting目录中都是空目录, 里面并没有数据
  2. 手动可以remove掉这些目录
  3. 原目录并没有消失, 而是存在且有大量的正在更新的数据.

​ 显然问题比我想象的更加复杂. 有一个可能的解释是在我操作delete之后由于太长时间没有删掉导致Kubelet直接暴力删除了这个pod, statefulset又重启了新的pod, 原来的container虽然删掉但相应的namespaces中依然有应用程序在读写, 这种情况导致remove busy, 而且数据在更新.

这些问题要调查起来都很费力. 显然直接删掉rename逻辑可以解决这个问题, 于是不再追究.

​ 一直感觉对kubernetes中的qos是一个盲点, 借着复习下scheduler的一些资源调度策略来学习下k8s中的qos策略, 然后再辐射到一般性的qos策略.

动机:

k8s 用非常简单的方式分配资源. 用户能够指定容器的资源限制. 比如一个用户能够指定某个container 只能使用1GB的内存. scheduler通过资源限制去调度容器. 如果某个节点的内存只有4GB, 那个一个有5Gb请求的容器就将不能调度到这个节点. 目前, k8s不能保证容器在一个超卖的系统中运行稳定.

目前的实现中, 如果用户指定limts给所有的container, 那么集群资源的利用率将会非常低下. 因为容器往往无法充分使用用户指定的那些资源. 一个可能的方式是不指定limits, 这样container就可以无限制的使用, 但是如果这些container无限制的使用资源, 就可能使得指定了limits的容器由于机器资源不够而被杀掉. 这是用户不愿意看到的, 他们希望自己指定了某个大小, 那么启动之后系统就应该保证这个容器的顺利运行.

qos机制就是在节点资源超卖的环境下, 通过提供不同级别的保证来满足资源的需求. 容器可以用request请求一个最小资源, request与limit不同, container可以使用超过request的值. best-effort级别的container相当于request为0的container. Best-effort container只使用那些其他container没有使用的资源, 可以用于资源清理(这个没看懂).

Request and limits

对于每种资源, container可以指定request和limits, 0 <= request <= limit <= infinity. 如果container成功调度, 意味着container能够保证至少有request的值的资源. container不能超过limit的值. 而request 和 limit 如何执行要看资源是compressible还是incompressible.

Compressible Resource Guarantees

  • 目前只有cpu
  • container至少得到request的cpu请求
  • 剩余的cpu会按照cpu request比例分配给container. 比如container A request 60%, container B request 30%. 假设两个container都尝试拿到更多的cpu, 那么剩余的10%将按照2:1的比例分配.
  • 如果超过limits, Containers 会被节流(不是被杀死).

Incompressible Resouce Guarantees

  • 目前只有内存. (我认为本地存储也在这一类)
  • Containers能够得到request的大小, 如果超过这个大小, 它们可能会被杀掉(如果其他container需要内存), 但是如果containers消耗的少于request值, 他们不会被删除(除非系统任务或者daemonset需要更多的内存)
  • Containers会被杀掉如果他们使用了超过limit的内存.

Kubelet admission 策略

kubelet通过统计containers的request来确保系统资源不会被超载.

QoS 分级

概述

k8s Qos 分成三个等级:

  • Guaranteed
  • Burstable
  • BestEffort

理论上QoS与limit, request应该是互不干扰的, 但实际上他们的联系非常紧密. QoS class不是自己设置的, 而是在创建pod的时候, 根据limit和request系统自动确认的.

Guaranteed

这是级别最高的, 他的触发条件是:

  • 每个pod中container必须有内存的limit和request, 而且必须相同
  • 每个pod中cpu必须有内存的limit和request, 而且必须相同

效果: 这些containers是最高优先级

注意: 如果设置了limit没有设置request, 系统将会自动填充request跟limit相同.

####Burstable

这个的触发条件是:

  • pod不满足Guaranteed
  • pod中至少一个container有内存或者cpu的请求.

效果: 能够保证request的请求, 但是不保证limit, 如果超出request请求的内存大小, 发生oom时可能会被杀死.

BestEffort

触发条件: 很明显, 没有指定任何request和limit或者值都是0就是BestEffort级别.

效果: 这些containers没有请求资源保障, 会被认为是最低优先级的, 如果系统发生oom, 他们会被首先杀死. 他们只会使用集群中没有被使用的那部分资源.

总结

所以k8s的qos主要是通过两方面: request,limit 和 qos class来实现的. 这两方面又有很多交叉的地方. 而且在面对不同的资源的时候, 他们的策略是不一样的.

Storage QoS

由于k8s中没有关于存储的qos机制, 于是关于存储的qos, 查看了一些文档.

0%