SpringBoot+shardingsphere实现按月分表功能教程

2025-05-14 10:14:41 142
魁首哥

shardingsphere 是一套开源的分布式数据库中间件解决方案,旨在简化数据库分片、读写分离、分布式事务等复杂场景的管理。

它由 apache 软件基金会支持,广泛应用于需要处理大规模数据的系统中

一、shardingsphere 是什么?

主要是为了防止一张表的数据量过大而设计的,数据库本身就支持,但是由于自行设计需要满足跨表查询,事务一致性,分页聚合等很多的复杂场景,还需要很多的配套监控,设计,扩容等方案,所以总体来说是一个任务量很大的任务,故而这里采用shardingsphere 来实现。

二、使用步骤

1.引入库


        
            org.apache.shardingsphere
            shardingsphere-jdbc-core-spring-boot-starter
            5.2.0
        

2.环境配置+mysql表

create table `user` (
  `id` bigint(20) not null auto_increment,
  `username` varchar(255) not null,
  `password` varchar(255) not null,
  `gender` tinyint(4) not null comment '0:男 1:女',
  `createtime` datetime not null default current_timestamp comment '创建时间',
  `updatetime` datetime not null default current_timestamp on update current_timestamp comment '更新时间',
  primary key (`id`)
) engine=innodb auto_increment=1890651990057906179 default charset=utf8mb4;
# 配置服务器端口
server:
  port: 9999

# spring框架下的shardingsphere配置
spring:
  shardingsphere:
    # 模式配置,设置为独立模式
    mode:
      type: standalone
    # 数据源配置
    datasource:
      # 定义数据源名称
      names: ds0
      # 数据源ds0的具体配置
      ds0:
        # 数据源类型为hikaricp
        type: com.zaxxer.hikari.hikaridatasource
        # 数据库驱动类名称
        driver-class-name: com.mysql.cj.jdbc.driver
        # 数据库连接url,包含时区设置
        jdbc-url: jdbc:mysql://localhost:3306/sharding_db?servertimezone=asia/shanghai
        # 数据库用户名
        username: root
        # 数据库密码
        password: root
    # 规则配置
    rules:
      # 分片规则配置
      sharding:
        # 定义分片的表
        tables:
          user:
            # 只配置基础表,其他表会动态创建
            actual-data-nodes: ds0.user,ds0.user_202401,ds0.user_202402,ds0.user_202403,ds0.user_202404,ds0.user_202405
            table-strategy:
              standard:
                sharding-column: createtime
                sharding-algorithm-name: user_inline
            # 添加主键生成策略
            key-generate-strategy:
              column: id
              key-generator-name: snowflake
        sharding-algorithms:
          user_inline:
            type: class_based
            props:
              strategy: standard
              algorithmclassname: com.hhh.sharding.standa.usershardingalgorithm
        # 配置主键生成器
        key-generators:
          snowflake:
            type: snowflake
            props:
              worker-id: 123
        # 添加默认分片策略
        default-sharding-column: gender
    # 属性配置
    props:
      # 是否显示sql语句
      sql-show: true

# mybatis-plus配置
mybatis-plus:
  configuration:
    # 不将下划线转换为驼峰命名
    map-underscore-to-camel-case: false
    # 使用标准输出日志实现
    log-impl: org.apache.ibatis.logging.stdout.stdoutimpl
  global-config:
    enable-sql-runner: true

这里有一个注意事项,那就是id一定要使用bigint使用雪花策略算法来实现,至于为什么这样呢,是为了防止分表的主键id一致的情况,这里首先推荐就是使用mybatisplus来实现,因为他天然支持雪花算法

3.分表代码实现

主要是两个文件一个是自己实现分表算法的usershardingalgorithm文件

package com.hhh.sharding.standa;

import com.baomidou.mybatisplus.extension.toolkit.sqlrunner;
import com.hhh.sharding.domain.user;
import com.hhh.sharding.service.userservice;
import lombok.extern.slf4j.slf4j;
import lombok.var;
import org.apache.shardingsphere.driver.jdbc.core.connection.shardingsphereconnection;
import org.apache.shardingsphere.infra.metadata.database.rule.shardingsphererulemetadata;
import org.apache.shardingsphere.mode.manager.contextmanager;
import org.apache.shardingsphere.sharding.api.config.shardingruleconfiguration;
import org.apache.shardingsphere.sharding.api.config.rule.shardingtableruleconfiguration;
import org.apache.shardingsphere.sharding.rule.shardingrule;
import org.springframework.stereotype.component;

