HTTP 超时浅见

前言

  最近业务调用方反馈接收到服务连接中断的错误(python requests 请求抛出异常 raise ConnectionError(err, request=request) \n ConnectionError: ('Connection aborted.', BadStatusLine("''",))),但从 golang 服务日志中看,服务应该是正常处理完成并返回了,且抛出异常的时间也基本和服务返回数据的时间一致,即表明在服务响应返回数据的那一刻,请求方同时抛出异常。

  这个问题很奇怪,起初拿到一个 case 还无法稳定复现,最初怀疑是网络抖动问题,但后续一直会偶发性出现,直到拿到了一个能稳定复现的 case,深入跟踪排查后才发现与网络问题无关,是服务端框架应用设置不合理的问题。

前言

  最近业务调用方反馈接收到服务连接中断的错误(python requests 请求抛出异常 raise ConnectionError(err, request=request) \n ConnectionError: ('Connection aborted.', BadStatusLine("''",))),但从 golang 服务日志中看,服务应该是正常处理完成并返回了,且抛出异常的时间也基本和服务返回数据的时间一致,即表明在服务响应返回数据的那一刻,请求方同时抛出异常。

  这个问题很奇怪,起初拿到一个 case 还无法稳定复现,最初怀疑是网络抖动问题,但后续一直会偶发性出现,直到拿到了一个能稳定复现的 case,深入跟踪排查后才发现与网络问题无关,是服务端框架应用设置不合理的问题。

问题篇

  从网上搜索 python ConnectionError: ('Connection aborted.'),错误种类非常多,有网络问题,服务端问题(关闭连接,拒绝服务,响应错误等),客户端关闭连接,超时设置不合理,请求参数/协议错误等等,但若带上 BadStatusLine("''",) ,错误就相对比较明确了(BadStatusLine Error in using Python, RequestsPython Requests getting ('Connection aborted.', BadStatusLine("''",)) error),主要是由于收到了一个空响应(header/body),空响应可以明确是服务端返回的问题,一般可能有以下几个原因:1. 服务端反爬;2. 服务端超时(比如 nginx 默认 60s 超时);3. 网络错误。

  由于是内部服务,所以反爬策略是没有的,而反馈的 case 都带有明显的特征(请求数据量大,处理耗时长),没有网络抖动那种随机性,所以应该也不是网络问题,剩下的只能是超时问题,由于业务方在前置策略上已经识别该 case 数据量大,所以不经过 nginx 网关,直连服务请求,所以也不会有 nginx 超时问题,只能是服务端自己超时。于是直接在代码中查找 timeout 关键字,发现在服务启动时设置了 ReadTimeout 和 WriteTimeout,进一步深挖之后,才对 go 服务的超时有了浅显的认识。

超时篇

参考资料:1. 你真的了解 timeout 吗?,2. i/o timeout , 希望你不要踩到这个net/http包的坑,3. net/http完全超时手册

  由于 HTTP 协议规范并未提及超时标准,而为保证服务稳定性,一般的 HTTP 服务请求都会设置超时时间,各 HTTP 服务端/客户端对于超时的理解大同小异,而这次的问题又起源与 go 服务,所以以 go 为例,分析一下超时。

客户端超时

http.Client.Timeout

  客户端超时,即 GET/POST 请求超时,这个很好理解,就是客户端发送请求到客户端接收到服务器返回数据的时间,算是开发的一般性常识,控制参数一般也特别简单,就是一个 timeout,当然 go 服务客户端支持设置更精细化的超时时间,一般也没啥必要。当客户端感知到超时时,会正常发起 TCP 断开连接的“四次挥手”过程。

服务端超时

http.Server Timeouts

  服务端超时,这才是引发问题的根本原因,go 服务端的超时,主要有两个参数,ReadTimeout 和 WriteTimeout,从上图可以看出,ReadTimeout 主要是设置服务端接收请求到读取客户端请求数据的时间(读请求的时间),WriteTimeout 是服务端处理请求数据以及返回数据的时间(写响应的时间)。GoFrame 框架的 ReadTimeout 默认值是 60s,在请求数据正常的情况下 ReadTimeout 也不可能超时,这次的问题主要出在 WriteTimeout,GoFrame 的默认值是 0s,代表不控制超时,但之前的开发者也同样设置为了 60s,导致服务端在处理大量数据时,发生了超时现象。

  更深挖之后,才发现 WriteTimeout 的诡异之处,当 WriteTimeout 发生之后,服务端不会即时返回超时消息,而是需要等服务端真正处理完之后,返回数据时,才会返回一个空数据,即使服务端正常写入返回数据,但都会强制为空数据返回,导致请求客户端报错。这种表现,看起来就像是 WriteTimeout 不仅没有起到应有的作用,在错误设置的情况下,还会起到反作用,使服务响应错误。WriteTimeout 无法即时生效的问题,也同样有其他人反馈了:1. Diving into Go's HTTP server timeouts;2. net/http: Request context is not canceled when Server.WriteTimeout is reached。可能是网上反馈的人多了,go 官方推出了一个 TimeoutHandler,通过这个设置服务端超时,即可即时返回超时消息。仿照官方的 TimeoutHandler ,即可在 GoFrame 框架中也实现自己的超时中间件。

  至于 WriteTimeout 为啥不起作用,个人猜测主要原因在于 go 服务每接收到一个请求,都是另开一个协程进行处理,而 goroutine 无法被强制 kill,只能自己退出,通常是要等到 goroutine 正常处理完之后才能返回数据,WriteTimeout 只是先强制写一个空数据占位,返回还是得等 goroutine 正常处理完。

  所以正常的 go 服务,在使用类似于 TimeoutHandler 中间件的时候,也最好让 goroutine 尽可能快的退出,一种简单的方法是:1. 设置请求的 context 为 context.WithTimeout;2. 分步处理数据,每一步开始前都先检查请求传入的 context 是否已经超时;3. 若已经超时,则直接 return,不进行下一步处理,快速退出 goroutine。

后记

  这次问题排查,碰到的最大障碍在于,前几次反馈的 case 难以复现,客户端请求报错和服务器返回的时间一致也不会让人往超时的角度去想,在拿到一个能稳定复现的 case 之后,才死马当活马医,先调一下超时参数试试。

  关于 go 服务超时的文章,其实之前也看过,但没碰到具体问题,名词也就仅仅只是名词,很难理解背后的含义和其中的坑点,实践才能出真知 ╮(~▽~)╭。

附录

长连接超时

  关于超时问题,也曾看到过有人碰到一个长链接服务的问题,现象是这样的:后端服务宕机之后,客户端可能需要很久才会感知到,原因在于 tcp 的超时重传机制,在 linux 中,默认会重传 tcp_retries2=15 次(即 16 次才会断开连接),而 TCP 最大超时时间为 TCP_RTO_MAX=2min,最小超时时间为 TCP_RTO_MIN=200ms。即在 linux 中,一个典型的 TCP 超时重传表现为:

重传次数发送时间超时时间
-1(原始数据发送)0s0.2s
0 (第 0 次重传)0.2s0.2s
10.4s0.4s
20.8s0.8s
31.6s1.6s
43.2s3.2s
56.4s6.4s
612.8s12.8s
725.6s25.6s
851.2s51.2s
9102.4s102.4s
10204.8s120s
11324.8s120s
12444.8s120s
13564.8s120s
14684.8s120s
15804.8s120s
断开连接924.8s(≈15min)

所以客户端需要在 15 分钟之后才能感知到服务端不可用,如此,仅靠 TCP 自身的超时机制,很难发现服务端是否宕机/不可用,长链接不释放,进而可能导致客户端不可用且无感知,所以在长链接服务中,需要有其他的手段来保障服务稳定/可用性(eg:心跳探活)。

服务端 context canceled

Refer to: context canceled,谁是罪魁祸首

  从官方的 net/http 包中可以知道,go 服务在接收请求时,会同时生成一个协程监控连接状态,当发现连接有问题(eg:客户端设置请求超时主动断开)时,会将该请求对应的 context cancel 掉,这时服务端如果再继续使用该 context 时,就会报错「context canceled」。当然,如果服务端发生错误,也同样会导致请求对应的 context cancel 掉。

  服务端主动 cancel context 的好处在于可以快速释放资源,避免无效的请求继续执行(当然也得业务代码上主动去感知 context 是否 cancel,从而及时退出);坏处在于,如果服务端需要上报这个请求发生的错误(一般在后置中间件中进行错误上报),这个时候上报错误的请求需要另外生成一个新的 context,绝不能直接使用现有的 context,因为已有的这个 context 已经 cancel 掉了,继续使用会导致上报错误的请求发送失败,达不到上报的目的。

VNSWRR 算法浅解

前言

  最近偶然在公司内网看到一篇文章「负载均衡算法vnswrr改进——从指定位置生成调度序列」。正好 Shaun 一直觉得调度类算法很有意思,就认真看了下,顺便写下自己的一些理解。

前言

  最近偶然在公司内网看到一篇文章「负载均衡算法vnswrr改进——从指定位置生成调度序列」。正好 Shaun 一直觉得调度类算法很有意思,就认真看了下,顺便写下自己的一些理解。

预备篇

  通俗来讲负载均衡解决的是「在避免机器过载的前提下,多个请求如何分发到多台机器上」的问题,本质上是一个分布式任务调度的问题,在机器性能相同的情况下,最简单的策略就是轮询,多个机器依次轮流处理请求。Nginx 官方的 SWRR 算法解决的是「在机器性能不同的情况下,如何使请求分布更均匀,更平滑,避免短时间大量请求造成局部热点」的问题。

SWRR篇

  在 SWRR 算法中,有两个权重,一个是初始实际权重(effective weight, ew),一个是算法迭代过程中的当前权重(current weight,cw),在负载均衡过程中,每次请求分发都选择当前权重最大的机器,同时更新每台机器的当前权重,当前权重更新策略如下:

  1. 若设定 n 台机器各自的初始权重为 \((ew_1,ew_2,...,ew_n)\),同时 \(ew_1 \le ew_2 \le ... \le ew_n\) ,且 \(W_{total}=\sum_{i=1}^n ew_i\)

  2. 第一个请求来时,n 台机器各自的当前权重 \(cw_i=ew_i, 1 \le i \le n\) ,由于此时 \(cw_{max}=\max(cw_i)=cw_n\) ,则请求分发给第 n 台机器处理,同时更新机器各自的当前权重 \(cw_1=cw_1+ew_1, cw_2=cw_2+ew_2,...,cw_{n-1}=cw_{n-1}+ew_{n-1},cw_n=cw_n+ew_n-W_{total}\),记为 \((2*ew_1,2*ew_2,...,2*ew_{n-1},2*ew_n-W_{total})\)

  3. 第二个请求来时,此时 n 台机器的各自权重为 \((2*ew_1,2*ew_2,...,2*ew_{n-1},2*ew_n-W_{total})\) ,选取权重值对应的机器进行处理,假设为第 n-1 台,则更新后权重为 \((3*ew_1,3*ew_2,...,3*ew_{n-1}-W_{total},3*ew_n-W_{total})\)

  4. \(W_{total}\) 个请求来时,此时 n 台机器的各自权重应该为 \[ (W_{total}*ew_1-m_1*W_{total},W_{total}*ew_2-m_2*W_{total},...,W_{total}*ew_{n-1}-m_{n-1}*W_{total},W_{total}*ew_n-m_n*W_{total}) \\ \text{s.t.} \quad \sum_{i=1}^n m_i=W_{total}-1 \\ \quad 0 <= m_i <= ew_i \] 由于每次调度都是权重最大值减权重和,重新分配权重后权重和无变化,所以理论上此时除第 k 台机器外,每台机器的权重都为 0,第 k 台机器的权重为 \(W_{total}\) ,所以这次调度处理之后,每台机器的权重又会重新回到初始权重。

VNSWRR 篇

  VNSWRR 算法是阿里针对 Nginx 官方的 SWRR 算法实际运行中对于部分场景下(瞬时流量大,权重更新等)均衡效果不太理想的改进算法,其最大的改进点在于预生成调度序列,以空间换时间减少调度时间,同时在权重更新后随机选取调度序列的起点,使初次请求就调度在不同的机器上,减少高权重机器的局部热点问题。具体流程如下:

  1. 首先使用 SWRR 算法生成前 n 个调度序列;
  2. 再随机选取一个位置作为调度起点,后续的请求依次从调度序列中选取;
  3. 若调度序列用完,则继续用 SWRR 算法生成后 n 个调度序列;
  4. 如此循环,直到调度序列的长度为 \(W_{total}\),即一个周期内的全部调度序列,用完后,从头开始调度即可;
  5. 若有权重更新,则从 1 开始重新生成调度序列;

