Jdbc怎么实现分布式事务数据源动态切换

Jdbc怎么实现分布式事务数据源动态切换

本篇内容介绍了“Jdbc怎么实现分布式事务数据源动态切换”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

一:依赖的jar包 Maven配置

Jdbc怎么实现分布式事务数据源动态切换

<dependency>

  1. <groupId>com.atomikos</groupId>

  2. <artifactId>transactions</artifactId>

  3. <version>4.0.4</version>

  4. </dependency>

  5. <dependency>

  6. <groupId>com.atomikos</groupId>

  7. <artifactId>transactions-api</artifactId>

  8. <version>4.0.4</version>

  9. </dependency>

  10. <dependency>

  11. <groupId>com.atomikos</groupId>

  12. <artifactId>atomikos-util</artifactId>

  13. <version>4.0.4</version>

  14. </dependency>

  15. <dependency>

  16. <groupId>com.atomikos</groupId>

  17. <artifactId>transactions-jdbc-deprecated</artifactId>

  18. <version>3.8.0</version>

  19. </dependency>

  20. <dependency>

  21. <groupId>com.atomikos</groupId>

  22. <artifactId>transactions-jta</artifactId>

  23. <version>4.0.4</version>

  24. </dependency>

  25. <dependency>

  26. <groupId>com.atomikos</groupId>

  27. <artifactId>transactions-jdbc</artifactId>

  28. <version>4.0.4</version>

  29. </dependency>


  30. <dependency>

  31. <groupId>cglib</groupId>

  32. <artifactId>cglib-nodep</artifactId>

  33. <version>3.2.5</version>

  34. </dependency>


  35. <dependency>

  36. <groupId>javax.transaction</groupId>

  37. <artifactId>jta</artifactId>

  38. <version>1.1</version>

  39. </dependency>

二:配置

dataSource.properties

点击(此处)折叠或打开

  1. workDesk.jdbc.driverclass=com.mysql.jdbc.Driver

  2. workDesk.jdbc.url=jdbc:mysql://10.243.3.18:3306/system?userUnicode=true&characterEncoding=UTF-8

  3. workDesk.jdbc.username=root

  4. workDesk.jdbc.password=$Fortune2015

  5. workDesk.jdbc.poolsize.max=3

  6. workDesk.jdbc.poolsize.min=3

  7. workDesk.jdbc.poolsize.initial=2

  8. workDesk.jdbc.idletime.max=25000

  9. workDesk.jdbc.idleConnectionTestPeriod=18000


  10. #-------workDesk jdbc--------

  11. workDesk.read.jdbc.driverclass=com.mysql.jdbc.Driver

  12. workDesk.read.jdbc.url=jdbc:mysql://112.74.53.213:3306/gmc?userUnicode=true&characterEncoding=UTF-8

  13. workDesk.read.jdbc.username=root

  14. workDesk.read.jdbc.password=Wanmide@123

  15. workDesk.read.jdbc.poolsize.max=3

  16. workDesk.read.jdbc.poolsize.min=3

  17. workDesk.read.jdbc.poolsize.initial=2

  18. workDesk.read.jdbc.idletime.max=25000

  19. workDesk.read.jdbc.idleConnectionTestPeriod=18000


  20. jdbc.xaDataSourceClassName=com.mysql.jdbc.jdbc2.optional.MysqlXADataSource

transactions.properties


点击(此处)折叠或打开

  1. # SAMPLE PROPERTIES FILE FOR THE TRANSACTION SERVICE

  2. # THIS FILE ILLUSTRATES THE DIFFERENT SETTINGS FOR THE TRANSACTION MANAGER

  3. # UNCOMMENT THE ASSIGNMENTS TO OVERRIDE DEFAULT VALUES;


  4. # Required: factory implementation class of the transaction core.

  5. # NOTE: there is no default for this, so it MUST be

  6. #

  7. com.atomikos.icatch.service=com.atomikos.icatch.standalone.UserTransactionServiceFactory



  8. # Set base name of file where messages are output

  9. # (also known as the 'console file').

  10. #

  11. # com.atomikos.icatch.console_file_name = tm.out


  12. # Size limit (in bytes) for the console file;

  13. # negative means unlimited.

  14. #

  15. # com.atomikos.icatch.console_file_limit=-1


  16. # For size-limited console files, this option

  17. # specifies a number of rotating files to

  18. # maintain.

  19. #

  20. # com.atomikos.icatch.console_file_count=1


  21. # Set the number of log writes between checkpoints

  22. #

  23. # com.atomikos.icatch.checkpoint_interval=500


  24. # Set output directory where console file and other files are to be put

  25. # make sure this directory

  26. #

  27. # com.atomikos.icatch.output_dir = ./


  28. # Set directory of log files; make sure this directory

  29. #

  30. com.atomikos.icatch.log_base_dir = ./


  31. # Set base name of log file

  32. # this name will be used as the first part of

  33. # the system-generated log file name

  34. #

  35. # com.atomikos.icatch.log_base_name = tmlog


  36. # Set the max number of active local transactions

  37. # or -1 for unlimited.

  38. #

  39. # com.atomikos.icatch.max_actives = 50


  40. # Set the default timeout (in milliseconds) for local transactions

  41. #

  42. # com.atomikos.icatch.default_jta_timeout = 10000


  43. # Set the max timeout (in milliseconds) for local transactions

  44. #

  45. # com.atomikos.icatch.max_timeout = 300000


  46. # The globally unique name of this transaction manager process

  47. # override this value with a globally unique name

  48. #

  49. # com.atomikos.icatch.tm_unique_name = tm


  50. # Do we want to use parallel subtransactions? JTA

