Java 异步编程:Loom 项目介绍
发布于 1 个月前 作者 yan 31 次浏览

(给ImportNew加星标,提高Java技能)

编译:ImportNew/覃佑桦

www.javacodegeeks.com/2019/12/project-loom.html

为什么会启动 Loom 项目?

Java 8 Stream 一个主要目标是并发编程。在 Stream Pipeline 上指定要完成的工作,任务会自动分配给可用处理器:

var result = myData
 .parallelStream()
 .map(someBusyOperation)
 .reduce(someAssociativeBinOp)
 .orElse(someDefault); 

如果数据结构可以拆分为多个部分,并行流会让处理器一直忙碌表现优异。这就是它的设计目标。

但如果工作负载中大多数是阻塞任务,那么也会无能为力。典型的比如 Web 应用程序,可以处理许多请求,每个请求都将耗费大量时间等待 REST 服务、数据库查询结果等等。

1998年,Sun Java Web Server(Tomcat的前身)实现了在单独的线程而非操作系统进程中处理请求,令人赞叹。这样就能为数以千计的并发请求服务!时至今日,这种技术已司空见惯。每个线程会占用大量内存,通常服务器上无法承载数百万个线程。

这就是为什么如今服务器端编程的口号是:“永不阻塞!”与之相反,你需要做的就是指定数据一旦可用时应该做什么。

这种异步编程风格非常适合服务器,可以轻松支持百万级并发请求。但这对于程序员而言并不是那么友好。

下面展示了 HttpClient API 异步请求:

HttpClient.newBuilder()
  .build()
  .sendAsync(request, HttpResponse.BodyHandlers.ofString())
  .thenAccept(response -> ...);
  .thenApply(...);
  .exceptionally(...); 

之前用语句实现的功能现在被封装成了方法调用。假如你喜欢这种编程风格就不会用 Loom,直接用 Lisp 愉快地开发就好了。

像 JavaScript 和 Kotlin 这样的语言提供了“async”方法。可以在这些方法中编写语句,然后转换成刚才看到的方法调用。虽然这样很好,但意味着现在有两种方法:常规方法和转换方法。而且无法混合使用(“红药丸与蓝药丸”的分界线)。

Loom 项目从 Erlang 和 Go 语言中得到了启发。对这些编程语言阻塞并不是什么大问题。可以在“纤程(fiber)”、“轻量级线程”或者“虚拟线程”中执行任务。虽然命名尚在讨论中,但我更喜欢“纤程”。因为它很好地体现了多个纤程在一个负载线程中执行的事实。当发生阻塞操作时,纤程会发生阻塞(park),例如等待锁释放(lock)或者 I/O 完成。阻塞的开销相对较小。如果纤程大多数情况下处于阻塞状态,那么一个负载线程可以支持上千个纤程。

请记住,Loom 项目并不能解决所有并发问题。如果有大量计算任务并且让所有处理器内核忙碌起来,Loom 就无能为力了。对于单个线程的用户界面也没有什么帮助,比如序列化非线程安全的数据结构。这种情况应该继续使用 AsyncTask、SwingWorker、JavaFX Task。如果有许多任务大部分时间处于阻塞时,Loom 项目很有用。

注意: 如果你很早以前就开始 Java 开发,可能还记得早期 Java 版本提供了映射操作系统线程的“绿色线程(Green Thread)”。但这里有一个关键区别。当绿色线程阻塞时,其负载线程也发生阻塞,进而同一个负载线程中所有其他绿色线程无法继续工作。

理出头绪

在这一点上,Loom 项目仍处于探索阶段。API 会持续修改。开发时,请准备用最新的 API 版本。

可以从http://jdk.java.net/loom/上下载二进制文件,这些文件不会频繁更新。在 Linux 机器或虚拟机上,可以很方便地编译最新版本:

git clone https://github.com/openjdk/loom
cd loom
git checkout fibers
sh configure
make images 

根据安装内容,可能会在 configure 过程中报告错误,可以根据提示安装必要的软件包。

在当前版本的 API 中,纤程被称为虚拟线程(Virtual Thread)用 Thread对象表示。有三种方法可以创建纤程。首先,提供了一个新的工厂方法创建操作系统线程或虚拟线程:

Thread thread = Thread.newThread(taskname, Thread.VIRTUAL, runnable); 

如果需要自定义,可以用 builder API:

Thread thread = Thread.builder()
  .name(taskname)
  .virtual()
  .priority(Thread.MAX_PRIORITY)
  .task(runnable)
  .build(); 

然而手工创建线程一直被认为是不好的实践,因此不推荐这么做。可以将 executor 与线程工厂一起使用:

ThreadFactory factory = Thread.builder().virtual().factory();
ExecutorService exec = Executors.newFixedThreadPool(NTASKS, factory); 

