多线程的执行方式有两种:并发(Concurrent)和并行(Parallel),简单来说,并发就是两个线程轮流在一个 CPU 核上执行,而并行则是两个线程分别在两个 CPU 核上运行。一般而言,程序员无法直接控制线程是并发执行还是并行执行,线程的执行一般由操作系统直接控制,当然程序运行时也可以做简单调度。所以对于一般程序员来说,只需要熟练使用相关语言的多线程编程库即可,至于是并发执行还是并行执行,可能并不是那么重要,只要能达到预期效果就行。
前言
多线程的执行方式有两种:并发(Concurrent)和并行(Parallel),简单来说,并发就是两个线程轮流在一个 CPU 核上执行,而并行则是两个线程分别在两个 CPU 核上运行。一般而言,程序员无法直接控制线程是并发执行还是并行执行,线程的执行一般由操作系统直接控制,当然程序运行时也可以做简单调度。所以对于一般程序员来说,只需要熟练使用相关语言的多线程编程库即可,至于是并发执行还是并行执行,可能并不是那么重要,只要能达到预期效果就行。
Scala 提供了一个默认的 ExecutionContext:scala.concurrent.ExecutionContext.Implicits.global,其本质也是一个 ForkJoinPool,并行度默认设置为当前可用 CPU 数,当然也会根据需要(比如当前全部线程被阻塞)额外创建更多线程。一般做计算密集型任务就用默认线程池即可,特殊情况也可以自己创建 ExecutionContext.fromExecutor(Executors.newFixedThreadPool(8)),下面的代码就可以创建一个同步阻塞的 ExecutionContext:
1 2 3 4 5
val currentThreadExecutionContext = ExecutionContext.fromExecutor( newExecutor { // Do not do this! defexecute(runnable: Runnable) { runnable.run() } })
////import scala.concurrent.ExecutionContext.Implicits.global //val pool = Executors.newFixedThreadPool(Runtime.getRuntime.availableProcessors()) val pool = Executors.newWorkStealingPool() implicitval ec = ExecutionContext.fromExecutorService(pool)
val futures = Array.range(0, 10000).map(i => Future { println(i) Thread.sleep(100) i })
关键字为 par,调用该方法即可轻松进行并发计算,不过需要注意的是并发操作的副作用(side-effects)和“乱序”(out of order)语义,副作用就是去写函数外的变量,不仅仅只读写并发操作函数内部声明的变量,乱序语义是指并发操作不会严格按照数组顺序执行,所以如果并发操作会同时操作两个数组元素(eg:reduce),则需要慎重使用,有的操作结果不变,而有的操作会导致结果不唯一。
val poolProducer = Executors.newWorkStealingPool() implicitval ecProducer = ExecutionContext.fromExecutorService(poolProducer) val poolConsumer = Executors.newSingleThreadExecutor() val ecConsumer = ExecutionContext.fromExecutorService(poolConsumer)
val futures = Array.range(0, 1000).map(i => Future { val x = produce(i) // produce something... x }(ecProducer).andThen { caseSuccess(x) => consume(x) // consume something... }(ecConsumer))
val futureSequence = Future.sequence(futures) futureSequence.onComplete({ caseSuccess(results) => { println("Success.")
Google S2 Geometry(以下简称 S2) 是 Google 发明的基于单位球的一种地图投影和空间索引算法,该算法可快速进行覆盖以及邻域计算。更多详见 S2Geometry,Google’s S2, geometry on the sphere, cells and Hilbert curve,halfrost 的空间索引系列文章。虽然使用 S2 已有一年的时间,但确实没有比较系统的看过其源码,这次借着这段空闲时间,将 Shaun 常用的功能系统的看看其具体实现,下文将结合 S2 的 C++,Java,Go 的版本一起看,由于 Java 和 Go 的都算是 C++ 的衍生版,所以以 C++ 为主,捎带写写这三种语言实现上的一些区别,Java 版本时隔 10 年更新了 2.0 版本,喜大普奔。
前言
Google S2 Geometry(以下简称 S2) 是 Google 发明的基于单位球的一种地图投影和空间索引算法,该算法可快速进行覆盖以及邻域计算。更多详见 S2Geometry,Google’s S2, geometry on the sphere, cells and Hilbert curve,halfrost 的空间索引系列文章。虽然使用 S2 已有一年的时间,但确实没有比较系统的看过其源码,这次借着这段空闲时间,将 Shaun 常用的功能系统的看看其具体实现,下文将结合 S2 的 C++,Java,Go 的版本一起看,由于 Java 和 Go 的都算是 C++ 的衍生版,所以以 C++ 为主,捎带写写这三种语言实现上的一些区别,Java 版本时隔 10 年更新了 2.0 版本,喜大普奔。
// validFaceXYZToUV given a valid face for the given point r (meaning that // dot product of r with the face normal is positive), returns // the corresponding u and v values, which may lie outside the range [-1,1]. funcvalidFaceXYZToUV(face int, r r3.Vector)(float64, float64) { switch face { case0: return r.Y / r.X, r.Z / r.X case1: return -r.X / r.Y, r.Z / r.Y case2: return -r.X / r.Z, -r.Y / r.Z case3: return r.Z / r.X, r.Y / r.X case4: return r.Z / r.Y, -r.X / r.Y } return -r.Y / r.Z, -r.X / r.Z }
之所以引入 ST 坐标是因为同样的球面面积映射到 UV 坐标面积大小不一,大小差距比较大(离坐标轴越近越小,越远越大),所以再做一次 ST 变换,将面积大的变小,小的变大,使面积更均匀,利于后面在立方体面上取均匀格网(cell)时,每个 cell 对应球面面积差距不大。S2 的 ST 变换有三种:1、线性变换,基本没做任何变形,只是简单将 ST 坐标的值域变换为 [0, 1],cell 对应面积最大与最小比大约为 5.2;2、二次变换,一种非线性变换,能起到使 ST 空间面积更均匀的作用,cell 对应面积最大与最小比大约为 2.1;3、正切变换,同样能使 ST 空间面积更均匀,且 cell 对应面积最大与最小比大约为 1.4,不过其计算速度相较于二次变换要慢 3 倍,所以 S2 权衡考虑,最终采用了二次变换作为默认的 UV 到 ST 之间的变换。二次变换公式为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
publicdoublestToUV(double s){ if (s >= 0.5) { return (1 / 3.) * (4 * s * s - 1); } else { return (1 / 3.) * (1 - 4 * (1 - s) * (1 - s)); } }
这个 id 其实是个一维坐标,而是利用希尔伯特空间填充曲线将 IJ 坐标从二维变换为一维,该 id 用一个 64 位整型表示,高 3 位用来表示 face(0~5),后面 61 位来保存不同的 level(0~30) 对应的希尔伯特曲线位置,每增加一个 level 增加两位,后面紧跟一个 1,最后的位数都补 0。注:Java 版本的 id 是有符号 64 位整型,而 C++ 和 Go 的是无符号 64 位整型,所以在跨语言传递 id 的时候,在南极洲所属的最后一个面(即 face = 5)需要小心处理。
HilbertCurve
hilbert_curve_subdivision_ruleshilbert_curve
上面两张图很明了的展示了希尔伯特曲线的构造过程,该曲线的构造基本元素由 ABCD 4 种“U”形构成,而 BCD 又可由 A 依次逆时针旋转 90 度得到,所以也可以认为只有一种“U”形,每个 U 占 4 个格子,以特定方式进行 1 分 4 得到下一阶曲线形状。
这个是 S2 内部计算使用的坐标,一般用来计算 cell 的中心坐标,以及根据当前 s 和 t 坐标的精度(小数点后几位)判断对应的级别(level)。由于 S2 本身并不显式存储 ST 坐标(有存 UV 坐标),所以 ST 坐标只能计算出来,每个 cell 的中心点同样如此。计算公式为 \(Si=s*2^{31};Ti=t*2^{31}\)。至于为啥是 \(2^{31}\),是因为该坐标是用来描述从 0~ 31 阶希尔伯特曲线网格的中心坐标,0 阶中心以 \(1/2^1\) 递增,1 阶中心以 \(1/2^2\) 递增,2 阶中心以 \(1/2^3\) 递增,……,30 阶中心以 \(1/2^{31}\) 递增。S2 计算 id 对应的格子中心坐标,首先就会计算 SiTi 坐标,再将 SiTi 转成 ST 坐标。
算法篇
邻域算法
S2 计算邻域,最关键的是计算不同面相邻的 leaf cell id,即:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
S2CellId S2CellId::FromFaceIJWrap(int face, int i, int j){ // 限制 IJ 最大最小取值为 -1~2^30, 刚好能超出 IJ 正常表示范围 0~2^30-1 i = max(-1, min(kMaxSize, i)); j = max(-1, min(kMaxSize, j));
face = S2::XYZtoFaceUV(S2::FaceUVtoXYZ(face, u, v), &u, &v); returnFromFaceIJ(face, S2::STtoIJ(0.5*(u+1)), S2::STtoIJ(0.5*(v+1))); }
这个算法主要用来计算超出范围(0~2^30-1)的 IJ 对应的 id,核心思想是先将 FaceIJ 转为 XYZ,再使用 XYZ 反算得到正常的 FaceIJ,进而得到正常的 id。中间 IJ -> UV 中坐标实际经过了 3 步,对于 leaf cell,IJ -> SiTi 的公式为 \(Si=2×I+1\),而对于 ST -> UV,这里没有采用二次变换,就是线性变换 \(u=2*s-1\),官方注释上说明用哪个变换效果都一样,所以采用最简单的就行。
边邻域
边邻域代码很简单,也很好理解:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
voidS2CellId::GetEdgeNeighbors(S2CellId neighbors[4])const{ int i, j; int level = this->level(); // 计算当前 level 一行或一列对应多少个 30 级的 cell(leaf cell) 2^(30-level) int size = GetSizeIJ(level); int face = ToFaceIJOrientation(&i, &j, nullptr);
// Edges 0, 1, 2, 3 are in the down, right, up, left directions. neighbors[0] = FromFaceIJSame(face, i, j - size, j - size >= 0) .parent(level); neighbors[1] = FromFaceIJSame(face, i + size, j, i + size < kMaxSize) .parent(level); neighbors[2] = FromFaceIJSame(face, i, j + size, j + size < kMaxSize) .parent(level); neighbors[3] = FromFaceIJSame(face, i - size, j, i - size >= 0) .parent(level); }
voidS2CellId::AppendVertexNeighbors(int level, vector<S2CellId>* output)const{ // level < this->level() S2_DCHECK_LT(level, this->level()); int i, j; int face = ToFaceIJOrientation(&i, &j, nullptr);
// 判断 IJ 落在 level 对应 cell 的哪个方位?(左下左上右上右下,对应上文的(0,0),(0,1),(1,1),(1,0)坐标) int halfsize = GetSizeIJ(level + 1); int size = halfsize << 1; bool isame, jsame; int ioffset, joffset; if (i & halfsize) { ioffset = size; isame = (i + size) < kMaxSize; } else { ioffset = -size; isame = (i - size) >= 0; } if (j & halfsize) { joffset = size; jsame = (j + size) < kMaxSize; } else { joffset = -size; jsame = (j - size) >= 0; }
output->push_back(parent(level)); output->push_back(FromFaceIJSame(face, i + ioffset, j, isame).parent(level)); output->push_back(FromFaceIJSame(face, i, j + joffset, jsame).parent(level)); // 则邻域的 IJ 与当前 cell 都不在同一个面,则说明只有三个点邻域 if (isame || jsame) { output->push_back(FromFaceIJSame(face, i + ioffset, j + joffset, isame && jsame).parent(level)); } }
上面的代码算是比较清晰了,3 个点邻域的情况一般出现在当前 id 位于立方体 6 个面的角落,该方法的参数 level 必须比当前 id 的 level 要小。
全邻域
所谓全邻域,即为当前 id 对应 cell 周围一圈 cell 对应的 id,若周围一圈 cell 的 level 与 当前 id 的 level 一样,则所求即为正常的 9 邻域。具体代码如下:
voidS2CellId::AppendAllNeighbors(int nbr_level, vector<S2CellId>* output)const{ // nbr_level >= level S2_DCHECK_GE(nbr_level, level()); int i, j; int face = ToFaceIJOrientation(&i, &j, nullptr);
// 先归一 IJ 坐标,将 IJ 坐标调整为当前 cell 左下角 leaf cell 的坐标 int size = GetSizeIJ(); i &= -size; j &= -size;
int nbr_size = GetSizeIJ(nbr_level); S2_DCHECK_LE(nbr_size, size);
for (int k = -nbr_size; ; k += nbr_size) { bool same_face; if (k < 0) { same_face = (j + k >= 0); } elseif (k >= size) { same_face = (j + k < kMaxSize); } else { same_face = true; // 生成外包围圈下上两边的 id, 顺序为从左往右 output->push_back(FromFaceIJSame(face, i + k, j - nbr_size, j - size >= 0).parent(nbr_level)); output->push_back(FromFaceIJSame(face, i + k, j + size, j + size < kMaxSize).parent(nbr_level)); } // 生成外包围圈左右两边以及四个边角的 id, 顺序为从下往上 output->push_back(FromFaceIJSame(face, i - nbr_size, j + k, same_face && i - size >= 0) .parent(nbr_level)); output->push_back(FromFaceIJSame(face, i + size, j + k, same_face && i + size < kMaxSize) .parent(nbr_level)); if (k >= size) break; } }
知道这个函数的作用,再看代码就很明了了,这个方法的参数 nbr_level 必须大于或等于当前 id 的 level,因为一旦外包围圈的 cell 面积比当前 cell 还大,就无法得到正确的外包围圈。
覆盖算法
S2 的覆盖,是指给定一块区域,能用多少 id 对应的 cell 完全覆盖该区域(GetCovering),当然也有尽量覆盖的算法(GetInteriorCovering),下面主要解析 GetCovering,因为 GetInteriorCovering 也差不多,就是覆盖策略略有不同。
dir += rot; hilbert(dir, -rot, order - 1); step(dir);
dir -= rot; hilbert(dir, rot, order - 1); step(dir);
hilbert(dir, rot, order - 1);
dir -= rot; step(dir); hilbert(dir, -rot, order - 1); } }
// https://en.wikipedia.org/wiki/Hilbert_curve getPath_V3() { let path: number[][] = [];
// for (let i = 0; i < this.totalSize; i++) { // path.push(hilbertToXY(this.size, i)); // }
for (let y = 0; y < this.size; y++) { for (let x = 0; x < this.size; x++) { path[xyToHilbert(this.size, x, y)] = [x, y]; } }
return path;
functionrot(N: number, rx: number, ry: number, xy: number[]) { if (ry === 0) { if (rx === 1) { xy[0] = N - 1 - xy[0]; xy[1] = N - 1 - xy[1]; }
let t = xy[0]; xy[0] = xy[1]; xy[1] = t; } }
functionhilbertToXY(N: number, h: number) { let t = h; let xy = [0, 0]; for (let s = 1; s < N; s *= 2) { let rx = 1 & (t / 2); let ry = 1 & (t ^ rx); rot(s, rx, ry, xy); xy[0] += s * rx; xy[1] += s * ry; t /= 4; }
return xy; }
functionxyToHilbert(N: number, x: number, y: number) { let h = 0; let xy = [x, y]; for (let s = N / 2; s > 0; s /= 2) { let rx = (xy[0] & s) > 0 ? 1 : 0; let ry = (xy[1] & s) > 0 ? 1 : 0; h += s * s * ((3 * rx) ^ ry); rot(N, rx, ry, xy); }
return h; } }
draw() { let lineGeometry = new THREE.Geometry(); this.getPath_V3().forEach((vertice) => { let vecot = new THREE.Vector3().fromArray(vertice); vecot.setZ(0); lineGeometry.vertices.push(vecot); }); let lineMaterial = new THREE.LineBasicMaterial({ color: 0x00ffff, linewidth: 1 }); let line = new THREE.Line(lineGeometry, lineMaterial);
K8S 调度的基本单元是 Pod,Pod 也是 K8S 自带的一个资源对象,其可以简单理解为是一个容器集合体,程序员可控的容器有两类(Pause 容器除外),一类是 InitContainer,另一类是普通业务容器,InitContainer 按数组顺序创建,顺序执行,若一个失败,则整个 Pod 创建失败,普通业务容器同样按数组顺序创建,但异步执行,所以执行顺序不可控(可以通过 postStart Hook 简单控制一下)。由于 InitContainer 先于 Pod 其他容器执行,所以一般用来做普通业务容器执行前置条件的一些事情,比如:下载文件,初始化配置,状态消息通知等。
同一 Pod 中存储卷和网络可以共享。存储卷共享是指 Pod 内各容器可以挂载相同存储卷,从而数据共享。K8S 目前支持的存储卷共有三种:第一种是 emptyDir,这种存储是临时的,只能在 Pod 内使用,当 Pod 被销毁时,该存储的内容也会消失,只能在同一 Pod 内共享数据;第二种是 hostPath,这种存储会直接和集群中物理机存储相关联,是一种跨 Pod 持久化存储,但仅限该物理机,当 pod 被调度到其他物理机时就无法实现跨 Pod 共享数据;最后一种是外部存储(NFS,Ceph,GlusterFS,AWS EBS 等),这种方式可以真正实现数据持久化并共享,而且可以支持存储与计算分离,对系统会更友好一些,当然运维的成本也会更大。当然除了 K8S 自身提供的存储卷挂载可以实现数据共享,从程序的角度上,使用传统的方式一样也能数据共享,如数据库,DFS,OSS 等。
而网络共享是指 Pod 内各容器直接可以使用 localhost 以及容器暴露的端口进行相互通信,K8S 的端口有三种,分别为:容器端口(containerPort,容器中对外暴露的端口),集群内端口(port,集群内 pod 相互通信的端口),集群外端口(nodePort,集群外请求集群内的端口),其中容器端口和集群内是正常的动态端口,取值范围为 [1024, 65535],集群外端口只能设置为 [30000, 32767],若集群中服务不与集群外通信,则只需要设置集群内端口就行。K8S 中 IP 也同样有三种,分别为:Pod IP(两不同 Pod 资源对象相互通信的地址,集群外不可访问),Cluster IP(Service 资源对象的通信地址,集群外不可访问),Node IP(K8S 物理节点的 IP 地址,是真实的物理网络,集群外配合 nodePort 即可访问)。集群内端口和集群外端口由 K8S 的 Service 资源提供设置。在创建 Service 时需要注意,一个 Pod 资源对应一个 Service 资源,不要想着一个 Service 管理两个 Pod 暴露的端口,这样做会使 Service 提供服务的能力异常,经常会接口超时。
K8S 的调度可简单分为两个层面上的调度,最底层的调度自然是 K8S 自身的调度策略,根据不同的资源用度和调度策略将 Pod 分配到不同的物理节点之上执行,根据指定的重启或恢复策略启动相应的 Pod,这个层面上的调度,K8S 有一套默认的调度器,对于特殊的调度需求,K8S 也支持自定义调度器,使用外部调度器代替默认调度器,这个层面的调度器 Shaun 没做太多研究,所以在这篇里对这层面的调度器不做过多描述。Shaun 接触过的是更上层的调度器,业务层面的调度服务,业务调度服务一般与业务紧密相关,但最核心的一点就是能够从业务入手,负责 Pod 的创建和销毁,并能掌握其运行状态,就算是完成了一个基础的业务调度服务器。
在设计业务调度服务时,有一种通用的模式,可以称之为 master-worker 模式,与同名的并发模式细节上有所不同,这里的 master 是指调度服务本体,只负责对外服务,资源监控,以及任务分发,任务状态感知等,不负责做具体的任务,一般也不关心任务的输入输出。在部署 master 时,一般会创建一个 Service 资源对象,毕竟其主要功能就是对外服务,master 一般由运维进行部署创建销毁。而 worker 是指真正做任务的 Pod,该 Pod 中可能会有多个容器,主容器负责真正执行任务,其他一些容器可能会负责保障任务的前置条件(输入,配置等),以及向 master 汇报任务执行状态信息(执行任务的主容器可能并不知道 master 的存在)等。worker 对应的 Pod 一般由 master 进行创建销毁,worker 的一些配置信息则可能会由运维管理。
由于 K8S 并没有在整个集群物理资源之上抽象出一层集群资源,所以 K8S 分配的节点实际还是在物理机上,若所有物理机剩余资源(是单个剩余资源,而不是所有剩余资源之和)都不满足 Pod 所需资源,则该 Pod 无法调度,类比内存碎片化,可以称之为资源碎片化。所以在创建 Pod 时,所需资源最好不要太多,以免调度失败。
let top = near * Math.tan( _Math.DEG2RAD * 0.5 * this.fov ) / this.zoom; let height = 2 * top; let width = this.aspect * height; let left = - 0.5 * width; let right = left + width; let bottom = top - height;
对于透视投影,由于计算出的齐次坐标 w 分量显然不为 1.0,所以必须进行透视除法(x,y,z 各分量分别除以 w),得到真正的 3 维坐标。
正射投影一般用来模拟 2D 空间,透视投影用来模拟 3D 空间,当透视投影 near 和 far 设置的相差太大时,很容易引发 z-fighting 现象,原因是离近平面越远时,计算出的深度精度越低,three.js 中为解决这一问题,引入了一个 logarithmicDepthBuffer 参数来决定是否开启使用对数函数优化深度计算,具体可看源码中的 logdepthbuf_vertex.glsl.js 和 logdepthbuf_fragment.glsl.js 文件,开启该参数会造成渲染性能下降。
defeval(t: Tree, env: Environment): Int = t match { caseSum(l, r) => eval(l, env) + eval(r, env) caseVar(n) => env(n) caseConst(v) => v }
defderive(t: Tree, v: String): Tree = t match { caseSum(l, r) => Sum(derive(l, v), derive(r, v)) caseVar(n) if (v == n) => Const(1) case _ => Const(0) }
defmain(args: Array[String]) { val exp: Tree = Sum(Sum(Var("x"), Var("x")), Sum(Const(7), Var("y"))) val env: Environment = {case"x" => 5case"y" => 7} println("Expression: " + exp) println("Evalution with x=5, y=7: " + eval(exp, env)) println("Derivative relative to x:\n" + derive(exp, "x")) println("Derivative relative to y:\n" + derive(exp, "y")) } }
该示例主要用来说明两种 case 关键字,分别为:case class 和 ... match case ...,前者可认为是一个结构体,实例化时可以省略 new 关键字,参数有默认的 getter 函数,整个 case class 有默认的 equals 和 hashCode 方法实现,通过这两个方式可实现根据值判断类的两个实例是否相等,而不是通过引用,条件类同样有默认的 toString 方法实现;后者可认为是一种特殊的 switch case ,只不过 case 的判定和执行是函数式的,case class 可直接参与 match case 的判定(判定是不是属于该类)。第 7 行中有个 type 关键字,可认为是定义了一种新的类型(不是数据类型),示例中是函数类型,通过这个 type ,可直接将字符串映射为整型,23 行中将这个 type 与 case 结合使用,定义多个字符串映射多个整型的变量。第 18 行中有个 _ ,这是 scala 中的通配符,不同的语义下表示的含义不同,这里的含义是指,当上面的模式都不匹配时,将执行这个,相当于 switch case 中的 default。
objectQuickSort{ defqSort(xs: Array[Int]) { defswap(i: Int, j: Int) { val t = xs(i); xs(i) = xs(j); xs(j) = t; }
defsort(l: Int, r: Int) { val pivot = xs(l); var i = l+1; var j = r; while (i < j) { while (i <= r && xs(i) < pivot) i += 1; while (j > l && xs(j) > pivot) j -= 1;
if (i < j) { swap(i, j); i += 1; j -= 1; }
if (i > j) { i = j; } } while (i > l && xs(i) > pivot) { i -= 1; j -= 1; } swap(i, l);
if (l < j-1) sort(l, j-1); if (j+1 < r) sort(j+1, r); }
classHelloActorextendsActor{ defact() { while (true) { receive { case name: String => println("Hello, " + name) } } } }
objectAuctionService{ defmain(args: Array[String]) { val seller: Actor = newHelloActor val client: Actor = newHelloActor val minBid = 10 val closing = newDate()
val helloActor = newHelloActor helloActor.start() helloActor ! "leo" } }
通过重写 Actor 中的 act 方法即可简单的实现多线程编程,Actor 中有个特殊的标识符 !,该符号其实是是一种缩写,即可将 helloActor.!("leo") 缩写为 helloActor ! "leo",代表将数据传递给 Actor,由 Actor 内部的 receive case 接受数据并处理,当然也可通过 receiveWithin 控制数据传递时间,若超时,则默认触发 TIMEOUT 处理模式。
val n = xs.length / 2 if (n == 0) xs else merge(mergeSort(less)(xs take n), mergeSort(less)(xs drop n)) }
defmain(args: Array[String]) { val xs = List(4, 1, 5, 7,7,7,7, 2, 6); // val xs = 3::2::1::1::Nil; println(xs(0), xs(1), xs(xs.length-1)) // (4,1,6) // val ys = insertSort(xs); val ys = mergeSort((x: Int, y: Int) => x > y)(xs); println(ys.mkString(" ")) } }
List 中有两个操作符非常类似,即 :: 和 :::, 前者用于 List 中的元素和 List 连接,即创建一个新 List,新 List 为原 List 头插入元素后的 List,后者用于连接两个 List,即创建一个新 List ,新 List 为将第二个 List 的元素全部放入第一个 List 尾部的 List。字符 Nil 代表空 List 和 List() 等效,head 方法返回 List 的第一个元素,tail 方法返回除第一个元素之外的其它所有元素,还是一个 List,isEmpty 方法当 List 为空时返回 true。List 的 case-match 方法中,case y :: ys 其中 y 代表 xs.head,ys 代表 xs.tail。(xs take n) 表示取 List 前 n 个元素,(xs drop n) 表示取 List 前 n 个元素之外的元素,即与 (xs take n) 取得元素正好互补,而 (xs split n) 返回一个元组,元组中第一个元素为 (xs take n),第二个元素为 (xs drop n)。关于 List 还有些更高阶得方法:filter,map, flatMap, reduceRight, foldRight 等方法就不继续写了。至于动态 List 可用 ListBuffer 结构,当然 Scala 中直接用 Seq 作为返回值和参数一般会更好些。
yum install -y vim lrzsz tree wget gcc gcc-c++ readline-devel zlib-devel
进入/usr/local/目录下:cd /usr/local
下载 tar 包:curl -O https://ftp.postgresql.org/pub/source/v16.2/postgresql-16.2.tar.gz
解压:tar -xzvf postgresql-16.2.tar.gz
编译安装:
1 2 3 4 5
cd /usr/local/postgresql-16.2 ./configure --prefix=/usr/local/pgsql-16.2 # /usr/local/pgsql-16.2 为安装目录 make && make install # Two thousand years later,出现「PostgreSQL installation complete.」代表安装成功
# This is an example of a start/stop script for SysV-style init, such # as is used on Linux systems. You should edit some of the variables # and maybe the 'echo' commands. # # Place this file at /etc/init.d/postgresql (or # /etc/rc.d/init.d/postgresql) and make symlinks to # /etc/rc.d/rc0.d/K02postgresql # /etc/rc.d/rc1.d/K02postgresql # /etc/rc.d/rc2.d/K02postgresql # /etc/rc.d/rc3.d/S98postgresql # /etc/rc.d/rc4.d/S98postgresql # /etc/rc.d/rc5.d/S98postgresql # Or, if you have chkconfig, simply: # chkconfig --add postgresql # # Proper init scripts on Linux systems normally require setting lock # and pid files under /var/run as well as reacting to network # settings, so you should treat this with care.
# Original author: Ryan Kirkpatrick <pgsql@rkirkpat.net>
# Data directory PGDATA="/home/postgres/db_data" ###### 下面不改 #####################
# Who to run postgres as, usually "postgres". (NOT "root") PGUSER=postgres
# Where to keep a log file PGLOG="$PGDATA/serverlog"
# It's often a good idea to protect the postmaster from being killed by the # OOM killer (which will tend to preferentially kill the postmaster because # of the way it accounts for shared memory). To do that, uncomment these # three lines: #PG_OOM_ADJUST_FILE=/proc/self/oom_score_adj #PG_MASTER_OOM_SCORE_ADJ=-1000 #PG_CHILD_OOM_SCORE_ADJ=0 # Older Linux kernels may not have /proc/self/oom_score_adj, but instead # /proc/self/oom_adj, which works similarly except for having a different # range of scores. For such a system, uncomment these three lines instead: #PG_OOM_ADJUST_FILE=/proc/self/oom_adj #PG_MASTER_OOM_SCORE_ADJ=-17 #PG_CHILD_OOM_SCORE_ADJ=0
## STOP EDITING HERE
# The path that is to be used for the script PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin
# What to use to start up postgres. (If you want the script to wait # until the server has started, you could use "pg_ctl start" here.) DAEMON="$prefix/bin/postgres"
# What to use to shut down postgres PGCTL="$prefix/bin/pg_ctl"
set -e
# Only start if we can find postgres. test -x $DAEMON || { echo"$DAEMON not found" if [ "$1" = "stop" ] thenexit 0 elseexit 5 fi }
# If we want to tell child processes to adjust their OOM scores, set up the # necessary environment variables. Can't just export them through the "su". if [ -e "$PG_OOM_ADJUST_FILE" -a -n "$PG_CHILD_OOM_SCORE_ADJ" ] then DAEMON_ENV="PG_OOM_ADJUST_FILE=$PG_OOM_ADJUST_FILE PG_OOM_ADJUST_VALUE=$PG_CHILD_OOM_SCORE_ADJ" fi
#!/bin/bash # THIS FILE IS ADDED FOR COMPATIBILITY PURPOSES # # It is highly advisable to create own systemd services or udev rules # to run scripts during boot instead of using this file. # # In contrast to previous versions due to parallel execution during boot # this script will NOT be run after all other services. # # Please note that you must run 'chmod +x /etc/rc.d/rc.local' to ensure # that this script will be executed during boot. exec 2> /tmp/rc.local.log # send stderr from rc.local to a log file exec 1>&2 # send stdout to the same log file echo"rc.local starting..."# show start of execution set -x
touch /var/lock/subsys/local
cd /etc/rc.d/init.d/ sudo sh postgresql start & # 以root执行,不然可能会出现权限错误,&表示后台执行 # 脚本执行完后也给个日志 echo"rc.local completed"
declare begin case i_mode when'INVALID'thenreturn0; when'AccessShareLock'thenreturn1; when'RowShareLock'thenreturn2; when'RowExclusiveLock'thenreturn3; when'ShareUpdateExclusiveLock'thenreturn4; when'ShareLock'thenreturn5; when'ShareRowExclusiveLock'thenreturn6; when'ExclusiveLock'thenreturn7; when'AccessExclusiveLock'thenreturn8; elsereturn0; endcase; end;
$$ language plpgsql strict;
-- 2. 修改查询语句,按锁级别排序: with t_wait as (select a.mode,a.locktype,a.database,a.relation,a.page,a.tuple,a.classid,a.objid,a.objsubid, a.pid,a.virtualtransaction,a.virtualxid,a,transactionid,b.query,b.xact_start,b.query_start, b.usename,b.datname from pg_locks a,pg_stat_activity b where a.pid=b.pid andnot a.granted), t_run as (select a.mode,a.locktype,a.database,a.relation,a.page,a.tuple,a.classid,a.objid,a.objsubid, a.pid,a.virtualtransaction,a.virtualxid,a,transactionid,b.query,b.xact_start,b.query_start, b.usename,b.datname from pg_locks a,pg_stat_activity b where a.pid=b.pid and a.granted) select r.locktype,r.mode r_mode,r.usename r_user,r.datname r_db,r.relation::regclass,r.pid r_pid, r.page r_page,r.tuple r_tuple,r.xact_start r_xact_start,r.query_start r_query_start, now()-r.query_start r_locktime,r.query r_query,w.mode w_mode,w.pid w_pid,w.page w_page, w.tuple w_tuple,w.xact_start w_xact_start,w.query_start w_query_start, now()-w.query_start w_locktime,w.query w_query from t_wait w,t_run r where r.locktype isnotdistinctfrom w.locktype and r.database isnotdistinctfrom w.database and r.relation isnotdistinctfrom w.relation and r.page isnotdistinctfrom w.page and r.tuple isnotdistinctfrom w.tuple and r.classid isnotdistinctfrom w.classid and r.objid isnotdistinctfrom w.objid and r.objsubid isnotdistinctfrom w.objsubid and r.transactionid isnotdistinctfrom w.transactionid and r.pid <> w.pid orderby f_lock_level(w.mode)+f_lock_level(r.mode) desc,r.xact_start;
现在可以排在前面的就是锁级别高的等待,优先干掉这个。
-[ RECORD 1 ]-+----------------------------------------------------------
-- 按从大到小排序输出数据库每个索引大小 select indexrelname, pg_size_pretty(pg_relation_size(indexrelid)) as size from pg_stat_user_indexes where schemaname='public'orderby pg_relation_size('public'||'.'||indexrelname) desc;
-- [PostgreSQL中查询 每个表的总大小、索引大小和数据大小,并按总大小降序排序](https://blog.csdn.net/sunny_day_day/article/details/131455635) SELECT pg_size_pretty(pg_total_relation_size(c.oid)) AS total_size, pg_size_pretty(pg_indexes_size(c.oid)) AS index_size, pg_size_pretty(pg_total_relation_size(c.oid) - pg_indexes_size(c.oid)) AS data_size, nspname AS schema_name, relname AS table_name FROM pg_class c LEFTJOIN pg_namespace n ON n.oid = c.relnamespace WHERE relkind ='r' AND nspname NOTLIKE'pg_%' AND nspname !='information_schema' ORDERBY pg_total_relation_size(c.oid) DESC;
-- 查找超过1小时的长事务 selectcount(*) from pg_stat_activity where state <>'idle'and (backend_xid isnotnullor backend_xmin isnotnull) and now()-xact_start >interval'3600 sec'::interval;
-- 查看处于等待锁状态 select*from pg_locks wherenot granted; -- 查看等待锁的关系(表,索引,序列等) select*from pg_class where oid=[上面查出来的relation]; -- 查看等待锁的数据库 select*from pg_database where oid=[上面查出来的database]; -- 锁表状态 select oid from pg_class where relname='可能锁表了的表'; -- 查询出结果则被锁 select pid from pg_locks where relation='上面查出的oid';
-- 输出删除全部表的sql \COPY (SELECT'DROP TABLE IF EXISTS "'|| tablename ||'" CASCADE;'from pg_tables WHERE schemaname ='public') TO'/tmp/sql_output.sql';
-- 添加部分索引(满足条件才建立索引), where 和 select 语句的一致 create index [XXX] where [XXX]
-- 查看当前连接事务执行超时时间 show statement_timeout; -- 设置数据库事务执行超时时间为 60 秒 AlTER DATABASE mydatabse SET statement_timeout='60s'; -- 设置用户事务执行超时时间为 5 分钟 ALTER ROLE guest SET statement_timeout='5min';
子查询优化
PG 的子查询实际有两种,分为子连接(Sublink)和子查询(SubQuery),按子句的位置不同,出现在 from 关键字后的是子查询,出现在 where/on 等约束条件中或投影中的子句是子连接。
子查询:select a.* from table_a a, (select a_id from table_b where id=1) b where b.a_id = a.id;
子连接:select * from table_a where id in(select a_id from table_b where id=1);
在简单的子连接查询下,PG 数据库查询优化器一般会将其转化为内连接的方式:select a.* from table_a a, table_b b where a.id=b.a_id and b.id=1;,正常索引没问题情况下这两种方式都能得一样的结果,最终执行的都是索引内连接结果。但在某些情况下,PG 查询优化器在子连接的 SQL 下,子连接的查询会走索引,而主查询会顺序扫描(Seq Scan),原因是当 table_a 的数据量很大时,索引值又有很多重复的,同时查询优化器也不知道子连接返回的具体数据,这时查询优化器可能会认为顺序扫描更快,从而不走索引,导致耗时增加,所以为减少查询优化器的不确定性,最好是直接使用内连接的方式代替 in 语句。 当然,对于特别复杂的查询业务,还是开启事务,分多次查询,在代码层做一些业务逻辑处理更合适,别让数据库把事情全做了,这也能减轻数据库的压力。 PG 查询计划执行路径可以看看: PostgreSQL 查询语句优化,postgresql通过索引优化查询速度操作
-- 创建只读组 create role readonly_group; -- 设置只读模式 ALTER ROLE readonly_group SET default_transaction_read_only TO'on'; -- 创建只读用户继承只读组 createuser reader with password 'reader'in role readonly_group; -- 删除用户 dropuser reader; -- 将只读组权限赋给只读用户 grant readonly_group to reader;
-- 读权限 GRANTSELECTONALL TABLES IN SCHEMA public TO readonly_group; GRANTSELECTONALL SEQUENCES IN SCHEMA public TO readonly_group; GRANTEXECUTEONALL FUNCTIONS IN SCHEMA public TO readonly_group; -- 写权限 GRANTINSERT, UPDATE, DELETEONALL TABLES IN SCHEMA public TO write_group; GRANT USAGE ONALL SEQUENCES IN SCHEMA public TO write_group;
# 主备机不同步时,re_wind恢复结点 wal_log_hints = on # 设置最大流复制数(从库数) max_wal_senders = 3 wal_keep_segments = 64 # 支持从库读,以及从库再拉从库 hot_standby = on
设置主库:pg_hba.conf
1 2 3 4 5 6
# Allow replication connections from localhost, by a user with the # replication privilege. local replication all trust host replication all 127.0.0.1/32 trust host replication all ::1/128 trust host replication all 0.0.0.0/0 md5
psql -d postgresql://owner_user:pswd@host:port/db_name -t -A -F"," -c " SELECT DISTINCT 'ALTER TABLE ' || quote_ident(nsp.nspname) || '.' || quote_ident(cls.relname) || ' ADD CONSTRAINT ' || quote_ident(con.conname) || ' FOREIGN KEY (' || array_to_string(ARRAY( SELECT quote_ident(att.attname) FROM pg_attribute att WHERE att.attnum = ANY(con.conkey) AND att.attrelid = cls.oid), ', ') || ') REFERENCES ' || quote_ident(f_nsp.nspname) || '.' || quote_ident(f_cls.relname) || ' (' || array_to_string(ARRAY( SELECT quote_ident(att.attname) FROM pg_attribute att WHERE att.attnum = ANY(con.confkey) AND att.attrelid = f_cls.oid), ', ') || ') ON DELETE ' || CASE con.confdeltype WHEN 'a' THEN 'NO ACTION' WHEN 'r' THEN 'RESTRICT' WHEN 'c' THEN 'CASCADE' WHEN 'n' THEN 'SET NULL' WHEN 'd' THEN 'SET DEFAULT' END || ' ON UPDATE ' || CASE con.confupdtype WHEN 'a' THEN 'NO ACTION' WHEN 'r' THEN 'RESTRICT' WHEN 'c' THEN 'CASCADE' WHEN 'n' THEN 'SET NULL' WHEN 'd' THEN 'SET DEFAULT' END || ';' FROM pg_constraint con JOIN pg_class cls ON con.conrelid = cls.oid JOIN pg_namespace nsp ON cls.relnamespace = nsp.oid JOIN pg_class f_cls ON con.confrelid = f_cls.oid JOIN pg_namespace f_nsp ON f_cls.relnamespace = f_nsp.oid WHERE con.contype = 'f';" > db_name_fkeys.sql
pg_dump -d postgresql://user:pswd@host:port/db_name --data-only -F d -j 4 -f ./db_name_data_dir
新建数据库实例
1
pg_ctl init -D ~/new_db_data
导入数据库全局用户/权限
1
psql -U superuser -p port -f db_name_user.sql
新建数据库
1
create database new_db_name owner owner_user
导入数据库全部表结构
1
psql -U superuser -p port -f db_name_schema.sql
移除新库外键约束
1 2 3 4 5 6 7 8 9 10 11 12
psql -d postgresql://owner_user:pswd@host:port/db_name <<EOF DO \$\$ DECLARE r RECORD; BEGIN FOR r IN (SELECT conname, conrelid::regclass FROM pg_constraint WHERE contype = 'f') LOOP EXECUTE 'ALTER TABLE ' || r.conrelid || ' DROP CONSTRAINT ' || r.conname; END LOOP; END \$\$; EOF
当自增主键报 duplicate key value violates unique constraint 主键冲突时,一般是因为存在手动分配 id 的数据(复制表或着手动插入分配了 id),自增主键 seqence TABLE_COLUMN_seq 没有更新,新插入一个值自增 id 和数据库已插入的分配 id 冲突,此时需要执行 SELECT setval('TABLE_COLUMN_seq', (SELECT max(COLUMN) FROM "TABLE")) 更新自增主键;
val spec = "name:String,dtg:Date,*geom:Point:srid=4326"
val spec = "name:String,dtg:Date,*geom:Point:srid=4326;geomesa.indices.enabled='z2,id,z3'"
val spec = "name:String:index=true,tags:String:json=true,dtg:Date:default=true,*geom:Point:srid=4326;geomesa.indices.enabled='z2,id,z3'"
val spec = "userId:String,trackId:String,altitude:Double,dtg:Date,*geom:Point:srid=4326;geomesa.index.dtg='dtg',geomesa.table.sharing='true',geomesa.indices='z3:4:3,z2:3:3,id:2:3',geomesa.table.sharing.prefix='\\u0001'"
String spec = "name:String,dtg:Date,*geom:Point:srid=4326"; SimpleFeatureType sft = SimpleFeatureTypes.createType("mySft", spec); // enable a default z3 and a default attribute index sft.getUserData().put("geomesa.indices.enabled", "z3,attr:name"); // or, enable a default z3 and an attribute index with a Z2 secondary index sft.getUserData().put("geomesa.indices.enabled", "z3,attr:name:geom"); // or, enable a default z3 and an attribute index with a temporal secondary index sft.getUserData().put("geomesa.indices.enabled", "z3,attr:name:dtg");
IDEA 以前基本没用过,只是简单用过 Android Studio,还基本都忘记了 ( ╯□╰ ),以后应该会用 Scala 做一些大数据方面的东西,而大数据的环境都是 Linux 下的,而 Shaun 日常都是在 Windows 下开发,所以需要用日前做的容器环境来测试调试运行程序,简单记录一下 IDEA 在这方面的使用方法。
前言
IDEA 以前基本没用过,只是简单用过 Android Studio,还基本都忘记了 ( ╯□╰ ),以后应该会用 Scala 做一些大数据方面的东西,而大数据的环境都是 Linux 下的,而 Shaun 日常都是在 Windows 下开发,所以需要用日前做的容器环境来测试调试运行程序,简单记录一下 IDEA 在这方面的使用方法。
FROM stc:2.0 COPY ./target/classes/ /tmp WORKDIR /tmp ENTRYPOINT ["scala","HelloWorld"]
点击左上角绿色双箭头,可编辑 Dockerfile(Edit 'Dockerfile') ,指定当前上下文目录(Context folder),Contaier name 等容器启动选项。直接运行 Dockerfile(Run 'Dockerfile'),IDEA 即可自动创建容器,并在容器中运行程序,程序运行完则容器自动停止,若需要运行存在外部依赖的程序,则只能以 jar 包的方式运行。
设置 IDEA 生成 jar 包如下:在最上面的菜单栏中 File =》Project Structure =》Artifacts =》+ =》JAR =》From modules with dependencies,选择 Main Class,点击右边的文件夹图标即可选择相应类,由于存在外部依赖,所以不能直接用默认的 extract to the target JAR,而是应该选择下面的 copy to the output directory and link via manifest,点击 OK 后,自动或手动选择导出的依赖 jar 包,点击 OK。在最上面的菜单栏中 Build =》Build Artifacts...,可在 out/artifacts/HelloWorld_jar 文件夹中生成所有 jar 包。之后编辑 Dockerfile, 更改 Dockerfile 上下文目录为 out/artifacts/HelloWorld_jar ,指定容器名,在 Command 中输入 java -jar HelloWorld.jar 修改 Dockerfile 中第 2 行命令为 COPY . /tmp,修改第 4 行命令为 CMD ["java", "-jar", "HelloWorld.jar"]。之后运行 Dockerfile 即可在下面 Services 栏对应 Docker 容器 Attached Console 中看到程序运行结果。
调试篇
除了使用 IDEA 生成 jar 包外,还需要使用 IDEA 的远程调试功能,设置 IDEA 远程调试功能如下:在最上面的菜单栏中 Run =》Edit Configurations... =》+ =》Remote JVM Debug,上方的 Debugger mode 中使用默认的 Attach to remote JVM, 在下面的 Before launch 添加 Launch Docker before debug。在弹窗中选择相应 Dockerfile,在下方的 Custom command 中输入 java -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005 -jar HelloWorld.jar, 完成后即可使用该配置在 IDEA 调试容器中运行的程序。
后记
用这种方式使用 IDEA 确实达到了 Shaun 理想的结果,Windows 下开发,Docker 中调试和运行,应付简单的代码调试和运行确实是没问题,但是在复杂的分布式环境下总会碰到一些莫名奇妙的问题,这些问题就是纯粹的经验了。