ConcurrentBag是HikariCP中实现的一个池化资源的并发管理类。它是一个高性能的生产者-消费者队列。

前言

ConcurrentBag是HikariCP中实现的一个池化资源的并发管理类。它是一个高性能的生产者-消费者队列。

ConcurrentBag的并发性能优于LinkedBlockingQueue和LinkedTransferQueue

LinkedBlockingQueue 阻塞队列

LinkedTransferQueue 数据传送队列

TransferQueue继承自BlockingQueue接口 TransferQueue的改进:

保留与完成

保留是指消费者线程在消费时如果发现队列为空,就生成一个空元素入队,然后该消费者线程在这个资源的数据字段上旋转等待。

完成是当生产者线程要放入一个新资源时,如果发现首位元素的数据字段为空,就把数据直接填充到这个元素中。

保留加完成,共同称为数据的传送。

为了提升并发效率,ConcurrentBag

  1. 优先使用ThreadLocal里的资源,如果ThreadLocal的List里没有可用的资源了,再使用公共集合(资源池)里的资源。
  2. 无论ThreadLocal还是公共集合,都使用CAS代替加锁

IConcurrentBagEntry接口

ConcurrentBag中定义了一个public的成员接口IConcurrentBagEntry,并作为这个类的泛型,要求所有要接受ConcurrentBag管理的池化资源都要实现这个接口

public interface IConcurrentBagEntry
{
    int STATE_NOT_IN_USE = 0;
    int STATE_IN_USE = 1;
    int STATE_REMOVED = -1;
    int STATE_RESERVED = -2;

    boolean compareAndSet(int expectState, int newState);
    void setState(int newState);
    int getState();
}

两个重要的方法

1. 借用池化资源

public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
  • 参数:
    • timeout: 超时时长
    • timeunit: 时长单位

首先尝试请求ThreadLocal的资源

final List<Object> list = threadList.get();
for (int i = list.size() - 1; i >= 0; i--) {
    final Object entry = list.remove(i);
    @SuppressWarnings("unchecked")
    final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
    if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
        return bagEntry;
    }
}

ThreadLocal里没有请求到资源,就去请求公共集合里的资源

final int waiting = waiters.incrementAndGet(); // waiters是个Atomic Integer,表示等待的消费者数量
try {
    // 遍历请求资源
    for (T bagEntry : sharedList) {
        if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
            // 当前消费者线程获取到的资源可能是别的消费者在等待的,为了不让其他消费者线程因为抢占而阻塞,调用创建新资源的线程,给在等待的消费者们
            if (waiting > 1) {
                listener.addBagItem(waiting - 1);
            }
            return bagEntry;
        }
    }

    // 另起一个创建新资源的线程,创建资源
    listener.addBagItem(waiting);

    // 等待获取资源,超时控制此时才开始
    timeout = timeUnit.toNanos(timeout);
    do {
        final long start = currentTime();
        final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
        // 返回null表示超时
        if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
            return bagEntry;
        }

        timeout -= elapsedNanos(start);
    } while (timeout > 10_000);

    return null;
}
finally {
    waiters.decrementAndGet(); // 释放
}

实际上,创建新资源的消费者不会马上就创建一个新资源,而是会先判断当前是否还有在等待的消费者,这是因为在高并发下,可能有资源抢先被其他线程归还,在等待的消费者就可以直接使用这个空闲的资源。

这种调用新线程创建资源的方法,比起其它线程池如果获取不到资源直接当前线程创建一个新资源的方式,因为多了一次等待中的消费者的数量的判断,所以既节省了创建资源的时间,提高了并发性能,又节省了内存占用,还节省了线程池空间的占用,可谓一举三得。

2. 归还借来的资源

public void requite(final T bagEntry)
  • 参数
    • bagEntry: 要归还的资源

如果有线程正在等待,尝试将资源归还到handoffQueue,使用定期parkNanos和yield防止当前操作占用了过多的CPU资源

for (int i = 0; waiters.get() > 0; i++) {
    if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {
        return;
    }
    else if ((i & 0xff) == 0xff) { // 每尝试256次,就阻塞10ms
        parkNanos(MICROSECONDS.toNanos(10));
    }
    else {
        Thread.yield(); // 让出CPU调度
    }
}

如果没有线程在等待,把资源归还到当前线程的ThreadLocal,因为同一次操作里很可能多次获取连接,要提高一次操作的效率。

可以看出,与常规的生产者-消费者模型不同,每次借用完一定要归还,因为borrow操作中没有删除资源的动作,GC是不可能去回收资源的,不及时归还的话可能导致内存泄露。

综合以上代码,对并发编程有以下启发:

  1. 高并发场景下,尽量避免使用synchronized这种重量级的锁,而是用Atomic、CopyOnWrite、CAS等轻量级的方式保证并发安全。
  2. 不能让一个线程长时间占用资源,要适当地给其它线程让行。
  3. 要尽量用低消耗的操作替代高消耗的操作,如这里的尽量不创建新资源。
  4. 常用的资源尽量放到ThreadLocal中。

版权声明:本文为wjhmrc原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/wjhmrc/p/14802034.html