2014年8月29日星期五

Facebook/Swift:简化Java中Thrift开发的工具

用Thrift在java中开发过的同学都了解,使用Thrift的IDL语言生成的java文件是无比巨大和复杂的。很多时候对这样一种超大的实体和service定义都感到非常无力。增减方法,增减字段的文件替换都非常不方便。还好发现了一个简化开发的工具,Facebook/Swift,github地址:https://github.com/facebook/swift
这个框架的核心原理是通过定义和实现注解来达到简化Thrift文件的目的,使得在开发时定义的POJO和Interface都能够和普通的java文件一致;
我们直接看例子,我们定义一个实体类,可以简化写法是:
@ThriftStruct
public class ThirdPartyCollection {

    public final long id; // required
    public final String date; // optional

    @ThriftConstructor
    public ThirdPartyCollection(long id, String date) {
        this.id = id;
        this.date = date;
    }
    @ThriftField(1)
    public long getId() {
        return id;
    }
    @ThriftField(2)
    public String getDate() {
        return date;
    }    
这要比直接使用thrift IDL生成的java代码简单非常多的倍数;
定义service写法如下:
@ThriftService("ThirdPartyCollectionService")
public interface ThirdPartyCollectionService {
    @ThriftMethod
    public ThirdPartyCollection save(@ThriftField(name = "collection") ThirdPartyCollection collection) throws IllegalAccessException, NoSuchMethodException, InvocationTargetException;
    @ThriftMethod
    public ThirdPartyCollection update(@ThriftField(name = "collection") ThirdPartyCollection collection);
    @ThriftMethod
    public List<ThirdPartyCollection> getAll();
}
写法需要在类和方法以及方法参数增加注解即可。
同时,启动服务和client调用的代码也有所变动:
public static void main(String[] args) throws IOException, InterruptedException {
        ThriftServiceProcessor processor = new ThriftServiceProcessor(
                new ThriftCodecManager(),
                ImmutableList.<ThriftEventHandler>of(),
                new ThirdPartyCollectionServiceImpl()
        );

        taskWorkerExecutor = newFixedThreadPool(1);

        ThriftServerDef serverDef = ThriftServerDef.newBuilder()
                .listen(8899)
                .withProcessor(processor)
                .using(taskWorkerExecutor)
                .build();

        bossExecutor = newCachedThreadPool();
        ioWorkerExecutor = newCachedThreadPool();

        NettyServerConfig serverConfig = NettyServerConfig.newBuilder()
                .setBossThreadExecutor(bossExecutor)
                .setWorkerThreadExecutor(ioWorkerExecutor)
                .build();

        server = new ThriftServer(serverConfig, serverDef);
        server.start();
}
Client端调用代码:
public static void main(String[] args) throws ExecutionException, InterruptedException, IllegalAccessException, NoSuchMethodException, InvocationTargetException {
        ThriftClientManager clientManager = new ThriftClientManager();
        FramedClientConnector connector = new FramedClientConnector(new InetSocketAddress("localhost",8899));
        ThirdPartyCollectionService scribe = clientManager.createClient(connector, ThirdPartyCollectionService.class).get();
        scribe.getAll();
        com.qiyi.thrift.test.core.ThirdPartyCollection collection =
                new com.qiyi.thrift.test.core.ThirdPartyCollection(1001, "2014-08-29");
        scribe.save(collection);
}


2014年8月12日星期二

Session机制和分布式Session

什么是session

    Session是指一个终端用户与交互系统进行通信的时间间隔,通常指从注册进入系统到注销退出系统之间所经过的时间。以及如果需要的话,可能还有一定的操作空间。
    在计算机系统中,Session又被称作“会话”。
    一个Session的概念需要包括特定的客户端,特定的服务器端以及不中断的操作时间。

什么是Cookie

    Cookie最早是网景公司的前雇员Lou Montulli在1993年3月的发明。
    Cookie是由服务器端生成,发送给User-Agent(一般是浏览器),浏览器会将Cookie的key/value保存到某个目录下的文本文件内,下次请求同一网站时就发送该Cookie给服务器(前提是浏览器设置为启用cookie)。Cookie名称和值可以由服务器端开发自己定义,对于JSP而言也可以直接写入jsessionid,这样服务器可以知道该用户是否合法用户以及是否需要重新登录等,服务器可以设置或读取Cookies中包含信息,借此维护用户跟服务器会话中的状态。

session原理

session 交互流程图
  1. 客户端请求服务器端,服务器会分配给客户端一个sessionID,这个sessionID会以cookie的形式保存在客户端浏览器中;
  2. 客户端此后的请求都会附带这个cookie请求服务器端接口和数据,当客户端请求登录时,服务器会校验用户信息,如果登录成功会更新本地的session数据,并返回客户端登录成功;
  3. 此后客户端就可以访问那些登录受限页面,获取用户数据,每次请求时,服务器端都会判断session是否过期,一旦过期,则返回错误页面,否则返回用户需要的数据;
  4. 当客户端cookie过期或服务器端session过期后,用户的session也就结束了。

分布式session

