在任何并发性应用程序中,异步事件处理都至关重要。无论事件的来源是什么(不同的计算任务、I/O 操作或与外部系统的交互),您的代码都必须跟踪事件,协调为响应它们而执行的操作。应用程序可以采用两种基本方法之一来实现异步事件处理:

  • 阻塞:一个等待事件的协调线程。
  • 非阻塞:事件向应用程序生成某种形式的通知,而没有线程显式等待它。

scala.concurrent.Promise 和 scala.concurrent.Future 类为 Scala 开发人员提供了一些与 Java 8 开发人员的 CompletableFuture 使用方式类似的选项。具体地讲,Future 同时提供了阻塞和非阻塞的事件完成方式。但是,尽管在此级别上很相似,但用于处理两种 future 的技术是不同的。

我们先来看一个并发任务设置:

在一个特定操作中,应用程序通常必须执行多个处理步骤。例如,在向用户返回结果之前,Web 应用程序可能需要:

  1. 在一个数据库中查找用户的信息
  2. 使用查找到的信息来执行 Web 服务调用,并执行另一次数据库查询。
  3. 根据从前两个操作中获得的结果来执行数据库更新。
    图 1 演示了这种结构类型。

图 1. 应用程序任务流

图 1 将处理过程分解为 4 个不同的任务,它们通过表示顺序依赖关系的箭头相连接。任务 1 可以直接执行,任务 2 和任务 3 都在任务 1 完成后执行,任务 4 在任务 2 和任务 3 都完成后执行。

在真实的系统中,异步事件的来源一般是并行计算或一种形式的 I/O 操作。但是,使用简单的时间延迟来建模这种系统会更容易一些,这也是这里所采用的方法。清单 1 显示了我用于生成事件的基本的赋时事件 (timed-event) 代码,这些事件采用了已完成的 Future 格式。