正文

  从上面的逻辑中,可看出 SWRR 算法调度序列是以 \(W_{total}\) 为周期的一个循环序列,只需要知道一个周期内的调度序列,就可以推算出后续的调度机器(除非权重有变更或者有机器增删)。计算一个周期内的调度序列也比较简单,取当前调度权重中最大值对应机器,同时更新每台机器的当前权重,作为下次调度的权重,简而言之,就是从上次调度结果推出下次调度结果,是一个递推式。那有没有办法不从上次结果推下次结果,直接计算当前的调度结果,简化 VNSWRR 的第一步每次都从头开始预生成前 n 个调度序列,直接从任意位置开始生成调度序列,内网中这篇文章就给出了一个看似“可行的”解决方案,直接计算第 q 个请求的调度结果,具体方案如下:

在 SWRR 算法中,第 q 个请求时,全部机器的当前权重序列应该为 \[ (q*ew_1-m_1*W_{total},q*ew_2-m_2*W_{total},...,q*ew_{n-1}-m_{n-1}*W_{total},q*ew_n-m_n*W_{total}) \\ \text{s.t.} \quad \sum_{i=1}^n m_i=q-1 \\ \quad 0 <= m_i <= ew_i \] 即权重序列中共减去了 \(q-1\)\(W_{total}\) ,平均上 \(m_i=ew_i/W_{total}*(q-1)\),区分 \(m_i\) 的整数部分 \(mz_i\) 和小数部分 \(mx_i\)\(\sum_{i=1}^n m z_i\) 代表减去的 \(W_{total}\) 个数,计算差值 \(d=q-1-\sum_{i=1}^n mz_i\),即还剩 d 个 \(W_{total}\) 待减,对小数部分 \(mx_i\) 从大到小排序,取前 d 个对应的机器再减 \(W_{total}\),即可得到第 q 个请求时的当前权重序列,取最大权重对应的机器即为调度结果,后续调度结果可通过递推式得出。


  初次看到这个方案的时候,就想动手实现一下,因为思路也比较清晰简单,实现完之后,简单测试一下,也确实没啥问题,后面再深度测试了一下,就发现该方案确实有点小小的问题,在大部分情况下,该方案确实能得到很正确的结果,但还是存在一些错误结果,就因为有少量错误结果,所以该方案不要在生产环境下应用。该方案错在了将 \(q*ew_i\) 看成最后一个整体进行处理排序,忽略了分步执行结果,导致小部分场景下的错误排序结果,进而生成错误调度权重,调度错误。

  现在再回到初始问题「如何生成 SWRR 算法中指定轮次的调度结果?」,抽象来看,该问题是个数学问题「如何从数列的递推式计算数列求通项公式」, 但 SWRR 的递推式相对复杂,中间还有取最大值这个不稳定变量,实际很难得到通项公式,直接计算指定调度解果,Shaun 问了 ChatGPT,也自己想了很久,搜了很久,但都没有答案,内网中的这个方案算是最接近的一个答案。

后记

  在内网中看到这个方案的思路很有意思,将整数和小数部分拆开,再单独对小数部分排序,所以就自己测试了一下,顺便学习了下负载均衡 SWRR 算法,虽然问题依旧还在,但总归是有点收获。

附录

  附代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import random


def ouput_schedule(rs_arr, schedule_num):
all_rs_weight_str = ";\t".join(["rs:%s,cw:%s" % (rs["rs_name"], rs["cw"]) for rs in rs_arr])
schedule_rs = max(rs_arr, key=lambda x:x["cw"])
print("%s:\t%s\t===>\trs:%s,cw:%s" % (schedule_num, all_rs_weight_str, schedule_rs["rs_name"], schedule_rs["cw"]))

return schedule_rs

def swrr(rs_arr, weight_total):
schedule_rs = rs_arr[0]
max_weight = schedule_rs["cw"]
for rs in rs_arr:
if rs["cw"] > max_weight:
schedule_rs = rs
max_weight = rs["cw"]

rs["cw"] += rs["ew"]

schedule_rs["cw"] -= weight_total

return schedule_rs

def swrr_test():
real_servers = [{"rs_name": chr(i+64), "ew": i, "cw": i} for i in range(1, 6)]
weight_total = sum([rs["ew"] for rs in real_servers])
schedule_count = weight_total
swrr_seq = []
for i in range(1, schedule_count+1):
ouput_schedule(real_servers, i)
schedule_rs = swrr(real_servers, weight_total)

swrr_seq.append(schedule_rs["rs_name"])

print(swrr_seq)

# swrr_test()
# print("---------")

def swrr_n(rs_arr, weight_total, schedule_num):
ms = [(rs["ew"] / float(weight_total)) * (schedule_num-1) for rs in rs_arr]
mzs = [int(m) for m in ms]
mxs = [(i, m-int(m)) for i, m in enumerate(ms)]
mxs = sorted(mxs, key=lambda x:x[1], reverse=True)
for i, rs in enumerate(rs_arr):
rs["cw"] = schedule_num * rs["ew"]
rs["cw"] -= mzs[i] * weight_total

d = (schedule_num-1) - sum(mzs)
for i in range(d):
rs_arr[mxs[i][0]]["cw"] -= weight_total

schedule_rs = ouput_schedule(rs_arr, schedule_num)

return schedule_rs

def swrr_n_test():
real_servers = [{"rs_name": chr(i+64), "ew": i, "cw": i} for i in range(1, 6)]
weight_total = sum([rs["ew"] for rs in real_servers])

schedule_rs_seq = []
for i in range(1, weight_total+1):
schedule_rs = swrr_n(real_servers, weight_total, i)

schedule_rs_seq.append(schedule_rs["rs_name"])
# swrr_n(real_servers, weight_total, 9) # err schedule rs
print(schedule_rs_seq)

# swrr_n_test()

def vnswrr_preschedule(rs_arr, weight_total, N, schedule_rs_seq):
for i in range(1, N+1):
schedule_rs = swrr(rs_arr, weight_total)
if len(schedule_rs_seq) >= weight_total:
break
schedule_rs_seq.append(schedule_rs)

def vnswrr(rs_arr, rs_count, weight_total, prev_schedule_idx, schedule_rs_seq):
N = min(rs_count, weight_total)

schedule_idx = prev_schedule_idx + 1
schedule_idx %= weight_total

if schedule_idx >= len(schedule_rs_seq)-1:
vnswrr_preschedule(rs_arr, weight_total, N, schedule_rs_seq)

return schedule_idx

def vnswrr_test():
all_schedule_rs_seq = []
real_servers = [{"rs_name": chr(i+64), "ew": i, "cw": i} for i in range(1, 6)]
rs_count = len(real_servers)
weight_total = sum([rs["ew"] for rs in real_servers])

N = min(rs_count, weight_total)
schedule_rs_seq = []
# 预生成调度序列
vnswrr_preschedule(real_servers, weight_total, N, schedule_rs_seq)
# 随机取调度结果
prev_schedule_idx = random.randint(0, N-1)-1

for i in range(1, 2*weight_total+1):
schedule_idx = vnswrr(real_servers, rs_count, weight_total, prev_schedule_idx, schedule_rs_seq)
all_schedule_rs_seq.append(schedule_rs_seq[schedule_idx]["rs_name"])
prev_schedule_idx = schedule_idx

print([rs["rs_name"] for rs in schedule_rs_seq])
print(all_schedule_rs_seq)

vnswrr_test()

参考资料

1、QPS 提升60%,揭秘阿里巴巴轻量级开源 Web 服务器 Tengine 负载均衡算法

2、Nginx SWRR 算法解读

记一次资源不释放的问题

前言

  最近发现一个 GoFrame 服务即使空载 CPU 使用率也很高,每次接受请求后资源没有被释放,一直累积,直到达到报警阈值,人工介入重启服务,于是压测排查了一下。

前言

  最近发现一个 GoFrame 服务即使空载 CPU 使用率也很高,每次接受请求后资源没有被释放,一直累积,直到达到报警阈值,人工介入重启服务,于是压测排查了一下。

问题篇

  先新增代码启动 go 自带的 pprof 服务器:

1
2
3
4
5
6
7
8
9
10
11
12
package main

import (
"net/http"
_ "net/http/pprof"
)

func Pprof(pprof_port string) {
go func(pprof_port string) {
http.ListenAndServe("0.0.0.0:"+pprof_port, nil)
}(pprof_port)
}

压测以及 profile 命令:

1
2
3
4
5
6
7
8
9
10
11
12
# 压测命令
wrk -t8 -c1000 -d60s --latency --timeout 10s -s post_script.lua http://host:[srv_port]/post

# profile 整体分析
go tool pprof -http=:8081 http://host:[pprof_port]/debug/pprof/profile?seconds=30

# 查看函数堆栈调用
curl http://host:[pprof_port]/debug/pprof/trace?seconds=30 > ./pprof/trace01
go tool trace -http=:8081 ./pprof/trace01

# 查看内存堆栈
go tool pprof -http=:8081 http://host:[pprof_port]/debug/pprof/heap?seconds=30

  在压测 30 次后,即使服务空载 CPU 也被打满了,查看服务此时的 profile,发现 goroutine 的数目到了百万级别,查看 cpu 堆栈发现集中调用在 gtimer 上,但遍寻服务代码,没有直接用到 GoFrame 的定时器,问题出在哪也还是没想太明白。吃完饭后偶然灵光一现,既然 CPU 看不出啥,那再看看内存,查看内存发现,内存对象最多的是 glog.Logger,看代码也正好有对应的对象,可算是找到问题真正的元凶了。

  log 对象一般都是全生命周期的,不主动销毁就会一直伴随着服务运行,所以 log 对象一般都是程序启动时初始化一次,后续调用,都是用这一个对象实例。而这次这个问题就是因为在代码中用 glog 记录了数据库执行日志,每次请求都会重新生成一个 glog 对象,又没有主动释放造成的。

  知道问题的真正所在,解决问题就相对很简单了,只在程序启动时初始化一个 glog 对象,后续打印日志就用这一个实例,其实更好的方式是生产环境不打印数据库日志,毕竟影响性能。

后记

  CPU 资源的占用往往伴随着内存资源的占用,当从调用堆栈以及线程资源上看不出问题的时候,可以转过头来看看内存堆栈,毕竟内存堆栈更能指示有问题的对象出在哪,知道内存对象是谁,也相当于提供了排查问题代码的方向。

附录

  在排查过程中发现 goroutine 数目异常的高,于是想限制一下 goroutine 数目,在网上搜索的时候发现当用容器部署 go 服务时,go 默认最大的 goroutine 数目为宿主机 cpu 核数,而不是容器的 cpu 核数,从而并发时 goroutine 数目可能比容器 cpu 核数高很多,造成资源争抢,导致并发性能下降,可以通过设置环境变量 GOMAXPROCS 指定 goroutine 最大数目,也可以使用 go.uber.org/automaxprocs 库自动修正最大核数为容器 cpu 核数。

自适应设置 GOMAXPROCS 上下限代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package main

import (
_ "go.uber.org/automaxprocs"

"runtime"
)

func main() {
procsNum := runtime.GOMAXPROCS(-1)
if procsNum < 4 {
procsNum = 4
} else if procsNum > 16 {
procsNum = 16
}

runtime.GOMAXPROCS(procsNum)

// todo something...

}

python 内存泄漏排查

※注:python 的默认参数是全局变量,若默认参数为一个引用类型(eg:字典对象),且函数中会对该参数进行写操作,就极有可能发生内存泄漏,所以 python 默认参数最好是值类型。

方法一是线上程序直接排查,通过 pyrasite 和 guppy 直接对应 python 程序:

step1:绑定 python 程序 pid,开启 pyrasite shell 窗口,执行 pyrasite-shell <pid>

step2:使用 guppy 查看 python 程序内存情况,

1
2
3
>>> from guppy import hpy
>>> h = hpy()
>>> h.heap()

step3:间隔一定时间后,再次使用 h.heap(),对比两次内存变化

该方法一般只能粗略查看内存泄露的数据对象,可能无法精确定位到指定位置,这时需要用方法二,手动插入代码查看程序运行日志:

Python标准库的gc、sys模块提供了检测的能力

1
2
3
4
5
6
import gc
import sys

