文章13 | 阅读 5456 | 点赞0
可以先看Netty源码分析-缓冲区Buffer体系结构设计再看这篇分析。
// 泛型T控制底层底层存放数据的实现
// 如字节数组byte[],Java NIO的ByteBuffer
abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {
// 核心字段recyclerHandle
// 每个对象实例包含一个recyclerHandle字段,
// 该字段作为中介,将该对象实例放到该对象实例所在的类的对象池(类级别字段)中
// Handle类的value字段指向该对象
private final Recycler.Handle<PooledByteBuf<T>> recyclerHandle;
protected PoolChunk<T> chunk;
protected long handle;
protected T memory;
protected int offset;
protected int length;
int maxLength;
PoolThreadCache cache;
private ByteBuffer tmpNioBuf;
private ByteBufAllocator allocator;
// 中间省略其他方法
...
// 该对象引用计数为0时,释放该对象
// 调用recycle方法,将该对象实例放到其所在类的对象缓存池中
@Override
protected final void deallocate() {
if (handle >= 0) {
final long handle = this.handle;
this.handle = -1;
memory = null;
tmpNioBuf = null;
chunk.arena.free(chunk, handle, maxLength, cache);
chunk = null;
recycle();
}
}
private void recycle() {
recyclerHandle.recycle(this);
}
}
(1)RECYCLER为PooledDirectByteBuf类的对象实例缓存池;
(2)newInstance方法,从缓存池RECYCLER获取一个DirectByteBuf的对象实例,然后调用reuse重置该buf,然后返回给调用方。
final class PooledDirectByteBuf extends PooledByteBuf<ByteBuffer> {
private static final Recycler<PooledDirectByteBuf> RECYCLER = new Recycler<PooledDirectByteBuf>() {
@Override
protected PooledDirectByteBuf newObject(Handle<PooledDirectByteBuf> handle) {
return new PooledDirectByteBuf(handle, 0);
}
};
static PooledDirectByteBuf newInstance(int maxCapacity) {
PooledDirectByteBuf buf = RECYCLER.get();
buf.reuse(maxCapacity);
return buf;
}
...
}
PooledDirectByteBuf的reuse实现:
/** * Method must be called before reuse this {@link PooledByteBufAllocator} */
final void reuse(int maxCapacity) {
maxCapacity(maxCapacity);
setRefCnt(1);
setIndex0(0, 0);
discardMarks();
}
class PooledHeapByteBuf extends PooledByteBuf<byte[]> {
private static final Recycler<PooledHeapByteBuf> RECYCLER = new Recycler<PooledHeapByteBuf>() {
@Override
protected PooledHeapByteBuf newObject(Handle<PooledHeapByteBuf> handle) {
return new PooledHeapByteBuf(handle, 0);
}
};
static PooledHeapByteBuf newInstance(int maxCapacity) {
PooledHeapByteBuf buf = RECYCLER.get();
buf.reuse(maxCapacity);
return buf;
}
...
}
public abstract class Recycler<T>
private static final Recycler<PooledHeapByteBuf> RECYCLER = new Recycler<PooledHeapByteBuf>() {
@Override
protected PooledHeapByteBuf newObject(Handle<PooledHeapByteBuf> handle) {
return new PooledHeapByteBuf(handle, 0);
}
};
private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
@Override
protected Stack<T> initialValue() {
return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor,
ratioMask, maxDelayedQueuesPerThread);
}
@Override
protected void onRemoval(Stack<T> value) {
// Let us remove the WeakOrderQueue from the WeakHashMap directly if its safe to remove some overhead
if (value.threadRef.get() == Thread.currentThread()) {
if (DELAYED_RECYCLED.isSet()) {
DELAYED_RECYCLED.get().remove(value);
}
}
}
};
public final T get() {
if (maxCapacityPerThread == 0) {
return newObject((Handle<T>) NOOP_HANDLE);
}
Stack<T> stack = threadLocal.get();
DefaultHandle<T> handle = stack.pop();
if (handle == null) {
handle = stack.newHandle();
handle.value = newObject(handle);
}
return (T) handle.value;
}
DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD = 4 * 1024; // Use 4k instances as default.
private static final Recycler<PooledHeapByteBuf> RECYCLER = new Recycler<PooledHeapByteBuf>() {
@Override
protected PooledHeapByteBuf newObject(Handle<PooledHeapByteBuf> handle) {
return new PooledHeapByteBuf(handle, 0);
}
};
static final class DefaultHandle<T> implements Handle<T> {
private int lastRecycledId;
private int recycleId;
boolean hasBeenRecycled;
private Stack<?> stack;
private Object value;
DefaultHandle(Stack<?> stack) {
this.stack = stack;
}
@Override
public void recycle(Object object) {
if (object != value) {
throw new IllegalArgumentException("object does not belong to handle");
}
stack.push(this);
}
}
每个线程所绑定的Stack最多可以容纳该类型对象实例的个数,主要通过maxCapacityPerThread和ratioMask两个参数来控制,即控制缓存池的大小,避免一直创建并缓存新对象实例,导致内存爆炸。
小于0,则使用默认大小4k个,这个是默认值;
等于0,则不缓存,即每次都是新建一个该类型对象实例,相当于不缓存;
大于0,则当Stack大小等于maxCapacityPerThread时,push入栈时,丢弃新的对象实例;
ratioMask:控制可以放到对象缓存池,即入栈成功的对象实例的比例,默认为第一个可以,之后每8次尝试,有一个不同的对象实例(可以是一个新的对象实例尝试8次,或者多个新的对象实例一起尝试8次,其中有一个可以入栈)可以放到缓存池中缓存,ratioMask默认值为7。
当缓冲池该类型对象少于maxCapacityPerThread时,调用push入栈时,也不一定就一直缓存新的对象实例,要看当前是第几次尝试,默认为每8次成功一个。
ratioMask的值也可以通过构造函数传入,则netty会调整为2的次方减一大小,这个设计是参照了hashmap的,2的次方减一的数字,转为二进制就全部都是1了,这样方便进行位运算,如判断逻辑:++handleRecycleCount & ratioMask != 0,其中handleRecycleCount默认值为-1,如ratioMask为7,则只有第1次,第8次,第16次,这个才会返回false,才能入栈。
Stack的核心源码实现如下:
1. maxCapacityPerThread的默认大小:
private static final int DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD = 4 * 1024; // Use 4k instances as default.
2. maxCapacityPerThread为0时,每次新建对象实例Recycle的get方法:
public final T get() {
if (maxCapacityPerThread == 0) {
return newObject((Handle<T>) NOOP_HANDLE);
}
Stack<T> stack = threadLocal.get();
DefaultHandle<T> handle = stack.pop();
if (handle == null) {
handle = stack.newHandle();
handle.value = newObject(handle);
}
return (T) handle.value;
}
3. ratioMask的默认大小:-1,则第一个入栈的实例可以成功。
// By default we allow one push to a Recycler for each 8th try on handles that were never recycled before.
// This should help to slowly increase the capacity of the recycler while not be too sensitive to allocation
// bursts.
RATIO = safeFindNextPositivePowerOfTwo(SystemPropertyUtil.getInt("io.netty.recycler.ratio", 8));
4. ratioMask调整为2的次方减一:Recycler的构造函数
protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor,
int ratio, int maxDelayedQueuesPerThread) {
// ratio为的2的次方,如传入ratio为11,则调整后ratioMask等于15
ratioMask = safeFindNextPositivePowerOfTwo(ratio) - 1;
if (maxCapacityPerThread <= 0) {
this.maxCapacityPerThread = 0;
this.maxSharedCapacityFactor = 1;
this.maxDelayedQueuesPerThread = 0;
} else {
this.maxCapacityPerThread = maxCapacityPerThread;
this.maxSharedCapacityFactor = max(1, maxSharedCapacityFactor);
this.maxDelayedQueuesPerThread = max(0, maxDelayedQueuesPerThread);
}
}
5. Stack入栈实例对象判断是否入栈缓存该对象的逻辑:
(1)Stack的入栈push:size >= maxCapacity || dropHandle(item)判断
private void pushNow(DefaultHandle<?> item) {
if ((item.recycleId | item.lastRecycledId) != 0) {
throw new IllegalStateException("recycled already");
}
item.recycleId = item.lastRecycledId = OWN_THREAD_ID;
int size = this.size;
if (size >= maxCapacity || dropHandle(item)) {
// Hit the maximum capacity or should drop - drop the possibly youngest object.
return;
}
if (size == elements.length) {
elements = Arrays.copyOf(elements, min(size << 1, maxCapacity));
}
elements[size] = item;
this.size = size + 1;
}
(2)dropHandle的实现:
boolean dropHandle(DefaultHandle<?> handle) {
if (!handle.hasBeenRecycled) {
// handleRecycleCount默认值为-1,如ratioMask为7,
// 只有第1次,第8次,第16次,这个才会返回false,才能入栈。
if ((++handleRecycleCount & ratioMask) != 0) {
// Drop the object.
return true;
}
handle.hasBeenRecycled = true;
}
return false;
}
第一个对象缓冲区实例
class PooledHeapByteBuf extends PooledByteBuf<byte[]> {
private static final Recycler<PooledHeapByteBuf> RECYCLER = new Recycler<PooledHeapByteBuf>() {
@Override
protected PooledHeapByteBuf newObject(Handle<PooledHeapByteBuf> handle) {
return new PooledHeapByteBuf(handle, 0);
}
};
...
}
第二个对象缓冲池实例
final class PooledDirectByteBuf extends PooledByteBuf<ByteBuffer> {
private static final Recycler<PooledDirectByteBuf> RECYCLER = new Recycler<PooledDirectByteBuf>() {
@Override
protected PooledDirectByteBuf newObject(Handle<PooledDirectByteBuf> handle) {
return new PooledDirectByteBuf(handle, 0);
}
};
...
}
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://xieyizun.blog.csdn.net/article/details/85549276
内容来源于网络,如有侵权,请联系作者删除!