2015年9月22日星期二

分布式事务实践

关于Java事务,可以参考《Java事务设计策略》一书

事务

一个数据库事务通常包含了一个序列的对数据库的读/写操作。它的存在包含有以下两个目的:

  1. 为数据库操作序列提供了一个从失败中恢复到正常状态的方法,同时提供了数据库即使在异常状态下仍能保持一致性的方法。
  2. 当多个应用程序在并发访问数据库时,可以在这些应用程序之间提供一个隔离方法,以防止彼此的操作互相干扰。
        当事务被提交给了DBMS(数据库管理系统),则DBMS(数据库管理系统)需要确保该事务中的所有操作都成功完成且其结果被永久保存在数据库中,如果事务中有的操作没有成功完成,则事务中的所有操作都需要被回滚,回到事务执行前的状态;同时,该事务对数据库或者其他事务的执行无影响,所有的事务都好像在独立的运行。

        但在现实情况下,失败的风险很高。在一个数据库事务的执行过程中,有可能会遇上事务操作失败、数据库系统/操作系统失败,甚至是存储介质失败等情况。这便需要DBMS对一个执行失败的事务执行恢复操作,将其数据库状态恢复到一致状态(数据的一致性得到保证的状态)。为了实现将数据库状态恢复到一致状态的功能,DBMS通常需要维护事务日志以追踪事务中所有影响数据库数据的操作。

        并非任意的对数据库的操作序列都是数据库事务。数据库事务拥有以下四个特性,习惯上被称之为ACID特性。

  • 原子性(Atomicity):事务作为一个整体被执行,包含在其中的对数据库的操作要么全部被执行,要么都不执行。
  • 一致性(Consistency):事务应确保数据库的状态从一个一致状态转变为另一个一致状态。一致状态的含义是数据库中的数据应满足完整性约束。
  • 隔离性(Isolation):多个事务并发执行时,一个事务的执行不应影响其他事务的执行。
  • 持久性(Durability):已被提交的事务对数据库的修改应该永久保存在数据库中。

分布式事务

        分布式事务是指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。

二阶段提交

        为了解决分布式事务的问题,普遍的算法是二阶段提交。是指,在计算机网络以及数据库领域内,为了使基于分布式系统架构下的所有节点在进行事务提交时保持一致性而设计的一种算法(Algorithm),具体可以参考维基百科-二阶段提交

Spring+ibatis+多数据源+分布式事务+Hession

        本文主要讨论如何在实际应用中,使用spring框架,ibatis框架和mysql数据库实现的分布式事务控制。

关于XA事务

        在计算技术上,XA规范是开放群组关于分布式事务处理 (DTP)的规范。规范描述了全局的事务管理器与局部的资源管理器之间的接口。XA规范的目的是允许的多个资源(如数据库,应用服务器,消息队列,等等)在同一事务中访问,这样可以使ACID属性跨越应用程序而保持有效。XA使用两阶段提交来保证所有资源同时提交或回滚任何特定的事务。
        
        mysql数据库是支持XA控制的,请参考:https://dev.mysql.com/doc/refman/5.0/en/xa.html
        非必要性的在您的应用中引入XA数据库驱动,会导致不可预料的后果与错误,特别是在使用本地事务模型(Local Transaction Model)时。因此,一般来说在您不需要XA的时候,应该尽量避免使用它。下面的最佳实践描述了什么时候应当使用XA:

最佳实践

        仅在同一个事务上下文中需要协调多种资源(即数据库,以及消息主题或队列)时,才有必要使用X/Open XA接口。

更多请参考:XA事务处理

AbstractRoutingDataSource 和 AtomikosNonXADataSourceBean

在Spring框架中,我们可以使用AbstractRoutingDataSource来配置多数据源。并且在配置数据源的时候,我们使用atomikos的事务控制框架来控制事务。由于我们不想盲目使用XA事务控制,因此我们设计我们的系统时尽量避免复杂的事务模型,只是保证数据在同一个数据库中的事务,因此我们不使用atomikos的XA datasource,使用AtomikosNonXADataSourceBean。代码如下:
public class DynamicDataSource extends AbstractRoutingDataSource{
    private DataSourceConfiguration dataSourceConfiguration;

    public void setDataSourceConfiguration(DataSourceConfiguration dataSourceConfiguration) {
        this.dataSourceConfiguration = dataSourceConfiguration;
    }

    @Override
    public void afterPropertiesSet(){
        Map<String, Map<String, String>> configurations = dataSourceConfiguration.getDatasources();
        Iterator<String> keys = configurations.keySet().iterator();
        Map<Object, Object> datasources = new HashMap<>();
        while(keys.hasNext()){
            String key = keys.next();
            Map<String, String> config = configurations.get(key);
            AtomikosNonXADataSourceBean dataSource = new AtomikosNonXADataSourceBean();
            dataSource.setDriverClassName(config.get("driverClassName"));
            dataSource.setUniqueResourceName(key);
            dataSource.setMaxIdleTime(NumberUtils.toInt(config.get("maxIdle"), 6));
            dataSource.setUrl(config.get("url"));
            dataSource.setUser(config.get("username"));
            dataSource.setPassword(config.get("password"));
            dataSource.setMaxPoolSize(NumberUtils.toInt(config.get("maxActive"), 64));
            dataSource.setPoolSize(NumberUtils.toInt(config.get("maxActive"), 32));
            dataSource.setMinPoolSize(NumberUtils.toInt(config.get("minPoolSize"), 1));
            dataSource.setMaxIdleTime(NumberUtils.toInt(config.get("maxIdle"), 6));
            datasources.put(key, dataSource);
        }
        super.setTargetDataSources(datasources);
        super.afterPropertiesSet();
    }
    @Override
    protected Object determineCurrentLookupKey() {
        return DataSourceSwitcher.getDataSource()==null?dataSourceConfiguration.defaultDataSource():DataSourceSwitcher.getDataSource();
    }
}