gc.get_objects() # 返回一个收集器所跟踪的所有对象的列表
gc.get_referrers(*objs) # 返回直接引用任意一个 ojbs 的对象列表
sys.getsizeof() # 返回对象的大小(以字节为单位)。只计算直接分配给对象的内存消耗,不计算它所引用的对象的内存消耗。

基于这些函数,先把进程中所有的对象引用拿到,得到对象大小,然后从大到小排序,打印出来,代码如下:

1
2
3
4
5
6
7
8
9
10
11
import gc
import sys

def show_memory():
print("*" * 60)
objects_list = []
for obj in gc.get_objects():
size = sys.getsizeof(obj)
objects_list.append((obj, size))
for obj, size in sorted(objects_list, key=lambda x: x[1], reverse=True)[:10]:
print(f"OBJ: {id(obj)}, TYPE: {type(obj)} SIZE: {size/1024/1024:.2f}MB {str(obj)[:100]}")

找到内存占用稳定增长的对象,调用 gc.get_referrers(*objs),查看该对象的引用信息,即可快速定位泄漏位置

该方法更加灵活精确,不好的地方是有侵入性,需要修改代码后重新上线,同时获取这些信息并打印,对性能有一定的影响,排查完之后,需要将该段代码下线。

参考资料

1、python内存泄露问题定位:附带解决pyrasite timed out

2、技术 · 一次Python程序内存泄露故障的排查过程

M1 个人配置

前言

  记录一下 Shaun 个人的 Mac 装机配置。

前言

  记录一下 Shaun 个人的 Mac 装机配置。

必备

AlDente:Mac 电池健康保护神器,默认 80% 就行,想充满就设置为 100%,需要禁掉自带的优化电池充电

LuLu:防火墙,控制应用联网权限。

BetterDisplay:使外接显示器更清晰,需设置与笔记本同宽高比/同分辨率的 Dummy 以及将 Dummy 屏幕镜像到外接显示器。

MacZip:解压缩。

Mac 黑魔法

  有时 Mac 系统抽风,部分设置在界面上无法修改,需要通过终端命令强制修改。现记录部分命令:

1
2
3
4
5
# 允许安装任何来源的 app
sudo spctl --master-disable

# 设置时区为中国标准时间
sudo systemsetup -settimezone Asia/Shanghai

iTerm2

  下载安装 iTerm2,默认 shell 就是 zsh,所以不需要安装。

  安装 Oh My Zsh,github 上的命令在国内可能无法顺利执行,先 clone 下来,手动执行 sh tools/install.sh

  安装 Powerlevel10k 之前,先安装 nerd font 字体,Shaun 个人还是比价喜欢 Fira Code 字体,所以就选择下载 Fira Code Nerd Font 字体,只需要安装 Fira Code Retina Nerd Font Complete.ttf 即可。设置 iTerm2 字体为 FiraCode Nerd Font。

  随后开始安装 Powerlevel10k,安装完之后重启 iTerm2,会有 Powerlevel10k 的配置提问,依次回答(有推荐按推荐)完成即可配置好 Powerlevel10k,若后续想修改配置,可直接编辑 ~/.p10k.zsh 文件或使用 p10k configure 命令重新回答配置提问。最后在 zsh 的配置文件 ~/.zshrc 中设置 ZSH_THEME=powerlevel10k/powerlevel10k

  推荐安装 zsh 插件 zsh-syntax-highlightingzsh-autosuggestions,在执行完

1
2
3
git clone https://github.com/zsh-users/zsh-syntax-highlighting.git ${ZSH_CUSTOM:-~/.oh-my-zsh/custom}/plugins/zsh-syntax-highlighting

git clone https://github.com/zsh-users/zsh-autosuggestions ${ZSH_CUSTOM:-~/.oh-my-zsh/custom}/plugins/zsh-autosuggestions

后修改 ~/.zshrc 的 plugins 值,

1
2
3
4
5
6
plugins=( 
git
zsh-syntax-highlighting
zsh-autosuggestions
# other plugins...
)

VSCode

  VSCode 同样需要设置终端字体为 FiraCode Nerd Font,在终端中进入 Downloads 目录执行 mv Visual\ Studio\ Code.app /Applications 命令,将 VSCode 放进 应用程序 中,再执行 sudo ln -s "/Applications/Visual Studio Code.app/Contents/Resources/app/bin/code" /usr/local/bin/code,之后可在终端使用命令(code .)直接打开 VSCode。若无法自动更新,需执行:

1
2
sudo chown -R $USER ~/Library/Caches/com.microsoft.VSCode.ShipIt
xattr -dr com.apple.quarantine /Applications/Visual\ Studio\ Code.app

Homebrew

20241112 更新:

可一键直接 安装 Homebrew中科大源清华大学源

以下命令无需执行。


  直接执行:

1
2
3
4
/bin/bash -c "$(curl -fsSL https://cdn.jsdelivr.net/gh/ineo6/homebrew-install/install.sh)"

echo 'eval "$(/opt/homebrew/bin/brew shellenv)"' >> ~/.zprofile
eval "$(/opt/homebrew/bin/brew shellenv)"

安装完成后先检查目录 /opt/homebrew/Library/Taps/homebrew/homebrew-cask 是否存在,若不存在,则执行:

1
2
cd /opt/homebrew/Library/Taps/homebrew/
git clone https://mirrors.ustc.edu.cn/homebrew-cask.git

  最后设置中科大源:

1
2
3
4
5
6
7
git -C "$(brew --repo)" remote set-url origin https://mirrors.ustc.edu.cn/brew.git
git -C "$(brew --repo homebrew/core)" remote set-url origin https://mirrors.ustc.edu.cn/homebrew-core.git
git -C "$(brew --repo homebrew/cask)" remote set-url origin https://mirrors.ustc.edu.cn/homebrew-cask.git
brew update

echo 'export HOMEBREW_BOTTLE_DOMAIN=https://mirrors.ustc.edu.cn/homebrew-bottles/bottles' >> ~/.zprofile
source ~/.zprofile

Aria2

  直接使用命令 brew install aria2 安装,生成配置文件:

1
2
3
4
cd ~
mkdir .aria2
cd .aria2
touch aria2.conf

  打开 Finder,通过 Shift+Cmd+G 进入路径:~/.aria2/,编辑文件 aria2.conf,添加以下内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#用户名
#rpc-user=user
#密码
#rpc-passwd=passwd
#上面的认证方式不建议使用,建议使用下面的token方式
#设置加密的密钥
#rpc-secret=token
#允许rpc
enable-rpc=true
#允许所有来源, web界面跨域权限需要
rpc-allow-origin-all=true
#允许外部访问,false的话只监听本地端口
rpc-listen-all=true
#RPC端口, 仅当默认端口被占用时修改
#rpc-listen-port=6800
#最大同时下载数(任务数), 路由建议值: 3
max-concurrent-downloads=5
#断点续传
continue=true
#同服务器连接数
max-connection-per-server=5
#最小文件分片大小, 下载线程数上限取决于能分出多少片, 对于小文件重要
min-split-size=10M
#单文件最大线程数, 路由建议值: 5
split=10
#下载速度限制
max-overall-download-limit=0
#单文件速度限制
max-download-limit=0
#上传速度限制
max-overall-upload-limit=0
#单文件速度限制
max-upload-limit=0
#断开速度过慢的连接
#lowest-speed-limit=0
#验证用,需要1.16.1之后的release版本
#referer=*
#文件保存路径, 默认为当前启动位置
dir=/Users/yuanxu/Downloads
#文件缓存, 使用内置的文件缓存, 如果你不相信Linux内核文件缓存和磁盘内置缓存时使用, 需要1.16及以上版本
#disk-cache=0
#另一种Linux文件缓存方式, 使用前确保您使用的内核支持此选项, 需要1.15及以上版本(?)
#enable-mmap=true
#文件预分配, 能有效降低文件碎片, 提高磁盘性能. 缺点是预分配时间较长
#所需时间 none < falloc ? trunc << prealloc, falloc和trunc需要文件系统和内核支持
file-allocation=prealloc
bt-tracker=udp://tracker.opentrackr.org:1337/announce,udp://open.tracker.cl:1337/announce,udp://9.rarbg.com:2810/announce,udp://tracker.openbittorrent.com:6969/announce,udp://exodus.desync.com:6969/announce,udp://www.torrent.eu.org:451/announce,udp://vibe.sleepyinternetfun.xyz:1738/announce,udp://tracker1.bt.moack.co.kr:80/announce,udp://tracker.zerobytes.xyz:1337/announce,udp://tracker.torrent.eu.org:451/announce,udp://tracker.theoks.net:6969/announce,udp://tracker.srv00.com:6969/announce,udp://tracker.pomf.se:80/announce,udp://tracker.ololosh.space:6969/announce,udp://tracker.monitorit4.me:6969/announce,udp://tracker.moeking.me:6969/announce,udp://tracker.lelux.fi:6969/announce,udp://tracker.leech.ie:1337/announce,udp://tracker.jordan.im:6969/announce,udp://tracker.blacksparrowmedia.net:6969/announce

最后的 bt-tracker 可以从 trackerslist 获取,只用最好的 20 个即可(trackers_best (20 trackers) => link / mirror / mirror 2)。

  接着启动 aria2:aria2c --conf-path="/Users/xxx/.aria2/aria2.conf" -D (xxx 为电脑用户名),在 ~/.zshrc 中加入

1
2
alias start-aria2='aria2c --conf-path="/Users/xxx/.aria2/aria2.conf" -D'
start-aria2

将 start-aria2c 作为启动 aria2 的命令别名,顺便开机自启。

  最后从 Aria2中文网 安装 Chrome 插件,打开 aria2 的 WebUI 界面。

expect

  经常需要使用 ssh 远程登陆堡垒机再到远程服务器,输密码选机器都很麻烦,可以用 expect 写些脚本,自动填充密码和机器,一键直接进到远程服务器。首先安装 expect:brew install expect。在 /usr/local/bin 目录中新建脚本:sudo vi mysl.sh,填充相应内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#!/usr/bin/expect -f

set USER [用户名]
set PWD [密码]
set TERMSERVIP [堡垒机服务器ip]

# 全部的远程服务器([remote_server_name] 需要修改为对应的服务器名
set RS1 [remote_server_name]
set RS2 [remote_server_name]

# help 命令,查看所有需要登录的远程服务器
if {[lindex $argv 0] == "help"} {
puts "1: $RS1 [说明]"
puts "2: $RS2 [说明]"
send "exit\r"
exit
}

# ===== 脚本正文 =====
# 默认登陆远程服务器1
set RS $RS1
set timeout 10

# 输入命令 1,则登陆第一台服务器
if {[lindex $argv 0] == "1"} {
set RS $RS1
}
if {[lindex $argv 0] == "2"} {
set RS $RS2
}

spawn ssh ${USER}@${TERMSERVIP} -p 22
expect {
"yes/no" { send "yes\r"; exp_continue; }
"*assword*" { send "$PWD\n"}
}

# 选择几号跳板机
expect "*num*" { send "0\n" }

# 登陆远程服务器
expect "${USER}@" { send "ssh $RS\n" }

# 退出 expect(保持在远程服务器终端
interact

# 退出 expect(回到本地终端
# expect eof

为新建的脚本增加可执行权限:sudo chmod 777 mysl.sh,之后可直接使用 mysl.sh 1 登录到对应的远程服务器。

lrzsz

  与 FTP 和 NFS 相比,使用 lrzsz 与远程 linux 服务器做文件上传和下载是最简单的,在 iTerm2 中使用 rzsz 命令进行上传和下载文件需要一定的配置。※注使用 expect 自动登录的远程环境可能无法使用 sz rz 命令

  首先安装 lrzsz:brew install lrzsz。再跳转目录:cd /usr/local/bin,新建文件:sudo vi iterm2-recv-zmodem.sh,添加内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#!/bin/bash
# Author: Matt Mastracci (matthew@mastracci.com)
# AppleScript from http://stackoverflow.com/questions/4309087/cancel-button-on-osascript-in-a-bash-script
# licensed under cc-wiki with attribution required
# Remainder of script public domain

osascript -e 'tell application "iTerm2" to version' > /dev/null 2>&1 && NAME=iTerm2 || NAME=iTerm
if [[ $NAME = "iTerm" ]]; then
FILE=`osascript -e 'tell application "iTerm" to activate' -e 'tell application "iTerm" to set thefile to choose folder with prompt "Choose a folder to place received files in"' -e "do shell script (\"echo \"&(quoted form of POSIX path of thefile as Unicode text)&\"\")"`
else
FILE=`osascript -e 'tell application "iTerm2" to activate' -e 'tell application "iTerm2" to set thefile to choose folder with prompt "Choose a folder to place received files in"' -e "do shell script (\"echo \"&(quoted form of POSIX path of thefile as Unicode text)&\"\")"`
fi

