本文主要记录了通过封装一个dao底层,来实现多数据源分库操作,通过分布式事务保持数据的强一致性。
SOA的事务,也主要分为两个方向:
1. 弱一致性,即单个的数据操作保持事务一致;这样可以保持高性能,在数据请求量很大的情况下对性能有保证;
2. 强一致性,即一个service方法的数个操作在同一个事务中,由于数个操作的数据源可能不一样,也可能由多个dao service构成,这时就要通过分布式事务保持数据的一致,其中一个操作失败,所有的数据都回滚。这里适用于对数据一致性要求很强的场景。分布式事务会对性能有一定的影响。
言归正传,本文主要使用了hibernate4,spring3.1,atomikos3.8来实现整体结构。设计的目的是一个方便的底层DAO的封装,而分布式事务会在上层的service控制,而不在DAO层控制,避免事务嵌套造成的问题。
第一部分:底层控制代码编写:
1. 配置dynamicDataSource,代码如下:
/**
* 动态数据源
* @author wenfengsun
* @since 2012-9-19 下午3:47:15
*/
public class DynamicDataSource extends AbstractRoutingDataSource{
@Override
protected Object determineCurrentLookupKey() {
String key = DataSourceContextHolder.getDataSource();
if(key == null)
key = "ds0";
return key;
}
}
其中,DataSourceContextHolder利用了ThreadLocal为每个线程设置不同的变量值,DataSourceContextHolder的代码如下:
/**
* 获取或设置当前线程的dataSource
* @author wenfengsun
* @since 2012-9-19 下午2:57:36
*/
public class DataSourceContextHolder {
private static final ThreadLocal contextHolder = new ThreadLocal();
public static void setDataSource(String dsName){
contextHolder.set(dsName);
}
public static String getDataSource(){
return (String) contextHolder.get();
}
public static void clearDataSource(){
contextHolder.remove();
}
}
解释:在每次请求来临时,根据不同的路由策略,设置不同的dataSource,从而达到分开存储的目的。
2. 配置sessionFactory和存储接口
public class Config {
private static Config _instance;
private SessionFactory sessionFactory;
public static Config getInstance(){
if(_instance == null){
synchronized (Config.class) {
_instance = new Config();
}
}
return _instance;
}
Config(){
initSessionFactoryWithTransactionManager();
}
}
我们定义了一个config,它是一个单例模式的实现,通过方法:initSessionFactoryWithTransactionManager来实现数据源和sessionFactory的初始化工作;
3. 方法initSessionFactoryWithTransactionManager的实现:
解释:首先建立了两个数据源,通过一个单例的map存储。并将数据源赋予给之前声明的dynamicDatasource,然后初始化sessionFactory,注意红色部分和删除的部分,不要写错,否则分布式事务会报错。
public void initSessionFactoryWithTransactionManager(){
DynamicDataSource dds = new DynamicDataSource();
然后再增加一个save方法:
public Serializable save(Serializable id, Object obj) throws NotSupportedException, SystemException{
long shardId = (Long)id;
if(shardId%2==1){
DataSourceContextHolder.setDataSource("ds1");
}else
DataSourceContextHolder.setDataSource("ds0");
System.err.println("sessionFactory:"+sessionFactory);
sessionFactory.openSession();
Session session = sessionFactory.getCurrentSession();
// session.beginTransaction();
Serializable ret = session.save(obj);
// session.getTransaction().commit();
return ret;
}
此处,基数存储到数据源ds1,偶数存储到数据源ds2
请注意:删除的部分,由于要在service中控制事务,此处的session的事务代码务必删除,否则会报一个错误:
Cannot call method 'commit' while a global transaction is runing |
第二部分:上层service和分布式事务
第二部分的配置依赖于spring。
<bean id="dao" class="com.xx.dao.impl.DaoImpl"/> <bean id="userService" class="com.xx.service.impl.UserServiceImpl"> <property name="dao" ref="dao"/> </bean> <bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close"> <property name="forceShutdown"> <value>true</value> </property> </bean> <bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp"> <property name="transactionTimeout" value="240"/> </bean> <bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"> <property name="transactionManager" ref="atomikosTransactionManager"/> <property name="userTransaction" ref="atomikosUserTransaction"/> </bean> <tx:annotation-driven transaction-manager="transactionManager"/>
我们声明了Dao和Service,并且声明了transactionManager。在最后一行tx声明之后,可以在service的实现类中的方法加入@Transactional即可控制事务。例如:
@Transactional
public long save(User user){
User u1 = new User();
u1.setName("tttt---------------");
long res = (Long)dao.save(10L, user);
dao.save(11L, u1);
return res;
}
当其中一个save失败时,整个事务会回滚;
没有评论:
发表评论