这样,固定线程池(FixedThreadPool)会按照熟悉的方式通过 factory 调度虚拟线程。当然,会由操作系统级得负载线程负责运行这些虚拟线程,但这是虚拟线程的内部实现。

固定线程池会限制并发虚拟线程的总数。默认通过 ForkJoinPool 将虚拟线程映射到负载线程,利用的内核数由系统属性 jdk.defaultScheduler.parallelism 或者 Runtime.getRuntime().availableProcessors() 指定。可以在线程工厂中自行提供调度程序(scheduler):

factory = Thread.builder().virtual().scheduler(myExecutor).factory(); 

我不知道这是不是人们想要的,为什么负载线程比 CPU 核心多?

回到 executor 服务。在虚拟线程上可以像操作系统线程一样执行任务:

for (int i = 1; i <= NTASKS; i++) {
  String taskname = "task-" + i;
  exec.submit(() -> run(taskname));
}
exec.shutdown();
exec.awaitTermination(delay, TimeUnit.MILLISECONDS); 

在每个任务中休眠,执行简单的测试。

public static int DELAY = 10_000;

 public static void run(Object obj) {
    try {
       Thread.sleep((int) (DELAY * Math.random()));
    } catch (InterruptedException ex) {
       ex.printStackTrace();
    }
    System.out.println(obj);
 } 

如果把将 NTASKS 设为 1_000_000 并在 factory builder 中把 .virtual()注释掉,程序会报告内存不足错误然后退出。一百万个操作系统线程会占用大量内存。但是使用虚拟线程,可以正常运行。

在我的电脑上,用之前构建的 Loom 版本可以正常工作。不幸的是,2019年12月5日下载构建版本发生了 core dump。使用 Loom 时,这种情况时有发生。希望在你使用时这个问题已经得到解决。

现在考虑更复杂的工作。Heinz Kabutz 最近发布了一个益智游戏,该程序可加载数千个呆伯特卡通图片。在网站上,每天都有一个页面,比如https://dilbert.com/strip/2011-06-05。程序会读取这些页面,在每个页面中找到卡通图像的 URL,然后加载这幅图像。整个工作是一个Completable Future,类似:

CompletableFuture
.completedFuture(getUrlForDate(date))
.thenComposeAsync(this::readPage, executor)
.thenApply(this::getImageUrl)
.thenComposeAsync(this::readPage)
.thenAccept(this::process); 

使用纤程后,代码更清晰:

exec.submit(() -> {
  String page = new String(readPage(getUrlForDate(date)));
  byte[] image = readPage(getImageUrl(page));
  process(image);
}); 

每个 readPage 都是阻塞调用,但使用纤程不会有问题。

接下来在你关心的工作上进行尝试。读取大量网页、处理、进行多个阻塞式读取,享受纤程阻塞处理开销低这一优点。

结构化并发

Loom 项目最初的动机是实现纤程,但2019年早些时候,项目着手开发一套针对结构化并发的实验性 API。在这篇文章中,Nathaniel Smith 提出了结构化并发的形式。这是他的核心论点。在新线程中启动任务实际上并不比用 GOTO 好,实际上反而有害:

new Thread(runnable).start(); 

假如多个线程没有协调就会出现面条式代码。1960年代,结构化编程用循环和函数取代了 goto:

现在是时候实现结构化并发了。启动并发任务时,通过阅读程序代码,我们应该知道何时能全部完成。

这样就能控制任务使用的资源。

到2019年夏天,Loom 项目实现了一个用于结构化并发的 API。不幸的是,由于最近正在统一线程和纤程 API,目前还处于混乱状态。可以尝试一下原型实现 http://jdk.java.net/loom/。

下面的代码调度了许多任务:

FiberScope scope = FiberScope.open();
for (int i = 0; i < NTASKS; i++) {
  scope.schedule(() -> run(i));
}
scope.close(); 

Scope.close() 会一直阻塞直到所有纤程执行结束。请记住,纤程阻塞不会有问题。一旦 scope 结束,就可以确定纤程执行完毕。

FiberScope 是 AutoCloseable,因此可以使try-with-resources 语句:

try (var scope = FiberScope.open()) {
  ...
} 

但如果其中一项任务没有完成怎么办?

可以创建带有截止期(Instant)或超时(Duration) scope:

try (var scope = FiberScope.open(Instant.now().plusSeconds(30))) {
  for (...)
     scope.schedule(...);
} 

到达截止期或发生超时后,尚未完成的所有纤程都会被取消。该怎么做?请继续阅读。

取消(Cancellation)