    由于最简单的session是以jvm为存储介质的,也就是一台JVM中存储了在这个系统访问的用户session信息,当处于分布式环境下,例如多个JVM提供服务时,这些session是无法共享数据的,此时就涉及了分布式session的概念。
    原理就是将session存储到一个公共的服务中,每个JVM都同时存取这些公共数据,保证数据可以互通;
    实现的方案有很多,例如以分布式缓存系统存储,例如redis,memcached,mongoDB等,另外一个思路是使用zookeeper存储sessionID信息,以Zookeeper的强一致性保证数据的唯一性和可用性;
    下面介绍一种非侵入式的分布式缓存系统,原理是重写tomcat的底层session,将session存储到redis中。这种方式的优点是上层业务逻辑代码可以不变,只需要配置tomcat即可达到目标;
    这是一个github上的开源项目:https://github.com/jcoleman/tomcat-redis-session-manager
    下载它对应tomcat7,jdk7的包,还有jedis2.0.0版本的jar包以及apache commons组件中的commons-pool-1.4.jar,将这三个jar拷贝到tomcat的lib目录。
    编辑conf目录下的context.xml文件,加入以下配置:
<Valve className="com.radiadesign.catalina.session.RedisSessionHandlerValve" />
<Manager className="com.radiadesign.catalina.session.RedisSessionManager"
         host=“192.168.10.11" 
         port="6379" 
         database="0" 
         maxInactiveInterval="60" />
    其中,host是redis的主机地址,port是redis服务的端口,database是redis的db名称,一般是0-15,默认redis有16个db。maxInactiveInterval是session过期时间,如果在代码层设置了过期时间,那么这个值会被覆盖,这个值的单位是秒;
    设置之后重启tomcat,通过redis-cli接入redis,查看keys:
keys *
可以通过这个命令查看redis的基本信息:
info
注:源码有一个bug,请查看RedisSessionManager的这段代码:
try{
   jedis = acquireConnection();
   // Ensure generation of a unique session identifier.
   do {
      if (null == sessionId) {
          sessionId = generateSessionId();
      }
      if (jvmRoute != null) {
         sessionId += '.' + jvmRoute;
      }
   } while (jedis.setnx(sessionId.getBytes(), NULL_SESSION) == 1L); // 1 = key set; 0 = key already existed
请注意看jvmRoute这段,如果在tomcat中配置了这个参数,那么这段代码会进入死循环。假如jvmRoute不为空,那么sessionId每次循环都会变更,导致while条件一直为true,一旦触发,会导致redis的内存以非常快速的速度增长。
改动就是把代码if(jvmRoute !=null)挪到if(null == sessionId)内;
另外,这段代码是为了避免分布式sessionID重现重复而加的,当不考虑jvmRoute时,一旦生成了redis中已存在的sessionId, 那么setnx会返回0,导致while结束;但是一旦生成了重复的sessionId也会导致循环结束,因此最后的while循环也会导致sessionId重复的问题;
所以考虑修改这段代码为:
try{
    jedis = acquireConnection();
    // Ensure generation of a unique session identifier.
    if (null == sessionId) {
        sessionId = generateSessionId();
    }
    while (jedis.setnx(sessionId.getBytes(), NULL_SESSION) == 0L){//contains key, re-generate
        sessionId = generateSessionId();
    }
}catch(Exception e){}

Tomcat容器如何管理session的

一般在代码中,我们使用下面这个方法获取session:
HttpSession session = request.getSession();
request这里类型一般是javax.servlet.http.HttpServletRequest,这是一个接口,扩展了ServletRequest接口,
public interface HttpServletRequest extends ServletRequest{
    ...
     /**
     *
     * Returns the current session associated with this request,
     * or if the request does not have a session, creates one.
     * 
     * @return  the HttpSession associated
     *   with this request
     *
     * @see #getSession(boolean)
     *
     */
    public HttpSession getSession();
}
接口说明:返回当前request相关的session对象,如果当前request没有session,则创建一个。
调用这个接口时,实际的实现类就到了tomcat容器org.apache.catalina.connector.RequestFacade中,这是一个门面封装类,它中间有实际getSession的实现方法:
@Override
    public HttpSession getSession(boolean create) {

        if (request == null) {
            throw new IllegalStateException(
                            sm.getString("requestFacade.nullRequest"));
        }

        if (SecurityUtil.isPackageProtectionEnabled()){
            return AccessController.
                doPrivileged(new GetSessionPrivilegedAction(create));
        } else {
            return request.getSession(create);
        }
    }
根据这个方法的实现,最终是通过org.apache.catalina.connector.Request类中的getSession方法获取或创建当前session对象;
/**
     * Return the session associated with this Request, creating one
     * if necessary and requested.
     *
     * @param create Create a new session if one does not exist
     */
    @Override
    public HttpSession getSession(boolean create) {
        Session session = doGetSession(create);
        if (session == null) {
            return null;
        }

        return session.getSession();
    }
Session在doGetSession中创建,
protected Session doGetSession(boolean create) {

        // There cannot be a session if no context has been assigned yet
        if (context == null) {
            return (null);
        }

        // Return the current session if it exists and is valid
        if ((session != null) && !session.isValid()) {
            session = null;
        }
        if (session != null) {
            return (session);
        }

        // Return the requested session if it exists and is valid
        Manager manager = null;
        if (context != null) {
            manager = context.getManager();
        }
        if (manager == null)
         {
            return (null);      // Sessions are not supported
        }
        if (requestedSessionId != null) {
            try {
                session = manager.findSession(requestedSessionId);
            } catch (IOException e) {
                session = null;
            }
            if ((session != null) && !session.isValid()) {
                session = null;
            }
            if (session != null) {
                session.access();
                return (session);
            }
        }

        // Create a new session if requested and the response is not committed
        if (!create) {
            return (null);
        }
        if ((context != null) && (response != null) &&
            context.getServletContext().getEffectiveSessionTrackingModes().
                    contains(SessionTrackingMode.COOKIE) &&
            response.getResponse().isCommitted()) {
            throw new IllegalStateException
              (sm.getString("coyoteRequest.sessionCreateCommitted"));
        }

        // Attempt to reuse session id if one was submitted in a cookie
        // Do not reuse the session id if it is from a URL, to prevent possible
        // phishing attacks
        // Use the SSL session ID if one is present.
        if (("/".equals(context.getSessionCookiePath())
                && isRequestedSessionIdFromCookie()) || requestedSessionSSL ) {
            session = manager.createSession(getRequestedSessionId());
        } else {
            session = manager.createSession(null);
        }

        // Creating a new session cookie based on that session
        if ((session != null) && (getContext() != null)
               && getContext().getServletContext().
                       getEffectiveSessionTrackingModes().contains(
                               SessionTrackingMode.COOKIE)) {
            Cookie cookie =
                ApplicationSessionCookieConfig.createSessionCookie(
                        context, session.getIdInternal(), isSecure());

            response.addSessionCookieInternal(cookie);
        }

        if (session == null) {
            return null;
        }

        session.access();
        return session;
    }
上面一段方法的逻辑是:先从现有的session池中查找session,如果当前request不含有sessionID或没有查到对应的session,则直接创建一个session,通过调用:org.apache.catalina.Manager.createSession()方法实现;
正常的方法实现如下:
/**
     * Construct and return a new session object, based on the default
     * settings specified by this Manager's properties.  The session
     * id specified will be used as the session id.  
     * If a new session cannot be created for any reason, return 
     * <code>null</code>.
     * 
     * @param sessionId The session id which should be used to create the
     *  new session; if <code>null</code>, a new session id will be
     *  generated
     * @exception IllegalStateException if a new session cannot be
     *  instantiated for any reason
     */
    @Override
    public Session createSession(String sessionId) {
        
        if ((maxActiveSessions >= 0) &&
                (getActiveSessions() >= maxActiveSessions)) {
            rejectedSessions++;
            throw new TooManyActiveSessionsException(
                    sm.getString("managerBase.createSession.ise"),
                    maxActiveSessions);
        }
        
        // Recycle or create a Session instance
        Session session = createEmptySession();

        // Initialize the properties of the new session and return it
        session.setNew(true);
        session.setValid(true);
        session.setCreationTime(System.currentTimeMillis());
        session.setMaxInactiveInterval(this.maxInactiveInterval);
        String id = sessionId;
        if (id == null) {
            id = generateSessionId();
        }
        session.setId(id);
        sessionCounter++;

        SessionTiming timing = new SessionTiming(session.getCreationTime(), 0);
        synchronized (sessionCreationTiming) {
            sessionCreationTiming.add(timing);
            sessionCreationTiming.poll();
        }
        return (session);

    }
创建一个空的session对象,如果没有sessionId,则声称一个id(生成时会滤重,本地有一个Map负责存储所有已生成的sessionId值,一旦重复则重新生成,避免由于重复产生错误;也就是generateSessionId方法(代码本处略);
分布式session需要重写这个方法,判重要根据redis中的数据判断,而不是根据本地的Map;
重写的分布式session中的createSession方法:
@Override
    public Session createSession(String sessionId) {
        RedisSession session = (RedisSession) createEmptySession();

        // Initialize the properties of the new session and return it
        session.setNew(true);
        session.setValid(true);
        session.setCreationTime(System.currentTimeMillis());
        session.setMaxInactiveInterval(getMaxInactiveInterval());

        String jvmRoute = getJvmRoute();

        Boolean error = true;
        Jedis jedis = null;

        try {
            jedis = acquireConnection();

            // Ensure generation of a unique session identifier.
            if (null == sessionId) {
                sessionId = generateSessionId();
            }
            while (jedis.setnx(sessionId.getBytes(), NULL_SESSION) == 0L){//contains key, re-generate
                sessionId = generateSessionId();
            }
      /* Even though the key is set in Redis, we are not going to flag
         the current thread as having had the session persisted since
         the session isn't actually serialized to Redis yet.
         This ensures that the save(session) at the end of the request
         will serialize the session into Redis with 'set' instead of 'setnx'. */

            error = false;

            session.setId(sessionId);
            session.tellNew();

            currentSession.set(session);
            currentSessionId.set(sessionId);
            currentSessionIsPersisted.set(false);
        } finally {
            if (jedis != null) {
                returnConnection(jedis, error);
            }
        }

        return session;
    }
会判断sessionId是否在redis中存在,如果存在则重新生成新的sessionId;
关于session的Attribute,是存储在session对象的Map中,每次获取、更新、删除都操作这个map对象;
关于session的存储,是利用了tomcat的valve责任链模式,将自定义的valve实现类增加到责任链中,通过每次执行invoke来保存数据到redis,代码:
public class RedisSessionHandlerValve extends ValveBase {
  private final Log log = LogFactory.getLog(RedisSessionManager.class);
  private RedisSessionManager manager;

  public void setRedisSessionManager(RedisSessionManager manager) {
    this.manager = manager;
  }

  @Override
  public void invoke(Request request, Response response) throws IOException, ServletException {
    try {
      getNext().invoke(request, response);
    } finally {
      final Session session = request.getSessionInternal(false);
      storeOrRemoveSession(session);
      manager.afterRequest();
    }
  }

  private void storeOrRemoveSession(Session session) {
    try {
      if (session != null) {
        if (session.isValid()) {
          log.trace("Request with session completed, saving session " + session.getId());
          if (session.getSession() != null) {
            log.trace("HTTP Session present, saving " + session.getId());
            manager.save(session);
          } else {
            log.trace("No HTTP Session present, Not saving " + session.getId());
          }
        } else {
          log.trace("HTTP Session has been invalidated, removing :" + session.getId());
          manager.remove(session);
        }
      }
    } catch (Exception e) {
      // Do nothing.
    }
  }
}

invoke会调用本地的storeOrRemoveSession,通过manager中的save方法将session保存更新到redis中。

参考资料

2014年8月8日星期五

IO操作之零拷贝(ZeroCopy)

简介

零拷贝技术是指在计算机操作过程中,CPU避免由于数据在内存间拷贝而浪费资源的技术。特别是在将文件在网络中输出的场景下,通过零拷贝技术可以节省很多的计算资源和内存资源。
下图描述了传统的IO操作的流程。
non-zerocopy
不使用零拷贝技术,当文件输出到网络时,首先要将数据从内核缓冲区拷贝到程序缓冲区,然后再由程序将缓冲区拷贝的数据输出到内核缓冲区的网络输出的socketBuffer中;我们可以看出拷贝到程序缓冲区这步实际上是没有必要的。
引入了零拷贝,流程见下图:
数据将不会拷贝到程序缓冲区,而是由程序调用系统的零拷贝命令,数据将会从输入流直接被写入到输出流中,避免了一次无用的拷贝过程;

Linux中的零拷贝

在linux中,可以通过mmap(), sendfile(), splice()实现零拷贝。

mmap

通过使用mmap接口替代read可以达到减少拷贝次数的目的。
tmp_buf = mmap(file, len); 
write(socket, tmp_buf, len);
使用mmap后,数据会通过DMA拷贝到操作系统内核缓冲区中,接着应用程序和操作系统共享这个缓冲区的数据,因此数据不需要再拷贝到应用程序缓冲区了。调用write后,操作系统将数据从内核缓冲区拷贝到与socket相关的内核缓冲区中,最后再拷贝到协议引擎中,一共三次数据拷贝;
使用 mmap 是 POSIX 兼容的,但是使用 mmap 并不一定能获得理想的数据传输性能。数据传输的过程中仍然需要一次 CPU 拷贝操作,而且映射操作也是一个开销很大的虚拟存储操作,这种操作需要通过更改页表以及冲刷 TLB (使得 TLB 的内容无效)来维持存储的一致性。但是,因为映射通常适用于较大范围,所以对于相同长度的数据来说,映射所带来的开销远远低于 CPU 拷贝所带来的开销。

sendfile

Linux2.1引入sendfile技术,与mmap的区别主要在它不需要维持内核缓冲区数据到程序缓冲区的映射操作,因此它极大的减少了对存储的开销。但是它仍然有一次在操作系统内核缓冲区的数据拷贝过程,将数据拷贝到socket相关的缓冲区中。

带有DMA收集功能的sendfile

sendfile() 系统调用利用 DMA 引擎将文件内容拷贝到内核缓冲区去;然后,将带有文件位置和长度信息的缓冲区描述符添加到 socket 缓冲区中去,此过程不需要将数据从操作系统内核缓冲区拷贝到 socket 缓冲区中,DMA 引擎会将数据直接从内核缓冲区拷贝到协议引擎中去,这样就避免了最后一次数据拷贝

splice

Linux 2.6.17 内核引入了 splice() 系统调用,它和sendfile非常类似,但是它不需要指定输出一端是socket,任何的系统文件输出都可以。从这一点上讲,sendfile实际上是splice的一个子集。

java中的零拷贝

Java 类库通过 java.nio.channels.FileChannel 中的 transferTo() 方法来在 Linux 和 UNIX 系统上支持零拷贝。可以使用 transferTo() 方法直接将字节从它被调用的通道上传输到另外一个可写字节通道上,数据无需流经应用程序。
下面是一段将文件输出到httpServletResponse的代码:
//输出流
servletOutputStream = response.getOutputStream();
FileChannel channel = new FileInputStream(imgPath).getChannel();
response.setHeader("Content-Length", String.valueOf(channel == null ? 0 : channel.size()));
channel.transferTo(0, channel.size(), Channels.newChannel(servletOutputStream));
通过零拷贝技术可以提高数据输出的时间延迟,下面是使用传统的IO输出和零拷贝输出的时间对比:
文件大小正常文件传输(ms)transferTo(ms)
7MB15645
21MB337128
63MB843387
98MB1320617
200MB21241150
350MB36311762
700MB134984422
1GB183998537
参考文档:



2014年8月4日星期一

JAVA多线程编程(二)

原子操作

原子操作,不可被中断的一个或一系列的操作;即该操作具有原子性;
例如为什么i++不是一个原子操作?
原因是执行i++是分为三步,read, inc, write。如果i=1,执行两次i++之后,那么i的最终结果可能不会是3,而是2。
加入现在如果有一个多线程的程序对线程的共享变量i进行操作,每次执行一次i++操作,最终i的输出和执行次数是一致的吗?例如下面的代码:
static int i = 0;
public static void loop() throws InterruptedException {
        Thread[] threads = new Thread[1000];
        for (int j = 0; j < 1000; j++) {
            Thread t = new Thread(new Runnable() {

                @Override
                public void run() {
                    try {
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    i++;
                }
            });
            threads[j] = t;
            t.start();
        }
        for (int j = 0; j < 1000; j++) {
            threads[j].join();//等待所有线程执行完毕
        }
        System.out.println(i.get());
    }

程序输出:999

原因分析

《JVM原理简述》中,在栈的部分提到了下面的部分:
每一个线程被创建的时候,都会得到自己的程序计数器和java栈。java栈以帧为单位保存调用信息。当线程调用一个方法,JVM会向栈中压入一个新的栈帧,当方法调用结束时,栈帧会被弹出。java栈帧是线程私有的,不存在多线程的安全性问题。
        栈帧由3部分构成,局部变量区,操作数栈,帧数据区。
        1. 局部变量区
        编译器将调用方法的局部变量和参数按照声明顺序放入局部变量数组中,从0开始计数。如果是实例方法,this将被存储到数组的第一个位置。java中的对象传递都是按照引用传递的,因此java对象全部存储在堆中,而局部变量区和操作数栈中只存储了对象的引用。
        2. 操作数栈
        JVM把操作数栈作为它的工作区,大多数指令都要从这里弹出数据,执行运算,然后把结果压回操作数栈,然后等相关的指令将结果再次弹出。操作数栈扮演了暂存操作数的角色。
        3. 帧数据区
        栈帧还可以存储常量池的解析,正常方法返回和异常派发机制。这些都存在帧数据区。当方法要访问常量池时,会通过帧数据区的指向常量池的指针来访问。
每个线程在创建时,都会得到自己的计数器和栈,每次调用方法的时候,都以帧为单位保存调用信息;在栈帧中,i的值会从主内存中加载进来,线程会创建一个局部变量对象i,栈中会保存局部变量的引用,当i的操作完成后,再向主内存更新数据;这就造成同一时间两个线程读取到了同样的值,操作之后又会写回同样的数据,造成最终的i的值小于1000。

volatile

现在我们不禁想起了volatile标签,如果i是volatile类型的变量,结果有变化吗?我们将程序第一行的i声明加上volatile,再执行一次;
程序输出:998
从输出结果看,还是没有改进;证明volatile不能将非原子操作的行为限定为原子操作;
volatile的作用是强制线程在操作变量时都从主内存读取数据,并且操作完成后将操作后的值写入主内存。由于i++不是原子操作,分为read,inc,write三步,因此read即使每次都强制从主内存读取,但是依然有可能两个线程读取到相同的值;

CAS

Compare and swap,CAS操作需要输入两个数值,一个旧值(期望操作前的值)和一个新值,在操作期间先比较下旧值有没有发生变化,如果没有发生变化,才交换成新值,发生了变化则不交换。
说到CAS,要先了解一下处理器是如何保证原子操作的。
32位IA-32处理器使用基于对缓存加锁或总线加锁的方式来实现多处理器之间的原子操作。
JAVA中利用处理器对CMPXCHG指令来实现原子操作,CMPXCHG在处理器中被增加了#LOCK前缀,因此它在处理时是原子操作;
如下代码可以实现对i计数器的原子操作:
volatile static AtomicInteger i = new AtomicInteger(0);
    public static void main(String[] args) throws InterruptedException {
        for (int k = 0; k < 100; k++) {
            loop();
        }
    }

    public static void loop() throws InterruptedException {
        Thread[] threads = new Thread[1000];
        for (int j = 0; j < 1000; j++) {
            Thread t = new Thread(new Runnable() {

                @Override
                public void run() {
                    try {
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    safeCount();
                }
            });
            threads[j] = t;
            t.start();
        }
        for (int j = 0; j < 1000; j++) {
            threads[j].join();
        }
        System.out.println(i.get());
    }

    public static void safeCount() {
        for (; ; ) {
            int tmp = i.get();
            if (i.compareAndSet(tmp, ++tmp)) {
                break;
            }
        }
    }
输出:
1000
2000
3000
4000
5000
使用CAS会存在下面几个问题

  • ABA的问题,即A被修改成为B,又被修改为A,此时使用CAS会发现变量没有变化,实际上却变化了;解决的办法是加入版本号,即1A-2B-3A。从Java1.5开始JDK的atomic包里提供了一个类AtomicStampedReference来解决ABA问题。这个类的compareAndSet方法作用是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。
  • CAS的CPU开销问题,由于存在循环,因此某些时候会使得CPU的开销比较大;
  • 只能保证一个共享变量的原子操作,无法对多个变量保证。如果存在多个变量,建议使用锁;

Lock

下面我们用加锁的方式来实现i++的原子操作;
    volatile static int i = 0;
    static final ReentrantLock lock = new ReentrantLock();
    public static void main(String[] args) throws InterruptedException {
        for(int k=0;k<100;k++){
            loop();
        }
    }
    public static void loop() throws InterruptedException {
        for(int j = 0;j<1000;j++){
            Thread t = new Thread(new Runnable() {

                @Override
                public void run() {
                    try {
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    lock.lock();
                    try{
                        i++;
                    }finally {
                        lock.unlock();
                    }

                }
            });
            t.start();
        }
        Thread.sleep(1001);
        System.out.println(i);
    }
程序输出
1000
2000
3000
4000
5000

线程池(Threadpool)

百度百科关于线程池的定义非常全面。
线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。
以java中的Excutor组件为实例说明线程池的整体流程;
以Executor组件的ThreadPoolExecutor为例来说明线程池的运行规则;
首先,调用new方法创建一个ThreadPoolExecutor实例,代码:
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
我们看到有几个核心参数,

  • corePoolSize:可以保留运行线程的数量
  • maximumPoolSize:最大线程数量
  • workQueue:阻塞队列,用于存储任务
下面再来看一下主要的方法,execute(),添加任务,
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
            if (runState == RUNNING && workQueue.offer(command)) {
                if (runState != RUNNING || poolSize == 0)
                    ensureQueuedTaskHandled(command);
            }
            else if (!addIfUnderMaximumPoolSize(command))
                reject(command); // is shutdown or saturated
        }
    }
以上代码可以看出,线程池的运行机制如下:

  1. 当poolsize<corePoolSize时,任务会被直接以创建workerThread的形式直接执行,不会存储到queue中;
  2. 当poolsize达到corePoolSize大小时,会向队列存储任务,workQueue.offer(), 这个在前面的blog中已经介绍过,offer是非阻塞方法,并且如果队列满的话,会直接返回false;插入队列的数据会被现有的workerThreads消费执行;
  3. 当队列无法插入数据的时候,会创建新的workerThread来执行,直到当前的poolSize达到了maximumPoolSize
  4. 当无法向队列插入数据,并且poolSize= maximumPoolSize,会拒绝这个任务;
  5. 如果poolsize>corePoolSize,那么多余的线程会在等待keepAliveTime时间后销毁;
下图解释了整个流程。
线程池说明


参考资料:

2014年8月1日星期五

JAVA多线程编程(一)

Thread.join(), Thread.start()

Java线程类java.lang.Thread中,有两个方法,start和join。start和直接执行run有何区别?调用join方法会导致什么后果?

start()

启动一个线程并调用run()方法;API中的注释说明:调用start()方法会导致线程开始执行;JVM会调用本线程的run()方法;结果是两个线程并行运行:当前线程和另外执行run方法的线程;
如果直接调用run()方法,则当前线程不会启动;

join()

join()方法在API的注释是:等待直到当前线程死去;这个方法可能会抛出InterruptedException,当其他任何线程打断当前线程时;
它有两个重载方法:join(long millis)和join(long millis, int nanos);
join(long millis):在millis(单位毫秒)时间内等待线程死去;第二个是再多等nanos纳秒的时间;
join可以用来使线程顺序执行,例如:
Thread t2 = new Thread(consumer);
Thread t3 = new Thread(consumer);
Thread t4 = new Thread(consumer);
try {
   t2.start();
   t2.join();
   t3.start();
   t3.join();
   t4.start();
   t4.join();
} catch (InterruptedException e) {
 e.printStackTrace();
}

Lock接口和synchronized

@since 1.5,在JDK5中添加的锁接口;它相比synchronized关键字,提供了更多的锁操作;除了可以实现和synchronized关键字一样的效果之外,Lock接口还提供了读写锁,读操作和写操作的加锁是分开的。提高了锁的效率;也可以满足类似ConcurrentHashMap这种数据结构的高性能的有条件锁;
如图,ConcurrentHashMap的锁结构,

ConcurrentHashMap通过分段锁的机制来达到提高并发处理能力的目的,每个Segment是一个重入锁(ReentrantLock,每次写数据的时候,会根据key的Hash值来使用不同的Segment加锁;

wait和sleep方法的不同

wait()方法是Object的一个方法,调用这个方法会导致当前线程处于等待的状态,直到另外一个线程调用它的notify()或notifyAll()方法,或者一个指定的超时时间已经结束;这个方法会导致当前线程将自己放在一个等待集合中,然后放弃所有在这个对象的同步锁;sleep()方法会导致线程休眠指定的时间,而休眠期间,所有关于当前线程的锁都不会被释放;

volatile关键字

volatile关键字修饰的共享变量在线程并发读取时,JAVA内存模型保证每个线程在读取的时刻的值永远是最新的数据;
解释volatile要说到JMM(JAVA内存模型);
我们看下面的代码,执行一个volatile类型的变量的new操作会导致:
0x01a3de1d: movb $0x0,0x1104800(%esi);
0x01a3de24: lock addl $0x0,(%esp);
volatile变量会比普通变量多了第二个汇编操作,这个操作在多核处理器会引起:

  • 将当前处理器缓存的数据保存到系统内存
  • 其他cpu的缓存数据失效
操作系统中,处理器为了处理数据速度更快,通常会将数据缓存到L1,L2级缓存中;但是处理器不知道何时将数据回写到主存中,当volatile类型的变量被创建时,JVM会向处理器发送一个lock指令,此时处理器会在更新数据之后,将缓存行数据回写到主存中。其他的处理器的数据还是旧的,为了实现缓存一致性,各个处理器会实现一个缓存一致性协议,每个处理器通过嗅探在总线上传播的数据来判断自己的数据是否过期了。当处理器发现自己的缓存行数据地址被修改,当前处理器会将这个数据标记为无效,当需要操作这个数据时,处理器会强制从系统内存中重新读取这个数据;

JAVA中阻塞队列

java.util.concurrent.BlockingQueue是java中的阻塞队列接口; 他的核心方法包括:
抛出异常 特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
检查 element() peek() 不可用 不可用


  • add()向队列写入一条数据,如果对象写入失败,则抛出异常,否则返回true
  • remove() 从队列删除数据,如果队列不包括这个对象,则抛出异常,否则返回true
  • offer()想队列写入一条数据,如果写入成功返回true,否则返回false
  • pool()从队列头获取一条数据,在指定超时时间内未能成功获取,则返回null
  • put() 向队列写入一条数据,如果队列没有空间写入,则阻塞知道写入成功为止;
  • take() 从队列头部获取一条数据,如果队列没有数据可以获取,则阻塞到有数据获取为止;

BlockingQueue特点

  • BlockingQueue可以限定容量;没有容量限制的队列,总会返回Integer.MAX_VALUE的剩余容量;
  • BlockingQueue主要用于生产者-消费者队列;关于生产者-消费者问题
  • BlockingQueue的实现是线程安全的;
  • BlockingQueue不支持使用一种类似close或shutdown的操作来指定不再添加对象;

CountdownLatch和CyclicBarriar

CountdownLatch

    可以看做是一个同步计数器;任何调用该对象的await()方法的函数都会被阻塞;调用该对象的countdown方法可以使计数器减一,这是一个原子操作,同时只有一个线程可以执行这个方法,直到计数器=0之后,await方法的阻塞会被取消;这个典型的应用场景就是需要等待众多线程工作完才能继续执行的场合,可以利用它来阻塞等待线程执行完毕;

CyclicBarriar

    正好相反,CyclicBarriar也是一个同步计数器,当计数器达到指定数目之后,会执行CyclicBarriar中自定义的run()方法,然后被await()方法阻塞的线程会被唤醒;这个典型应用场景是多个线程在执行前,要等待一个公用的方法执行完毕,可以利用CyclicBarriar计数器来完成;

总结

    上面两个计数器中的CountDownLatch. countdown()是通过CAS实现的原子性;而CyclicBarriar.await()是通过ReentrantLock实现的原子性;
    有关原子性会在下一篇文章中详细讨论;

不可变对象(immutable objects)

不可变对象:即被创建出来之后状态不可改变的对象,例如String,Integer等各种基础类型的包装类,都是不可变对象;每次对它的改变都会创建新的不可变对象;
StringBuffer是可变对象,因为每次修改StringBuffer是修改该对象本身;
图.String's Immutablity
如图,我们创建一个String类型的变量s,
String s = "abcd";
s实际上存储了一个引用地址,该地址指向heap中的一个String对象,"abcd";
s = s.concat(s, "ef");
当对String执行concat操作之后,实际上会创建一个新的String对象:"abcdef", 并且s的引用地址修改指向到新的String对象;
不可变对象在多线程中可以不使用锁机制就可以被多个线程共享;它在多线程编程中有以下特点:
  • 线程安全
  • 不需要锁同步,可以直接使用
  • 提高了性能,避免使用锁或synchronized关键字
  • 可以重复使用;
上图也显示了,不可变对象存在的问题,会产生大量垃圾,给垃圾回收带来压力;对String的concat操作会使之前创建的对象"abcd"变成没有引用的垃圾对象,会逐步被垃圾回收机制回收。

线程调度

分时操作系统

CPU将时间切割为时间片,然后将时间片分配给程序,一个程序的时间片运行结束后,下一个程序的时间片继续执行,多个程序轮流执行;由于CPU的高速处理特性,给人感觉是同时处理一样;

进程和线程

    我们使用的操作系统(Linux,Windows)都有进程的概念,进程就是我们上面所说的程序,它有自己独立的内存空间和系统资源,每个进程的内部的数据和状态都是独立的。创建并执行一个进程的开销比较大,因此线程出现了;
    线程是程序执行流的最小单元,它属于某一个进程,并能和其他线程共享该进程内部的所有资源;线程通常只有寄存器和堆栈,用于存储该线程运行的数据;它比进程更加轻量级,系统在线程间切换比进程消耗更小的资源;一个进程包含多个线程;

线程状态

图. 线程运行机制

  • 创建状态
    • 线程刚刚被创建
  • 可运行状态
    • 线程准备好,等待调度,获取CPU时间片运行前的状态
  • 运行状态
    • 获得CPU资源,处于运行状态
  • 阻塞状态
    • 调用了sleep使得线程处于阻塞状态,此时不会释放锁;
    • 调用await方法,使得线程处于等待状态,此时线程会释放锁
    • 等待获得锁
    • 被阻塞事件阻塞,例如等待数据输入等
  • 死亡状态
    • 线程运行结束

Java线程调度器

java的线程调度器负责调度线程,线程被调度器分为10个级别,1-10,默认是NORMAL_PRIORITY=5.
java的线程调度器负责按照优先级调度线程,一旦时间片有空余,会先让优先级高的线程运行,直到线程dead或sleep或wait,才会执行低优先级的线程;
当有多个线程处于可运行状态,并且优先级相同时,JVM会随机选取一个线程运行。
调度算法有两种:分时和独占式。分时是让所有线程轮流使用CPU时间片;独占则是让一个线程一直执行直到执行结束;JVM采用独占式调度算法;