cd ../

Java原子循环计数器

最近在折腾JDBC工具类,自己写了一个DataSourceRoundRobin负载均衡器,其中就碰到了一个原子循环计数器的问题。

我们知道,在JDK的concurrent包中包含了一系列的原子操作的工具类,如AtomicIntegerAtomicLongAtomicReference等。通过使用这些已有的工具类,我们可以十分方便地实现无锁的替换、递增和递减等操作。

例如,原本如下线程不安全的计数器实现,

public class NotThreadSafeCounter implements Counter {

    private int counterVal = 0;

    @Override
    public int next() {
        return counterVal++;
    }
}

可以替换为

public class ThreadSafeCounter implements Counter {

    private final AtomicInteger counterVal = new AtomicInteger(0);

    @Override
    public int next() {
        return counterVal.getAndIncrement();
    }
}

从而适合于多线程并发环境下使用。那如何通过利用这些已有工具类来实现循环计数器呢?我们可以先看一下concurrent包中的AtomicInteger.getAndIncrement()是如何实现的。

以下是来自JDK 1.6的源码:

/**
 * Atomically increments by one the current value.
 *
 * @return the previous value
 */
public final int getAndIncrement() {
    for (;;) {
        int current = get();
        int next = current + 1;
        if (compareAndSet(current, next))
            return current;
    }
}

其中get()的实现十分简单,直接返回了内部的private volatile int value变量。而compareAndSet(current, next)方法直接调用了底层的Compare-and-swap指令操作,

/**
 * Atomically sets the value to the given updated value
 * if the current value {@code ==} the expected value.
 *
 * @param expect the expected value
 * @param update the new value
 * @return true if successful. False return indicates that
 * the actual value was not equal to the expected value.
 */
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

也就是从机器指令层面上实现了原子替换操作,即仅在比较成立的情况下作替换操作。关于CAS可以详见上面的那个链接,这里就不展开了。根据上面的javadoc,compareAndSet方法的返回表明了这次操作是否成功执行。getAndIncrement方法就是通过这个返回是否成功的信息,在一个死循环中不断尝试,直到成功为止。现在我们就可以模仿这种不断尝试替换的模式来实现我们的循环计数器。

public class RoundRobinCounter implements Counter {

    private final int maxIndex;
    private AtomicInteger counter = new AtomicInteger(0);

    public RoundRobinCounter(int maxIndex) {
        Check.argumentIsPositive(maxIndex,
                "maxIndex must be a positive number");
        this.maxIndex = maxIndex;
    }

    @Override
    public int next() {
        while (true) {
            int current = counter.get();
            int next = (current + 1) % maxIndex;
            if (counter.compareAndSet(current, next)) {
                return current;
            }
        }
    }
}

通过比较AtomicInteger.getAndIncrement()的实现不难发现,其实就是把原本int next = current + 1;替换成了int next = (current + 1) % maxIndex;,使得每次当要递增达到最大上限时,变为0(此时maxIndex % maxIndex = 0)。