import javax.annotation.postconstruct;
import javax.annotation.resource;
import javax.sql.datasource;
import java.sql.sqlexception;
import java.text.simpledateformat;
import java.util.*;
import java.util.stream.collectors;

@component
@slf4j
public class dynamicshardingmanager {

    @resource
    private datasource datasource;

    @resource
    private userservice userservice;


    private static final string logic_table_name = "user";

    private static final string database_name = "sharding_db"; // 配置文件中的数据库名称

    @postconstruct
    public void initialize() {
        log.info("初始化动态分表配置...");
        updateshardingtablenodes();
    }


    /**
     * 获取所有用户相关的表名
     * 此方法旨在动态地收集所有用户表的表名,以支持可能存在的不同性别用户表
     * 如果无法获取动态表名或列表为空,则默认返回包含单一的默认用户表名"user"
     *
     * @return 包含所有用户表名的集合
     */
    private set fetchallusertablenames() {
        //获取所有动态化表名
        set tablenames = new hashset<>();
        try {
            // 获取用户列表
            list users = userservice.list();
            // 如果用户列表不为空,则映射每个用户到对应的表名,并收集到集合中
            if (users != null) {
                tablenames = users.stream()
                        .map(user -> "user_" + user.getgender())
                        .collect(collectors.toset());
            }
            // 确保至少包含默认表
            tablenames.add("user");
        } catch (exception e) {
            // 记录获取表名时发生的错误
            log.error("获取所有动态化表名失败", e);
            // 发生异常时至少返回默认表
            tablenames.add("user");
        }
        // 返回收集到的表名集合
        return tablenames;
    }


    /**
     * 动态更新分片表节点配置
     * 
     * 本方法旨在根据当前的用户表名称,动态地更新分片表的节点配置
     * 它首先获取所有用户表的名称,然后构建新的分片表节点配置,并尝试更新到数据库的元数据中
     */
    private void updateshardingtablenodes() {
        try {
            // 获取所有用户表的名称
            set tablenames = fetchallusertablenames();
            if (tablenames.isempty()) {
                // 如果未获取到任何表名,则使用默认的表配置
                log.warn("未获取到任何表名,将使用默认表配置");
                tablenames.add("user");
            }
    
            // 确保包含所有可能的表
            tablenames.add("user");
            tablenames.add("user_0");
            tablenames.add("user_1");
    
            // 构建新的分片表节点配置
            string newactualdatanodes = tablenames.stream()
                    .distinct()
                    .map(tablename -> "ds0." + tablename)
                    .collect(collectors.joining(","));
            log.info("动态分表 actual-data-nodes 配置: {}", newactualdatanodes);
    
            // 获取 contextmanager 实例
            contextmanager contextmanager = getcontextmanager();
            if (contextmanager == null) {
                log.error("获取 contextmanager 失败");
                return;
            }
    
            // 获取 metadatacontexts 实例
            var metadatacontexts = contextmanager.getmetadatacontexts();
            if (metadatacontexts == null) {
                log.error("获取 metadatacontexts 失败");
                return;
            }
    
            // 获取 metadata 实例
            var metadata = metadatacontexts.getmetadata();
            if (metadata == null) {
                log.error("获取 metadata 失败");
                return;
            }
    
            // 检查数据库是否存在
            var databases = metadata.getdatabases();
            if (databases == null || !databases.containskey(database_name)) {
                log.error("数据库 {} 不存在", database_name);
                return;
            }
    
            // 获取 shardingsphere 的规则元数据
            shardingsphererulemetadata rulemetadata = databases.get(database_name).getrulemetadata();
            if (rulemetadata == null) {
                log.error("获取规则元数据失败");
                return;
            }
    
            // 查找 shardingrule
            optional shardingrule = rulemetadata.findsinglerule(shardingrule.class);
            if (shardingrule.ispresent()) {
                // 获取分片规则配置
                shardingruleconfiguration ruleconfig = (shardingruleconfiguration) shardingrule.get().getconfiguration();
                if (ruleconfig.gettables() == null || ruleconfig.gettables().isempty()) {
                    log.error("分片规则配置为空");
                    return;
                }
    
                // 更新分片表规则配置
                list updatedrules = ruleconfig.gettables()
                        .stream()
                        .map(oldtablerule -> {
                            if (logic_table_name.equals(oldtablerule.getlogictable())) {
                                shardingtableruleconfiguration newtableruleconfig = new shardingtableruleconfiguration(logic_table_name, newactualdatanodes);
                                newtableruleconfig.setdatabaseshardingstrategy(oldtablerule.getdatabaseshardingstrategy());
                                newtableruleconfig.settableshardingstrategy(oldtablerule.gettableshardingstrategy());
                                newtableruleconfig.setkeygeneratestrategy(oldtablerule.getkeygeneratestrategy());
                                newtableruleconfig.setauditstrategy(oldtablerule.getauditstrategy());
                                return newtableruleconfig;
                            }
                            return oldtablerule;
                        })
                        .collect(collectors.tolist());
                ruleconfig.settables(updatedrules);
                
                // 尝试更新分片规则配置
                try {
                    contextmanager.alterruleconfiguration(database_name, collections.singleton(ruleconfig));
                    contextmanager.reloaddatabase(database_name);
                    log.info("动态分表规则更新成功!");
                } catch (exception e) {
                    log.error("更新分片规则失败", e);
                }
            } else {
                log.error("未找到 shardingsphere 的分片规则配置,动态分表更新失败。");
            }
        } catch (exception e) {
            log.error("更新分片规则时发生异常", e);
        }
    }