取消操作一直是 Java 的痛点。按惯例,可以通过中断(Interrupt)取消线程。如果线程阻塞,那么阻塞的操作会抛出 InterruptedException 结束执行。否则,会设置中断状态标志。要做到检查无误通常是一件很枯燥的事情。重置中断状态,或者利用 InterruptedException 受检异常并没有什么帮助。

Java.util.concurrent 中的取消处理一直没有做到一致。考虑 ExecutorService.invokeAny 是否可以。如果某个任务得到执行结果,则其他任务将被取消。但 CompletableFuture.anyOf 允许所有任务执行完成,尽管结果会被忽略。

Loom API 在2019年夏天解决了取消问题。在这个版本中,纤程提供了类似 interrupt 的 cancel 操作。取消操作不可撤销。如果当前纤程已被取消,Fiber.cancelled 静态方法返回 true。

Scope 发生超时,其纤程也会取消。

在 FiberScope 构造函数中使用下列选项可以控制取消操作:

  • CANCEL_AT_CLOSE:关闭 scope 将取消所有加入调度的纤程
  • PROPAGATE_CANCEL:如果取消当前拥有的纤程,那么所有新调度的纤程都会自动取消
  • IGNORE_CANCEL:已调度的纤程不能取消

所有这些选项都可以在顶层重置。PROPAGATE_CANCEL 和 IGNORE_CANCEL 选项从父 scope 继承。

正如你看到的,这里的可调整性很大。重新审视这个问题,再看看会有什么结果。对于结构化并发,当 scope 超时或被强制关闭时,必须自动取消 scope 中的所有纤程。

Thread Local

让我吃惊的是,Loom 实现者的痛点之一是 ThreadLocal 变量以及更加深奥的 AccessControlContext 上下文类加载器。我不知道在线程上还有这么多相关实现。

如果数据结构不适合并发访问,可以每个线程使用一个实例。SimpleDateFormat 就是一个经典示例。当然可以不断创建新的 formatter 对象,但这并不高效。所以最好是共享一个对象。但一个全局对象

public static final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd"); 

无法正常工作。如果两个线程同时访问,可能造成格式混乱。

因此,应该每个线程包含一个对象:

public static final ThreadLocal<SimpleDateFormat> dateFormat
  = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd")); 

要访问实际的 formatter,可以调用

String dateStamp = dateFormat.get().format(new Date()); 

在线程中第一次调用 get 时,将调用构造函数中的 lambda。接着,get 方法会返回当前线程的 formatter 实例。

对于线程,这是一种公认的做法。但是,如果真的有一百万个纤程,是否真的要提供一百万个实例?

对我来说不是问题,使用 java.time formatter 这样线程安全的结构更容易。但是 Loom 项目一直在评估“scope local”对象:即那些被 FiberScope 重新激活的对象。

当线程与处理器数量一样多时,ThreadLocal 也可以近似看做处理器局部变量。用模拟用户实际意图的 API 可以支持此功能。

项目状态

想要使用 Loom 项目的开发者会重点关注 API。但正如你看到的,API尚未完工。许多工作都在 API 背后进行。

一个关键是在操作阻塞时让纤程阻塞(park)。网络连接相关工作已经完成,因此可以在纤程中连接网站、数据库等。本地文件操作阻塞时,尚不支持阻塞。

实际上,JDK 11、12 和 13 中已经重新实现了这些库,可以认为这时频繁发布新版本来带的礼物。

目前尚不支持在 monitor 阻塞(synchronized 代码块和方法),但最终版本会支持。ReentrantLock 已支持。

如果纤程调用本机方法(native method)时阻塞,则会“钉住(pin)”线程,所有纤程都会受到影响。Loom 项目对此无能为力。

Method.invoke 需要更多的工作才能实现支持。

支持 debug 和 monitor 的相关工作正在进行中。

正如前面讨论的那样,稳定性仍然是一个问题。

最重要的是,在性能方面还有很长一段路要走。纤程的阻塞与取消阻塞也不是免费午餐。每次操作都需要替换运行时中的一部分堆栈。

所有这些方面都取得了很大进展,让我们回顾一下开发人员关心的 API。现在是查看 Loom 项目并考虑如何利用它的好时机。

用同一个类表示线程与纤程对你有价值吗?还是宁愿在将来把 Thread 上的一些包袱丢掉?你是否认同结构化并发的承诺?

尝试一下 Loom 项目,看看如何与应用程序和框架一起工作,并为勇敢无畏的开发团队提供反馈!

推荐阅读  点击标题可跳转 Java 异步编程最佳实践

你可能不知道的 IDEA 高级调试技巧

比 Redis 快 5 倍的中间件,为啥这么快?

看完本文有收获?请转发分享给更多人

关注「ImportNew」,提升Java技能

好文章,我在看❤️

回到顶部