if [[ $FILE = "" ]]; then
echo Cancelled.
# Send ZModem cancel
echo -e \\x18\\x18\\x18\\x18\\x18
sleep 1
echo
echo \# Cancelled transfer
else
cd "$FILE"
/usr/local/bin/rz -E -e -b
sleep 1
echo
echo
echo \# Sent \-\> $FILE
fi

再新建文件:sudo vi iterm2-send-zmodem.sh,添加内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#!/bin/bash
# Author: Matt Mastracci (matthew@mastracci.com)
# AppleScript from http://stackoverflow.com/questions/4309087/cancel-button-on-osascript-in-a-bash-script
# licensed under cc-wiki with attribution required
# Remainder of script public domain

osascript -e 'tell application "iTerm2" to version' > /dev/null 2>&1 && NAME=iTerm2 || NAME=iTerm
if [[ $NAME = "iTerm" ]]; then
FILE=`osascript -e 'tell application "iTerm" to activate' -e 'tell application "iTerm" to set thefile to choose file with prompt "Choose a file to send"' -e "do shell script (\"echo \"&(quoted form of POSIX path of thefile as Unicode text)&\"\")"`
else
FILE=`osascript -e 'tell application "iTerm2" to activate' -e 'tell application "iTerm2" to set thefile to choose file with prompt "Choose a file to send"' -e "do shell script (\"echo \"&(quoted form of POSIX path of thefile as Unicode text)&\"\")"`
fi
if [[ $FILE = "" ]]; then
echo Cancelled.
# Send ZModem cancel
echo -e \\x18\\x18\\x18\\x18\\x18
sleep 1
echo
echo \# Cancelled transfer
else
/usr/local/bin/sz "$FILE" -e -b
sleep 1
echo
echo \# Received $FILE
fi

  为新建的两文件添加可执行权限:sudo chmod 777 iterm2-*。之后添加 rz sz 命令的软连接:

1
2
sudo ln -s /opt/homebrew/bin/rz /usr/local/bin/rz
sudo ln -s /opt/homebrew/bin/sz /usr/local/bin/sz

  最后配置 iTerm2,选择 Preference... -> Profiles -> Default -> Advanced -> Edit (in Triggers),添加下载触发器:

1
2
3
4
5
6
7
8
9
10
11
# 1. Regular expression 中填写
rz waiting to receive.\*\*B0100

# 2. Action 选择
Run Silent Coprocess...

# 3. Parameters 中填写
/usr/local/bin/iterm2-send-zmodem.sh

# 4. Instant 不勾选
# 5. Enabled 勾选

再添加上传触发器:

1
2
3
4
5
6
7
8
9
10
11
# 1. Regular expression 中填写
\*\*B00000000000000

# 2. Action 选择
Run Silent Coprocess...

# 3. Parameters 中填写
/usr/local/bin/iterm2-recv-zmodem.sh

# 4. Instant 不勾选
# 5. Enabled 勾选

  至此 M1 中 iTerm2 rz sz 命令配置完成。

参考资料

iTerm2 + zsh + Oh My Zsh + Powerlevel10k 打造 Mac 下最强终端

Mac M1 iTerm2 配置rz sz 上传下载文件

Scala 多线程编程小结

前言

  多线程的执行方式有两种:并发(Concurrent)和并行(Parallel),简单来说,并发就是两个线程轮流在一个 CPU 核上执行,而并行则是两个线程分别在两个 CPU 核上运行。一般而言,程序员无法直接控制线程是并发执行还是并行执行,线程的执行一般由操作系统直接控制,当然程序运行时也可以做简单调度。所以对于一般程序员来说,只需要熟练使用相关语言的多线程编程库即可,至于是并发执行还是并行执行,可能并不是那么重要,只要能达到预期效果就行。

前言

  多线程的执行方式有两种:并发(Concurrent)和并行(Parallel),简单来说,并发就是两个线程轮流在一个 CPU 核上执行,而并行则是两个线程分别在两个 CPU 核上运行。一般而言,程序员无法直接控制线程是并发执行还是并行执行,线程的执行一般由操作系统直接控制,当然程序运行时也可以做简单调度。所以对于一般程序员来说,只需要熟练使用相关语言的多线程编程库即可,至于是并发执行还是并行执行,可能并不是那么重要,只要能达到预期效果就行。

  Shaun 目前接触的 Scala 原生多线程编程语法就两个:Future 和 Parallel Collections。其中 Future 用的的最多,并且 Parallel Collections 语法非常简单,所以主要介绍 Future,附带提一下 Parallel Collections。

ExecutionContext 篇

  ExecutionContext 是 Future 的执行上下文,相当于是 Java 的线程池,Java 的线程池主要有以下两类:

  • ThreadPool:所有线程共用一个任务队列,当线程空闲时,从队列中取一个任务执行。
  • ForkJoinPool:每个线程各有一个任务队列,当线程空闲时,从其他线程的任务队列中取一批任务放进自己的队列中执行。

  对于少量任务,这两个池子没啥区别,只是 ThreadPool 在某些情况下会死锁,比如在一个并行度为 2 (最多两个线程)的 ThreadPool 中执行两个线程,两个线程又分别提交一个子任务,并等到子任务执行完才退出,这时会触发相互等待的死锁条件,因为没有多余的空闲线程来执行子任务,而 ForkJoinPool 中每个线程产生的子任务会放在自己的任务队列中,ForkJoinPool 可以在线程耗尽时额外创建线程,也可以挂起当前任务,执行子任务,从而防止死锁。对于大量任务,ForkJoinPool 中的空闲线程会从其他线程的任务队列中一批一批的取任务执行,所以一般会更快,当然若各个任务执行时间比较均衡,则 ThreadPool 会更快。

  根据线程池创建的参数不同,Executors 中提供了 5 种线程池:newSingleThreadExecutor(单线程线程池,可保证任务执行顺序),newFixedThreadPool(固定大小线程池,限制并行度),newCachedThreadPool(无限大小线程池,任务执行时间小采用),newScheduledThreadPool(同样无限大小,用来处理延时或定时任务),newWorkStealingPool(ForkJoinPool 线程池)。前四种都属于 ThreadPool,根据阿里的 Java 的编程规范,不推荐直接使用 Executors 创建线程池,不过对于计算密集型任务,一般使用 newFixedThreadPool 或 newWorkStealingPool 即可,线程数设置当前 CPU 数即可(Runtime.getRuntime.availableProcessors()),多了反而增加线程上下文切换次数,对CPU 的利用率不增反减。

  Scala 提供了一个默认的 ExecutionContext:scala.concurrent.ExecutionContext.Implicits.global,其本质也是一个 ForkJoinPool,并行度默认设置为当前可用 CPU 数,当然也会根据需要(比如当前全部线程被阻塞)额外创建更多线程。一般做计算密集型任务就用默认线程池即可,特殊情况也可以自己创建 ExecutionContext.fromExecutor(Executors.newFixedThreadPool(8)),下面的代码就可以创建一个同步阻塞的 ExecutionContext:

1
2
3
4
5
val currentThreadExecutionContext = ExecutionContext.fromExecutor(
new Executor {
// Do not do this!
def execute(runnable: Runnable) { runnable.run() }
})

原因是 runnable.run() 并不会新开一个线程,而是直接在主线程上执行,和调用普通函数一样。

Future 篇

  先上一个简单的 Future 并发编程 Demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
////import scala.concurrent.ExecutionContext.Implicits.global
//val pool = Executors.newFixedThreadPool(Runtime.getRuntime.availableProcessors())
val pool = Executors.newWorkStealingPool()
implicit val ec = ExecutionContext.fromExecutorService(pool)

val futures = Array.range(0, 10000).map(i => Future {
println(i)
Thread.sleep(100)
i
})

val futureSequence = Future.sequence(futures)
futureSequence.onComplete({
case Success(results) => {
println(results.mkString("Array(", ", ", ")"))
println(s"Success")

ec.shutdown()
pool.shutdownNow()
}
case Failure(e) => println(s"Error processing future operations, error = ${e.getMessage}")
})
Await.result(futureSequence, Duration.Inf)

  如果计算机 CPU 核数为 8 核,则程序运行成功后将会从 VisualVM 中看到有 8 个线程数在运行,控制台中会每次打印 8 条记录,最后打印出完整数组。

  onComplete 是 Future 的回调函数,可对 Success 和 Failure 分别处理,Await 是为了阻塞主线程,当 futureSequence 执行完成后,才继续执行下面的任务。当然,主线程的阻塞也可以使用 Java 中的 CountDownLatch 来实现,只需要在每个 Future 执行完成后调用一次 countDown() 即可,或者直接在 onComplete 的回调函数中调用一次也行。(题外话:CountDownLatch 和 Golang 中的 sync.WaitGroup 感觉区别不大)。

  如果不想让程序并发执行,则将 Future.sequence(futures) 改为 Future.traverse(futures)(x => x) 即可,此时就会一条条打印,但不保证打印顺序与数组一致。

  如果使用 ExecutionContext.Implicits.global,并将上面创建 futures 的代码改为:

1
2
3
4
5
6
7
val futures = Array.range(0, 10000).map(i => Future {
blocking {
println(i)
Thread.sleep(100)
i
}
})

  则控制台会马上将数组全部打印出来,从 VisualVM 中看会有非常多的线程在运行,远远超过 8 个,这是因为 ForkJoinPool 检测到当前线程以全部阻塞,所以需要另开线程继续执行,如果将线程池改为 Executors.newFixedThreadPool(8),则不会马上将数组全部打印,而是恢复原样,每次打印 8 条。blocking 需要慎用,如果 ForkJoinPool 中线程数太多,同样会 OOM,一般在大量运行时间短内存小的并发任务中使用。


  Parallel Collections 并发编程就很简单了,demo 如下:

1
2
3
4
Array.range(0, 10000).par.foreach(i => {
println(i)
Thread.sleep(100)
})

  关键字为 par,调用该方法即可轻松进行并发计算,不过需要注意的是并发操作的副作用(side-effects)和“乱序”(out of order)语义,副作用就是去写函数外的变量,不仅仅只读写并发操作函数内部声明的变量,乱序语义是指并发操作不会严格按照数组顺序执行,所以如果并发操作会同时操作两个数组元素(eg:reduce),则需要慎重使用,有的操作结果不变,而有的操作会导致结果不唯一。

经验篇

  Shaun 目前使用 Scala 进行多线程编程主要碰到过以下几个问题:

  • 数据竞争问题
  • 任务拆分问题
  • 内存占用问题

  数据竞争问题算是多线程编程中最常见的问题,简单来说就是两个线程同时写同一个变量,导致变量值不确定,引发后续问题,解决该问题有很多方法,性能由高到底有:Atomic,volatile,线程安全数据结构(eg:ConcurrentHashMap),Lock,synchronized,前两个方法性能最高,但局限性也很大,如果有现成的线程安全对象使用是最好的,没有的只能用 Lock 和 synchronized,这两种各有优缺点,synchronized 用法简单,能应付绝大部分问题,但对读也会加锁并且无法中断等待线程,Lock 是个接口,有比较多的派生对象(ReentrantLock,ReadWriteLock,ReentrantReadWriteLock 等),能更灵活的控制锁,不过使用起来相对复杂,需要显式地加锁解锁。

  任务拆分问题,这个问题发生在任务量非常多(千万级以上)的时候,当需要对千万级数据进行并发处理时,单纯的生成相应的千万级 Future 在默认的 ExecutionContext 中执行会比较慢,甚至出现程序运行一段时间卡一段时间的现象(可能是内存不足,GC 卡了),此时需要人为对千万级任务进行合并。Shaun 这里有两种方案:一种是使用 grouped 将千万级任务划分为 16 组,从而降级为 16 个任务,生成 16 个Future,这时执行速度会快很多,且不会有卡的现象出现;另一种方案就是,每次只生成 10 万个 Future 放进 ExecutionContext 中执行,如此将千万级任务拆分成每次 10 万并发执行,同样能解决问题。

  内存占用问题,这个问题发生在单个任务需要占用大量内存(1G 以上)的时候,当单个任务需要 1G 以上内存,8 个任务并行则需要 8G 以上内存,内存占用过高,提高 JVM 的内存,但也只是治标不治本。Shaun 的解决方案是对单个任务进行进一步拆分,将单个任务继续拆分为 16 个子任务,再将 16 个子任务的结果进行合并,作为单个大任务的结果,8 个大任务串行执行,如此内存占用极大减少,只需要单个任务的内存即可完成全部任务,且 CPU 利用率不变,执行速度甚至会更快(Full GC 次数变少)。


  Shaun 在写大文件的时候会用到 newSingleThreadExecutor 和 Future.traverse,将写文件的操作放在 Future 里面,每次只写一个大文件(不用多线程写是因为机械硬盘的顺序读写肯定比随机读写快),而生产大文件内容的操作由默认的 ExecutionContext 执行,从而使生产与消费互不干扰,写大文件操作不会阻塞生产操作。

  一个用 Future 实现的生产者消费者 demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
