异步请求

Web MVC 异步请求集成,如下:

DeferredResult

只要 HTTP 处理器返回了 DeferredResult 对象就会进入异步处理状态。例如:

@GetMapping("/quotes")
@ResponseBody
public DeferredResult<String> quotes() {
  DeferredResult<String> deferredResult = new DeferredResult<>();
  // 其他地方引用 deferredResult 变量.
  return deferredResult;
}

// 或者其他方式的处理器
public class MyHttpRequestHandler implements HttpRequestHandler {

  @Nullable
  @Override
  public DeferredResult<String> handleRequest(RequestContext request) throws Throwable {
    DeferredResult<String> deferredResult = new DeferredResult<>();
    // 其他地方引用 deferredResult 变量.
    return deferredResult;
  }

}

// 从其他线程设置返回结果...
deferredResult.setResult(result);

任何类型的处理器都可处理这种类型 例如 HttpRequestHandler

控制器可以异步地产生返回值,从不同的线程 — 例如,响应外部事件(JMS消息)、计划任务或其他事件。

Callable

HTTP 处理器也可以使用 java.util.concurrent.Callable 包装任何支持的返回值,如下例所示:

@PostMapping
public Callable<String> processUpload(MultipartFile file) {
  return () -> "someView";
}

// 或者其他方式的处理器
public class MyHttpRequestHandler implements HttpRequestHandler {

  @Nullable
  @Override
  public Callable<String> handleRequest(RequestContext request) throws Throwable {
    MultipartFile file = request.getMultipartRequest().getFile("file");
    // ...
    return () -> "someView";
  }

}

返回值可以通过运行给定任务通过 配置好的 AsyncTaskExecutor 来获取。

异步请求处理

DeferredResult 处理流程如下:

  • HTTP 处理器返回一个 DeferredResult,并将其保存在一些内存队列或列表中,以便可以访问。

  • Web MVC 调用 AsyncWebRequest.startAsync()

  • 与此同时,DispatcherHandler 退出当前IO线程,但响应通道保持打开状态。

  • 应用程序从某个线程设置 DeferredResult,Web MVC 将请求重新派发到底层 HTTP 引擎。

  • DispatcherHandler#handleConcurrentResult 将会被调用,并且处理异步产生的返回值。

Callable 处理流程如下:

  • HTTP 处理器返回一个 Callable

  • Web MVC 调用 AsyncWebRequest.startAsync() 并提交 Callable 到一个业务线程池 AsyncTaskExecutor

  • 与此同时,DispatcherHandler 退出当前IO线程,但响应通道保持打开状态。

  • 最终 Callable 产生一个结果(返回值)DispatcherHandler#handleConcurrentResult 将会被调用,并且处理异步产生的返回值。

异常处理

当您使用 DeferredResult 时,您可以选择调用 setResult 或使用异常调用 setErrorResult。 在这两种情况下,Web MVC 都会继续处理该请求,它被视为 HTTP 处理器返回了给定的值,或者被视为它产生了给定的异常。 然后,该异常会通过常规的异常处理机制(例如,调用 @ExceptionHandler 方法)进行处理。

当您使用 Callable 时,也会有类似的处理逻辑,主要的区别在于结果是由 Callable 返回的,或者由它引发的异常。

异步拦截

HandlerInterceptor 实现还可以注册一个 CallableProcessingInterceptor 或一个 DeferredResultProcessingInterceptor,以便更深入地集成到异步请求的生命周期中(例如,处理超时事件)。

DeferredResult 提供了 onTimeout(Runnable)onCompletion(Runnable) 回调。 详见 javadoc of DeferredResult 以获取更多详细信息。 Callable 可以替换为 WebAsyncTask,后者公开了额外的超时和完成回调方法。

HTTP Streaming

您可以使用 DeferredResultCallable 来获取单个异步返回值。 如果您想要生成多个异步结果,并将这些值写入响应中,该怎么办呢?本节描述了如何实现这一点。

流对象

你可以使用 ResponseBodyEmitter 产生流式响应,每个对象都会被一个 HttpMessageConverter 序列化,然后 写入 HTTP 连接。示例如下:

@GetMapping("/events")
public ResponseBodyEmitter handle() {
  ResponseBodyEmitter emitter = new ResponseBodyEmitter();
  // 其他地方引用 emitter 变量.
  return emitter;
}