点击(此处)折叠或打开

  1. /**

  2. * Atomikos 数据源A

  3. * @return

  4. */

  5. @Bean(name="dataSourceA",initMethod="init",destroyMethod="close")

  6. public AtomikosDataSourceBean dataSourceA()

  7. {

  8. AtomikosDataSourceBean dataSourceA=new AtomikosDataSourceBean();

  9. dataSourceA.setUniqueResourceName("dataSourceA");

  10. dataSourceA.setXaDataSourceClassName(xaDataSourceClassName);

  11. Properties xaProperties = new Properties();

  12. xaProperties.put("user", user);

  13. xaProperties.put("password", password);

  14. xaProperties.put("url", jdbcUrl);

  15. xaProperties.put("pinGlobalTxToPhysicalConnection", true);

  16. dataSourceA.setXaProperties(xaProperties);

  17. dataSourceA.setMaxPoolSize(maxPoolSize);

  18. dataSourceA.setMinPoolSize(minPoolSize);

  19. dataSourceA.setMaxIdleTime(maxIdleTime);

  20. dataSourceA.setTestQuery("SELECT 1");

  21. return dataSourceA;

  22. }



  23. /**

  24. * Atomikos 数据源A

  25. * @return

  26. */

  27. @Bean(name="dataSourceB",initMethod="init",destroyMethod="close")

  28. public AtomikosDataSourceBean dataSourceB()

  29. {

  30. AtomikosDataSourceBean dataSourceA=new AtomikosDataSourceBean();

  31. dataSourceA.setUniqueResourceName("dataSourceB");

  32. dataSourceA.setXaDataSourceClassName(xaDataSourceClassName);

  33. Properties xaProperties = new Properties();

  34. xaProperties.put("user", readUser);

  35. xaProperties.put("password", readPassword);

  36. xaProperties.put("url", readJdbcUrl);

  37. xaProperties.put("pinGlobalTxToPhysicalConnection", true);

  38. dataSourceA.setXaProperties(xaProperties);

  39. dataSourceA.setMaxPoolSize(readMaxPoolSize);

  40. dataSourceA.setMinPoolSize(readMinPoolSize);

  41. dataSourceA.setMaxIdleTime(readMaxIdleTime);

  42. dataSourceA.setTestQuery("SELECT 1");

  43. return dataSourceA;

  44. }

