数据缓冲区和编解码器
Java NIO 提供了 ByteBuffer
,但许多库在其之上构建了自己的字节缓冲区 API,
特别是对于网络操作,其中重用缓冲区和/或使用直接缓冲区有利于性能。例如,Netty 使用 ByteBuf
结构,
Undertow 使用 XNIO,Jetty 使用带有释放回调的池化字节缓冲区,等等。
today-core
模块提供了一组抽象,以处理各种字节缓冲区 API,如下所示:
-
DataBufferFactory
抽象了数据缓冲区的创建。 -
DataBuffer
表示一个字节缓冲区,可能是 池化、 -
DataBufferUtils
提供了数据缓冲区的实用方法。 -
编解码器
将数据缓冲区流解码或编码为更高级别的对象。
DataBufferFactory
DataBufferFactory
用于以以下两种方式之一创建数据缓冲区:
-
分配新的数据缓冲区,可选择地提前指定容量(如果已知),这种方式更高效,即使
DataBuffer
的实现可以根据需求增长和收缩。 -
包装现有的
byte[]
或java.nio.ByteBuffer
,它使用DataBuffer
实现装饰给定的数据,而不涉及分配。
DataBuffer
DataBuffer
接口提供了与 java.nio.ByteBuffer
类似的操作,但还带来了一些额外的好处,
其中一些受到了 Netty ByteBuf
的启发。以下是部分好处的列表:
-
独立的读和写位置,即不需要调用
flip()
来在读和写之间切换。 -
根据需要扩展容量,与
java.lang.StringBuilder
类似。 -
通过
Pooled DataBuffer
实现的缓冲池和引用计数。 -
将缓冲区视为
java.nio.ByteBuffer
、InputStream
或OutputStream
。 -
确定给定字节的索引或最后一个索引。
Pooled DataBuffer
如 ByteBuffer 的 Javadoc 所述,字节缓冲区可以是直接的或非直接的。直接缓冲区可能驻留在 Java 堆之外,这消除了原生 I/O 操作的复制需求。 这使得直接缓冲区特别适用于通过套接字接收和发送数据,但它们创建和释放的成本也更高,因此引入了缓冲区池的概念。
Pooled DataBuffer
是 DataBuffer
的扩展,它有助于引用计数,这对字节缓冲区池是至关重要的。
它是如何工作的呢?当分配一个 Pooled DataBuffer
时,引用计数为 1。
调用 retain()
方法会增加计数,而调用 release()
方法会减少计数。
只要计数大于 0,就保证不会释放缓冲区。
当计数减少到 0 时,池化缓冲区可以被释放,实际上这可能意味着为缓冲区保留的内存被返回到内存池中。
需要注意的是,在大多数情况下,不直接操作 Pooled DataBuffer
,而是使用 DataBufferUtils
中的便利方法更好,
这些方法仅在 DataBuffer
是 Pooled DataBuffer
的实例时应用释放或保留。
DataBufferUtils
DataBufferUtils
提供了多个实用方法来操作数据缓冲区:
-
将数据缓冲区流合并成一个单一的缓冲区,可能通过复合缓冲区实现零拷贝,例如,如果底层字节缓冲区 API 支持的话。
-
将
InputStream
或 NIOChannel
转换为Flux<DataBuffer>
,反之亦然,将Publisher<DataBuffer>
转换为OutputStream
或 NIOChannel
。 -
如果缓冲区是
Pooled DataBuffer
的实例,则释放或保留数据缓冲区的方法。 -
从字节流中跳过或取出特定字节计数。
编解码器
cn.taketoday.core.codec
包提供了以下策略接口:
-
Encoder
用于将Publisher<T>
编码为数据缓冲区流。 -
Decoder
用于将Publisher<DataBuffer>
解码为更高级别对象流。
today-core
模块提供了 byte[]
、ByteBuffer
、DataBuffer
、Resource
和 String
的编码器和解码器实现。today-web
模块添加了 Jackson JSON、Jackson Smile、JAXB2、Protocol Buffers 和其他编码器和解码器。
使用 DataBuffer
在处理数据缓冲区时,必须特别注意确保释放缓冲区,因为它们可能是 池化的 的。我们将使用编解码器来说明这是如何工作的,但这些概念通常适用。让我们看看编解码器在内部如何管理数据缓冲区。
Decoder
是最后一个读取输入数据缓冲区的对象,在创建更高级别对象之前,因此它必须按以下方式释放它们:
-
如果
Decoder
只是简单地读取每个输入缓冲区并准备立即释放它,它可以通过DataBufferUtils.release(dataBuffer)
来释放。 -
如果
Decoder
使用Flux
或Mono
操作符,如flatMap
、reduce
和其他在内部预取和缓存数据项的操作符,或者是使用诸如filter
、skip
和其他排除项的操作符,则必须将doOnDiscard(DataBuffer.class, DataBufferUtils::release)
添加到组合链中,以确保在被丢弃之前释放这些缓冲区,可能还包括作为错误或取消信号的结果而被丢弃。 -
如果
Decoder
以任何其他方式持有一个或多个数据缓冲区,它必须确保在完全读取时释放它们,或者在缓存的数据缓冲区被读取和释放之前发生错误或取消信号的情况下释放它们。
请注意,DataBufferUtils#join
提供了一种安全高效的方式将数据缓冲流聚合成单个数据缓冲区。同样,
skipUntilByteCount
和 takeUntilByteCount
是解码器使用的额外安全方法。
Encoder
分配其他对象必须读取(和释放)的数据缓冲区。因此,Encoder
没有太多要做的事情。但是,
如果在使用数据填充缓冲区时发生序列化错误,则 Encoder
必须确保释放数据缓冲区。例如:
DataBuffer buffer = factory.allocateBuffer();
boolean release = true;
try {
// serialize and populate buffer..
release = false;
}
finally {
if (release) {
DataBufferUtils.release(buffer);
}
}
return buffer;
Encoder
的消费者负责释放其接收到的数据缓冲区。在 非阻塞 应用程序中,
Encoder
的输出用于写入HTTP服务器响应,或者写入客户端 HTTP 请求,这种情况下释放数据缓冲区是代码写入服务器响应或客户端请求的责任。
请注意,当在Netty上运行时,有一些调试选项可用于 troubleshooting buffer leaks.