数据缓冲区和编解码器

Java NIO 提供了 ByteBuffer,但许多库在其之上构建了自己的字节缓冲区 API, 特别是对于网络操作,其中重用缓冲区和/或使用直接缓冲区有利于性能。例如,Netty 使用 ByteBuf 结构, Undertow 使用 XNIO,Jetty 使用带有释放回调的池化字节缓冲区,等等。 today-core 模块提供了一组抽象,以处理各种字节缓冲区 API,如下所示:

DataBufferFactory

DataBufferFactory 用于以以下两种方式之一创建数据缓冲区:

  1. 分配新的数据缓冲区,可选择地提前指定容量(如果已知),这种方式更高效,即使 DataBuffer 的实现可以根据需求增长和收缩。

  2. 包装现有的 byte[]java.nio.ByteBuffer,它使用 DataBuffer 实现装饰给定的数据,而不涉及分配。

DataBuffer

DataBuffer 接口提供了与 java.nio.ByteBuffer 类似的操作,但还带来了一些额外的好处, 其中一些受到了 Netty ByteBuf 的启发。以下是部分好处的列表:

  • 独立的读和写位置,即不需要调用 flip() 来在读和写之间切换。

  • 根据需要扩展容量,与 java.lang.StringBuilder 类似。

  • 通过 Pooled DataBuffer 实现的缓冲池和引用计数。

  • 将缓冲区视为 java.nio.ByteBufferInputStreamOutputStream

  • 确定给定字节的索引或最后一个索引。

Pooled DataBuffer

ByteBuffer 的 Javadoc 所述,字节缓冲区可以是直接的或非直接的。直接缓冲区可能驻留在 Java 堆之外,这消除了原生 I/O 操作的复制需求。 这使得直接缓冲区特别适用于通过套接字接收和发送数据,但它们创建和释放的成本也更高,因此引入了缓冲区池的概念。

Pooled DataBufferDataBuffer 的扩展,它有助于引用计数,这对字节缓冲区池是至关重要的。 它是如何工作的呢?当分配一个 Pooled DataBuffer 时,引用计数为 1。 调用 retain() 方法会增加计数,而调用 release() 方法会减少计数。 只要计数大于 0,就保证不会释放缓冲区。 当计数减少到 0 时,池化缓冲区可以被释放,实际上这可能意味着为缓冲区保留的内存被返回到内存池中。

需要注意的是,在大多数情况下,不直接操作 Pooled DataBuffer,而是使用 DataBufferUtils 中的便利方法更好, 这些方法仅在 DataBufferPooled DataBuffer 的实例时应用释放或保留。

DataBufferUtils

DataBufferUtils 提供了多个实用方法来操作数据缓冲区:

  • 将数据缓冲区流合并成一个单一的缓冲区,可能通过复合缓冲区实现零拷贝,例如,如果底层字节缓冲区 API 支持的话。

  • InputStream 或 NIO Channel 转换为 Flux<DataBuffer>,反之亦然,将 Publisher<DataBuffer> 转换为 OutputStream 或 NIO Channel

  • 如果缓冲区是 Pooled DataBuffer 的实例,则释放或保留数据缓冲区的方法。

  • 从字节流中跳过或取出特定字节计数。

编解码器

infra.core.codec 包提供了以下策略接口:

  1. Encoder 用于将 Publisher<T> 编码为数据缓冲区流。

  2. Decoder 用于将 Publisher<DataBuffer> 解码为更高级别对象流。

today-core 模块提供了 byte[]ByteBufferDataBufferResourceString 的编码器和解码器实现。today-web 模块添加了 Jackson JSON、Jackson Smile、JAXB2、Protocol Buffers 和其他编码器和解码器。

使用 DataBuffer

在处理数据缓冲区时,必须特别注意确保释放缓冲区,因为它们可能是 池化的 的。我们将使用编解码器来说明这是如何工作的,但这些概念通常适用。让我们看看编解码器在内部如何管理数据缓冲区。

Decoder 是最后一个读取输入数据缓冲区的对象,在创建更高级别对象之前,因此它必须按以下方式释放它们:

  1. 如果 Decoder 只是简单地读取每个输入缓冲区并准备立即释放它,它可以通过 dataBuffer.release() 来释放。

  2. 如果 Decoder 使用 FluxMono 操作符,如 flatMapreduce 和其他在内部预取和缓存数据项的操作符,或者是使用诸如 filterskip 和其他排除项的操作符,则必须将 doOnDiscard(DataBuffer.class, DataBuffer::release) 添加到组合链中,以确保在被丢弃之前释放这些缓冲区,可能还包括作为错误或取消信号的结果而被丢弃。

  3. 如果 Decoder 以任何其他方式持有一个或多个数据缓冲区,它必须确保在完全读取时释放它们,或者在缓存的数据缓冲区被读取和释放之前发生错误或取消信号的情况下释放它们。

请注意,DataBufferUtils#join 提供了一种安全高效的方式将数据缓冲流聚合成单个数据缓冲区。同样, skipUntilByteCounttakeUntilByteCount 是解码器使用的额外安全方法。

Encoder 分配其他对象必须读取(和释放)的数据缓冲区。因此,Encoder 没有太多要做的事情。但是, 如果在使用数据填充缓冲区时发生序列化错误,则 Encoder 必须确保释放数据缓冲区。例如:

DataBuffer buffer = factory.allocateBuffer();
boolean release = true;
try {
  // serialize and populate buffer..
  release = false;
}
finally {
  if (release) {
    buffer.release();
  }
}
return buffer;

Encoder 的消费者负责释放其接收到的数据缓冲区。在 非阻塞 应用程序中, Encoder 的输出用于写入HTTP服务器响应,或者写入客户端 HTTP 请求,这种情况下释放数据缓冲区是代码写入服务器响应或客户端请求的责任。

请注意,当在Netty上运行时,有一些调试选项可用于 troubleshooting buffer leaks.