// 或者其他方式的处理器
public class MyHttpRequestHandler implements HttpRequestHandler {

  @Nullable
  @Override
  public ResponseBodyEmitter handleRequest(RequestContext request) throws Throwable {
    ResponseBodyEmitter emitter = new ResponseBodyEmitter();
    // 其他地方引用 emitter 变量.
    return emitter;
  }

}

// 在其他线程(业务线程)
emitter.send("Hello 1");

// 下一次响应
emitter.send("Hello again");

// 任务处理完成
emitter.complete();

也可以使用 ResponseBodyEmitter 作为 ResponseEntity 的 body,允许自定义响应的状态和响应头信息。

当一个 emitter 抛出一个 IOException(例如,如果远程客户端断开了连接),应用程序不需要负责清理连接, 也不应该调用 emitter.completeemitter.completeWithError。底层会自动处理。

SSE

SseEmitterResponseBodyEmitter 的子类,提供了 Server-Sent Events 支持。服务器发出来的数据格式遵循 W3C SSE 标准。

要通过 HTTP 处理器 产生一个 SSE 流,您可以返回一个 SseEmitter,如下例所示:

@GetMapping(path="/events", produces=MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter streamEvents() {
  SseEmitter emitter = new SseEmitter();
  // 在此处配置并使用 emitter 发送事件
  return emitter;
}

// 或者其他方式的处理器
public class MyHttpRequestHandler implements HttpRequestHandler {

  @Nullable
  @Override
  public SseEmitter handleRequest(RequestContext request) throws Throwable {
    SseEmitter emitter = new SseEmitter();
    // 其他地方引用 emitter 变量.
    return emitter;
  }
}

// 在其他线程(业务线程)
emitter.send("Hello 1");

// 下一次响应
emitter.send("Hello again");

// 任务处理完成
emitter.complete();

虽然 SSE 是向浏览器流式传输的主要选项,请注意有些浏览器并不支持(Internet Explorer)Server-Sent Events。 考虑使用 WebSocket 消息传递

另见 上一节 有关异常处理的说明。

Raw Data

有些场景需要直接写入 OutputStream (文件下载) 你就可以使用 StreamingResponseBody。如下例所示:

@GetMapping("/download")
public StreamingResponseBody handle() {
  return new StreamingResponseBody() {
    @Override
    public void writeTo(OutputStream outputStream) throws IOException {
      // write...
    }
  };
}

也可以使用 StreamingResponseBody 作为 ResponseEntity 的 body,允许自定义响应的状态和响应头信息。

Reactive Types

Web MVC 支持在控制器中使用响应式客户端库。

响应式返回值的处理方式如下:

  • 单值,类似于使用 DeferredResult。示例包括 Mono(Reactor)或 Single(RxJava)。

  • 多值流与流媒体类型(例如 application/x-ndjsontext/event-stream), 类似于使用 ResponseBodyEmitterSseEmitter。示例包括 Flux(Reactor)或 Observable(RxJava)。 应用程序还可以返回 Flux<ServerSentEvent>Observable<ServerSentEvent>

  • 任何其他媒体类型的多值流(例如 application/json)被适配,类似于使用 DeferredResult<List<?>>

Web MVC 通过 today-core 中的 ReactiveAdapterRegistry 支持 Reactor 和 RxJava,它允许从多种响应式库进行适配。

对于响应式背压的流式传输到响应中,虽然支持,但写入响应仍然是阻塞的,并且是通过 配置的 AsyncTaskExecutor 在单独的线程上运行,以避免阻塞上游源,例如从 WebClient 返回的 Flux

配置

Web MVC 为异步请求公开了几个选项。

Web MVC

MVC 配置为异步请求处理提供了以下选项:

  • Java 配置:使用 WebMvcConfigurer 上的 configureAsyncSupport 回调。

您可以配置以下内容:

  • 异步请求没有默认超时值,除非它被明确设置。

  • 用于阻塞写入时的 AsyncTaskExecutor,当使用 响应式类型 流式传输和执行来自控制器方法的 Callable 实例。默认使用的不适用于负载下的生产环境。

  • DeferredResultProcessingInterceptor 实现和 CallableProcessingInterceptor 实现。

请注意,您也可以在 DeferredResultResponseBodyEmitterSseEmitter 上设置默认超时值。 对于 Callable,您可以使用 WebAsyncTask 提供超时值。