val poolProducer = Executors.newWorkStealingPool()
implicit val ecProducer = ExecutionContext.fromExecutorService(poolProducer)
val poolConsumer = Executors.newSingleThreadExecutor()
val ecConsumer = ExecutionContext.fromExecutorService(poolConsumer)

val futures = Array.range(0, 1000).map(i => Future {
val x = produce(i) // produce something...
x
}(ecProducer).andThen { case Success(x) =>
consume(x) // consume something...
}(ecConsumer))

val futureSequence = Future.sequence(futures)
futureSequence.onComplete({
case Success(results) => {
println("Success.")

ecProducer.shutdown()
poolProducer.shutdownNow()
ecConsumer.shutdown()
poolConsumer.shutdownNow()
}
case Failure(e) => println(s"Error processing future operations, error = ${e.getMessage}")
})
Await.result(futureSequence, Duration.Inf)

后记

  Shaun 这里写的 Scala 多线程编程主要是针对计算密集型任务,而 IO 密集型任务一般会用专门的一些框架,计算密集型考虑的是如何最大化利用 CPU,加快任务执行速度,线程数一般比较固定。Scala 的 Future 多线程编程相比 Java 的多线程编程要简洁了很多,唯一需要控制的就是并行度和任务拆分,Shaun 自己在用时也对 Future 做了简单封装,进一步简化了 Scala 的多线程编程,对 Iterable 的并发计算会更方便。

参考资料

[1] Futures and Promises

[2] scala.concurrent.blocking - what does it actually do?

[3] Parallel Collections

[4] Java并发编程:Lock

Google S2 Geometry 浅解

前言

  Google S2 Geometry(以下简称 S2) 是 Google 发明的基于单位球的一种地图投影和空间索引算法,该算法可快速进行覆盖以及邻域计算。更多详见 S2GeometryGoogle’s S2, geometry on the sphere, cells and Hilbert curvehalfrost 的空间索引系列文章。虽然使用 S2 已有一年的时间,但确实没有比较系统的看过其源码,这次借着这段空闲时间,将 Shaun 常用的功能系统的看看其具体实现,下文将结合 S2 的 C++,Java,Go 的版本一起看,由于 Java 和 Go 的都算是 C++ 的衍生版,所以以 C++ 为主,捎带写写这三种语言实现上的一些区别,Java 版本时隔 10 年更新了 2.0 版本,喜大普奔。

前言

  Google S2 Geometry(以下简称 S2) 是 Google 发明的基于单位球的一种地图投影和空间索引算法,该算法可快速进行覆盖以及邻域计算。更多详见 S2GeometryGoogle’s S2, geometry on the sphere, cells and Hilbert curvehalfrost 的空间索引系列文章。虽然使用 S2 已有一年的时间,但确实没有比较系统的看过其源码,这次借着这段空闲时间,将 Shaun 常用的功能系统的看看其具体实现,下文将结合 S2 的 C++,Java,Go 的版本一起看,由于 Java 和 Go 的都算是 C++ 的衍生版,所以以 C++ 为主,捎带写写这三种语言实现上的一些区别,Java 版本时隔 10 年更新了 2.0 版本,喜大普奔。

坐标篇

s2 projection

  S2 的投影方式可简单想象为一个单位球外接一个立方体,从球心发出一条射线得到球面上的点到立方体上 6 个面的投影,即将球面投影为立方体,当然中间为了使面积分布更为均匀,还做了些其他坐标变换。

S2LatLng 坐标

  首先是经纬度坐标,默认用弧度(Radians)构造,取值范围为经度 [-π,+π],纬度 [-π/2,+π/2],当然也可使用 S1Angle 将角度(Degrees)转成弧度来构造。

S2Point 坐标

  然后球面笛卡尔坐标,这是个三维坐标,由 S2LatLng 到 S2Point 相当于将单位球的极坐标表示法转换为笛卡尔坐标表示法,具体公式为 \(x=\cos(lat)cos(lng); y=cos(lat)sin(lng); z=sin(lat)\)

FaceUV 坐标

  这个坐标并没实际的类与其对应,face 指的是立方体的面,值域为 [0,5],而 uv 坐标是指面上的点,值域为 [-1,1]。首先需要知道 S2Point 会投影到哪个面上,可以知道 S2 的笛卡尔坐标 X 轴正向指向 0 面,Y 轴正向指向 1 面,Z 轴正向指向 2 面,X 轴负向指向 3 面,Y 轴负向指向 4 面,Z 轴负向指向 5 面,所以 S2Point xyz 哪个分量的绝对值最大,就会投影到哪个轴指向的面,若该分量为正值,则取正向指的面,若该分量为负值,则取负向指的面。至于 uv 的计算方式就是直线与平面的交点了,之前的一篇「计算几何基础」中写过,但这里的平面和直线都比较特殊,所以有快速算法,就直接贴 Go 的代码吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// validFaceXYZToUV given a valid face for the given point r (meaning that
// dot product of r with the face normal is positive), returns
// the corresponding u and v values, which may lie outside the range [-1,1].
func validFaceXYZToUV(face int, r r3.Vector) (float64, float64) {
switch face {
case 0:
return r.Y / r.X, r.Z / r.X
case 1:
return -r.X / r.Y, r.Z / r.Y
case 2:
return -r.X / r.Z, -r.Y / r.Z
case 3:
return r.Z / r.X, r.Y / r.X
case 4:
return r.Z / r.Y, -r.X / r.Y
}
return -r.Y / r.Z, -r.X / r.Z
}

  这里需要注意的是 S2Point xyz 三分量构成的向量与平面法向量的点积必须是正数时 uv 才算正确有效,Go 在计算时没做校验,C++ 和 Java 都有校验,使用时需要注意。

FaceST 坐标

  之所以引入 ST 坐标是因为同样的球面面积映射到 UV 坐标面积大小不一,大小差距比较大(离坐标轴越近越小,越远越大),所以再做一次 ST 变换,将面积大的变小,小的变大,使面积更均匀,利于后面在立方体面上取均匀格网(cell)时,每个 cell 对应球面面积差距不大。S2 的 ST 变换有三种:1、线性变换,基本没做任何变形,只是简单将 ST 坐标的值域变换为 [0, 1],cell 对应面积最大与最小比大约为 5.2;2、二次变换,一种非线性变换,能起到使 ST 空间面积更均匀的作用,cell 对应面积最大与最小比大约为 2.1;3、正切变换,同样能使 ST 空间面积更均匀,且 cell 对应面积最大与最小比大约为 1.4,不过其计算速度相较于二次变换要慢 3 倍,所以 S2 权衡考虑,最终采用了二次变换作为默认的 UV 到 ST 之间的变换。二次变换公式为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public double stToUV(double s) {
if (s >= 0.5) {
return (1 / 3.) * (4 * s * s - 1);
} else {
return (1 / 3.) * (1 - 4 * (1 - s) * (1 - s));
}
}

public double uvToST(double u) {
if (u >= 0) {
return 0.5 * Math.sqrt(1 + 3 * u);
} else {
return 1 - 0.5 * Math.sqrt(1 - 3 * u);
}
}

FaceIJ 坐标

  IJ 坐标是离散化后的 ST 坐标,将 ST 空间的平面划分为 \(2^{30}×2^{30}\) 个网格,取网格所在的横纵坐标得到 IJ 坐标,所以由 ST 到 IJ 坐标的变换就比较简单了:

1
2
3
4
5
public static int stToIj(double s) {
return Math.max(
0, Math.min(1073741824 - 1, (int) Math.round(1073741824 * s - 0.5))
);
}

S2CellId

  这个 id 其实是个一维坐标,而是利用希尔伯特空间填充曲线将 IJ 坐标从二维变换为一维,该 id 用一个 64 位整型表示,高 3 位用来表示 face(0~5),后面 61 位来保存不同的 level(0~30) 对应的希尔伯特曲线位置,每增加一个 level 增加两位,后面紧跟一个 1,最后的位数都补 0。注:Java 版本的 id 是有符号 64 位整型,而 C++ 和 Go 的是无符号 64 位整型,所以在跨语言传递 id 的时候,在南极洲所属的最后一个面(即 face = 5)需要小心处理。

HilbertCurve

hilbert_curve_subdivision_rules
hilbert_curve

  上面两张图很明了的展示了希尔伯特曲线的构造过程,该曲线的构造基本元素由 ABCD 4 种“U”形构成,而 BCD 又可由 A 依次逆时针旋转 90 度得到,所以也可以认为只有一种“U”形,每个 U 占 4 个格子,以特定方式进行 1 分 4 得到下一阶曲线形状。

每个 U 坐标与希尔伯特位置(用二进制表示)对应关系如下:

  • A:00 -> (0,0); 01 -> (0,1); 10 -> (1,1); 11 -> (1,0);
  • B:00 -> (1,1); 01 -> (0,1); 10 -> (0,0); 11 -> (1,0);
  • C:00 -> (1,1); 01 -> (1,0); 10 -> (0,0); 11 -> (0,1);
  • D:00 -> (0,0); 01 -> (1,0); 10 -> (1,1); 11 -> (0,1);

每个 U 一分四对应关系如下:

  • A:D -> A -> A -> B
  • B:C -> B -> B -> A
  • C:B -> C -> C -> D
  • D:A -> D -> D -> C

  根据以上两个对应关系就能找到右手坐标系任意阶数的希尔伯特位置及坐标对应关系。以初始 1 阶曲线 A 为例,占据四个格子,然后进行一分四操作,四个格子分成 16 个格子,A 分为 DAAB 四个“U”形,连接起来即为 2 阶曲线,位置与坐标对应关系为(都用二进制表示):

0000 -> (00, 00); 0001 -> (01, 00); 0010 -> (01, 01); 0011 -> (00, 01)

0100 -> (00, 10); 0101 -> (00, 11); 0110 -> (01, 11); 0111 -> (01, 10)

1000 -> (10, 10); 1001 -> (10, 11); 1010 -> (11, 11); 1011 -> (11, 10)

1100 -> (11, 01); 1101 -> (10, 01); 1110 -> (10, 00); 1111 -> (11, 00)

  从二进制中很容易看出随着阶数的增加,位置与坐标的对应关系:每增加一阶,位置往后增加两位,坐标分量各增加一位,位置增加的两位根据一分四对应关系拼接,坐标各分量增加的一位需先找到一分四对应关系,再找对应位置与坐标对应关系,将得到的坐标分量对应拼接。以一阶的 01 -> (0,1) 到二阶的 0110 -> (01, 11) 为例,首先根据 01 得到当前所属一阶第二块,查找一分四对应关系知道,下一阶这块还是 A,根据 0110 后两位 10 可知这块属于 A 的第三个位置,查找坐标得到是 (1,1),结合一阶的 (0,1),对应分量拼接得到坐标 (01,11),即 (1, 3),同理可根据第二阶的坐标反查第二阶的位置。有了这些关系,就能生成希尔伯特曲线了,下面就看看 S2 是怎么生成 id 的。

S2Id

  首先 S2 中用了两个二维数组分别保存位置到坐标以及坐标到位置的对应的关系:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// kIJtoPos[orientation][ij] -> pos
const int kIJtoPos[4][4] = {
// (0,0) (0,1) (1,0) (1,1)
{ 0, 1, 3, 2 }, // canonical order
{ 0, 3, 1, 2 }, // axes swapped
{ 2, 3, 1, 0 }, // bits inverted
{ 2, 1, 3, 0 }, // swapped & inverted
};

