异步请求
DeferredResult
只要 HTTP 处理器返回了 DeferredResult
对象就会进入异步处理状态。例如:
@GetMapping("/quotes")
@ResponseBody
public DeferredResult<String> quotes() {
DeferredResult<String> deferredResult = new DeferredResult<>();
// 其他地方引用 deferredResult 变量.
return deferredResult;
}
// 或者其他方式的处理器
public class MyHttpRequestHandler implements HttpRequestHandler {
@Nullable
@Override
public DeferredResult<String> handleRequest(RequestContext request) throws Throwable {
DeferredResult<String> deferredResult = new DeferredResult<>();
// 其他地方引用 deferredResult 变量.
return deferredResult;
}
}
// 从其他线程设置返回结果...
deferredResult.setResult(result);
任何类型的处理器都可处理这种类型 例如 HttpRequestHandler
。
控制器可以异步地产生返回值,从不同的线程 — 例如,响应外部事件(JMS消息)、计划任务或其他事件。
Callable
HTTP 处理器也可以使用 java.util.concurrent.Callable
包装任何支持的返回值,如下例所示:
@PostMapping
public Callable<String> processUpload(MultipartFile file) {
return () -> "someView";
}
// 或者其他方式的处理器
public class MyHttpRequestHandler implements HttpRequestHandler {
@Nullable
@Override
public Callable<String> handleRequest(RequestContext request) throws Throwable {
MultipartFile file = request.getMultipartRequest().getFile("file");
// ...
return () -> "someView";
}
}
返回值可以通过运行给定任务通过
配置好的 AsyncTaskExecutor
来获取。
异步请求处理
DeferredResult
处理流程如下:
-
HTTP 处理器返回一个
DeferredResult
,并将其保存在一些内存队列或列表中,以便可以访问。 -
Web MVC 调用
AsyncWebRequest.startAsync()
。 -
与此同时,
DispatcherHandler
退出当前IO线程,但响应通道保持打开状态。 -
应用程序从某个线程设置
DeferredResult
,Web MVC 将请求重新派发到底层 HTTP 引擎。 -
DispatcherHandler#handleConcurrentResult
将会被调用,并且处理异步产生的返回值。
Callable
处理流程如下:
-
HTTP 处理器返回一个
Callable
。 -
Web MVC 调用
AsyncWebRequest.startAsync()
并提交Callable
到一个业务线程池AsyncTaskExecutor
-
与此同时,
DispatcherHandler
退出当前IO线程,但响应通道保持打开状态。 -
最终
Callable
产生一个结果(返回值)DispatcherHandler#handleConcurrentResult
将会被调用,并且处理异步产生的返回值。
异常处理
当您使用 DeferredResult
时,您可以选择调用 setResult
或使用异常调用 setErrorResult
。
在这两种情况下,Web MVC 都会继续处理该请求,它被视为 HTTP 处理器返回了给定的值,或者被视为它产生了给定的异常。
然后,该异常会通过常规的异常处理机制(例如,调用 @ExceptionHandler
方法)进行处理。
当您使用 Callable
时,也会有类似的处理逻辑,主要的区别在于结果是由 Callable
返回的,或者由它引发的异常。
异步拦截
HandlerInterceptor
实现还可以注册一个 CallableProcessingInterceptor
或一个
DeferredResultProcessingInterceptor
,以便更深入地集成到异步请求的生命周期中(例如,处理超时事件)。
DeferredResult
提供了 onTimeout(Runnable)
和 onCompletion(Runnable)
回调。
详见 javadoc of DeferredResult
以获取更多详细信息。
Callable
可以替换为 WebAsyncTask
,后者公开了额外的超时和完成回调方法。
HTTP Streaming
您可以使用 DeferredResult
和 Callable
来获取单个异步返回值。
如果您想要生成多个异步结果,并将这些值写入响应中,该怎么办呢?本节描述了如何实现这一点。
流对象
你可以使用 ResponseBodyEmitter
产生流式响应,每个对象都会被一个
HttpMessageConverter
序列化,然后
写入 HTTP 连接。示例如下:
@GetMapping("/events")
public ResponseBodyEmitter handle() {
ResponseBodyEmitter emitter = new ResponseBodyEmitter();
// 其他地方引用 emitter 变量.
return emitter;
}
// 或者其他方式的处理器
public class MyHttpRequestHandler implements HttpRequestHandler {
@Nullable
@Override
public ResponseBodyEmitter handleRequest(RequestContext request) throws Throwable {
ResponseBodyEmitter emitter = new ResponseBodyEmitter();
// 其他地方引用 emitter 变量.
return emitter;
}
}
// 在其他线程(业务线程)
emitter.send("Hello 1");
// 下一次响应
emitter.send("Hello again");
// 任务处理完成
emitter.complete();
也可以使用 ResponseBodyEmitter
作为 ResponseEntity
的 body,允许自定义响应的状态和响应头信息。
当一个 emitter
抛出一个 IOException
(例如,如果远程客户端断开了连接),应用程序不需要负责清理连接,
也不应该调用 emitter.complete
或 emitter.completeWithError
。底层会自动处理。
SSE
SseEmitter
是 ResponseBodyEmitter
的子类,提供了 Server-Sent Events
支持。服务器发出来的数据格式遵循 W3C SSE 标准。
要通过 HTTP 处理器 产生一个 SSE 流,您可以返回一个 SseEmitter
,如下例所示:
@GetMapping(path="/events", produces=MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter streamEvents() {
SseEmitter emitter = new SseEmitter();
// 在此处配置并使用 emitter
发送事件
return emitter;
}
// 或者其他方式的处理器
public class MyHttpRequestHandler implements HttpRequestHandler {
@Nullable
@Override
public SseEmitter handleRequest(RequestContext request) throws Throwable {
SseEmitter emitter = new SseEmitter();
// 其他地方引用 emitter 变量.
return emitter;
}
}
// 在其他线程(业务线程)
emitter.send("Hello 1");
// 下一次响应
emitter.send("Hello again");
// 任务处理完成
emitter.complete();
虽然 SSE 是向浏览器流式传输的主要选项,请注意有些浏览器并不支持(Internet Explorer)Server-Sent Events。 考虑使用 WebSocket 消息传递。
另见 上一节 有关异常处理的说明。
Raw Data
有些场景需要直接写入 OutputStream
(文件下载) 你就可以使用 StreamingResponseBody
。如下例所示:
@GetMapping("/download")
public StreamingResponseBody handle() {
return new StreamingResponseBody() {
@Override
public void writeTo(OutputStream outputStream) throws IOException {
// write...
}
};
}
也可以使用 StreamingResponseBody
作为 ResponseEntity
的 body,允许自定义响应的状态和响应头信息。
Reactive Types
Web MVC 支持在控制器中使用响应式客户端库。
响应式返回值的处理方式如下:
-
单值,类似于使用
DeferredResult
。示例包括Mono
(Reactor)或Single
(RxJava)。 -
多值流与流媒体类型(例如
application/x-ndjson
或text/event-stream
), 类似于使用ResponseBodyEmitter
或SseEmitter
。示例包括Flux
(Reactor)或Observable
(RxJava)。 应用程序还可以返回Flux<ServerSentEvent>
或Observable<ServerSentEvent>
。 -
任何其他媒体类型的多值流(例如
application/json
)被适配,类似于使用DeferredResult<List<?>>
。
Web MVC 通过 today-core 中的 ReactiveAdapterRegistry
支持 Reactor 和 RxJava,它允许从多种响应式库进行适配。
|
对于响应式背压的流式传输到响应中,虽然支持,但写入响应仍然是阻塞的,并且是通过
配置的
AsyncTaskExecutor
在单独的线程上运行,以避免阻塞上游源,例如从 WebClient
返回的 Flux
。
配置
Web MVC 为异步请求公开了几个选项。
Web MVC
MVC 配置为异步请求处理提供了以下选项:
-
Java 配置:使用
WebMvcConfigurer
上的configureAsyncSupport
回调。
您可以配置以下内容:
-
异步请求没有默认超时值,除非它被明确设置。
-
用于阻塞写入时的
AsyncTaskExecutor
,当使用 响应式类型 流式传输和执行来自控制器方法的Callable
实例。默认使用的不适用于负载下的生产环境。 -
DeferredResultProcessingInterceptor
实现和CallableProcessingInterceptor
实现。
请注意,您也可以在 DeferredResult
、ResponseBodyEmitter
和 SseEmitter
上设置默认超时值。
对于 Callable
,您可以使用 WebAsyncTask
提供超时值。