异步请求
DeferredResult
只要 HTTP 处理器返回了 DeferredResult 对象就会进入异步处理状态。例如:
@GetMapping("/quotes")
@ResponseBody
public DeferredResult<String> quotes() {
DeferredResult<String> deferredResult = new DeferredResult<>();
// 其他地方引用 deferredResult 变量.
return deferredResult;
}
// 或者其他方式的处理器
public class MyHttpRequestHandler implements HttpRequestHandler {
@Nullable
@Override
public DeferredResult<String> handleRequest(RequestContext request) throws Throwable {
DeferredResult<String> deferredResult = new DeferredResult<>();
// 其他地方引用 deferredResult 变量.
return deferredResult;
}
}
// 从其他线程设置返回结果...
deferredResult.setResult(result);
任何类型的处理器都可处理这种类型 例如 HttpRequestHandler。
控制器可以异步地产生返回值,从不同的线程 — 例如,响应外部事件(JMS消息)、计划任务或其他事件。
Callable
HTTP 处理器也可以使用 java.util.concurrent.Callable 包装任何支持的返回值,如下例所示:
@PostMapping
public Callable<String> processUpload(MultipartFile file) {
return () -> "someView";
}
// 或者其他方式的处理器
public class MyHttpRequestHandler implements HttpRequestHandler {
@Nullable
@Override
public Callable<String> handleRequest(RequestContext request) throws Throwable {
MultipartFile file = request.getMultipartRequest().getFile("file");
// ...
return () -> "someView";
}
}
返回值可以通过运行给定任务通过
配置好的 AsyncTaskExecutor 来获取。
异步请求处理
DeferredResult 处理流程如下:
-
HTTP 处理器返回一个
DeferredResult,并将其保存在一些内存队列或列表中,以便可以访问。 -
Web MVC 调用
AsyncWebRequest.startAsync()。 -
与此同时,
DispatcherHandler退出当前IO线程,但响应通道保持打开状态。 -
应用程序从某个线程设置
DeferredResult,Web MVC 将请求重新派发到底层 HTTP 引擎。 -
DispatcherHandler#handleConcurrentResult将会被调用,并且处理异步产生的返回值。
Callable 处理流程如下:
-
HTTP 处理器返回一个
Callable。 -
Web MVC 调用
AsyncWebRequest.startAsync()并提交Callable到一个业务线程池AsyncTaskExecutor -
与此同时,
DispatcherHandler退出当前IO线程,但响应通道保持打开状态。 -
最终
Callable产生一个结果(返回值)DispatcherHandler#handleConcurrentResult将会被调用,并且处理异步产生的返回值。
异常处理
当您使用 DeferredResult 时,您可以选择调用 setResult 或使用异常调用 setErrorResult。
在这两种情况下,Web MVC 都会继续处理该请求,它被视为 HTTP 处理器返回了给定的值,或者被视为它产生了给定的异常。
然后,该异常会通过常规的异常处理机制(例如,调用 @ExceptionHandler 方法)进行处理。
当您使用 Callable 时,也会有类似的处理逻辑,主要的区别在于结果是由 Callable 返回的,或者由它引发的异常。
异步拦截
HandlerInterceptor 实现还可以注册一个 CallableProcessingInterceptor 或一个
DeferredResultProcessingInterceptor,以便更深入地集成到异步请求的生命周期中(例如,处理超时事件)。
DeferredResult 提供了 onTimeout(Runnable) 和 onCompletion(Runnable) 回调。
详见 javadoc of DeferredResult 以获取更多详细信息。
Callable 可以替换为 WebAsyncTask,后者公开了额外的超时和完成回调方法。
HTTP Streaming
您可以使用 DeferredResult 和 Callable 来获取单个异步返回值。
如果您想要生成多个异步结果,并将这些值写入响应中,该怎么办呢?本节描述了如何实现这一点。
流对象
你可以使用 ResponseBodyEmitter 产生流式响应,每个对象都会被一个
HttpMessageConverter 序列化,然后
写入 HTTP 连接。示例如下:
@GetMapping("/events")
public ResponseBodyEmitter handle() {
ResponseBodyEmitter emitter = ResponseBodyEmitter.forChunkedTransferEncoding();
// 其他地方引用 emitter 变量.
return emitter;
}
// 或者其他方式的处理器
public class MyHttpRequestHandler implements HttpRequestHandler {
@Nullable
@Override
public ResponseBodyEmitter handleRequest(RequestContext request) throws Throwable {
ResponseBodyEmitter emitter = ResponseBodyEmitter.forChunkedTransferEncoding();
// 其他地方引用 emitter 变量.
return emitter;
}
}
// 在其他线程(业务线程)
emitter.send("Hello 1");
// 下一次响应
emitter.send("Hello again");
// 任务处理完成
emitter.complete();
也可以使用 ResponseBodyEmitter 作为 ResponseEntity 的 body,允许自定义响应的状态和响应头信息。
当一个 emitter 抛出一个 IOException(例如,如果远程客户端断开了连接),应用程序不需要负责清理连接,
也不应该调用 emitter.complete 或 emitter.completeWithError。底层会自动处理。
SSE
SseEmitter 是 ResponseBodyEmitter 的子类,提供了 Server-Sent Events
支持。服务器发出来的数据格式遵循 W3C SSE 标准。
要通过 HTTP 处理器 产生一个 SSE 流,您可以返回一个 SseEmitter,如下例所示:
@GetMapping(path="/events", produces=MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter streamEvents() {
SseEmitter emitter = forServerSentEvents();
// 在此处配置并使用 emitter 发送事件
return emitter;
}
// 或者其他方式的处理器
public class MyHttpRequestHandler implements HttpRequestHandler {
@Nullable
@Override
public SseEmitter handleRequest(RequestContext request) throws Throwable {
SseEmitter emitter = forServerSentEvents();
// 其他地方引用 emitter 变量.
return emitter;
}
}
// 在其他线程(业务线程)
emitter.send("Hello 1");
// 下一次响应
emitter.send("Hello again");
// 任务处理完成
emitter.complete();
虽然 SSE 是向浏览器流式传输的主要选项,请注意有些浏览器并不支持(Internet Explorer)Server-Sent Events。 考虑使用 WebSocket 消息传递。
另见 上一节 有关异常处理的说明。
Raw Data
有些场景需要直接写入 OutputStream (文件下载) 你就可以使用 StreamingResponseBody。如下例所示:
@GetMapping("/download")
public StreamingResponseBody handle() {
return new StreamingResponseBody() {
@Override
public void writeTo(OutputStream outputStream) throws IOException {
// write...
}
};
}
也可以使用 StreamingResponseBody 作为 ResponseEntity 的 body,允许自定义响应的状态和响应头信息。
Reactive Types
Web MVC 支持在控制器中使用响应式客户端库。
响应式返回值的处理方式如下:
-
单值,类似于使用
DeferredResult。示例包括Mono(Reactor)或Single(RxJava)。 -
多值流与流媒体类型(例如
application/x-ndjson或text/event-stream), 类似于使用ResponseBodyEmitter或SseEmitter。示例包括Flux(Reactor)或Observable(RxJava)。 应用程序还可以返回Flux<ServerSentEvent>或Observable<ServerSentEvent>。 -
任何其他媒体类型的多值流(例如
application/json)被适配,类似于使用DeferredResult<List<?>>。
Web MVC 通过 today-core 中的 ReactiveAdapterRegistry
支持 Reactor 和 RxJava,它允许从多种响应式库进行适配。
|
对于响应式背压的流式传输到响应中,虽然支持,但写入响应仍然是阻塞的,并且是通过
配置的
AsyncTaskExecutor 在单独的线程上运行,以避免阻塞上游源,例如从 WebClient 返回的 Flux。
配置
Web MVC 为异步请求公开了几个选项。
Web MVC
MVC 配置为异步请求处理提供了以下选项:
-
Java 配置:使用
WebMvcConfigurer上的configureAsyncSupport回调。
您可以配置以下内容:
-
异步请求没有默认超时值,除非它被明确设置。
-
用于阻塞写入时的
AsyncTaskExecutor,当使用 响应式类型 流式传输和执行来自控制器方法的Callable实例。默认使用的不适用于负载下的生产环境。 -
DeferredResultProcessingInterceptor实现和CallableProcessingInterceptor实现。
请注意,您也可以在 DeferredResult、ResponseBodyEmitter 和 SseEmitter 上设置默认超时值。
对于 Callable,您可以使用 WebAsyncTask 提供超时值。