// kPosToIJ[orientation][pos] -> ij
const int kPosToIJ[4][4] = {
// 0 1 2 3
{ 0, 1, 3, 2 }, // canonical order: (0,0), (0,1), (1,1), (1,0)
{ 0, 2, 3, 1 }, // axes swapped: (0,0), (1,0), (1,1), (0,1)
{ 3, 2, 0, 1 }, // bits inverted: (1,1), (1,0), (0,0), (0,1)
{ 3, 1, 0, 2 }, // swapped & inverted: (1,1), (0,1), (0,0), (1,0)
};

// kPosToOrientation[pos] -> orientation_modifier
const int kPosToOrientation[4] = {1, 0, 0, 3};

  方向 0(canonical order)相当于上文中 A,方向 1(axes swapped)相当于上文中 D,方向 2(bits inverted)相当于上文中 C,方向 3(swapped & inverted)相当于上文中 B,kPosToOrientation 代表 S2 中方向 0 一分四的对应关系,而 方向 1,2,3 的对应关系可由该值推出,计算公式为 orientation ^ kPosToOrientation,eg:1 -> 1^kPosToOrientation=[0, 1, 1, 2]; 3 -> 3^kPosToOrientation=[2, 3, 3, 0],与上文中一分四对应关系一致。

  随后 S2 初始化了一个 4 阶希尔伯特曲线位置与坐标的对应关系查找表,见 C++ 版的 MaybeInit() 方法,

1
2
3
int ij = (i << 4) + j;
lookup_pos[(ij << 2) + orig_orientation] = (pos << 2) + orientation;
lookup_ij[(pos << 2) + orig_orientation] = (ij << 2) + orientation;

  orig_orientation 代表 4 个初始方向,orientation 代表该位置或坐标下一阶一分四的方向,数组中每个元素是 16 位数,2 个字节,一个四阶希尔伯特曲线是 \(2^4×2^4=256\) 个位置,一个初始方向对应一个四阶希尔伯特曲线,所以一个查找表共占内存 \(2×256×4=2048=2KB\),正好一级缓存能放下,再大的话,一级缓存可能放不下,反而会降低查找速度。这两个查找表就相当于 4 个超“U”形的位置与坐标对应关系,同时一分四对应关系保持不变,以超“U”作为基本元素做下一阶希尔伯特曲线,每增加一阶位置往后增加 8 位,IJ 坐标各往后增加 4 位,如此,以更快的速度迭代到 S2 想要的 30 阶希尔伯特曲线。C++ 的这份代码就很精妙了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
S2CellId S2CellId::FromFaceIJ(int face, int i, int j) {
// 初始化超“U”形查找表
MaybeInit();

// face 向左移 60 位
uint64 n = absl::implicit_cast<uint64>(face) << (kPosBits - 1);

// 确定每个面的初始“U”形方向,使每个面都保持相同的右手坐标系,6 个面生成的希尔伯特曲线可以依次相连
uint64 bits = (face & kSwapMask);

// 基于超“U”形得到 30 阶希尔伯特曲线 IJ 坐标对应位置
#define GET_BITS(k) do { \
const int mask = (1 << kLookupBits) - 1; \
bits += ((i >> (k * kLookupBits)) & mask) << (kLookupBits + 2); \
bits += ((j >> (k * kLookupBits)) & mask) << 2; \
bits = lookup_pos[bits]; \
n |= (bits >> 2) << (k * 2 * kLookupBits); \
bits &= (kSwapMask | kInvertMask); \
} while (0)

// IJ 只有 30 位,7 这个调用只会导致位置移 4 位,后续调用都移 8 位,得到 4 + 8 * 7 = 60 位
GET_BITS(7);
GET_BITS(6);
GET_BITS(5);
GET_BITS(4);
GET_BITS(3);
GET_BITS(2);
GET_BITS(1);
GET_BITS(0);
#undef GET_BITS

// 整个 n 向右移一位,再以 1 结尾
return S2CellId(n * 2 + 1);
}

再来看看根据 id 反算 IJ 坐标:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
int S2CellId::ToFaceIJOrientation(int* pi, int* pj, int* orientation) const {
// 与上面一样
MaybeInit();

int i = 0, j = 0;
int face = this->face();
int bits = (face & kSwapMask);

// 反算 IJ 坐标,k == 7 时,取希尔伯特曲线位置高 4 位,IJ 各前 2 位,其余依次取位置 8 位, IJ 各 4 位
#define GET_BITS(k) do { \
const int nbits = (k == 7) ? (kMaxLevel - 7 * kLookupBits) : kLookupBits; \
bits += (static_cast<int>(id_ >> (k * 2 * kLookupBits + 1)) \
& ((1 << (2 * nbits)) - 1)) << 2; \
bits = lookup_ij[bits]; \
i += (bits >> (kLookupBits + 2)) << (k * kLookupBits); \
j += ((bits >> 2) & ((1 << kLookupBits) - 1)) << (k * kLookupBits); \
bits &= (kSwapMask | kInvertMask); \
} while (0)

GET_BITS(7);
GET_BITS(6);
GET_BITS(5);
GET_BITS(4);
GET_BITS(3);
GET_BITS(2);
GET_BITS(1);
GET_BITS(0);
#undef GET_BITS

*pi = i;
*pj = j;

if (orientation != nullptr) {
S2_DCHECK_EQ(0, kPosToOrientation[2]);
S2_DCHECK_EQ(kSwapMask, kPosToOrientation[0]);
// 0x1111111111111111ULL may be better?
if (lsb() & 0x1111111111111110ULL) {
bits ^= kSwapMask;
}
*orientation = bits;
}
return face;
}

  这里的 orientation 实际是指当前位置的方向,即其周围必有 3 个位置与其方向相同,最后一行注释 Shaun 之所以认为应该是 0x1111111111111111ULL,是因为第 30 阶希尔伯特曲线位置(leaf cell)按理说同样需要做异或操作得到方向,不过整个 S2 库都没有需要用到 leaf cell 的方向,所以这就倒无关紧要了。之所以需要做异或操作,是因为 bits 是该位置下一阶一分四的方向,而对于同一个希尔伯特曲线位置,奇数阶与奇数阶下一阶一分四方向相同,偶数阶与偶数阶下一阶一分四方向相同,lsb() 表示二进制 id 从右往左数第一个 1 所代表的数, 所以有 0x1111111111111110ULL 这一魔术数,而异或操作正好能将下一阶一分四方向调整为当前阶方向。

  如此 S2 的坐标以及 id 的生成以及反算就很明了了,下面就是 S2 如何使用 id 做计算了。


FaceSiTi 坐标

  这个是 S2 内部计算使用的坐标,一般用来计算 cell 的中心坐标,以及根据当前 s 和 t 坐标的精度(小数点后几位)判断对应的级别(level)。由于 S2 本身并不显式存储 ST 坐标(有存 UV 坐标),所以 ST 坐标只能计算出来,每个 cell 的中心点同样如此。计算公式为 \(Si=s*2^{31};Ti=t*2^{31}\)。至于为啥是 \(2^{31}\),是因为该坐标是用来描述从 0~ 31 阶希尔伯特曲线网格的中心坐标,0 阶中心以 \(1/2^1\) 递增,1 阶中心以 \(1/2^2\) 递增,2 阶中心以 \(1/2^3\) 递增,……,30 阶中心以 \(1/2^{31}\) 递增。S2 计算 id 对应的格子中心坐标,首先就会计算 SiTi 坐标,再将 SiTi 转成 ST 坐标。

算法篇

邻域算法

  S2 计算邻域,最关键的是计算不同面相邻的 leaf cell id,即:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
S2CellId S2CellId::FromFaceIJWrap(int face, int i, int j) {
// 限制 IJ 最大最小取值为 -1~2^30, 刚好能超出 IJ 正常表示范围 0~2^30-1
i = max(-1, min(kMaxSize, i));
j = max(-1, min(kMaxSize, j));

static const double kScale = 1.0 / kMaxSize;
static const double kLimit = 1.0 + DBL_EPSILON;
S2_DCHECK_EQ(0, kMaxSize % 2);
// IJ -> SiTi -> ST -> UV
double u = max(-kLimit, min(kLimit, kScale * (2 * (i - kMaxSize / 2) + 1)));
double v = max(-kLimit, min(kLimit, kScale * (2 * (j - kMaxSize / 2) + 1)));

face = S2::XYZtoFaceUV(S2::FaceUVtoXYZ(face, u, v), &u, &v);
return FromFaceIJ(face, S2::STtoIJ(0.5*(u+1)), S2::STtoIJ(0.5*(v+1)));
}

  这个算法主要用来计算超出范围(0~2^30-1)的 IJ 对应的 id,核心思想是先将 FaceIJ 转为 XYZ,再使用 XYZ 反算得到正常的 FaceIJ,进而得到正常的 id。中间 IJ -> UV 中坐标实际经过了 3 步,对于 leaf cell,IJ -> SiTi 的公式为 \(Si=2×I+1\),而对于 ST -> UV,这里没有采用二次变换,就是线性变换 \(u=2*s-1\),官方注释上说明用哪个变换效果都一样,所以采用最简单的就行。

边邻域

  边邻域代码很简单,也很好理解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void S2CellId::GetEdgeNeighbors(S2CellId neighbors[4]) const {
int i, j;
int level = this->level();
// 计算当前 level 一行或一列对应多少个 30 级的 cell(leaf cell) 2^(30-level)
int size = GetSizeIJ(level);
int face = ToFaceIJOrientation(&i, &j, nullptr);

// Edges 0, 1, 2, 3 are in the down, right, up, left directions.
neighbors[0] = FromFaceIJSame(face, i, j - size, j - size >= 0)
.parent(level);
neighbors[1] = FromFaceIJSame(face, i + size, j, i + size < kMaxSize)
.parent(level);
neighbors[2] = FromFaceIJSame(face, i, j + size, j + size < kMaxSize)
.parent(level);
neighbors[3] = FromFaceIJSame(face, i - size, j, i - size >= 0)
.parent(level);
}

  分别计算当前 IJ 坐标下右上左坐标对应 id,FromFaceIJSame 表示若邻域在相同面,则走 FromFaceIJ,否则走 FromFaceIJWrap,由于这两个函数得到都是 leaf cell,要上升到指定 level,需要用到 parent 方法,即将希尔伯特曲线位置去掉右 \(2*(30-level)\) 位,再组合成新的 id,位运算也很有意思:

1
2
3
4
5
6
7
8
9
static uint64 lsb_for_level(int level) {
return uint64{1} << (2 * (kMaxLevel - level));
}

inline S2CellId S2CellId::parent(int level) const {
uint64 new_lsb = lsb_for_level(level);
// 取反加一实际是取负数
return S2CellId((id_ & (~new_lsb + 1)) | new_lsb);
}

点邻域

  S2 的点邻域并不是指常规意义上 4 个顶点相邻左上右上右下左下的 id,而是一种比较特殊的相邻关系,以直角坐标系 (0,0),(0,1),(1,1),(1,0) 为例,(0,0) 的点邻域为 (0,0),(0,-1),(-1,-1),(-1,0),(0,1) 的点邻域为 (0,1),(0,2),(-1,2),(-1,1),(1,1) 的点邻域为 (1,1),(1,2),(2,2),(2,1),(1,0) 的点邻域为 (1,0),(1,-1),(2,-1),(2,0)。具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
void S2CellId::AppendVertexNeighbors(int level,
vector<S2CellId>* output) const {
// level < this->level()
S2_DCHECK_LT(level, this->level());
int i, j;
int face = ToFaceIJOrientation(&i, &j, nullptr);

// 判断 IJ 落在 level 对应 cell 的哪个方位?(左下左上右上右下,对应上文的(0,0),(0,1),(1,1),(1,0)坐标)
int halfsize = GetSizeIJ(level + 1);
int size = halfsize << 1;
bool isame, jsame;
int ioffset, joffset;
if (i & halfsize) {
ioffset = size;
isame = (i + size) < kMaxSize;
} else {
ioffset = -size;
isame = (i - size) >= 0;
}
if (j & halfsize) {
joffset = size;
jsame = (j + size) < kMaxSize;
} else {
joffset = -size;
jsame = (j - size) >= 0;
}

output->push_back(parent(level));
output->push_back(FromFaceIJSame(face, i + ioffset, j, isame).parent(level));
output->push_back(FromFaceIJSame(face, i, j + joffset, jsame).parent(level));
// 则邻域的 IJ 与当前 cell 都不在同一个面,则说明只有三个点邻域
if (isame || jsame) {
output->push_back(FromFaceIJSame(face, i + ioffset, j + joffset,
isame && jsame).parent(level));
}
}

  上面的代码算是比较清晰了,3 个点邻域的情况一般出现在当前 id 位于立方体 6 个面的角落,该方法的参数 level 必须比当前 id 的 level 要小