    /**
     * 获取 shardingsphere contextmanager
     */
    private contextmanager getcontextmanager() {
        try {
            if (datasource == null) {
                log.error("数据源未注入");
                return null;
            }
            
            var connection = datasource.getconnection();
            if (connection == null) {
                log.error("获取数据库连接失败");
                return null;
            }

            shardingsphereconnection shardingconnection = connection.unwrap(shardingsphereconnection.class);
            if (shardingconnection == null) {
                log.error("无法获取 shardingsphereconnection");
                connection.close();
                return null;
            }

            contextmanager contextmanager = shardingconnection.getcontextmanager();
            connection.close();
            return contextmanager;
        } catch (sqlexception e) {
            log.error("获取 shardingsphere contextmanager 失败", e);
            return null;
        }
    }

    /**
     * 根据用户信息创建用户表
     * 表名基于用户创建时间生成,格式为:logic_table_name_yyyymm
     * 如果表已存在,则不进行创建操作
     * 
     * @param user 用户对象,包含用户创建时间等信息
     */
    public void createusertable(user user) {
        // 获取用户创建时间
        date createtime = user.getcreatetime();
        // 创建日期格式化对象,用于生成表名
        simpledateformat dateformat = new simpledateformat("yyyymm");
        // 生成完整的表名
        string tablename = logic_table_name + "_" + dateformat.format(createtime);
        
        try {
            // 首先检查表是否已存在
            string checktablesql = "show tables like '" + tablename + "'";
            list> tables = sqlrunner.db().selectlist(checktablesql);
            
            // 如果表存在,记录日志并结束方法
            if (tables != null && !tables.isempty()) {
                log.info("表 {} 已经存在,无需创建", tablename);
                return;
            }
            
            // 创建表
            string createtablesql = "create table if not exists " + tablename + " like user";
            log.info("开始创建表,sql: {}", createtablesql);
            
            sqlrunner.db().update(createtablesql);
            log.info("表 {} 创建成功", tablename);
            
            // 更新分片配置
            updateshardingtablenodes();
        } catch (exception e) {
            log.error("创建分表 {} 失败: {}", tablename, e.getmessage(), e);
            // 检查异常消息,如果表已存在,则记录日志并结束方法
            if (e.getmessage() != null && e.getmessage().contains("already exists")) {
                log.info("表 {} 已经存在,继续处理", tablename);
                return;
            }
            // 如果异常与表已存在无关,则抛出运行时异常
            throw new runtimeexception("创建分表失败: " + e.getmessage(), e);
        }
    }

}
package com.hhh.sharding.standa;

