本文主要记录了通过封装一个dao底层,来实现多数据源分库操作,通过分布式事务保持数据的强一致性。
SOA的事务,也主要分为两个方向:
1. 弱一致性,即单个的数据操作保持事务一致;这样可以保持高性能,在数据请求量很大的情况下对性能有保证;
2. 强一致性,即一个service方法的数个操作在同一个事务中,由于数个操作的数据源可能不一样,也可能由多个dao service构成,这时就要通过分布式事务保持数据的一致,其中一个操作失败,所有的数据都回滚。这里适用于对数据一致性要求很强的场景。分布式事务会对性能有一定的影响。
言归正传,本文主要使用了hibernate4,spring3.1,atomikos3.8来实现整体结构。设计的目的是一个方便的底层DAO的封装,而分布式事务会在上层的service控制,而不在DAO层控制,避免事务嵌套造成的问题。
第一部分:底层控制代码编写:
1. 配置dynamicDataSource,代码如下:
/** * 动态数据源 * @author wenfengsun * @since 2012-9-19 下午3:47:15 */ public class DynamicDataSource extends AbstractRoutingDataSource{ @Override protected Object determineCurrentLookupKey() { String key = DataSourceContextHolder.getDataSource(); if(key == null) key = "ds0"; return key; } }
其中,DataSourceContextHolder利用了ThreadLocal为每个线程设置不同的变量值,DataSourceContextHolder的代码如下:
/** * 获取或设置当前线程的dataSource * @author wenfengsun * @since 2012-9-19 下午2:57:36 */ public class DataSourceContextHolder { private static final ThreadLocalcontextHolder = new ThreadLocal (); public static void setDataSource(String dsName){ contextHolder.set(dsName); } public static String getDataSource(){ return (String) contextHolder.get(); } public static void clearDataSource(){ contextHolder.remove(); } }
解释:在每次请求来临时,根据不同的路由策略,设置不同的dataSource,从而达到分开存储的目的。
2. 配置sessionFactory和存储接口
public class Config { private static Config _instance; private SessionFactory sessionFactory; public static Config getInstance(){ if(_instance == null){ synchronized (Config.class) { _instance = new Config(); } } return _instance; } Config(){ initSessionFactoryWithTransactionManager(); } }
我们定义了一个config,它是一个单例模式的实现,通过方法:initSessionFactoryWithTransactionManager来实现数据源和sessionFactory的初始化工作;
3. 方法initSessionFactoryWithTransactionManager的实现:
解释:首先建立了两个数据源,通过一个单例的map存储。并将数据源赋予给之前声明的dynamicDatasource,然后初始化sessionFactory,注意红色部分和删除的部分,不要写错,否则分布式事务会报错。
public void initSessionFactoryWithTransactionManager(){ DynamicDataSource dds = new DynamicDataSource();
然后再增加一个save方法:
public Serializable save(Serializable id, Object obj) throws NotSupportedException, SystemException{ long shardId = (Long)id; if(shardId%2==1){ DataSourceContextHolder.setDataSource("ds1"); }else DataSourceContextHolder.setDataSource("ds0"); System.err.println("sessionFactory:"+sessionFactory); sessionFactory.openSession(); Session session = sessionFactory.getCurrentSession(); // session.beginTransaction(); Serializable ret = session.save(obj); // session.getTransaction().commit(); return ret; }
此处,基数存储到数据源ds1,偶数存储到数据源ds2
请注意:删除的部分,由于要在service中控制事务,此处的session的事务代码务必删除,否则会报一个错误:
Cannot call method 'commit' while a global transaction is runing |
第二部分:上层service和分布式事务
第二部分的配置依赖于spring。
<bean id="dao" class="com.xx.dao.impl.DaoImpl"/> <bean id="userService" class="com.xx.service.impl.UserServiceImpl"> <property name="dao" ref="dao"/> </bean> <bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close"> <property name="forceShutdown"> <value>true</value> </property> </bean> <bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp"> <property name="transactionTimeout" value="240"/> </bean> <bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"> <property name="transactionManager" ref="atomikosTransactionManager"/> <property name="userTransaction" ref="atomikosUserTransaction"/> </bean> <tx:annotation-driven transaction-manager="transactionManager"/>
我们声明了Dao和Service,并且声明了transactionManager。在最后一行tx声明之后,可以在service的实现类中的方法加入@Transactional即可控制事务。例如:
@Transactional public long save(User user){ User u1 = new User(); u1.setName("tttt---------------"); long res = (Long)dao.save(10L, user); dao.save(11L, u1); return res; }
当其中一个save失败时,整个事务会回滚;
没有评论:
发表评论