点击(此处)折叠或打开

  1. @Configuration

  2. public class DynamicTransactionManagerElConfig {


  3. // @Autowired

  4. // @Qualifier("platformTomcat")

  5. // private DataSource platformTomcat;

  6. //

  7. // @Autowired

  8. // @Qualifier("platformReadTomcat")

  9. // private DataSource platformReadTomcat;


  10. @Autowired

  11. @Qualifier("dataSourceA")

  12. private DataSource dataSourceA;


  13. @Autowired

  14. @Qualifier("dataSourceB")

  15. private DataSource dataSourceB;


  16. @Bean(name = "dataSource")

  17. public DynamicDataSource dataSource() {

  18. DynamicDataSource dataSource = new DynamicDataSource();

  19. Map<Object, Object> targetDataSources = new HashMap<>();

  20. targetDataSources.put("master", dataSourceA);

  21. targetDataSources.put("slave", dataSourceB);

  22. dataSource.setTargetDataSources(targetDataSources);

  23. dataSource.setDefaultTargetDataSource(dataSourceA);

  24. return dataSource;

  25. }


  26. @Bean(name = "jdbcTemplate")

  27. public JdbcTemplate jdbcTemplate(DynamicDataSource dataSource) {

  28. JdbcTemplate jdbcTemplate = new JdbcTemplate();

  29. jdbcTemplate.setDataSource(dataSource);

  30. return jdbcTemplate;

  31. }


  32. @Bean(name = "jdbcReadTemplate")

  33. public JdbcTemplate jdbcReadTemplate(DynamicDataSource dataSource) {

  34. JdbcTemplate jdbcReadTemplate = new JdbcTemplate();

  35. jdbcReadTemplate.setDataSource(dataSource);

  36. return jdbcReadTemplate;

  37. }


  38. @Bean(name = "atomikosTransactionManager", initMethod = "init", destroyMethod = "close")

  39. public UserTransactionManager atomikosTransactionManager() {

  40. UserTransactionManager atomikosTransactionManager = new UserTransactionManager();

  41. atomikosTransactionManager.setForceShutdown(true);

  42. return atomikosTransactionManager;

  43. }


  44. @Bean(name = "atomikosUserTransaction")

  45. public UserTransactionImp atomikosUserTransaction() {

  46. UserTransactionImp atomikosUserTransaction = new UserTransactionImp();

  47. try {

  48. atomikosUserTransaction.setTransactionTimeout(300);

  49. }

  50. catch (SystemException e) {

  51. e.printStackTrace();

  52. }

  53. return atomikosUserTransaction;

  54. }


  55. // @Bean(name = "transactionManager")

  56. // public DataSourceTransactionManager transactionManager(DynamicDataSource

  57. // dataSource) {

  58. // DataSourceTransactionManager transactionManager = new

  59. // DataSourceTransactionManager();

  60. // transactionManager.setDataSource(dataSource);

  61. // return transactionManager;

  62. // }


  63. @Bean(name = "transactionManager")

  64. public JtaTransactionManager transactionManager(UserTransactionManager atomikosTransactionManager,

  65. UserTransactionImp atomikosUserTransaction) {

  66. JtaTransactionManager transactionManager = new JtaTransactionManager();

  67. transactionManager.setTransactionManager(atomikosTransactionManager);

  68. transactionManager.setUserTransaction(atomikosUserTransaction);

  69. transactionManager.setAllowCustomIsolationLevels(true);

  70. return transactionManager;

  71. }


  72. }

三:AOP 注解方式数据源动态切换


点击(此处)折叠或打开

  1. @Retention(RetentionPolicy.RUNTIME)

  2. @Target(ElementType.METHOD)

  3. @Documented

  4. public @interface DataSource {

  5. String value();

  6. }

点击(此处)折叠或打开

  1. public class DynamicDataSource extends AbstractRoutingDataSource {


  2. @Override

  3. protected Object determineCurrentLookupKey() {

  4. return DataSourceContextHolder.getDataSource();

  5. }


  6. }

点击(此处)折叠或打开

  1. public class DataSourceContextHolder {


  2. private static final ThreadLocal<String> contextHolder = new ThreadLocal<String>();


  3. public static void setDataSource(String dataSource) {

  4. contextHolder.set(dataSource);

  5. }


  6. public static String getDataSource() {

  7. return contextHolder.get();

  8. }


  9. public static void removeDataSource() {

  10. contextHolder.remove();

  11. }


  12. }

点击(此处)折叠或打开

  1. @Aspect

  2. @Order(1)

  3. @Component

  4. public class DataSourceAspect {


  5. @Pointcut("@annotation(com.gemdale.ghome.business.async.deal.center.demo.datasource.DataSource)")

  6. public void dataSourcePointCut() {

  7. };


  8. @Before("dataSourcePointCut()")

  9. public void before(JoinPoint joinPoint) {


  10. System.out.println("=============dataSourcePointCut:before=============");

  11. Object target = joinPoint.getTarget();

  12. String method = joinPoint.getSignature().getName();


  13. // Class[] classz = target.getClass().getInterfaces();

  14. Class<?> classz = target.getClass();

  15. Class<?>[] parameterTypes = ((MethodSignature) joinPoint.getSignature()).getMethod().getParameterTypes();


  16. try {

  17. // Method m = classz[0].getMethod(method, parameterTypes);

  18. Method m = classz.getMethod(method, parameterTypes);

  19. if (null != m && m.isAnnotationPresent(DataSource.class)) {

  20. DataSource dataSource = m.getAnnotation(DataSource.class);

  21. DataSourceContextHolder.setDataSource(dataSource.value());


  22. System.out.println("=============dataSource:" + dataSource.value());

  23. }

  24. }

  25. catch (Exception e) {

  26. e.printStackTrace();

  27. }

  28. }


  29. }

四:数据源动态切换实例