import lombok.extern.slf4j.slf4j;
import org.apache.shardingsphere.sharding.api.sharding.standard.preciseshardingvalue;
import org.apache.shardingsphere.sharding.api.sharding.standard.rangeshardingvalue;
import org.apache.shardingsphere.sharding.api.sharding.standard.standardshardingalgorithm;

import java.text.simpledateformat;
import java.util.arraylist;
import java.util.collection;
import java.util.date;
import java.util.properties;

@slf4j
public class usershardingalgorithm implements standardshardingalgorithm {
    private static final simpledateformat date_format = new simpledateformat("yyyymm");

    @override
    public string dosharding(collection availabletargetnames, preciseshardingvalue preciseshardingvalue) {
        date createtime = preciseshardingvalue.getvalue();
        string logictablename = preciseshardingvalue.getlogictablename();
        
        log.info("分片算法执行 - 可用目标表: {}, 分片值: {}, 逻辑表名: {}", 
                availabletargetnames, createtime, logictablename);
        
        if (createtime == null) {
            log.info("createtime为空,返回逻辑表名: {}", logictablename);
            return logictablename;
        }
        
        // 根据 createtime 动态生成分表名
        string suffix = date_format.format(createtime);
        string realtablename = "user_" + suffix;
        log.info("计算得到的实际表名: {}", realtablename);
        
        if (availabletargetnames.contains(realtablename)) {
            log.info("找到匹配的目标表: {}", realtablename);
            return realtablename;
        } else {
            log.warn("未找到匹配的目标表,返回逻辑表名: {}", logictablename);
            return logictablename;
        }
    }

    @override
    public collection dosharding(collection collection, rangeshardingvalue rangeshardingvalue) {
        return new arraylist<>();
    }

    @override
    public properties getprops() {
        return new properties();
    }

    @override
    public void init(properties properties) {
        // 可以添加初始化逻辑
    }
}

4.测试用例

package com.hhh.sharding.controller;

import cn.hutool.core.util.randomutil;
import com.hhh.sharding.domain.user;
import com.hhh.sharding.service.userservice;
import com.hhh.sharding.standa.dynamicshardingmanager;
import org.springframework.web.bind.annotation.getmapping;
import org.springframework.web.bind.annotation.requestmapping;
import org.springframework.web.bind.annotation.restcontroller;

import javax.annotation.resource;
import java.util.date;
import java.util.list;
import java.text.simpledateformat;

@restcontroller
@requestmapping("/user")
public class usercontroller {

    @resource
    private userservice userservice;

    @resource
    private dynamicshardingmanager dynamicshardingmanager;

    @getmapping("/add")
    public boolean user() {
        // 创建一些2024年的随机日期
        date[] dates = {
            getdate("2024-01-15"),
            getdate("2024-02-20"),
            getdate("2024-03-10"),
            getdate("2024-04-05"),
            getdate("2024-05-25")
        };
        
        for (int i = 0; i < 10; i++) {
            user user = new user();
            user.setusername(generaterandomusername());
            user.setpassword("123456");
            user.setgender(randomutil.randomint(2));
            // 随机选择一个2024年的日期
            date randomdate = dates[randomutil.randomint(dates.length)];
            user.setcreatetime(randomdate);
            user.setupdatetime(randomdate);
            //这里每一次新增数据的时候去判断是否要创建出来当月的数据表,这张表一定要在    
            //application.yml中的actual-data-nodes中去添加
            dynamicshardingmanager.createusertable(user);
            userservice.save(user);
        }
        return true;
    }

    private date getdate(string datestr) {
        try {
            return new simpledateformat("yyyy-mm-dd").parse(datestr);
        } catch (exception e) {
            return new date();
        }
    }

    // 生成10位随机数字的用户名
    private string generaterandomusername() {
        return randomutil.randomnumbers(10);  // 生成10位数字
    }

    @getmapping("/all")
    public list all() {
        return userservice.list();
    }
}

5.测试结果

  • 新增数据

  • 查询数据

  • 数据库情况

  • 数据库表数据展示

总结

由于公司有一个需求那就是按月来分表展示数据,看了好多人的文章都没有效果,最终三天得以解决这个功能,故而写下此文章。

以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。

分享
海报
142
上一篇:SpringBoot如何实现一个Redis限流注解 下一篇:Java字符串操作全解析之语法、示例与应用场景分析

忘记密码?

图形验证码