DataSourceSwitcher(数据源切换控制器)

通过一个ThreadLocal变量来控制当前的数据源。ThreadLocal变量可以仅可以被当前的线程访问,也可以看做是当前线程的局部变量,线程之间是不能互相访问的。关于ThreadLocal原理
public class DataSourceSwitcher {
    private static final Log log = LogFactory.getLog(DataSourceSwitcher.class);

    private static final ThreadLocal contextHolder = new ThreadLocal();

    public static void setDataSource(String dataSource) {
        if (dataSource != null) {
            contextHolder.set(dataSource);
        } else {
            throw new RuntimeException("数据访问异常");
        }

    }

    public static void setSlave() {
        setDataSource("slave");
    }

    public static void setMaster() {
        clearDbType();
    }

    public static String getDataSource() {
        return (String) contextHolder.get();

    }

    public static void clearDbType() {
        contextHolder.remove();
    }
}

使用注解动态切换数据源

利用spring中的MethodInterceptor(反射)来实现通过注解动态切换数据源。
拦截器代码:
public class AnnotationMethodInterceptor implements MethodInterceptor {
    @Override
    public Object invoke(MethodInvocation methodInvocation) throws Throwable {

        Class thisClass = methodInvocation.getThis().getClass();
        Method implMethod = thisClass.getMethod(methodInvocation.getMethod().getName(),
                methodInvocation.getMethod().getParameterTypes());
        DBAccess dbAccess = implMethod.getAnnotation(DBAccess.class);
        if (dbAccess != null) {
            String datasource = dbAccess.value();
            if (!StringUtils.isBlank(datasource)) {
                DataSourceSwitcher.setDataSource(datasource);
            }
        }
        try {
            Object obj = methodInvocation.proceed();
            return obj;
        } finally {
            if (dbAccess != null) {
                DataSourceSwitcher.clearDbType();
            }
        }
    }
}
注解代码:
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface DBAccess {
    String value() default "";
}
配置xml:
<bean id="serviceInterceptor" class="com.xx.dao.annotation.AnnotationMethodInterceptor"></bean>
<bean class="org.springframework.aop.framework.autoproxy.BeanNameAutoProxyCreator">
    <property name="beanNames" value="*Dao"/>
    <property name="interceptorNames" value="serviceInterceptor"/>
</bean>

配置DAO层和Service层

我们通过区分DAO层和Service层来达到逻辑分层的目的。
DAO层主要负责数据交互,主要的数据库访问注解和SqlMapper都在这一层实现。
Service主要负责组合的数据逻辑以及缓存配置等业务需求逻辑,通过调用DAO层来实现数据的存取更新等操作。

配置数据源

在spring里配置数据源。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"
    default-autowire="byName">
 <context:component-scan base-package="com.xx.client.service.impl" />

 <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
  <property name="location" value="classpath:mysql-dbcp-config.properties" />
 </bean>

 <bean id="dataSourceConfiguration" class="com.xx.dao.datasource.DataSourceConfiguration"/>

 <bean id="dataSource" class="com.xx.dao.datasource.DynamicDataSource">
        <property name="dataSourceConfiguration" ref="dataSourceConfiguration"/>
    </bean>
 <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
  <property name="dataSource" ref="dataSource" />
  <property name="configLocation" value="classpath:sqlmap-config.xml"/>
 </bean>
 <!-- prepare for daoImpl -->
 <bean id="sqlSession" class="org.mybatis.spring.SqlSessionTemplate">
  <constructor-arg index="0" ref="sqlSessionFactory" />
 </bean>
</beans>

定义atomikos事务控制

<bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init"  destroy-method="close">
    <property name="forceShutdown" value="true"/>
</bean>
<bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
    <property name="transactionTimeout" value="240"/>
</bean>
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
    <property name="transactionManager" ref="atomikosTransactionManager"/>
    <property name="userTransaction" ref="atomikosUserTransaction"/>
</bean>
<tx:annotation-driven  transaction-manager="transactionManager"/>
其他的配置数据请参考spring相关资料。

使用事务

通过使用JTA提供的Transactional注解就可以很容易的实现事务控制。我们要在service层控制事务,可以参考如下代码
    @Override
    @Transactional
    public int updateByPrimaryKey(AgentInfo record) {
        removeAgentListCache();
        removeAgentCache(record);
        record.setUpdatedAt(System.currentTimeMillis());
        record = agentInfoDao.update(record);
        AgentOrder order = orderDao.getById(record.getOrderId());
        order.setStatus = 1;
        orderDao.update(order);
        return record == null ? 0 : 1;
    }
以上代码,如果其中一个dao保存时出错,整个事务就会回滚。

其他

        在Transactional注解下,由于使用了AtomikosNonXADataSourceBean,因此在代码中不允许使用其他的数据源,atomikos会锁定第一个使用的数据源,例如,在A库查询了一条数据,再更新B库的表,此时事务会锁定第一个使用的数据源,中途无法切换数据源,因此即使是get操作也不能跨库。
        在Transactional内部使用try时要注意,需要跑出的exception不能catch住,否则即使底层的DAO报错,也会因为被catch导致数据无法回滚。

没有评论:

发表评论