MySQL同步Elasticsearch的6种方案小结
引言
在分布式架构中,mysql与elasticsearch(es)的协同已成为解决高并发查询与复杂检索的标配组合。
然而,如何实现两者间的高效数据同步,是架构设计中绕不开的难题。
这篇文章跟大家一起聊聊mysql同步es的6种主流方案,结合代码示例与场景案例,帮助开发者避开常见陷阱,做出最优技术选型。
方案一:同步双写
场景:适用于对数据实时性要求极高,且业务逻辑简单的场景,如金融交易记录同步。
在业务代码中同时写入mysql与es。
代码如下:
@transactional public void createorder(order order) { // 写入mysql ordermapper.insert(order); // 同步写入es indexrequest request = new indexrequest("orders") .id(order.getid()) .source(json.tojsonstring(order), xcontenttype.json); client.index(request, requestoptions.default); }
痛点:
- 硬编码侵入:所有涉及写操作的地方均需添加es写入逻辑。
- 性能瓶颈:双写操作导致事务时间延长,tps下降30%以上。
- 数据一致性风险:若es写入失败,需引入补偿机制(如本地事务表+定时重试)。
方案二:异步双写
场景:电商订单状态更新后需同步至es供客服系统检索。
我们可以使用mq进行解耦。
架构图如下:
代码示例如下:
// 生产者端 public void updateproduct(product product) { productmapper.update(product); kafkatemplate.send("product-update", product.getid()); } // 消费者端 @kafkalistener(topics = "product-update") public void synctoes(string productid) { product product = productmapper.selectbyid(productid); esclient.index(product); }
优势:
- 吞吐量提升:通过mq削峰填谷,可承载万级qps。
- 故障隔离:es宕机不影响主业务链路。
缺陷:
- 消息堆积:突发流量可能导致消费延迟(需监控lag值)。
- 顺序性问题:需通过分区键保证同一数据的顺序消费。
方案三:logstash定时拉取
场景:用户行为日志的t+1分析场景。
该方案低侵入但高延迟。
配置示例如下:
input { jdbc { jdbc_driver => "com.mysql.jdbc.driver" jdbc_url => "jdbc:mysql://localhost:3306/log_db" schedule => "*/5 * * * *" # 每5分钟执行 statement => "select * from user_log where update_time > :sql_last_value" } } output { elasticsearch { hosts => ["es-host:9200"] index => "user_logs" } }
适用性分析:
- 优点:零代码改造,适合历史数据迁移。
- 致命伤:
- 分钟级延迟(无法满足实时搜索)
- 全表扫描压力大(需优化增量字段索引)
方案四:canal监听binlog
场景:社交平台动态实时搜索(如微博热搜更新)。
技术栈:canal + rocketmq + es
该方案高实时,并且低侵入。
架构流程如下:
关键配置:
# canal.properties canal.instance.master.address=127.0.0.1:3306 canal.mq.topic=canal.es.sync
避坑指南:
- 数据漂移:需处理ddl变更(通过schema registry管理映射)。
- 幂等消费:通过
_id
唯一键避免重复写入。
方案五:datax批量同步
场景:将历史订单数据从分库分表mysql迁移至es。
该方案是大数据迁移的首选。
配置文件如下:
{ "job": { "content": [{ "reader": { "name": "mysqlreader", "parameter": { "splitpk": "id", "querysql": "select * from orders" } }, "writer": { "name": "elasticsearchwriter", "parameter": { "endpoint": "http://es-host:9200", "index": "orders" } } }] } }
性能调优:
- 调整
channel
数提升并发(建议与分片数对齐) - 启用
limit
分批查询避免oom
方案六:flink流处理
场景:商品价格变更时,需关联用户画像计算实时推荐评分。
该方案适合于复杂的etl场景。
代码片段如下:
streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment(); env.addsource(new canalsource()) .map(record -> parsetopriceevent(record)) .keyby(event -> event.getproductid()) .connect(userprofilebroadcaststream) .process(new pricerecommendationprocess()) .addsink(new elasticsearchsink());
优势:
- 状态管理:精准处理乱序事件(watermark机制)
- 维表关联:通过broadcast state实现实时画像关联
总结
对于文章上面给出的这6种技术方案,我们在实际工作中,该如何做选型呢?
下面用一张表格做对比:
方案 | 实时性 | 侵入性 | 复杂度 | 适用阶段 |
---|---|---|---|---|
同步双写 | 秒级 | 高 | 低 | 小型单体项目 |
mq异步 | 秒级 | 中 | 中 | 中型分布式系统 |
logstash | 分钟级 | 无 | 低 | 离线分析 |
canal | 毫秒级 | 无 | 高 | 高并发生产环境 |
datax | 小时级 | 无 | 中 | 历史数据迁移 |
flink | 毫秒级 | 低 | 极高 | 实时数仓 |
苏三的建议:
- 若团队无运维中间件能力 → 选择logstash或同步双写
- 需秒级延迟且允许改造 → mq异步 + 本地事务表
- 追求极致实时且资源充足 → canal + flink双保险
到此这篇关于mysql同步elasticsearch的6种方案小结的文章就介绍到这了,更多相关mysql同步elasticsearch内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
推荐阅读
-
Nginx重启失败排查与解决方案
前言在linux系统中,nginx作为高性能的web服务器和反向代理服务器,广泛应用于各类生产环境中。然而,作为一款强大而灵活的...
-
Apache Sqoop数据采集原理解析
sqoop数据采集格式问题apachesqoop是一款开源的工具,主要用于在hadoop(hive)与传统的数据库(mysql...
-
MySQL中的分组和多表连接详解
mysql中的分组和多表连接一、mysql的分组(groupby)单例函数函数含义lower将列内容变成小写upper将...
-
MySQL主从同步延迟问题的全面解决方案
一、同步延迟原因深度分析1.1主从复制原理回顾mysql主从复制流程:主库binlog→主库dump线程→从库io...
-
浅谈MySQL中drop、truncate和delete的区别
1.前言对于drop、truncate和delete,虽然简单,但是真要使用或者面试时候问到还是需要有一定的总结,今天来简单讲讲...
-
MySQL查询重写如何把复杂查询变简单详解
-
MySQL数据库约束深入详解
-
Linux搭建单机MySQL8.0.26版本的操作方法
-
史上最全nginx详细参数配置
(enginex)是一个轻量级高性能的http和反向代理服务器,同时也是一个通用代理服务器(tcp/udp/imap/po...
-
nginx负载均衡及详细配置方法
一、nginx负载均衡策略nginx作为一种高效的web服务器和反向代理服务器,广泛应用于网站的负载均衡中。负载均衡是指将接收...