全邻域

  所谓全邻域,即为当前 id 对应 cell 周围一圈 cell 对应的 id,若周围一圈 cell 的 level 与 当前 id 的 level 一样,则所求即为正常的 9 邻域。具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
void S2CellId::AppendAllNeighbors(int nbr_level,
vector<S2CellId>* output) const {
// nbr_level >= level
S2_DCHECK_GE(nbr_level, level());
int i, j;
int face = ToFaceIJOrientation(&i, &j, nullptr);

// 先归一 IJ 坐标,将 IJ 坐标调整为当前 cell 左下角 leaf cell 的坐标
int size = GetSizeIJ();
i &= -size;
j &= -size;

int nbr_size = GetSizeIJ(nbr_level);
S2_DCHECK_LE(nbr_size, size);

for (int k = -nbr_size; ; k += nbr_size) {
bool same_face;
if (k < 0) {
same_face = (j + k >= 0);
} else if (k >= size) {
same_face = (j + k < kMaxSize);
} else {
same_face = true;
// 生成外包围圈下上两边的 id, 顺序为从左往右
output->push_back(FromFaceIJSame(face, i + k, j - nbr_size,
j - size >= 0).parent(nbr_level));
output->push_back(FromFaceIJSame(face, i + k, j + size,
j + size < kMaxSize).parent(nbr_level));
}
// 生成外包围圈左右两边以及四个边角的 id, 顺序为从下往上
output->push_back(FromFaceIJSame(face, i - nbr_size, j + k,
same_face && i - size >= 0)
.parent(nbr_level));
output->push_back(FromFaceIJSame(face, i + size, j + k,
same_face && i + size < kMaxSize)
.parent(nbr_level));
if (k >= size) break;
}
}

  知道这个函数的作用,再看代码就很明了了,这个方法的参数 nbr_level 必须大于或等于当前 id 的 level,因为一旦外包围圈的 cell 面积比当前 cell 还大,就无法得到正确的外包围圈。

覆盖算法

  S2 的覆盖,是指给定一块区域,能用多少 id 对应的 cell 完全覆盖该区域(GetCovering),当然也有尽量覆盖的算法(GetInteriorCovering),下面主要解析 GetCovering,因为 GetInteriorCovering 也差不多,就是覆盖策略略有不同。

GetCovering 的区域入参是 S2Region,比较典型的 S2Region 有以下几种:

  • S2Cell:S2 id 对应的网格,会保存左下右上两个 UV 坐标,也是覆盖算法使用的基本元素;
  • S2CellUnion:多个 S2Cell 集合体,GetCovering 的返回值;
  • S2LatLngRect:经纬度矩形区域;
  • S2Cap:球帽区域,类比于二维圆的圆弧,球帽的构造比较奇怪,球帽的中心 S2Point 是需要,但另一个变量不是球帽的圆弧角,而是半个圆弧角(S2 代码库对应的 S1Angle 弧度,90 度代表半球,180 度代表全球)所对应弦长的平方,最大值为 4,之所以采用弦长的平方作为默认构造,是因为这就是 3 维中距离,在进行距离比较的场景时会更方便,比如测试是否包含一个 S2Point,计算覆盖多边形时,就不用再比较角度,毕竟角度计算代价比较大;
  • S2Loop:多边形的基本组成元素,第一个点与最后一个点隐式连接,逆时针代表封闭,顺时针代表开孔取外围区域,不允许自相交;
  • S2Polygon:非常正常的复杂多边形,由多个 S2Loop 构成,S2Loop 之间不能相交;
  • S2Polyline:一条折线,同样不能自相交;
  • 还有些其它不常用的:S2R2Rect(S2Point 矩形区域),S2RegionIntersection(集合相交区域),S2RegionUnion(集合合并区域),……等。

  S2 覆盖算法的本质是一种启发式算法,先取满足当前条件最基本的元素,再依照条件进行迭代优化,所以该算法得到的只是一个近似最优解。GetCovering 需要依次满足以下条件:

  1. 生成的 S2Cell level 不能比指定的 minLevel 小;(必须满足)
  2. 生成的 S2Cell 的个数不能比指定的 maxCells 多;(可以满足,当满足 1 时,数目已经 maxCells 多,迭代停止)
  3. 生成的 S2Cell level 不能比指定的 maxLevel 大;(必须满足)

  以上 3 个条件对应 GetCovering 的其他三个参数,当然还有一个参数是 levelModel,表示从 minLevel 向下分到 maxLevel 时,是 1 分 4,还是 1 分 16,还是 1 分 64,对应一次升 1 阶曲线,还是一次升 2 阶,或是一次升 3 阶。下面就来具体看看 GetCovering 的算法流程(代码就不贴了,太多了):

  1. 首先获取候选种子 S2Cell。先构造一个临时覆盖器,设置 maxCells 为 4,minLevel 为 0,以快速得到初始覆盖结果,做法为:先得到覆盖输入区域的 S2Cap,再用 S2CellUnion 覆盖该 S2Cap,根据 S2Cap 圆弧度计算 S2Cell 的 level,若最终 level < 0,则说明 S2Cap 非常大,需要取 6 个面对应的 S2Cell,否则只需要取 S2Cap 中心点对应 S2Cell 的 level 级的点邻域 4 个 S2Cell 作为初始候选 S2Cell。
  2. 然后标准化候选种子。第一步,如果候选 S2Cell level 比 maxLevel 大或者候选 S2Cell 的 level 不符合 levelModel,则调整候选 S2Cell 的 level,用指定父级 S2Cell 来代替;第二步,归一化候选 S2Cell,先对 S2Cell 按 id 排序,去除被包含的 id,以及对 id 剪枝(若连续 4 个 S2Cell 共有同一个 parent,则用 parent 代替这 4 个 S2Cell);第三步,反归一化候选 S2Cell,若候选 S2Cell level 比 minLevel 小或不满足 levelModel,则需要将 S2Cell 分裂,用指定级别的孩子来取代该 S2Cell;第四步,检查是否满足全部条件,若满足,则标准化完成,若不满足,则看候选 S2Cell 的数目是否足够多,若足够多,则需要迭代进行 GetCovering,这样会极大降低算法性能,若不是很多,则迭代合并相同祖先的两个 S2Cell(当然祖先的 level 不能比 minLevel 小),最后再次检查所有候选 S2Cell 是否达到标准化要求,并调整 S2Cell level。
  3. 构造优先级队列。将符合条件(与入参区域相交)的候选 S2Cell 放进一个优先级队列中,优先级会依次根据三个参数进行判断,1、S2Cell 的大小(level 越大,S2Cell 越小),越大的优先级越高;2、入参区域与候选 S2Cell 孩子相交(这里的相交是指相交但不完全包含)的个数,越少优先级越高;3、入参区域完全包含候选 S2Cell 孩子和与无法再细分的孩子的个数,同样是越少优先级越高。在构造这个优先级队列的同时,会输出一些候选 S2Cell 作为覆盖算法的正式结果,这些 S2Cell 满足任意以下条件:1、被入参区域完全覆盖;2、与入参区域相交但不可再细分;3、入参区域包含或相交全部孩子。如此留在优先级队列中的,就都是些与入参区域边界相交的 S2Cell,这些就是真正的候选 S2Cell。
  4. 最后,处理优先级队列中的 S2Cell。处理方式也比较简单粗暴,继续细分并入队,满足上面3个出队条件的任意一个,即可出队作为正式结果,当然,若分到后面可能正式的 S2Cell 太多,甚至超过 maxCells,这时不再细分强行出队作为正式结果。最后,再对正式结果做一次标准化处理,即进行第 2 步,得到最终的覆盖结果。

  以上就是 S2 覆盖算法的大致流程,更加细节的东西,还是得看代码,文字有些不是很好描述,代码里面计算候选 S2Cell 的优先级就很有意思。


  当然 S2 中还有很多其他算法(凸包,相交,距离),这里就不做太多介绍了,Shaun 平常用的最多的就是覆盖算法,之前一直没有细看,就简单用用 api,同时为了对一块大的 S2Cell 做多线程处理,需要了解 S2Cell 一分四的方向,经过这次对 S2 的了解,发现之前的用法存在一些问题,可见调包侠同样需要对包有一定的了解才能调好包 ╮(╯▽╰)╭。

后记

  正如许多经典的算法一样,看完之后总有种我上我也行的感觉,但实际完全不行,S2 全程看下来有些地方确实比较晦涩,而且这一连串的想法也很精妙(单位球立方体投影,ST 空间面积优化,64 位 id 生成等),Shaun 或许能有部分想法,但这么多奇思妙想组合起来,就完全不行。

附录

HilbertCurve 绘制

  在网上随便找了三种实现方式,并用 threejs 简单绘制了一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
import * as THREE from "three";

export default class HilbertCurve {
order = 3; // 阶数
size = 1 << this.order; // 行列数
totalSize = this.size * this.size; // 总网格数,希尔伯特长度

// https://www.youtube.com/watch?v=dSK-MW-zuAc
getPath_V1() {
let path = [];
let origOrientation = [
[0, 0],
[0, 1],
[1, 1],
[1, 0],
]; // 倒 U 形

for (let i = 0; i < this.totalSize; i++) {
path.push(hilbertToXY(i, this.order));
}

return path;

function hilbertToXY(i: number, order: number) {
let index = i & 3;
let curCoord = origOrientation[index].slice();

for (let ord = 1; ord < order; ord++) {
i = i >>> 2;
index = i & 3;
let delta = 1 << ord;
if (index === 0) {
// 顺时针旋转 90°
let tmp = curCoord[0];
curCoord[0] = curCoord[1];
curCoord[1] = tmp;
} else if (index === 1) {
curCoord[1] += delta;
} else if (index === 2) {
curCoord[0] += delta;
curCoord[1] += delta;
} else if (index === 3) {
// 逆时针旋转 90°
let tmp = delta - 1 - curCoord[0];
curCoord[0] = delta - 1 - curCoord[1];
curCoord[1] = tmp;
curCoord[0] += delta;
}
}

return curCoord;
}
}

// Hacker's Delight
getPath_V2() {
let path: number[][] = [];
let x = -1,
y = 0;
let s = 0; // along the curve

step(0);
hilbert(0, 1, this.order);

return path;

function step(dir: number) {
switch (dir & 3) {
case 0:
x += 1;
break;
case 1:
y += 1;
break;
case 2:
x -= 1;
break;
case 3:
y -= 1;
break;
}

path[s] = [x, y];

s += 1;
}

function hilbert(dir: number, rot: number, order: number) {
if (order === 0) return;

dir += rot;
hilbert(dir, -rot, order - 1);
step(dir);

dir -= rot;
hilbert(dir, rot, order - 1);
step(dir);

hilbert(dir, rot, order - 1);

dir -= rot;
step(dir);
hilbert(dir, -rot, order - 1);
}
}

// https://en.wikipedia.org/wiki/Hilbert_curve
getPath_V3() {
let path: number[][] = [];

// for (let i = 0; i < this.totalSize; i++) {
// path.push(hilbertToXY(this.size, i));
// }

for (let y = 0; y < this.size; y++) {
for (let x = 0; x < this.size; x++) {
path[xyToHilbert(this.size, x, y)] = [x, y];
}
}

return path;

function rot(N: number, rx: number, ry: number, xy: number[]) {
if (ry === 0) {
if (rx === 1) {
xy[0] = N - 1 - xy[0];
xy[1] = N - 1 - xy[1];
}

let t = xy[0];
xy[0] = xy[1];
xy[1] = t;
}
}

function hilbertToXY(N: number, h: number) {
let t = h;
let xy = [0, 0];
for (let s = 1; s < N; s *= 2) {
let rx = 1 & (t / 2);
let ry = 1 & (t ^ rx);
rot(s, rx, ry, xy);
xy[0] += s * rx;
xy[1] += s * ry;
t /= 4;
}

return xy;
}

function xyToHilbert(N: number, x: number, y: number) {
let h = 0;
let xy = [x, y];
for (let s = N / 2; s > 0; s /= 2) {
let rx = (xy[0] & s) > 0 ? 1 : 0;
let ry = (xy[1] & s) > 0 ? 1 : 0;
h += s * s * ((3 * rx) ^ ry);
rot(N, rx, ry, xy);
}

return h;
}
}

draw() {
let lineGeometry = new THREE.Geometry();
this.getPath_V3().forEach((vertice) => {
let vecot = new THREE.Vector3().fromArray(vertice);
vecot.setZ(0);
lineGeometry.vertices.push(vecot);
});
let lineMaterial = new THREE.LineBasicMaterial({ color: 0x00ffff, linewidth: 1 });
let line = new THREE.Line(lineGeometry, lineMaterial);

return line;
}
}