清单 1. 赋时事件代码

  1. import java.util.Timer
  2. import java.util.TimerTask
  3. import scala.concurrent._
  4. object TimedEvent {
  5. val timer = new Timer
  6. /** Return a Future which completes successfully with the supplied value after secs seconds. */
  7. def delayedSuccess[T](secs: Int, value: T): Future[T] = {
  8. val result = Promise[T]
  9. timer.schedule(new TimerTask() {
  10. def run() = {
  11. result.success(value)
  12. }
  13. }, secs * 1000)
  14. result.future
  15. }
  16. /** Return a Future which completes failing with an IllegalArgumentException after secs
  17. * seconds. */
  18. def delayedFailure(secs: Int, msg: String): Future[Int] = {
  19. val result = Promise[Int]
  20. timer.schedule(new TimerTask() {
  21. def run() = {
  22. result.failure(new IllegalArgumentException(msg))
  23. }
  24. }, secs * 1000)
  25. result.future
  26. }

清单 1 中的 Scala 代码使用一个 java.util.Timer 来安排 java.util.TimerTask 在一个延迟之后执行。每个 TimerTask 在运行时完成一个有关联的 future。delayedSuccess 函数定制了一个任务,在运行时成功完成一个 Scala Future[T],然后将该 future 返回给调用方。delayedSuccess 函数返回相同类型的 future,但使用了一个在完成 future 时发生 IllegalArgumentException 异常的失败任务。

清单 2 展示了如何使用 清单 1 中的代码创建 Future[Int] 格式的事件,使之与 图 1 中的 4 个任务相匹配。(此代码来自示例代码中的 AsyncHappy 类。)

清单 2. 示例任务的事件

  1. // task definitions
  2. def task1(input: Int) = TimedEvent.delayedSuccess(1, input + 1)
  3. def task2(input: Int) = TimedEvent.delayedSuccess(2, input + 2)
  4. def task3(input: Int) = TimedEvent.delayedSuccess(3, input + 3)
  5. def task4(input: Int) = TimedEvent.delayedSuccess(1, input + 4)

清单 2 中 4 个任务方法中的每一个都为该任务的完成时刻使用了特定的延迟值:task1 为 1 秒,task2 为 2 秒,task3 为 3 秒,task4 重新变为 1 秒。每个任务还接受一个输入值,是该输入加上任务编号作为 future 的(最终)结果值。这些方法都使用了 future 的成功形式;稍后您会看到一些使用失败形式的例子。

这些任务要求您按 图 1 中所示的顺序运行它们,向每个任务传递上一个任务返回的结果值(或者对于 task4,传递前两个任务结果的和)。如果中间两个任务同时执行,总的执行时间大约为 5 秒(1 秒 + (2 秒、3 秒中的最大值)+ 1 秒。如果 task1 的输入为 1,那么结果为 2。如果该结果被传递给 task2 和 task3,那么结果将为 4 和 5。如果这两个结果的和 (9) 被作为输入传递给 task4,那么最终结果将为 13。

在设定好操作环境之后,是时候来查看 Scala 如何处理事件的完成情况了。与上一期的 Java 代码中一样,协调 4 个任务的执行的最简单的方法是使用阻塞等待:主要线程等待每个任务依次完成。清单 3(同样来自示例代码中的 AsyncHappy 类)给出了此方法。

清单 3. 阻塞等待任务执行

  1. def runBlocking() = {
  2. val v1 = Await.result(task1(1), Duration.Inf)
  3. val future2 = task2(v1)
  4. val future3 = task3(v1)
  5. val v2 = Await.result(future2, Duration.Inf)
  6. val v3 = Await.result(future3, Duration.Inf)
  7. val v4 = Await.result(task4(v2 + v3), Duration.Inf)
  8. val result = Promise[Int]
  9. result.success(v4)
  10. result.future
  11. }

清单 3 使用 Scala scala.concurrent.Await 对象的 result() 方法来完成阻塞等待。该代码首先等待 task1 的结果,然后同时创建 task2 和 task3 future,并等待两个任务依次返回 future,最后等待 task4 的结果。最后 3 行(创建和设置 result)使得该方法能够返回一个 Future[Int]。返回该 future,让此方法与我接下来展示的非阻塞形式一致,但该 future 将在该方法返回之前完成。

清单 4(同样来自示例代码中的 AsyncHappy 类)展示了一种将 future 联系在一起的方式,以便按正确顺序并使用正确的依赖关系执行任务,而不使用阻塞。

清单 4. 使用 onSuccess() 处理事件的完成

  1. def runOnSuccess() = {
  2. val result = Promise[Int]
  3. task1(1).onSuccess(v => v match {
  4. case v1 => {
  5. val a = task2(v1)
  6. val b = task3(v1)
  7. a.onSuccess(v => v match {
  8. case v2 =>
  9. b.onSuccess(v => v match {
  10. case v3 => task4(v2 + v3).onSuccess(v4 => v4 match {
  11. case x => result.success(x)
  12. })
  13. })
  14. })
  15. }
  16. })
  17. result.future
  18. }

清单 4 代码使用 onSuccess() 方法将一个函数(技术上讲是一个部分函数,因为它仅处理成功完成的情况)设置为在每个 future 完成时返回。因为 onSuccess() 调用是嵌套式的,所以它们将按顺序执行(即使 future 未完全按顺序完成)。

清单 4 的代码比较容易理解,但很冗长。清单 5 展示了一种使用 flatMap() 方法处理这种情况的更简单的方法。

清单 5. 使用 flatMap() 处理事件的完成

  1. def runFlatMap() = {
  2. task1(1) flatMap {v1 =>
  3. val a = task2(v1)
  4. val b = task3(v1)
  5. a flatMap { v2 =>
  6. b flatMap { v3 => task4(v2 + v3) }}
  7. }
  8. }

清单 5 中的代码实际上执行了与 清单 4 相同的事情,但 清单 5 使用了 flatMap() 方法从每个 future 中提取单一结果值。使用 flatMap() 消除了 清单 4 中所需的 match / case 结构,提供了一种更简洁的格式,但采用了同样的逐步执行路线。

示例代码使用了一个 Scala App 来依次运行事件代码的每个版本,并确保完成事件(约 5 秒)和结果 (13) 是正确的。您可以使用 Maven 从命令行运行此代码,如清单 6 所示(删除了无关的 Maven 输出):

清单 6. 运行事件代码

  1. dennis@linux-9qea:~/devworks/scala4/code> mvn scala:run -Dlauncher=happypath
  2. ...
  3. [INFO] launcher 'happypath' selected => com.sosnoski.concur.article4.AsyncHappy
  4. Starting runBlocking
  5. runBlocking returned 13 in 5029 ms.
  6. Starting runOnSuccess
  7. runOnSuccess returned 13 in 5011 ms.
  8. Starting runFlatMap
  9. runFlatMap returned 13 in 5002 ms.

目前为止,您看到了以 future 形式协调事件的代码,这些代码总是能够成功完成。在真实应用程序中,不能寄希望于事情总是这么顺利。处理任务过程中可能会出现问题,而且在 JVM 语言术语中,这些问题通常表示为 Throwable。

更改 清单 2 中的任务定义很容易,只需使用 delayedFailure() 代替 delayedSuccess() 方法,如这里的 task4 所示:

def task4(input: Int) = TimedEvent.delayedFailure(1, “This won’t work!”)

如果运行仅将 task4 修改为完成时抛出异常的 清单 3,那么您会得到 task4 上的 Await.result() 调用所抛出的预期的 IllegalArgumentException。如果在 runBlocking() 方法中没有捕获该问题,该异常会在调用链中一直传递,直到最终捕获问题(如果未捕获问题,则会终止线程)。幸运的是,修改该代码很容易,因此,如果任何任务完成时抛出异常,该异常会通过返回的 future 传递给调用方来处理。清单 7 展示了这一更改。

清单 7. 具有异常的阻塞等待

  1. def runBlocking() = {
  2. val result = Promise[Int]
  3. try {
  4. val v1 = Await.result(task1(1), Duration.Inf)
  5. val future2 = task2(v1)
  6. val future3 = task3(v1)
  7. val v2 = Await.result(future2, Duration.Inf)
  8. val v3 = Await.result(future3, Duration.Inf)
  9. val v4 = Await.result(task4(v2 + v3), Duration.Inf)
  10. result.success(v4)
  11. } catch {
  12. case t: Throwable => result.failure(t)
  13. }
  14. result.future
  15. }

清单 7 非常浅显易懂,最初的代码包装在一个 try/catch 中,catch 在返回的 future 完成时传回异常。此方法稍微复杂一些,但任何 Scala 开发人员应该仍然很容易理解它。

那么,清单 4 和清单 5 中的事件处理代码的非阻塞变形是怎样的?从名称可以看出,清单 4 中使用的 onSuccess() 方法仅 适用于 future 的成功完成类型。如果想要同时处理成功和失败完成类型,则必须使用 onComplete() 方法,检查哪种完成例行适用。清单 8 展示了此技术如何用在事件处理代码中。

清单 8. 成功和失败的 onComplete() 处理

  1. def runOnComplete() = {
  2. val result = Promise[Int]
  3. task1(1).onComplete(v => v match {
  4. case Success(v1) => {
  5. val a = task2(v1)
  6. val b = task3(v1)
  7. a.onComplete(v => v match {
  8. case Success(v2) =>
  9. b.onComplete(v => v match {
  10. case Success(v3) => task4(v2 + v3).onComplete(v4 => v4 match {
  11. case Success(x) => result.success(x)
  12. case Failure(t) => result.failure(t)
  13. })
  14. case Failure(t) => result.failure(t)
  15. })
  16. case Failure(t) => result.failure(t)
  17. })
  18. }
  19. case Failure(t) => result.failure(t)
  20. })
  21. result.future
  22. }

清单 8 看起来很凌乱,幸运的是还有一种简单得多的替代方法:使用 清单 5 中的 flatMap() 代码代替。flatMap() 方法同时处理成功和失败完成类型,无需执行任何更改。

最新的 Scala 版本包含在编译期间使用宏 转换代码的能力。目前实现的一个最有用的宏是 async,它在编译期间将使用 future 的看似顺序的代码转换为异步代码。清单 9 展示了 async 如何简化本教程中使用的任务代码。

清单 9. 结合使用 future 与 async

  1. def runAsync(): Future[Int] = {
  2. async {
  3. val v1 = await(task1(1))
  4. val a = task2(v1)
  5. val b = task3(v1)
  6. await(task4(await(a) + await(b)))
  7. }
  8. }

清单 9 中封装的 async 调用了 async 宏。此调用将该代码块声明为异步执行的代码,并在默认情况下异步执行它,然后返回一个 future 表示该代码块的执行结果。在该代码块中,await() 方法(实际上是该宏的一个关键字,而不是一个真正的方法)显示了何处需要一个 future 的结果。async 宏在编译期间修改了 Scala 程序的抽象语法树 (AST),以便将该代码块转换为使用回调的代码,这大体相当于 清单 4 的代码。

除了 async 包装器之外,清单 9 中的代码还与 清单 3 中最初的阻塞代码很相似。这主要是这个宏的成就,它抽象化了异步事件的所有复杂性,使它看起来像您在编写简单的线性代码。在幕后,这涉及到大量复杂性。

如果查看 Scala 编译器从源代码生成的类,就会看到一些具有类似 AsyncHappy$$anonfun$1.class 的名称的内部类。从名称可以猜到,这些类由编译器为异步函数而生成(比如传递给 onSuccess() 或 flatMap() 方法的语句。)

使用 Scala 2.11.1 编译器和 Async 0.9.2 实现,您还会看到一个名为 AsyncUnhappy$stateMachine$macro$1$1.class 的类。这是 async 宏生成的实际实现代码,采用状态机的形式来处理异步任务。清单 10 给出了此类的一个部分地方进行了反编译(decompiled)的视图。

清单 10. 反编译后的 AsyncUnhappy$stateMachine$macro$1$1.class

  1. public class AsyncUnhappy$stateMachine$macro$1$1
  2. implements Function1<Try<Object>, BoxedUnit>, Function0.mcV.sp
  3. {
  4. private int state;
  5. private final Promise<Object> result;
  6. private int await$macro$3$macro$13;
  7. private int await$macro$7$macro$14;
  8. private int await$macro$5$macro$15;
  9. private int await$macro$11$macro$16;
  10. ...
  11. public void resume() {
  12. ...
  13. }
  14. public void apply(Try<Object> tr) {
  15. int i = this.state;
  16. switch (i) {
  17. default:
  18. throw new MatchError(BoxesRunTime.boxToInteger(i));
  19. case 3:
  20. if (tr.isFailure()) {
  21. result().complete(tr);
  22. } else {
  23. this.await$macro$11$macro$16 = BoxesRunTime.unboxToInt(tr.get());
  24. this.state = 4;
  25. resume();
  26. }
  27. break;
  28. case 2:
  29. if (tr.isFailure()) {
  30. result().complete(tr);
  31. } else {
  32. this.await$macro$7$macro$14 = BoxesRunTime.unboxToInt(tr.get());
  33. this.state = 3;
  34. resume();
  35. }
  36. break;
  37. case 1:
  38. if (tr.isFailure()) {
  39. result().complete(tr);
  40. } else {
  41. this.await$macro$5$macro$15 = BoxesRunTime.unboxToInt(tr.get());
  42. this.state = 2;
  43. resume();
  44. }
  45. break;
  46. case 0:
  47. if (tr.isFailure()) {
  48. result().complete(tr);
  49. } else {
  50. this.await$macro$3$macro$13 = BoxesRunTime.unboxToInt(tr.get());
  51. this.state = 1;
  52. resume();
  53. }
  54. break;
  55. }
  56. }
  57. ...
  58. }

清单 10 中的 apply() 方法处理实际的状态更改,估算一个 future 的结果并将输出状态更改为匹配。输入状态会告诉该代码正在估算哪个 future;每个状态值对应于 async 代码块中一个特定的 future。从 清单 10 的部分代码很难了解这一点,但查看其他一些字节码,就可以看到状态代码是与任务匹配的,所以状态 0 表示 task1 的结果符合预期,状态 1 表示 task2 的结果符合预期,依此类推。

resume() 方法并未显示在 清单 10 中,因为反编译器无法确定如何将它转换为 Java 代码。我也不打算探讨这个过程,但通过查看字节码,可以确定 resume() 方法执行了与状态代码上的 Java switch 相似的工作。对于每个非最终状态,resume() 执行适当的代码段来设置下一个预期的 future,最终将 AsyncUnhappy$stateMachine$macro$1$1 实例设置为 future 的 onComplete() 方法的目标。对于最终状态,resume() 将会设置结果值并履行对最终结果的承诺。

您实际上并不需要深入分析生成的代码来理解 async(但它可能很有趣)。关于 async 工作原理的完整描述,请查阅 SIP-22 – Async 提案。

async 限制
由于 async 宏将代码转换为状态机类的方式,该宏的使用有一些限制。最明显的限制是,不能将 await() 嵌套在 async 代码块中的另一个对象或闭包内(包括一个函数定义)。也不能将 await() 嵌套在一个 try 或 catch 内。

除了这些使用限制之外,async 的最大问题是:在调试时,您同样会体验到一些通常与异步代码有关的问题回调,在这种情况下,需要尝试理解没有反映明显的代码结构的调用堆栈。不幸的是,目前的调试器设计无法解决这些问题。这是 Scala 中一个新的工作区域(请参阅 反思调试器。)与此同时,您可以禁用 async 代码块的异步执行,让调试变得更轻松(假设您尝试修复的问题在按顺序执行操作时仍然存在)。

最后,Scala 宏仍是一项我们正在开展的工作。async 有望在未来的版本中成为 Scala 语言的一个正式部分,但只有在 Scala 语言团队对宏的工作方式感到满意时,这种情况才会出现。到那时,无法确保 async 的格式不会发生改变。

一些处理异步事件的 Scala 方法与 Java 代码存在很大的区别。借助 flatMap() 和 async 宏,Scala 提供了整洁而且容易理解的技术。async 特别有趣,您可以编写看似正常的顺序的代码,但编译的代码会并发地执行。Scala 不是提供这种方法的惟一语言,但基于宏的实现为其他方法提供了极高的灵活性。

本文转自:https://www.ibm.com/developerworks/cn/java/j-jvmc4/index.html

版权声明:本文为listenfwind原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/listenfwind/p/9911934.html