点击(此处)折叠或打开

  1. @Repository("gmcSmsInfoDaoImpl")

  2. public class GmcSmsInfoDaoImpl extends BaseDaoSupport implements GmcSmsInfoDAO{


  3. /* (non-Javadoc)

  4. * @see com.gemdale.ghome.business.async.deal.center.demo.GmcSmsInfoDAO#addMaster(com.gemdale.ghome.business.async.deal.center.demo.GmcSmsInfo)

  5. */

  6. @Override

  7. @DataSource("master")

  8. public Integer addMaster(GmcSmsInfo smsInfo) throws FrameworkDAOException {

  9. return save(smsInfo);

  10. }


  11. /* (non-Javadoc)

  12. * @see com.gemdale.ghome.business.async.deal.center.demo.GmcSmsInfoDAO#addSlave(com.gemdale.ghome.business.async.deal.center.demo.GmcSmsInfo)

  13. */

  14. @Override

  15. @DataSource("slave")

  16. public Integer addSlave(GmcSmsInfo smsInfo) throws FrameworkDAOException {

  17. return save(smsInfo);

  18. }



  19. }

五:分布式事务下的数据源动态切换
情况一:事务下唯一数据源切换

点击(此处)折叠或打开

  1. @Transactional(rollbackFor={Exception.class,RuntimeException.class})

  2. @DataSource("master")

  3. public GmcSmsInfo addMaster(GmcSmsInfo smsInfo) throws BusinessServiceException {

  4. try {

  5. smsInfo.setSmsId(gmcSmsInfoDaoImpl.save(smsInfo));

  6. }

  7. catch (FrameworkDAOException e) {

  8. throw new BusinessServiceException(e);

  9. }

  10. return smsInfo;

  11. }

@Transactional 通过数据库的connection来建立事务的,为了保证数据源能顺利切换,要保证@DataSource优先于@Transactional执行。 实现办法 在DataSourceAspect 切面上增加注解@Order(1). 此处需要了解切面的层次和执行顺序等相关知识。 @Transactionl,@DataSource 在service层的同一个方法上。

情况二:事务下多数据源切换

service类

点击(此处)折叠或打开

  1. @Autowired

  2. private GmcSmsInfoDAO gmcSmsInfoDaoImpl;


  3. @Autowired

  4. @Qualifier("transactionManager")

  5. private JtaTransactionManager transactionManager;



  6. public void addMasterAndSlave(GmcSmsInfo smsInfo) throws BusinessServiceException {

  7. UserTransaction userTransaction = transactionManager.getUserTransaction();

  8. try {

  9. userTransaction.begin();

  10. gmcSmsInfoDaoImpl.addMaster(smsInfo);

  11. smsInfo = new GmcSmsInfo();


  12. smsInfo.setChannel("test2");

  13. smsInfo.setContent("test2");

  14. smsInfo.setStatus("001");

  15. smsInfo.setCreateDate(Calendar.getInstance().getTime());

  16. smsInfo.setMobile("88888888");

  17. gmcSmsInfoDaoImpl.addSlave(smsInfo);

  18. userTransaction.commit();

  19. }

  20. catch (Exception e) {

  21. try {

  22. userTransaction.rollback();

  23. }

  24. catch (IllegalStateException e1) {

  25. e1.printStackTrace();

  26. }

  27. catch (SecurityException e1) {

  28. e1.printStackTrace();

  29. }

  30. catch (SystemException e1) {

  31. e1.printStackTrace();

  32. }

  33. throw new BusinessServiceException(e);

  34. }


  35. }


dao实现类


点击(此处)折叠或打开

  1. /* (non-Javadoc)

  2. * @see com.gemdale.ghome.business.async.deal.center.demo.GmcSmsInfoDAO#addMaster(com.gemdale.ghome.business.async.deal.center.demo.GmcSmsInfo)

  3. */

  4. @Override

  5. @DataSource("master")

  6. public Integer addMaster(GmcSmsInfo smsInfo) throws FrameworkDAOException {

  7. return save(smsInfo);

  8. }


  9. /* (non-Javadoc)

  10. * @see com.gemdale.ghome.business.async.deal.center.demo.GmcSmsInfoDAO#addSlave(com.gemdale.ghome.business.async.deal.center.demo.GmcSmsInfo)

  11. */

  12. @Override

  13. @DataSource("slave")

  14. public Integer addSlave(GmcSmsInfo smsInfo) throws FrameworkDAOException {

  15. return save(smsInfo);

  16. }


注意,此处采用了编程式来实现事务,注解式暂时还没有好的解决方法,欢迎大家讨论分享。 此处的@DataSource 放在了dao的实现层。

“Jdbc怎么实现分布式事务数据源动态切换”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注恰卡编程网网站,小编将为大家输出更多高质量的实用文章!

发布于 2021-12-22 21:56:54
收藏
分享
海报
0 条评论
49
上一篇:Android面试的重点要点题有哪些 下一篇:Flutter怎么用
目录

    0 条评论

    本站已关闭游客评论,请登录或者注册后再评论吧~

    忘记密码?

    图形验证码