2012年9月20日星期四

数据底层的多数据源和分布式事务

    SOA(面向服务的架构)在已经相当流行;在大数据处理的时候,分表,分区,分库等操作也一直伴随着一起发展,这里就涉及到了数个数据源,读写分离,事务等一系列的问题。
    本文主要记录了通过封装一个dao底层,来实现多数据源分库操作,通过分布式事务保持数据的强一致性。
    SOA的事务,也主要分为两个方向:
    1. 弱一致性,即单个的数据操作保持事务一致;这样可以保持高性能,在数据请求量很大的情况下对性能有保证;
    2. 强一致性,即一个service方法的数个操作在同一个事务中,由于数个操作的数据源可能不一样,也可能由多个dao service构成,这时就要通过分布式事务保持数据的一致,其中一个操作失败,所有的数据都回滚。这里适用于对数据一致性要求很强的场景。分布式事务会对性能有一定的影响。

    言归正传,本文主要使用了hibernate4,spring3.1,atomikos3.8来实现整体结构。设计的目的是一个方便的底层DAO的封装,而分布式事务会在上层的service控制,而不在DAO层控制,避免事务嵌套造成的问题。

第一部分:底层控制代码编写:

1. 配置dynamicDataSource,代码如下:
/**
 * 动态数据源
 * @author wenfengsun
 * @since 2012-9-19 下午3:47:15
 */
public class DynamicDataSource extends AbstractRoutingDataSource{
      @Override
      protected Object determineCurrentLookupKey() {
             String key = DataSourceContextHolder.getDataSource();
             if(key == null)
                   key = "ds0";
             return key;
      }
}
其中,DataSourceContextHolder利用了ThreadLocal为每个线程设置不同的变量值,DataSourceContextHolder的代码如下:
/**
 * 获取或设置当前线程的dataSource
 * @author wenfengsun
 * @since 2012-9-19 下午2:57:36
 */
public class DataSourceContextHolder {
     private static final ThreadLocal contextHolder = new ThreadLocal();  
     public static void setDataSource(String dsName){  
          contextHolder.set(dsName);  
     }  
     public static String getDataSource(){  
          return (String) contextHolder.get();  
     }  
     public static void clearDataSource(){  
          contextHolder.remove();  
     }  
}

解释:在每次请求来临时,根据不同的路由策略,设置不同的dataSource,从而达到分开存储的目的。
2. 配置sessionFactory和存储接口
public class Config {
       private static Config _instance;
       private SessionFactory sessionFactory;

       public static Config getInstance(){
              if(_instance == null){
                       synchronized (Config.class) {
                            _instance = new Config();
                       }
              }
              return _instance;
       }
       Config(){
              initSessionFactoryWithTransactionManager();
       }
}

我们定义了一个config,它是一个单例模式的实现,通过方法:initSessionFactoryWithTransactionManager来实现数据源和sessionFactory的初始化工作;
3. 方法initSessionFactoryWithTransactionManager的实现:
解释:首先建立了两个数据源,通过一个单例的map存储。并将数据源赋予给之前声明的dynamicDatasource,然后初始化sessionFactory,注意红色部分和删除的部分,不要写错,否则分布式事务会报错。
public void initSessionFactoryWithTransactionManager(){
    DynamicDataSource dds = new DynamicDataSource();
然后再增加一个save方法:
public Serializable save(Serializable id, Object obj) throws NotSupportedException, SystemException{
        long shardId = (Long)id;
        if(shardId%2==1){
              DataSourceContextHolder.setDataSource("ds1");
        }else
              DataSourceContextHolder.setDataSource("ds0");
        System.err.println("sessionFactory:"+sessionFactory);
        sessionFactory.openSession();
        Session session = sessionFactory.getCurrentSession();
     //  session.beginTransaction();
        Serializable ret = session.save(obj);
    //  session.getTransaction().commit();
        return ret;
}
此处,基数存储到数据源ds1,偶数存储到数据源ds2
请注意:删除的部分,由于要在service中控制事务,此处的session的事务代码务必删除,否则会报一个错误:

Cannot call method 'commit' while a global transaction is runing

好,至此我们底层的控制已经完成,现在新建一个接口Dao和一个实现DaoImpl,调用save方法存储对象;(代码掠过)

第二部分:上层service和分布式事务

第二部分的配置依赖于spring。
<bean id="dao" class="com.xx.dao.impl.DaoImpl"/>
<bean id="userService" class="com.xx.service.impl.UserServiceImpl">
          <property name="dao" ref="dao"/>
</bean>
<bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init"  destroy-method="close">
          <property name="forceShutdown">
                   <value>true</value>
          </property>
</bean>
<bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
          <property name="transactionTimeout" value="240"/>
</bean>
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
          <property name="transactionManager" ref="atomikosTransactionManager"/>
          <property name="userTransaction" ref="atomikosUserTransaction"/>
</bean>
<tx:annotation-driven  transaction-manager="transactionManager"/>


我们声明了Dao和Service,并且声明了transactionManager。在最后一行tx声明之后,可以在service的实现类中的方法加入@Transactional即可控制事务。例如:
@Transactional
public long save(User user){
       User u1 = new User();
       u1.setName("tttt---------------");
       long res = (Long)dao.save(10L, user);
       dao.save(11L, u1);
       return res;
}


当其中一个save失败时,整个事务会回滚;

没有评论:

发表评论