Netty学习篇⑥--ByteBuf源码分析
什么是ByteBuf?
ByteBuf在Netty中充当着非常重要的角色;它是在数据传输中负责装载字节数据的一个容器;其内部结构和数组类似,初始化默认长度为256,默认最大长度为Integer.MAX_VALUE。
ByteBuf数据结构
* <pre>
* +-------------------+------------------+------------------+
* | discardable bytes | readable bytes | writable bytes |
* | | (CONTENT) | |
* +-------------------+------------------+------------------+
* | | | |
* 0 <= readerIndex <= writerIndex <= capacity
* </pre>
ByteBuf字节缓冲区主要由discardable
、readable
、writable
三种类型的字节组成的;
ByteBuf字节缓冲区可以操控readerIndex
、writerIndex
二个下标;这两个下标都是单独维护
的
名词 | 解释 | 方法 |
---|---|---|
discardable bytes | 丢弃的字节;ByteBuf中已经读取的字节 | discardReadBytes(); |
readable bytes | 剩余的可读的字节 | |
writable bytes | 已经写入的字节 | |
readerIndex | 字节读指针(数组下标) | readerIndex() |
writerIndex | 字节写指针(数组下标) | writerIndex() |
ByteBuf中主要的类-UML图
ByteBuf怎么创建的?
ByteBuf是通过Unpooled来进行创建;默认长度为256,可自定义指定长度,最大长度为Integer.MAX_VALUE;
ByteBuf创建的类型有哪几种?
1. 基于内存管理分类
类型 | 解释 | 对应的字节缓冲区类 |
---|---|---|
Pooled | 池化; 简单的理解就是pooled拥有一个pool 池空间 (poolArea),凡是创建过的字节缓冲区都会被缓存进去, 有新的连接需要字节缓冲区会先从缓存中 get ,取不到则在进行创建; |
1.PooledDirectByteBuf 2.PooledHeapByteBuf 3.PooledUnsafeDirectByteBuf 4.PooledUnsafeHeapByteBuf |
Unpooled | 非池化; 每次都会创建一个字节缓冲区 |
1.UnpooledDirectByteBuf 2.UnpooledHeapByteBuf 3.UnpooledUnsafeDirectByteBuf 4.UnpooledUnsafeHeapByteBuf |
优缺点:
- 在频繁的创建申请字节缓冲区的情况下,池化要比非池化要好很多,池化减少了内存的创建和销毁,重复使用
- 在非频繁的情况下,非池化的性能要高于池化,不需要管理维护对象池,所以在不需要大量使用ByteBuf的情况下推荐使用非池化来创建字节缓冲区
2. 基于内存分类
类型 | 解释 | 特点 | 构造方法 |
---|---|---|---|
heapBuffer(常用) | 堆字节缓冲区; | 底层就是JVM的堆内存,只是IO读写需要从堆内存拷贝到内核中(类似之前学过的IO多路复用) | buffer(128) |
directBuffer(常用) | 直接内存字节缓冲区; | 直接存于操作系统内核空间(堆外内存) | directBuffer(256) |
优缺点:
- heapBuffer是在JVM的堆内存中分配一个空间,使用完毕后通过JVM回收机制进行回收,但是数据传输到Channel中需要从堆内存中拷贝到系统内核中
- directBuffer直接在堆外,系统内核中开辟一个空间,在数据传输上要比heapBuffer高(减少了内存拷贝),但是由于不受JVM管理在创建和回收上要比heapBuffer更加耗时耗能;
每一种都有自己优势的地方,我们要根据实际的业务来灵活的运用;如果涉及到大量的文件操作建议使用directBuffer(搬来搬去确实挺耗性能);大部分业务还是推荐使用heapBuffer(heapBuffer,普通的业务搬来搬去相比在内核申请一块内存和释放内存来说要更加优)。
ByteBuf是怎么样回收的
1. heapBuffer
heapBuffer是基于堆内存来进行创建的,回收自然而然通过JVM的回收机制进行回收
2. directBuffer回收内存的方法
可以通过DirectByteBuffer中的Cleaner来进行清除
或者依靠unsafe的释放内存(freeMemory方法)也可以进行回收
源码分析
ByteBuf内存分配
ByteBuf的内存分配主要分为heap(堆内存)和direct(堆外内存);
1. heap堆内存的分配:通过UnpooledByteBufAllocator类进行内存分配
1.1 创建堆缓冲区
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
// 是否支持unsafe
return PlatformDependent.hasUnsafe() ?
// 创建unsafe非池化堆字节缓冲区
new InstrumentedUnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
// 创建非池化堆字节缓冲区
new InstrumentedUnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}
1.2 unsafe和非unsafe都是通过实例 UnpooledHeapByteBuf 来分配内存
// InstrumentedUnpooledUnsafeHeapByteBuf/InstrumentedUnpooledHeapByteBuf
// 最终都是通过这个方法来创建分配内存;后面会讲讲unsafe和普通的非unsafe的区别
protected UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
// 设置最大容量
super(maxCapacity);
// 检查内存分配类是否为空
checkNotNull(alloc, "alloc");
if (initialCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
}
this.alloc = alloc;
// allocateArray初始化一个initialCapacity长度的字节数组
setArray(allocateArray(initialCapacity));
// 初始化读写索引为0
setIndex(0, 0);
}
// 初始化一个initialCapacity长度的字节数组
byte[] allocateArray(int initialCapacity) {
return new byte[initialCapacity];
}
// 初始化读写索引为0
@Override
public ByteBuf setIndex(int readerIndex, int writerIndex) {
if (readerIndex < 0 || readerIndex > writerIndex || writerIndex > capacity()) {
throw new IndexOutOfBoundsException(String.format(
"readerIndex: %d, writerIndex: %d (expected: 0 <= readerIndex <= writerIndex <= capacity(%d))",
readerIndex, writerIndex, capacity()));
}
setIndex0(readerIndex, writerIndex);
return this;
}
final void setIndex0(int readerIndex, int writerIndex) {
this.readerIndex = readerIndex;
this.writerIndex = writerIndex;
}
从源码可以得知,堆内存ByteBuf通过判断系统环境是否支持unsafe来判断是创建UnsafeHeapByteBuf还是heapByteBuf; 如果支持unsafe则返回 InstrumentedUnpooledUnsafeHeapByteBuf
实例,反之则返回 InstrumentedUnpooledHeapByteBuf
实例;但它们都是分配一个byte数组来进行存储字节数据。
1.3 unsafe和非unsafe创建的ByteBuf有什么区别呢
unsafe和非unsafe创建的heapByteBuf区别在于获取数据;非unsafe获取数据直接是通过数组索引来进行获取的;而unsafe获取数据则是通过UNSAFE操控内存来获取;我们可以通过源码来看看
heapByteBuf获取数据
@Override
public byte getByte(int index) {
ensureAccessible();
return _getByte(index);
}
@Override
protected byte _getByte(int index) {
return HeapByteBufUtil.getByte(array, index);
}
// 直接返回数组对应索引的值
static byte getByte(byte[] memory, int index) {
return memory[index];
}
unsafeHeapByteBuf获取数据
@Override
public byte getByte(int index) {
checkIndex(index);
return _getByte(index);
}
@Override
protected byte _getByte(int index) {
return UnsafeByteBufUtil.getByte(array, index);
}
static byte getByte(byte[] array, int index) {
return PlatformDependent.getByte(array, index);
}
public static byte getByte(byte[] data, int index) {
return PlatformDependent0.getByte(data, index);
}
static byte getByte(byte[] data, int index) {
// 通过unsafe来获取
return UNSAFE.getByte(data, BYTE_ARRAY_BASE_OFFSET + index);
}
2. direct内存分配:unsafe创建和非unsafe创建
2.1 创建directBuffer
// PlatformDependent检测运行环境的变量属性,比如java环境,unsafe是否支持等
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
final ByteBuf buf;
// 支持unsafe
if (PlatformDependent.hasUnsafe()) {
// 运行环境是否使用不清空的direct内存
buf = PlatformDependent.useDirectBufferNoCleaner() ?
new InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(this, initialCapacity, maxCapacity) :
new InstrumentedUnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else {
// 创建非unsafe实例ByteBuf
buf = new InstrumentedUnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
}
2.2 unsafe返回 UnpooledUnsafeDirectByteBuf
实例,非unsafe返回 UnpooledDirectByteBuf
实例
// 创建unsafe direct字节缓冲区
protected UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
// 设置最大容量
super(maxCapacity);
if (alloc == null) {
throw new NullPointerException("alloc");
}
if (initialCapacity < 0) {
throw new IllegalArgumentException("initialCapacity: " + initialCapacity);
}
if (maxCapacity < 0) {
throw new IllegalArgumentException("maxCapacity: " + maxCapacity);
}
if (initialCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
}
this.alloc = alloc;
// allocateDirect创建DirectByteBuffer(java nio)分配内存
setByteBuffer(allocateDirect(initialCapacity), false);
}
// 非unsafe创建direct内存
protected UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(maxCapacity);
if (alloc == null) {
throw new NullPointerException("alloc");
}
if (initialCapacity < 0) {
throw new IllegalArgumentException("initialCapacity: " + initialCapacity);
}
if (maxCapacity < 0) {
throw new IllegalArgumentException("maxCapacity: " + maxCapacity);
}
if (initialCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
}
this.alloc = alloc;
setByteBuffer(ByteBuffer.allocateDirect(initialCapacity));
}
2.3 分配 direct内存,返回 java nio ByteBuffer实例
protected ByteBuffer allocateDirect(int initialCapacity) {
return ByteBuffer.allocateDirect(initialCapacity);
}
/**
* Allocates a new direct byte buffer. 分配一个新的直接内存字节缓冲区
*
* <p> The new buffer's position will be zero, its limit will be its
* capacity, its mark will be undefined, and each of its elements will be
* initialized to zero. Whether or not it has a
* {@link #hasArray backing array} is unspecified.
*
* @param capacity
* The new buffer's capacity, in bytes
*
* @return The new byte buffer
*
* @throws IllegalArgumentException
* If the <tt>capacity</tt> is a negative integer
*/
public static ByteBuffer allocateDirect(int capacity) {
return new DirectByteBuffer(capacity);
}
// allocateDirect创建DirectByteBuffer(java nio)
DirectByteBuffer(int cap) { // package-private
// 设置文件描述, 位置等信息
super(-1, 0, cap, cap);
boolean pa = VM.isDirectMemoryPageAligned();
int ps = Bits.pageSize();
long size = Math.max(1L, (long)cap + (pa ? ps : 0));
Bits.reserveMemory(size, cap);
long base = 0;
try {
// 通过unsafe类来分配内存
base = unsafe.allocateMemory(size);
} catch (OutOfMemoryError x) {
Bits.unreserveMemory(size, cap);
throw x;
}
unsafe.setMemory(base, size, (byte) 0);
if (pa && (base % ps != 0)) {
// Round up to page boundary
address = base + ps - (base & (ps - 1));
} else {
address = base;
}
// 实例化cleaner,用于后续回收
cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
att = null;
}
2.4 设置ByteBuffer的属性(unsafe和非unsafe)
unsafe设置ByteBuffer
/**
* buffer 数据字节
* tryFree 尝试释放,默认为false
*/
final void setByteBuffer(ByteBuffer buffer, boolean tryFree) {
if (tryFree) {
// 全局buffer设置成旧buffer
ByteBuffer oldBuffer = this.buffer;
if (oldBuffer != null) {
if (doNotFree) {
doNotFree = false;
} else {
// 释放旧缓冲区的内存
freeDirect(oldBuffer);
}
}
}
// 将当前传入的buffer设置成全局buffer
this.buffer = buffer;
// 记录内存地址
memoryAddress = PlatformDependent.directBufferAddress(buffer);
// 将临时buff设置为null
tmpNioBuf = null;
// 记录容量大小
capacity = buffer.remaining();
}
// 获取对象在内存中的地址
static long directBufferAddress(ByteBuffer buffer) {
return getLong(buffer, ADDRESS_FIELD_OFFSET);
}
// 通过unsafe操控系统内存,获取对象在内存中的地址
private static long getLong(Object object, long fieldOffset) {
return UNSAFE.getLong(object, fieldOffset);
}
非unsafe设置ByteBuffer
// 非unsafe设置属性
private void setByteBuffer(ByteBuffer buffer) {
ByteBuffer oldBuffer = this.buffer;
if (oldBuffer != null) {
if (doNotFree) {
doNotFree = false;
} else {
freeDirect(oldBuffer);
}
}
this.buffer = buffer;
tmpNioBuf = null;
capacity = buffer.remaining();
}
根据源码,direct通过判断运行系统环境是否使用useDirectBufferNoCleaner
来实例不同的ByteBufferedReader(unsafe和非unsafe),但是他们最终都是通过ByteBuffer来分配内存,底层都是通过在不同的ByteBuf实例中构建一个ByteBuffer来进行存储字节数据的(具体可以看看UnpooledDirectByteBuf
的set方法)
2.5 unsafe和非unsafe创建的directByteBuf的区别
unsafe获取数据:UNSAFE通过索引的内存地址来获取对应的值
@Override
protected byte _getByte(int index) {
// UnsafeByteBufUtil unsafe工具类获取
return UnsafeByteBufUtil.getByte(addr(index));
}
// addr 获取索引的内存地址
long addr(int index) {
return memoryAddress + index;
}
static byte getByte(long address) {
return PlatformDependent.getByte(address);
}
public static byte getByte(long address) {
return PlatformDependent0.getByte(address);
}
// 通过UNSAFE获取内存地址的值
static byte getByte(long address) {
return UNSAFE.getByte(address);
}
非unsafe获取数据: 直接通过对应索引的ByteBuffer获取值
@Override
public byte getByte(int index) {
// 检查授权,及ByteBuffer对象是否还有引用
ensureAccessible();
return _getByte(index);
}
@Override
protected byte _getByte(int index) {
// 通过索引获取值
return buffer.get(index);
}
3. ByteBuf扩容
每次我们再往字节缓冲区中写入数据的时候都会判断当前容量是否还能写入数据,当发现容量不够时,此时ByteBuf会总动进行扩容;当然我们也可以手动更改ByteBuf的容量;详细见代码分析。
public static void main(String[] args) {
// 利用非池化Unpooled类创建字节缓冲区
ByteBuf byteBuf = Unpooled.buffer(2);
System.out.println("initCapacity: " + byteBuf.capacity());
byteBuf.writeByte(66);
byteBuf.writeByte(67);
byteBuf.readBytes(1);
System.out.println("readerIndex: " + byteBuf.readerIndex());
System.out.println("writerIndex: " + byteBuf.writerIndex());
// 丢弃已经阅读的字节
byteBuf.discardReadBytes();
byteBuf.writeByte(68);
byteBuf.writeByte(69);
System.out.println("readerIndex: " + byteBuf.readerIndex());
System.out.println("writerIndex: " + byteBuf.writerIndex());
System.out.println("capacity: " + byteBuf.capacity());
}
// 运行结果
initCapacity: 2
readerIndex: 1
writerIndex: 2
readerIndex: 0
writerIndex: 3
capacity: 64
上面代码的操作步骤:初始化ByteBuf — 写入数据 — 读取数据 — 丢弃数据 — 再写入数据;
丢弃了一个字节数的数据又写入了2个字节数的数据,初始化容量的缓冲区明显不够发生了自动扩容,扩容后的容量:64;它是怎么进行扩容的呢?什么时候扩容的呢?看下源码
public ByteBuf writeByte(int value) {
// 确保可以写入(判断是否容量够不够写入)
ensureWritable0(1);
// 设置写索引、存值
_setByte(writerIndex++, value);
return this;
}
// minWritableBytes默认为1 因为writeByte每次只能写入一个字节数
final void ensureWritable0(int minWritableBytes) {
// 检查是否还有占有权和是否还有引用
ensureAccessible();
// writableBytes() = capacity - writerIndex 剩余可写容量
if (minWritableBytes <= writableBytes()) {
return;
}
// ByteBuf虽然支持自动扩容但是也有上限(Integer.MAX_VALUE)
if (minWritableBytes > maxCapacity - writerIndex) {
throw new IndexOutOfBoundsException(String.format(
"writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
writerIndex, minWritableBytes, maxCapacity, this));
}
// 开始进行扩容 newCapacity = writerIndex + minWritableBytes
int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity);
// 将新的容量写入到ByteBuf
capacity(newCapacity);
}
/**
* 计算新的容量
* minNewCapacity 写入的最小容量
* maxCapacity 最大容量及Integer.MAX_VALUE 2147483647
*/
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
if (minNewCapacity < 0) {
throw new IllegalArgumentException("minNewCapacity: " + minNewCapacity + " (expected: 0+)");
}
if (minNewCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
minNewCapacity, maxCapacity));
}
// 4兆大小 4194304
final int threshold = CALCULATE_THRESHOLD; // 4 MiB page
if (minNewCapacity == threshold) {
return threshold;
}
// 如果超过了4兆,
if (minNewCapacity > threshold) {
// 新的容量扩容为超过的倍数的容量
int newCapacity = minNewCapacity / threshold * threshold;
// 如果超过了最大的容量则直接设置为最大容量
if (newCapacity > maxCapacity - threshold) {
newCapacity = maxCapacity;
} else {
newCapacity += threshold;
}
return newCapacity;
}
// 默认扩容大小为64
int newCapacity = 64;
while (newCapacity < minNewCapacity) {
// 左移一位 newCapacity = newCapacity*2
newCapacity <<= 1;
}
return Math.min(newCapacity, maxCapacity);
}
从上面的源码可知,自动扩容在4兆
的范围内变化的话,每次扩容都是64 * 2的N字方
(N >= 1); 一旦超过了4兆
则递增倍数为(newCapacity / 4194304) * 4194304
即表示的是基于4兆增长的倍数。
4. ByteBuf和ByteBuffer的区别
- ByteBuf单独维护
读写
两个数组下标,ByteBuffer只有一个下标索引,读写的时候需要手动设置(调用flip和rewind) - ByteBuffer不支持动态扩容(final型),ByteBuf支持动态扩容上限为Integer.MAX_VALUE
- ByteBuf支持创建对外内存(direct内存)存储数据
- ByteBuf支持的Api更加丰富
虽然Netty中使用的ByteBuf来进行缓存字节数据,但是最后在Channel中还是以ByteBuffer(java nio)来进行传输
参考文献:
https://www.cnblogs.com/stateis0/p/9062152.html
https://www.jianshu.com/p/1585e32cf6b4
https://blog.csdn.net/ZBylant/article/details/83037421