K8S 应用开发指北

前言

  在周志明的『凤凰架构』中需要思考这样一个问题,如何用不可靠的部件来构造一个可靠的系统?对于程序员来说,写的代码从某种程度上来说都是不可靠的,但这些代码组成的一些系统却可以是可靠的。程序员对于错误的处理可以分为两派,一派是必须对错误进行处理,以保证系统的稳定行;另一派不对错误进行处理,任由程序 crash,只要有兜底方案,后面再不断完善。这两派并无孰优孰劣,只是两种不同的思维方式,甚至在同一个程序中,有些错误会处理,有些错误不会处理,这都是可能的。K8S 作为事实上的云原生操作系统,其目的就是为了将程序员写的各个程序组装成一个稳定的系统,并减少运维成本。

前言

  在周志明的『凤凰架构』中需要思考这样一个问题,如何用不可靠的部件来构造一个可靠的系统?对于程序员来说,写的代码从某种程度上来说都是不可靠的,但这些代码组成的一些系统却可以是可靠的。程序员对于错误的处理可以分为两派,一派是必须对错误进行处理,以保证系统的稳定行;另一派不对错误进行处理,任由程序 crash,只要有兜底方案,后面再不断完善。这两派并无孰优孰劣,只是两种不同的思维方式,甚至在同一个程序中,有些错误会处理,有些错误不会处理,这都是可能的。K8S 作为事实上的云原生操作系统,其目的就是为了将程序员写的各个程序组装成一个稳定的系统,并减少运维成本。

基础篇

  K8S 调度的基本单元是 Pod,Pod 也是 K8S 自带的一个资源对象,其可以简单理解为是一个容器集合体,程序员可控的容器有两类(Pause 容器除外),一类是 InitContainer,另一类是普通业务容器,InitContainer 按数组顺序创建,顺序执行,若一个失败,则整个 Pod 创建失败,普通业务容器同样按数组顺序创建,但异步执行,所以执行顺序不可控(可以通过 postStart Hook 简单控制一下)。由于 InitContainer 先于 Pod 其他容器执行,所以一般用来做普通业务容器执行前置条件的一些事情,比如:下载文件,初始化配置,状态消息通知等。

  同一 Pod 中存储卷和网络可以共享。存储卷共享是指 Pod 内各容器可以挂载相同存储卷,从而数据共享。K8S 目前支持的存储卷共有三种:第一种是 emptyDir,这种存储是临时的,只能在 Pod 内使用,当 Pod 被销毁时,该存储的内容也会消失,只能在同一 Pod 内共享数据;第二种是 hostPath,这种存储会直接和集群中物理机存储相关联,是一种跨 Pod 持久化存储,但仅限该物理机,当 pod 被调度到其他物理机时就无法实现跨 Pod 共享数据;最后一种是外部存储(NFS,Ceph,GlusterFS,AWS EBS 等),这种方式可以真正实现数据持久化并共享,而且可以支持存储与计算分离,对系统会更友好一些,当然运维的成本也会更大。当然除了 K8S 自身提供的存储卷挂载可以实现数据共享,从程序的角度上,使用传统的方式一样也能数据共享,如数据库,DFS,OSS 等。

  而网络共享是指 Pod 内各容器直接可以使用 localhost 以及容器暴露的端口进行相互通信,K8S 的端口有三种,分别为:容器端口(containerPort,容器中对外暴露的端口),集群内端口(port,集群内 pod 相互通信的端口),集群外端口(nodePort,集群外请求集群内的端口),其中容器端口和集群内是正常的动态端口,取值范围为 [1024, 65535],集群外端口只能设置为 [30000, 32767],若集群中服务不与集群外通信,则只需要设置集群内端口就行。K8S 中 IP 也同样有三种,分别为:Pod IP(两不同 Pod 资源对象相互通信的地址,集群外不可访问),Cluster IP(Service 资源对象的通信地址,集群外不可访问),Node IP(K8S 物理节点的 IP 地址,是真实的物理网络,集群外配合 nodePort 即可访问)。集群内端口和集群外端口由 K8S 的 Service 资源提供设置。在创建 Service 时需要注意,一个 Pod 资源对应一个 Service 资源,不要想着一个 Service 管理两个 Pod 暴露的端口,这样做会使 Service 提供服务的能力异常,经常会接口超时

  K8S 编程可以简单称之为面向 config 编程,一切需要动态变化的程序初始化变量,都应该以 config 的形式提供,然后交给运维就行,这样可以避免程序员频繁的修改程序,减少运维负担,K8S 的 config 有三种形式,第一种是程序启动参数,通过创建容器时的 args 参数配置;第二种是系统环境变量,通过创建容器时的 env 参数配置;最后一种是 K8S 提供的 ConfigMap 资源,该资源可以从文件,目录或 key-value 字符串创建,创建后的 ConfinMap 被全集群同命名空间所共享,可以通过 volumes 参数挂载到 pod 中,进而 mount 进容器中,被程序读取。前两种 config 方式对于配置变量少的可以使用,当配置变量很多或配置参数很长时,还是使用 ConfigMap 比较合适。

调度篇

  调度,广义上的调度可指一切管理安排,CPU 的指令执行就涉及到三级缓存的调度,程序运行时的 GC 可认为是运行时对内存资源的调度,操作系统的进程轮转可认为是系统对进程的调度,而 K8S 中的调度可简单理解为是对操作系统的调度。

  K8S 的调度可简单分为两个层面上的调度,最底层的调度自然是 K8S 自身的调度策略,根据不同的资源用度和调度策略将 Pod 分配到不同的物理节点之上执行,根据指定的重启或恢复策略启动相应的 Pod,这个层面上的调度,K8S 有一套默认的调度器,对于特殊的调度需求,K8S 也支持自定义调度器,使用外部调度器代替默认调度器,这个层面的调度器 Shaun 没做太多研究,所以在这篇里对这层面的调度器不做过多描述。Shaun 接触过的是更上层的调度器,业务层面的调度服务,业务调度服务一般与业务紧密相关,但最核心的一点就是能够从业务入手,负责 Pod 的创建和销毁,并能掌握其运行状态,就算是完成了一个基础的业务调度服务器。

  在设计业务调度服务时,有一种通用的模式,可以称之为 master-worker 模式,与同名的并发模式细节上有所不同,这里的 master 是指调度服务本体,只负责对外服务,资源监控,以及任务分发,任务状态感知等,不负责做具体的任务,一般也不关心任务的输入输出。在部署 master 时,一般会创建一个 Service 资源对象,毕竟其主要功能就是对外服务,master 一般由运维进行部署创建销毁。而 worker 是指真正做任务的 Pod,该 Pod 中可能会有多个容器,主容器负责真正执行任务,其他一些容器可能会负责保障任务的前置条件(输入,配置等),以及向 master 汇报任务执行状态信息(执行任务的主容器可能并不知道 master 的存在)等。worker 对应的 Pod 一般由 master 进行创建销毁,worker 的一些配置信息则可能会由运维管理。

  由于 K8S 并没有在整个集群物理资源之上抽象出一层集群资源,所以 K8S 分配的节点实际还是在物理机上,若所有物理机剩余资源(是单个剩余资源,而不是所有剩余资源之和)都不满足 Pod 所需资源,则该 Pod 无法调度,类比内存碎片化,可以称之为资源碎片化。所以在创建 Pod 时,所需资源最好不要太多,以免调度失败。

实践篇

  Shaun 目前在 K8S 上开发的主要就是重计算(单机计算时间以小时计)调度服务。这类调度服务其实也分两种,一种是并发调度,一种是流水线(pipeline)式的串行调度,当然也可以将这两种混合起来,串行中有并行。在设计这类调度服务时,需要考虑集群上的资源(内存,CPU)是否足够,若不足,则可以考虑加入一个简单的等待机制,将任务放进一个队列中,当然加入这样一个等待机制,又会增加系统复杂性,需要考虑队列容量,队列优先级等。所以可执行的最小任务消耗的资源越少约好,否则集群中可能完全无法执行相关任务。

  由于 Shaun 是独立开发,能完全控制 master 和 worker 的编写,所以 worker 设计的比较简单,一个主容器即完成了前置数据处理,主任务执行,执行状态汇报等全部事情,这是从时间和性能上以及系统复杂度上等多方面权衡的结果,当然在时间足够人手够的情况,是应该把现有的 worker 进一步分离的,而 master 就是比较通用的设计,资源监控,任务队列,任务 Pod 创建与销毁,任务状态信息保存,服务接口等,其中常规的服务接口应该有添加任务,开始任务,停止任务,恢复任务,删除任务,任务状态查询,任务日志查询,任务状态汇报等接口,如果任务是并行且无依赖的,还应该支持开始指定子任务等接口。

  在工作中,Shaun 也接触到一个 pipeline 式的任务调度服务,pipeline 式的工作流有个特点就是下一个子任务的输入必定依赖上一个子任务的输出,在这个任务调度服务中,其子任务的输入输出都是文件态,并且 master 不关心子任务的输入输出,子任务的执行程序也不知道 master 的存在,尽量低耦合。在云上,文件态的存储载体比较好的自然是 OSS,但原本的子任务执行程序只支持本地读取文件,而且在原来的程序中引入 OSS 的读写逻辑并不十分合适,所以在 K8S 中引入了 NFS,由 master 负责将 NFS 挂载到各子任务的 Pod 中,并在挂载到主容器时使用 SubPath 完成 pipeline 之间的资源隔离,使用 emptyDir 完成各子任务之间的资源隔离,每条 pipeline 开始的子任务是从 OSS 中拉取文件到 NFS 中对应的 SubPath 目录中,结束的子任务是将 NFS 中对应的 SubPath 目录中约定好的生成物上传到 OSS 中,并清空该 SubPath 目录,从而使原来的程序在 IO 这块完全不用改动。在监听任务运行状态方面,有两种方案:一种是利用 K8S 的 InitContainer,另一种是借助 K8S 的 shareProcessNamespace。InitContainer 的方案比较简单,InitContainer 第一个容器只做汇报子任务开始这一件事, 第二个容器则是真正执行子任务的容器,而业务容器只做汇报子任务结束这一件事,该方案利用 InitContainer 顺序且先于业务容器执行这两特点,并且若执行子任务的容器失败,则 Pod 也会创建失败,查询 Pod 状态即可知道子任务是否正常运行。而 shareProcessNamespace 的方案稍微复杂一些,同样使用一个 InitContainer 做汇报子任务开始这件事,而业务容器中放两个容器:一个主容器和一个 sidecar 容器(希望 K8S 原生支持的 SideCar 早日做好 ╯△╰),sidecar 容器中以轮询的方式监听主容器的运行状态(查询是否存在主进程)以及是否正常退出(获取容器退出码),并向 master 推送状态信息,该方案借助进程空间共享,使 sidecar 容器能直接查询主容器中的进程,从而达到监听主容器运行状态的目的,该方案的执行还需要一个小 trick,就是要让主容器先执行,由两种方案:一种是借助 postStart Hook,另一种是直接让 sidecar 容器先休眠个 10s 钟。关于 sidecar 容器的另外一种应用方案可参考 Nginx容器配置文件如何热更新?

  虽然分布式任务调度框架有很多,eg:AirflowLuigi 以及 DolphinScheduler 等,但目前与 K8S 联系最紧密的应该就是 Argo 了,其利用 K8S 的自定义资源对 K8S 已有功能进行扩展,仅使用 YAML 即可完成整个 pipeline 的任务调度和部署,虽然在并发任务调度时有一定的缺陷,但仅使用 YAML 表示其对 K8S 运维的足够友好性,对于常规 pipeline 式任务,Argo 已足以应付,除特殊需求外,程序员可少写很多代码。

附录

  对于 Spring 编写的程序,在 K8S 中运行,在导出日志时可参考 k8s:获取pod的ip,通过 valueFrom 使用 Pod 的 metadata 作为环境变量,以区分日志的来源,不过挂载存储时最好还是用外部存储,用 hostPath 的话就需要保证每个物理节点都有相同的日志存储目录。

后记

  K8S 作为云原生时代的操作系统,不要求人人都完全掌握,但至少需要了解,知道什么该开发干,什么该运维干,这样才能充分发挥各个角色(包括 K8S)的价值。