原子操作
原子操作,不可被中断的一个或一系列的操作;即该操作具有原子性;
例如为什么i++不是一个原子操作?
原因是执行i++是分为三步,read, inc, write。如果i=1,执行两次i++之后,那么i的最终结果可能不会是3,而是2。加入现在如果有一个多线程的程序对线程的共享变量i进行操作,每次执行一次i++操作,最终i的输出和执行次数是一致的吗?例如下面的代码:
static int i = 0;
public static void loop() throws InterruptedException {
Thread[] threads = new Thread[1000];
for (int j = 0; j < 1000; j++) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
i++;
}
});
threads[j] = t;
t.start();
}
for (int j = 0; j < 1000; j++) {
threads[j].join();//等待所有线程执行完毕
}
System.out.println(i.get());
}
程序输出:999
原因分析
在《JVM原理简述》中,在栈的部分提到了下面的部分:每一个线程被创建的时候,都会得到自己的程序计数器和java栈。java栈以帧为单位保存调用信息。当线程调用一个方法,JVM会向栈中压入一个新的栈帧,当方法调用结束时,栈帧会被弹出。java栈帧是线程私有的,不存在多线程的安全性问题。
栈帧由3部分构成,局部变量区,操作数栈,帧数据区。
1. 局部变量区
编译器将调用方法的局部变量和参数按照声明顺序放入局部变量数组中,从0开始计数。如果是实例方法,this将被存储到数组的第一个位置。java中的对象传递都是按照引用传递的,因此java对象全部存储在堆中,而局部变量区和操作数栈中只存储了对象的引用。
2. 操作数栈
JVM把操作数栈作为它的工作区,大多数指令都要从这里弹出数据,执行运算,然后把结果压回操作数栈,然后等相关的指令将结果再次弹出。操作数栈扮演了暂存操作数的角色。
3. 帧数据区
栈帧还可以存储常量池的解析,正常方法返回和异常派发机制。这些都存在帧数据区。当方法要访问常量池时,会通过帧数据区的指向常量池的指针来访问。
每个线程在创建时,都会得到自己的计数器和栈,每次调用方法的时候,都以帧为单位保存调用信息;在栈帧中,i的值会从主内存中加载进来,线程会创建一个局部变量对象i,栈中会保存局部变量的引用,当i的操作完成后,再向主内存更新数据;这就造成同一时间两个线程读取到了同样的值,操作之后又会写回同样的数据,造成最终的i的值小于1000。volatile
现在我们不禁想起了volatile标签,如果i是volatile类型的变量,结果有变化吗?我们将程序第一行的i声明加上volatile,再执行一次;程序输出:998
从输出结果看,还是没有改进;证明volatile不能将非原子操作的行为限定为原子操作;
volatile的作用是强制线程在操作变量时都从主内存读取数据,并且操作完成后将操作后的值写入主内存。由于i++不是原子操作,分为read,inc,write三步,因此read即使每次都强制从主内存读取,但是依然有可能两个线程读取到相同的值;
CAS
Compare and swap,CAS操作需要输入两个数值,一个旧值(期望操作前的值)和一个新值,在操作期间先比较下旧值有没有发生变化,如果没有发生变化,才交换成新值,发生了变化则不交换。
说到CAS,要先了解一下处理器是如何保证原子操作的。
32位IA-32处理器使用基于对缓存加锁或总线加锁的方式来实现多处理器之间的原子操作。
JAVA中利用处理器对CMPXCHG指令来实现原子操作,CMPXCHG在处理器中被增加了#LOCK前缀,因此它在处理时是原子操作;
如下代码可以实现对i计数器的原子操作:
volatile static AtomicInteger i = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
for (int k = 0; k < 100; k++) {
loop();
}
}
public static void loop() throws InterruptedException {
Thread[] threads = new Thread[1000];
for (int j = 0; j < 1000; j++) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
safeCount();
}
});
threads[j] = t;
t.start();
}
for (int j = 0; j < 1000; j++) {
threads[j].join();
}
System.out.println(i.get());
}
public static void safeCount() {
for (; ; ) {
int tmp = i.get();
if (i.compareAndSet(tmp, ++tmp)) {
break;
}
}
}
输出:
1000 2000 3000 4000 5000使用CAS会存在下面几个问题
- ABA的问题,即A被修改成为B,又被修改为A,此时使用CAS会发现变量没有变化,实际上却变化了;解决的办法是加入版本号,即1A-2B-3A。从Java1.5开始JDK的atomic包里提供了一个类AtomicStampedReference来解决ABA问题。这个类的compareAndSet方法作用是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。
- CAS的CPU开销问题,由于存在循环,因此某些时候会使得CPU的开销比较大;
- 只能保证一个共享变量的原子操作,无法对多个变量保证。如果存在多个变量,建议使用锁;
Lock
下面我们用加锁的方式来实现i++的原子操作;
volatile static int i = 0;
static final ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
for(int k=0;k<100;k++){
loop();
}
}
public static void loop() throws InterruptedException {
for(int j = 0;j<1000;j++){
Thread t = new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.lock();
try{
i++;
}finally {
lock.unlock();
}
}
});
t.start();
}
Thread.sleep(1001);
System.out.println(i);
}
程序输出
参考资料:
1000 2000 3000 4000 5000
线程池(Threadpool)
百度百科关于线程池的定义非常全面。
线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。
以java中的Excutor组件为实例说明线程池的整体流程;
以Executor组件的ThreadPoolExecutor为例来说明线程池的运行规则;
首先,调用new方法创建一个ThreadPoolExecutor实例,代码:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
我们看到有几个核心参数,- corePoolSize:可以保留运行线程的数量
- maximumPoolSize:最大线程数量
- workQueue:阻塞队列,用于存储任务
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
}
}
以上代码可以看出,线程池的运行机制如下:- 当poolsize<corePoolSize时,任务会被直接以创建workerThread的形式直接执行,不会存储到queue中;
- 当poolsize达到corePoolSize大小时,会向队列存储任务,workQueue.offer(), 这个在前面的blog中已经介绍过,offer是非阻塞方法,并且如果队列满的话,会直接返回false;插入队列的数据会被现有的workerThreads消费执行;
- 当队列无法插入数据的时候,会创建新的workerThread来执行,直到当前的poolSize达到了maximumPoolSize
- 当无法向队列插入数据,并且poolSize= maximumPoolSize,会拒绝这个任务;
- 如果poolsize>corePoolSize,那么多余的线程会在等待keepAliveTime时间后销毁;
![]() |
| 线程池说明 |
参考资料:

没有评论:
发表评论