Scala 多线程编程小结

前言

  多线程的执行方式有两种:并发(Concurrent)和并行(Parallel),简单来说,并发就是两个线程轮流在一个 CPU 核上执行,而并行则是两个线程分别在两个 CPU 核上运行。一般而言,程序员无法直接控制线程是并发执行还是并行执行,线程的执行一般由操作系统直接控制,当然程序运行时也可以做简单调度。所以对于一般程序员来说,只需要熟练使用相关语言的多线程编程库即可,至于是并发执行还是并行执行,可能并不是那么重要,只要能达到预期效果就行。

  Shaun 目前接触的 Scala 原生多线程编程语法就两个:Future 和 Parallel Collections。其中 Future 用的的最多,并且 Parallel Collections 语法非常简单,所以主要介绍 Future,附带提一下 Parallel Collections。

ExecutionContext 篇

  ExecutionContext 是 Future 的执行上下文,相当于是 Java 的线程池,Java 的线程池主要有以下两类:

  • ThreadPool:所有线程共用一个任务队列,当线程空闲时,从队列中取一个任务执行。
  • ForkJoinPool:每个线程各有一个任务队列,当线程空闲时,从其他线程的任务队列中取一批任务放进自己的队列中执行。

  对于少量任务,这两个池子没啥区别,只是 ThreadPool 在某些情况下会死锁,比如在一个并行度为 2 (最多两个线程)的 ThreadPool 中执行两个线程,两个线程又分别提交一个子任务,并等到子任务执行完才退出,这时会触发相互等待的死锁条件,因为没有多余的空闲线程来执行子任务,而 ForkJoinPool 中每个线程产生的子任务会放在自己的任务队列中,ForkJoinPool 可以在线程耗尽时额外创建线程,也可以挂起当前任务,执行子任务,从而防止死锁。对于大量任务,ForkJoinPool 中的空闲线程会从其他线程的任务队列中一批一批的取任务执行,所以一般会更快,当然若各个任务执行时间比较均衡,则 ThreadPool 会更快。

  根据线程池创建的参数不同,Executors 中提供了 5 种线程池:newSingleThreadExecutor(单线程线程池,可保证任务执行顺序),newFixedThreadPool(固定大小线程池,限制并行度),newCachedThreadPool(无限大小线程池,任务执行时间小采用),newScheduledThreadPool(同样无限大小,用来处理延时或定时任务),newWorkStealingPool(ForkJoinPool 线程池)。前四种都属于 ThreadPool,根据阿里的 Java 的编程规范,不推荐直接使用 Executors 创建线程池,不过对于计算密集型任务,一般使用 newFixedThreadPool 或 newWorkStealingPool 即可,线程数设置当前 CPU 数即可(Runtime.getRuntime.availableProcessors()),多了反而增加线程上下文切换次数,对CPU 的利用率不增反减。

  Scala 提供了一个默认的 ExecutionContext:scala.concurrent.ExecutionContext.Implicits.global,其本质也是一个 ForkJoinPool,并行度默认设置为当前可用 CPU 数,当然也会根据需要(比如当前全部线程被阻塞)额外创建更多线程。一般做计算密集型任务就用默认线程池即可,特殊情况也可以自己创建 ExecutionContext.fromExecutor(Executors.newFixedThreadPool(8)),下面的代码就可以创建一个同步阻塞的 ExecutionContext:

1
2
3
4
5
val currentThreadExecutionContext = ExecutionContext.fromExecutor(
new Executor {
// Do not do this!
def execute(runnable: Runnable) { runnable.run() }
})

原因是 runnable.run() 并不会新开一个线程,而是直接在主线程上执行,和调用普通函数一样。

Future 篇

  先上一个简单的 Future 并发编程 Demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
////import scala.concurrent.ExecutionContext.Implicits.global
//val pool = Executors.newFixedThreadPool(Runtime.getRuntime.availableProcessors())
val pool = Executors.newWorkStealingPool()
implicit val ec = ExecutionContext.fromExecutorService(pool)

val futures = Array.range(0, 10000).map(i => Future {
println(i)
Thread.sleep(100)
i
})

val futureSequence = Future.sequence(futures)
futureSequence.onComplete({
case Success(results) => {
println(results.mkString("Array(", ", ", ")"))
println(s"Success")

ec.shutdown()
pool.shutdownNow()
}
case Failure(e) => println(s"Error processing future operations, error = ${e.getMessage}")
})
Await.result(futureSequence, Duration.Inf)

  如果计算机 CPU 核数为 8 核,则程序运行成功后将会从 VisualVM 中看到有 8 个线程数在运行,控制台中会每次打印 8 条记录,最后打印出完整数组。

  onComplete 是 Future 的回调函数,可对 Success 和 Failure 分别处理,Await 是为了阻塞主线程,当 futureSequence 执行完成后,才继续执行下面的任务。当然,主线程的阻塞也可以使用 Java 中的 CountDownLatch 来实现,只需要在每个 Future 执行完成后调用一次 countDown() 即可,或者直接在 onComplete 的回调函数中调用一次也行。(题外话:CountDownLatch 和 Golang 中的 sync.WaitGroup 感觉区别不大)。

  如果不想让程序并发执行,则将 Future.sequence(futures) 改为 Future.traverse(futures)(x => x) 即可,此时就会一条条打印,但不保证打印顺序与数组一致。

  如果使用 ExecutionContext.Implicits.global,并将上面创建 futures 的代码改为:

1
2
3
4
5
6
7
val futures = Array.range(0, 10000).map(i => Future {
blocking {
println(i)
Thread.sleep(100)
i
}
})

  则控制台会马上将数组全部打印出来,从 VisualVM 中看会有非常多的线程在运行,远远超过 8 个,这是因为 ForkJoinPool 检测到当前线程以全部阻塞,所以需要另开线程继续执行,如果将线程池改为 Executors.newFixedThreadPool(8),则不会马上将数组全部打印,而是恢复原样,每次打印 8 条。blocking 需要慎用,如果 ForkJoinPool 中线程数太多,同样会 OOM,一般在大量运行时间短内存小的并发任务中使用。


  Parallel Collections 并发编程就很简单了,demo 如下:

1
2
3
4
Array.range(0, 10000).par.foreach(i => {
println(i)
Thread.sleep(100)
})

  关键字为 par,调用该方法即可轻松进行并发计算,不过需要注意的是并发操作的副作用(side-effects)和“乱序”(out of order)语义,副作用就是去写函数外的变量,不仅仅只读写并发操作函数内部声明的变量,乱序语义是指并发操作不会严格按照数组顺序执行,所以如果并发操作会同时操作两个数组元素(eg:reduce),则需要慎重使用,有的操作结果不变,而有的操作会导致结果不唯一。

经验篇

  Shaun 目前使用 Scala 进行多线程编程主要碰到过以下几个问题:

  • 数据竞争问题
  • 任务拆分问题
  • 内存占用问题

  数据竞争问题算是多线程编程中最常见的问题,简单来说就是两个线程同时写同一个变量,导致变量值不确定,引发后续问题,解决该问题有很多方法,性能由高到底有:Atomic,volatile,线程安全数据结构(eg:ConcurrentHashMap),Lock,synchronized,前两个方法性能最高,但局限性也很大,如果有现成的线程安全对象使用是最好的,没有的只能用 Lock 和 synchronized,这两种各有优缺点,synchronized 用法简单,能应付绝大部分问题,但对读也会加锁并且无法中断等待线程,Lock 是个接口,有比较多的派生对象(ReentrantLock,ReadWriteLock,ReentrantReadWriteLock 等),能更灵活的控制锁,不过使用起来相对复杂,需要显式地加锁解锁。

  任务拆分问题,这个问题发生在任务量非常多(千万级以上)的时候,当需要对千万级数据进行并发处理时,单纯的生成相应的千万级 Future 在默认的 ExecutionContext 中执行会比较慢,甚至出现程序运行一段时间卡一段时间的现象(可能是内存不足,GC 卡了),此时需要人为对千万级任务进行合并。Shaun 这里有两种方案:一种是使用 grouped 将千万级任务划分为 16 组,从而降级为 16 个任务,生成 16 个Future,这时执行速度会快很多,且不会有卡的现象出现;另一种方案就是,每次只生成 10 万个 Future 放进 ExecutionContext 中执行,如此将千万级任务拆分成每次 10 万并发执行,同样能解决问题。

  内存占用问题,这个问题发生在单个任务需要占用大量内存(1G 以上)的时候,当单个任务需要 1G 以上内存,8 个任务并行则需要 8G 以上内存,内存占用过高,提高 JVM 的内存,但也只是治标不治本。Shaun 的解决方案是对单个任务进行进一步拆分,将单个任务继续拆分为 16 个子任务,再将 16 个子任务的结果进行合并,作为单个大任务的结果,8 个大任务串行执行,如此内存占用极大减少,只需要单个任务的内存即可完成全部任务,且 CPU 利用率不变,执行速度甚至会更快(Full GC 次数变少)。


  Shaun 在写大文件的时候会用到 newSingleThreadExecutor 和 Future.traverse,将写文件的操作放在 Future 里面,每次只写一个大文件(不用多线程写是因为机械硬盘的顺序读写肯定比随机读写快),而生产大文件内容的操作由默认的 ExecutionContext 执行,从而使生产与消费互不干扰,写大文件操作不会阻塞生产操作。

  一个用 Future 实现的生产者消费者 demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
val poolProducer = Executors.newWorkStealingPool()
implicit val ecProducer = ExecutionContext.fromExecutorService(poolProducer)
val poolConsumer = Executors.newSingleThreadExecutor()
val ecConsumer = ExecutionContext.fromExecutorService(poolConsumer)

val futures = Array.range(0, 1000).map(i => Future {
val x = produce(i) // produce something...
x
}(ecProducer).andThen { case Success(x) =>
consume(x) // consume something...
}(ecConsumer))

val futureSequence = Future.sequence(futures)
futureSequence.onComplete({
case Success(results) => {
println("Success.")

ecProducer.shutdown()
poolProducer.shutdownNow()
ecConsumer.shutdown()
poolConsumer.shutdownNow()
}
case Failure(e) => println(s"Error processing future operations, error = ${e.getMessage}")
})
Await.result(futureSequence, Duration.Inf)

后记

  Shaun 这里写的 Scala 多线程编程主要是针对计算密集型任务,而 IO 密集型任务一般会用专门的一些框架,计算密集型考虑的是如何最大化利用 CPU,加快任务执行速度,线程数一般比较固定。Scala 的 Future 多线程编程相比 Java 的多线程编程要简洁了很多,唯一需要控制的就是并行度和任务拆分,Shaun 自己在用时也对 Future 做了简单封装,进一步简化了 Scala 的多线程编程,对 Iterable 的并发计算会更方便。

参考资料

[1] Futures and Promises

[2] scala.concurrent.blocking - what does it actually do?

[3] Parallel Collections

[4] Java并发编程:Lock