defswrr_n(rs_arr, weight_total, schedule_num): ms = [(rs["ew"] / float(weight_total)) * (schedule_num-1) for rs in rs_arr] mzs = [int(m) for m in ms] mxs = [(i, m-int(m)) for i, m inenumerate(ms)] mxs = sorted(mxs, key=lambda x:x[1], reverse=True) for i, rs inenumerate(rs_arr): rs["cw"] = schedule_num * rs["ew"] rs["cw"] -= mzs[i] * weight_total
d = (schedule_num-1) - sum(mzs) for i inrange(d): rs_arr[mxs[i][0]]["cw"] -= weight_total
#!/bin/bash # Author: Matt Mastracci (matthew@mastracci.com) # AppleScript from http://stackoverflow.com/questions/4309087/cancel-button-on-osascript-in-a-bash-script # licensed under cc-wiki with attribution required # Remainder of script public domain
osascript -e 'tell application "iTerm2" to version' > /dev/null 2>&1 && NAME=iTerm2 || NAME=iTerm if [[ $NAME = "iTerm" ]]; then FILE=`osascript -e 'tell application "iTerm" to activate' -e 'tell application "iTerm" to set thefile to choose folder with prompt "Choose a folder to place received files in"' -e "do shell script (\"echo \"&(quoted form of POSIX path of thefile as Unicode text)&\"\")"` else FILE=`osascript -e 'tell application "iTerm2" to activate' -e 'tell application "iTerm2" to set thefile to choose folder with prompt "Choose a folder to place received files in"' -e "do shell script (\"echo \"&(quoted form of POSIX path of thefile as Unicode text)&\"\")"` fi
if [[ $FILE = "" ]]; then echo Cancelled. # Send ZModem cancel echo -e \\x18\\x18\\x18\\x18\\x18 sleep 1 echo echo \# Cancelled transfer else cd"$FILE" /usr/local/bin/rz -E -e -b sleep 1 echo echo echo \# Sent \-\> $FILE fi
#!/bin/bash # Author: Matt Mastracci (matthew@mastracci.com) # AppleScript from http://stackoverflow.com/questions/4309087/cancel-button-on-osascript-in-a-bash-script # licensed under cc-wiki with attribution required # Remainder of script public domain
osascript -e 'tell application "iTerm2" to version' > /dev/null 2>&1 && NAME=iTerm2 || NAME=iTerm if [[ $NAME = "iTerm" ]]; then FILE=`osascript -e 'tell application "iTerm" to activate' -e 'tell application "iTerm" to set thefile to choose file with prompt "Choose a file to send"' -e "do shell script (\"echo \"&(quoted form of POSIX path of thefile as Unicode text)&\"\")"` else FILE=`osascript -e 'tell application "iTerm2" to activate' -e 'tell application "iTerm2" to set thefile to choose file with prompt "Choose a file to send"' -e "do shell script (\"echo \"&(quoted form of POSIX path of thefile as Unicode text)&\"\")"` fi if [[ $FILE = "" ]]; then echo Cancelled. # Send ZModem cancel echo -e \\x18\\x18\\x18\\x18\\x18 sleep 1 echo echo \# Cancelled transfer else /usr/local/bin/sz "$FILE" -e -b sleep 1 echo echo \# Received $FILE fi
多线程的执行方式有两种:并发(Concurrent)和并行(Parallel),简单来说,并发就是两个线程轮流在一个 CPU 核上执行,而并行则是两个线程分别在两个 CPU 核上运行。一般而言,程序员无法直接控制线程是并发执行还是并行执行,线程的执行一般由操作系统直接控制,当然程序运行时也可以做简单调度。所以对于一般程序员来说,只需要熟练使用相关语言的多线程编程库即可,至于是并发执行还是并行执行,可能并不是那么重要,只要能达到预期效果就行。
前言
多线程的执行方式有两种:并发(Concurrent)和并行(Parallel),简单来说,并发就是两个线程轮流在一个 CPU 核上执行,而并行则是两个线程分别在两个 CPU 核上运行。一般而言,程序员无法直接控制线程是并发执行还是并行执行,线程的执行一般由操作系统直接控制,当然程序运行时也可以做简单调度。所以对于一般程序员来说,只需要熟练使用相关语言的多线程编程库即可,至于是并发执行还是并行执行,可能并不是那么重要,只要能达到预期效果就行。
Scala 提供了一个默认的 ExecutionContext:scala.concurrent.ExecutionContext.Implicits.global,其本质也是一个 ForkJoinPool,并行度默认设置为当前可用 CPU 数,当然也会根据需要(比如当前全部线程被阻塞)额外创建更多线程。一般做计算密集型任务就用默认线程池即可,特殊情况也可以自己创建 ExecutionContext.fromExecutor(Executors.newFixedThreadPool(8)),下面的代码就可以创建一个同步阻塞的 ExecutionContext:
1 2 3 4 5
val currentThreadExecutionContext = ExecutionContext.fromExecutor( newExecutor { // Do not do this! defexecute(runnable: Runnable) { runnable.run() } })
////import scala.concurrent.ExecutionContext.Implicits.global //val pool = Executors.newFixedThreadPool(Runtime.getRuntime.availableProcessors()) val pool = Executors.newWorkStealingPool() implicitval ec = ExecutionContext.fromExecutorService(pool)
val futures = Array.range(0, 10000).map(i => Future { println(i) Thread.sleep(100) i })
关键字为 par,调用该方法即可轻松进行并发计算,不过需要注意的是并发操作的副作用(side-effects)和“乱序”(out of order)语义,副作用就是去写函数外的变量,不仅仅只读写并发操作函数内部声明的变量,乱序语义是指并发操作不会严格按照数组顺序执行,所以如果并发操作会同时操作两个数组元素(eg:reduce),则需要慎重使用,有的操作结果不变,而有的操作会导致结果不唯一。
val poolProducer = Executors.newWorkStealingPool() implicitval ecProducer = ExecutionContext.fromExecutorService(poolProducer) val poolConsumer = Executors.newSingleThreadExecutor() val ecConsumer = ExecutionContext.fromExecutorService(poolConsumer)
val futures = Array.range(0, 1000).map(i => Future { val x = produce(i) // produce something... x }(ecProducer).andThen { caseSuccess(x) => consume(x) // consume something... }(ecConsumer))
val futureSequence = Future.sequence(futures) futureSequence.onComplete({ caseSuccess(results) => { println("Success.")
Google S2 Geometry(以下简称 S2) 是 Google 发明的基于单位球的一种地图投影和空间索引算法,该算法可快速进行覆盖以及邻域计算。更多详见 S2Geometry,Google’s S2, geometry on the sphere, cells and Hilbert curve,halfrost 的空间索引系列文章。虽然使用 S2 已有一年的时间,但确实没有比较系统的看过其源码,这次借着这段空闲时间,将 Shaun 常用的功能系统的看看其具体实现,下文将结合 S2 的 C++,Java,Go 的版本一起看,由于 Java 和 Go 的都算是 C++ 的衍生版,所以以 C++ 为主,捎带写写这三种语言实现上的一些区别,Java 版本时隔 10 年更新了 2.0 版本,喜大普奔。
前言
Google S2 Geometry(以下简称 S2) 是 Google 发明的基于单位球的一种地图投影和空间索引算法,该算法可快速进行覆盖以及邻域计算。更多详见 S2Geometry,Google’s S2, geometry on the sphere, cells and Hilbert curve,halfrost 的空间索引系列文章。虽然使用 S2 已有一年的时间,但确实没有比较系统的看过其源码,这次借着这段空闲时间,将 Shaun 常用的功能系统的看看其具体实现,下文将结合 S2 的 C++,Java,Go 的版本一起看,由于 Java 和 Go 的都算是 C++ 的衍生版,所以以 C++ 为主,捎带写写这三种语言实现上的一些区别,Java 版本时隔 10 年更新了 2.0 版本,喜大普奔。
// validFaceXYZToUV given a valid face for the given point r (meaning that // dot product of r with the face normal is positive), returns // the corresponding u and v values, which may lie outside the range [-1,1]. funcvalidFaceXYZToUV(face int, r r3.Vector)(float64, float64) { switch face { case0: return r.Y / r.X, r.Z / r.X case1: return -r.X / r.Y, r.Z / r.Y case2: return -r.X / r.Z, -r.Y / r.Z case3: return r.Z / r.X, r.Y / r.X case4: return r.Z / r.Y, -r.X / r.Y } return -r.Y / r.Z, -r.X / r.Z }
之所以引入 ST 坐标是因为同样的球面面积映射到 UV 坐标面积大小不一,大小差距比较大(离坐标轴越近越小,越远越大),所以再做一次 ST 变换,将面积大的变小,小的变大,使面积更均匀,利于后面在立方体面上取均匀格网(cell)时,每个 cell 对应球面面积差距不大。S2 的 ST 变换有三种:1、线性变换,基本没做任何变形,只是简单将 ST 坐标的值域变换为 [0, 1],cell 对应面积最大与最小比大约为 5.2;2、二次变换,一种非线性变换,能起到使 ST 空间面积更均匀的作用,cell 对应面积最大与最小比大约为 2.1;3、正切变换,同样能使 ST 空间面积更均匀,且 cell 对应面积最大与最小比大约为 1.4,不过其计算速度相较于二次变换要慢 3 倍,所以 S2 权衡考虑,最终采用了二次变换作为默认的 UV 到 ST 之间的变换。二次变换公式为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
publicdoublestToUV(double s){ if (s >= 0.5) { return (1 / 3.) * (4 * s * s - 1); } else { return (1 / 3.) * (1 - 4 * (1 - s) * (1 - s)); } }
这个 id 其实是个一维坐标,而是利用希尔伯特空间填充曲线将 IJ 坐标从二维变换为一维,该 id 用一个 64 位整型表示,高 3 位用来表示 face(0~5),后面 61 位来保存不同的 level(0~30) 对应的希尔伯特曲线位置,每增加一个 level 增加两位,后面紧跟一个 1,最后的位数都补 0。注:Java 版本的 id 是有符号 64 位整型,而 C++ 和 Go 的是无符号 64 位整型,所以在跨语言传递 id 的时候,在南极洲所属的最后一个面(即 face = 5)需要小心处理。
HilbertCurve
上面两张图很明了的展示了希尔伯特曲线的构造过程,该曲线的构造基本元素由 ABCD 4 种“U”形构成,而 BCD 又可由 A 依次逆时针旋转 90 度得到,所以也可以认为只有一种“U”形,每个 U 占 4 个格子,以特定方式进行 1 分 4 得到下一阶曲线形状。
这个是 S2 内部计算使用的坐标,一般用来计算 cell 的中心坐标,以及根据当前 s 和 t 坐标的精度(小数点后几位)判断对应的级别(level)。由于 S2 本身并不显式存储 ST 坐标(有存 UV 坐标),所以 ST 坐标只能计算出来,每个 cell 的中心点同样如此。计算公式为 \(Si=s*2^{31};Ti=t*2^{31}\)。至于为啥是 \(2^{31}\),是因为该坐标是用来描述从 0~ 31 阶希尔伯特曲线网格的中心坐标,0 阶中心以 \(1/2^1\) 递增,1 阶中心以 \(1/2^2\) 递增,2 阶中心以 \(1/2^3\) 递增,……,30 阶中心以 \(1/2^{31}\) 递增。S2 计算 id 对应的格子中心坐标,首先就会计算 SiTi 坐标,再将 SiTi 转成 ST 坐标。
算法篇
邻域算法
S2 计算邻域,最关键的是计算不同面相邻的 leaf cell id,即:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
S2CellId S2CellId::FromFaceIJWrap(int face, int i, int j){ // 限制 IJ 最大最小取值为 -1~2^30, 刚好能超出 IJ 正常表示范围 0~2^30-1 i = max(-1, min(kMaxSize, i)); j = max(-1, min(kMaxSize, j));
face = S2::XYZtoFaceUV(S2::FaceUVtoXYZ(face, u, v), &u, &v); returnFromFaceIJ(face, S2::STtoIJ(0.5*(u+1)), S2::STtoIJ(0.5*(v+1))); }
这个算法主要用来计算超出范围(0~2^30-1)的 IJ 对应的 id,核心思想是先将 FaceIJ 转为 XYZ,再使用 XYZ 反算得到正常的 FaceIJ,进而得到正常的 id。中间 IJ -> UV 中坐标实际经过了 3 步,对于 leaf cell,IJ -> SiTi 的公式为 \(Si=2×I+1\),而对于 ST -> UV,这里没有采用二次变换,就是线性变换 \(u=2*s-1\),官方注释上说明用哪个变换效果都一样,所以采用最简单的就行。
边邻域
边邻域代码很简单,也很好理解:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
voidS2CellId::GetEdgeNeighbors(S2CellId neighbors[4])const{ int i, j; int level = this->level(); // 计算当前 level 一行或一列对应多少个 30 级的 cell(leaf cell) 2^(30-level) int size = GetSizeIJ(level); int face = ToFaceIJOrientation(&i, &j, nullptr);
// Edges 0, 1, 2, 3 are in the down, right, up, left directions. neighbors[0] = FromFaceIJSame(face, i, j - size, j - size >= 0) .parent(level); neighbors[1] = FromFaceIJSame(face, i + size, j, i + size < kMaxSize) .parent(level); neighbors[2] = FromFaceIJSame(face, i, j + size, j + size < kMaxSize) .parent(level); neighbors[3] = FromFaceIJSame(face, i - size, j, i - size >= 0) .parent(level); }
voidS2CellId::AppendVertexNeighbors(int level, vector<S2CellId>* output)const{ // level < this->level() S2_DCHECK_LT(level, this->level()); int i, j; int face = ToFaceIJOrientation(&i, &j, nullptr);
// 判断 IJ 落在 level 对应 cell 的哪个方位?(左下左上右上右下,对应上文的(0,0),(0,1),(1,1),(1,0)坐标) int halfsize = GetSizeIJ(level + 1); int size = halfsize << 1; bool isame, jsame; int ioffset, joffset; if (i & halfsize) { ioffset = size; isame = (i + size) < kMaxSize; } else { ioffset = -size; isame = (i - size) >= 0; } if (j & halfsize) { joffset = size; jsame = (j + size) < kMaxSize; } else { joffset = -size; jsame = (j - size) >= 0; }
output->push_back(parent(level)); output->push_back(FromFaceIJSame(face, i + ioffset, j, isame).parent(level)); output->push_back(FromFaceIJSame(face, i, j + joffset, jsame).parent(level)); // 则邻域的 IJ 与当前 cell 都不在同一个面,则说明只有三个点邻域 if (isame || jsame) { output->push_back(FromFaceIJSame(face, i + ioffset, j + joffset, isame && jsame).parent(level)); } }
上面的代码算是比较清晰了,3 个点邻域的情况一般出现在当前 id 位于立方体 6 个面的角落,该方法的参数 level 必须比当前 id 的 level 要小。
全邻域
所谓全邻域,即为当前 id 对应 cell 周围一圈 cell 对应的 id,若周围一圈 cell 的 level 与 当前 id 的 level 一样,则所求即为正常的 9 邻域。具体代码如下:
voidS2CellId::AppendAllNeighbors(int nbr_level, vector<S2CellId>* output)const{ // nbr_level >= level S2_DCHECK_GE(nbr_level, level()); int i, j; int face = ToFaceIJOrientation(&i, &j, nullptr);
// 先归一 IJ 坐标,将 IJ 坐标调整为当前 cell 左下角 leaf cell 的坐标 int size = GetSizeIJ(); i &= -size; j &= -size;
int nbr_size = GetSizeIJ(nbr_level); S2_DCHECK_LE(nbr_size, size);
for (int k = -nbr_size; ; k += nbr_size) { bool same_face; if (k < 0) { same_face = (j + k >= 0); } elseif (k >= size) { same_face = (j + k < kMaxSize); } else { same_face = true; // 生成外包围圈下上两边的 id, 顺序为从左往右 output->push_back(FromFaceIJSame(face, i + k, j - nbr_size, j - size >= 0).parent(nbr_level)); output->push_back(FromFaceIJSame(face, i + k, j + size, j + size < kMaxSize).parent(nbr_level)); } // 生成外包围圈左右两边以及四个边角的 id, 顺序为从下往上 output->push_back(FromFaceIJSame(face, i - nbr_size, j + k, same_face && i - size >= 0) .parent(nbr_level)); output->push_back(FromFaceIJSame(face, i + size, j + k, same_face && i + size < kMaxSize) .parent(nbr_level)); if (k >= size) break; } }
知道这个函数的作用,再看代码就很明了了,这个方法的参数 nbr_level 必须大于或等于当前 id 的 level,因为一旦外包围圈的 cell 面积比当前 cell 还大,就无法得到正确的外包围圈。
覆盖算法
S2 的覆盖,是指给定一块区域,能用多少 id 对应的 cell 完全覆盖该区域(GetCovering),当然也有尽量覆盖的算法(GetInteriorCovering),下面主要解析 GetCovering,因为 GetInteriorCovering 也差不多,就是覆盖策略略有不同。
dir += rot; hilbert(dir, -rot, order - 1); step(dir);
dir -= rot; hilbert(dir, rot, order - 1); step(dir);
hilbert(dir, rot, order - 1);
dir -= rot; step(dir); hilbert(dir, -rot, order - 1); } }
// https://en.wikipedia.org/wiki/Hilbert_curve getPath_V3() { let path: number[][] = [];
// for (let i = 0; i < this.totalSize; i++) { // path.push(hilbertToXY(this.size, i)); // }
for (let y = 0; y < this.size; y++) { for (let x = 0; x < this.size; x++) { path[xyToHilbert(this.size, x, y)] = [x, y]; } }
return path;
functionrot(N: number, rx: number, ry: number, xy: number[]) { if (ry === 0) { if (rx === 1) { xy[0] = N - 1 - xy[0]; xy[1] = N - 1 - xy[1]; }
let t = xy[0]; xy[0] = xy[1]; xy[1] = t; } }
functionhilbertToXY(N: number, h: number) { let t = h; let xy = [0, 0]; for (let s = 1; s < N; s *= 2) { let rx = 1 & (t / 2); let ry = 1 & (t ^ rx); rot(s, rx, ry, xy); xy[0] += s * rx; xy[1] += s * ry; t /= 4; }
return xy; }
functionxyToHilbert(N: number, x: number, y: number) { let h = 0; let xy = [x, y]; for (let s = N / 2; s > 0; s /= 2) { let rx = (xy[0] & s) > 0 ? 1 : 0; let ry = (xy[1] & s) > 0 ? 1 : 0; h += s * s * ((3 * rx) ^ ry); rot(N, rx, ry, xy); }
return h; } }
draw() { let lineGeometry = new THREE.Geometry(); this.getPath_V3().forEach((vertice) => { let vecot = new THREE.Vector3().fromArray(vertice); vecot.setZ(0); lineGeometry.vertices.push(vecot); }); let lineMaterial = new THREE.LineBasicMaterial({ color: 0x00ffff, linewidth: 1 }); let line = new THREE.Line(lineGeometry, lineMaterial);
K8S 调度的基本单元是 Pod,Pod 也是 K8S 自带的一个资源对象,其可以简单理解为是一个容器集合体,程序员可控的容器有两类(Pause 容器除外),一类是 InitContainer,另一类是普通业务容器,InitContainer 按数组顺序创建,顺序执行,若一个失败,则整个 Pod 创建失败,普通业务容器同样按数组顺序创建,但异步执行,所以执行顺序不可控(可以通过 postStart Hook 简单控制一下)。由于 InitContainer 先于 Pod 其他容器执行,所以一般用来做普通业务容器执行前置条件的一些事情,比如:下载文件,初始化配置,状态消息通知等。
同一 Pod 中存储卷和网络可以共享。存储卷共享是指 Pod 内各容器可以挂载相同存储卷,从而数据共享。K8S 目前支持的存储卷共有三种:第一种是 emptyDir,这种存储是临时的,只能在 Pod 内使用,当 Pod 被销毁时,该存储的内容也会消失,只能在同一 Pod 内共享数据;第二种是 hostPath,这种存储会直接和集群中物理机存储相关联,是一种跨 Pod 持久化存储,但仅限该物理机,当 pod 被调度到其他物理机时就无法实现跨 Pod 共享数据;最后一种是外部存储(NFS,Ceph,GlusterFS,AWS EBS 等),这种方式可以真正实现数据持久化并共享,而且可以支持存储与计算分离,对系统会更友好一些,当然运维的成本也会更大。当然除了 K8S 自身提供的存储卷挂载可以实现数据共享,从程序的角度上,使用传统的方式一样也能数据共享,如数据库,DFS,OSS 等。
而网络共享是指 Pod 内各容器直接可以使用 localhost 以及容器暴露的端口进行相互通信,K8S 的端口有三种,分别为:容器端口(containerPort,容器中对外暴露的端口),集群内端口(port,集群内 pod 相互通信的端口),集群外端口(nodePort,集群外请求集群内的端口),其中容器端口和集群内是正常的动态端口,取值范围为 [1024, 65535],集群外端口只能设置为 [30000, 32767],若集群中服务不与集群外通信,则只需要设置集群内端口就行。K8S 中 IP 也同样有三种,分别为:Pod IP(两不同 Pod 资源对象相互通信的地址,集群外不可访问),Cluster IP(Service 资源对象的通信地址,集群外不可访问),Node IP(K8S 物理节点的 IP 地址,是真实的物理网络,集群外配合 nodePort 即可访问)。集群内端口和集群外端口由 K8S 的 Service 资源提供设置。在创建 Service 时需要注意,一个 Pod 资源对应一个 Service 资源,不要想着一个 Service 管理两个 Pod 暴露的端口,这样做会使 Service 提供服务的能力异常,经常会接口超时。
K8S 的调度可简单分为两个层面上的调度,最底层的调度自然是 K8S 自身的调度策略,根据不同的资源用度和调度策略将 Pod 分配到不同的物理节点之上执行,根据指定的重启或恢复策略启动相应的 Pod,这个层面上的调度,K8S 有一套默认的调度器,对于特殊的调度需求,K8S 也支持自定义调度器,使用外部调度器代替默认调度器,这个层面的调度器 Shaun 没做太多研究,所以在这篇里对这层面的调度器不做过多描述。Shaun 接触过的是更上层的调度器,业务层面的调度服务,业务调度服务一般与业务紧密相关,但最核心的一点就是能够从业务入手,负责 Pod 的创建和销毁,并能掌握其运行状态,就算是完成了一个基础的业务调度服务器。
在设计业务调度服务时,有一种通用的模式,可以称之为 master-worker 模式,与同名的并发模式细节上有所不同,这里的 master 是指调度服务本体,只负责对外服务,资源监控,以及任务分发,任务状态感知等,不负责做具体的任务,一般也不关心任务的输入输出。在部署 master 时,一般会创建一个 Service 资源对象,毕竟其主要功能就是对外服务,master 一般由运维进行部署创建销毁。而 worker 是指真正做任务的 Pod,该 Pod 中可能会有多个容器,主容器负责真正执行任务,其他一些容器可能会负责保障任务的前置条件(输入,配置等),以及向 master 汇报任务执行状态信息(执行任务的主容器可能并不知道 master 的存在)等。worker 对应的 Pod 一般由 master 进行创建销毁,worker 的一些配置信息则可能会由运维管理。
由于 K8S 并没有在整个集群物理资源之上抽象出一层集群资源,所以 K8S 分配的节点实际还是在物理机上,若所有物理机剩余资源(是单个剩余资源,而不是所有剩余资源之和)都不满足 Pod 所需资源,则该 Pod 无法调度,类比内存碎片化,可以称之为资源碎片化。所以在创建 Pod 时,所需资源最好不要太多,以免调度失败。