您的位置:首页 > 游戏 > 手游 > 动画片制作教程_公共资源交易中心怎么报名投标_今天的新闻头条_交换友情链接的渠道有哪些

动画片制作教程_公共资源交易中心怎么报名投标_今天的新闻头条_交换友情链接的渠道有哪些

2025/5/3 22:11:02 来源:https://blog.csdn.net/weixin_63443072/article/details/146340906  浏览:    关键词:动画片制作教程_公共资源交易中心怎么报名投标_今天的新闻头条_交换友情链接的渠道有哪些
动画片制作教程_公共资源交易中心怎么报名投标_今天的新闻头条_交换友情链接的渠道有哪些

目录

引言

一、为什么需要MySQL到ES的同步?

二、四大同步方案对比

三、方案详解与代码实战

1. 应用层双写:简单但强耦合

2. 定时任务同步:可控的准实时

3. Logstash JDBC:离线迁移利器

4. Binlog监听:生产级实时同步(推荐)

四、避坑指南:关键注意事项

五、总结


引言

Elasticsearch(ES)凭借其强大的全文搜索和实时分析能力,已成为现代应用的核心组件。但当数据存储在MySQL时,如何实现高效、可靠的双向同步?本文将深入解析四种主流同步方案,涵盖从简单双写到生产级实时同步的全场景,并提供可落地的代码实现。

官网


一、为什么需要MySQL到ES的同步?

1. 全文搜索:ES支持分词、模糊匹配,弥补MySQL LIKE查询性能差的缺陷

2. 复杂聚合:ES Bucket和Metric聚合实现毫秒级多维分析

3. 数据异构:ES支持嵌套文档、向量搜索等灵活的数据结构

4. 读写分离:将复杂查询流量从MySQL卸载到ES,提升系统整体性能


二、四大同步方案对比

方案实时性数据一致性开发成本适用场景
应用层双写实时难保证小型项目,数据量小
定时任务同步分钟级最终一致允许延迟,增量同步场景
Logstash JDBC小时级最终一致离线历史数据迁移
Binlog监听秒级强一致生产环境高实时性要求

三、方案详解与代码实战

1. 应用层双写:简单但强耦合

原理:在业务代码中同步写入MySQL和ES,适合初创项目快速验证。

// Node.js 示例(注意事务回滚!)
async function createOrder(orderData) {// 1. MySQL写入const [mysqlResult] = await mysql.query('INSERT INTO orders SET ?', orderData);// 2. ES同步try {await elasticClient.index({index: 'orders',id: mysqlResult.insertId.toString(),body: orderData});} catch (e) {// ES写入失败则回滚MySQLawait mysql.query('DELETE FROM orders WHERE id = ?', [mysqlResult.insertId]);throw e;}
}

缺陷

•业务侵入性强,需维护两套数据模型

•分布式事务难题(建议本地事务表+补偿机制)


2. 定时任务同步:可控的准实时

核心步骤

1. MySQL表添加`updatedat`字段

2. 定时扫描增量数据批量推送到ES

// 使用Node.js定时任务(示例:每10分钟)
const schedule = require('node-schedule');
let lastSyncTime = new Date('2024-01-01');
schedule.scheduleJob('*/10 * * * *', async () => {const results = await mysql.query(`SELECT * FROM orders WHERE updated_at > ?`, [lastSyncTime]);// 构造ES Bulk API请求体const bulkBody = results.flatMap(doc => [{ index: { _index: 'orders', _id: doc.id } },{ ...doc, timestamp: new Date() } // 可追加自定义字段]);if (bulkBody.length > 0) {await elasticClient.bulk({ body: bulkBody });lastSyncTime = new Date(); // 持久化存储时间戳防宕机}
});

优化技巧

•使用`trackingcolumn`记录断点(如Redis存储`lastSyncTime`)

•分页查询避免内存溢出


3. Logstash JDBC:离线迁移利器

配置要点

•安装MySQL驱动到Logstash的`/logstash-core/lib/jars/`

•定时轮询策略

# mysql-to-es.conf
input {jdbc {jdbc_driver_class => "com.mysql.cj.jdbc.Driver"jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"jdbc_user => "admin"jdbc_password => "Passw0rd!"schedule => "*/30 * * * *" # 每30分钟statement => "SELECT * FROM products WHERE updated_at > :sql_last_value"tracking_column => "updated_at"tracking_column_type => "timestamp"last_run_metadata_path => "/tmp/products_last_run.time"}
}
output {elasticsearch {hosts => ["http://es-node1:9200"]index => "products"document_id => "%{id}"}
}

启动命令

bin/logstash -f mysql-to-es.conf

4. Binlog监听:生产级实时同步(推荐)

架构

`MySQL -> Canal/Debezium -> Kafka -> ES Consumer`

Debezium实战步骤

1. 启动Kafka集群

docker-compose up -d zookeeper kafka schema-registry

2. 部署Debezium MySQL Connector

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{"name": "mysql-connector","config": {"connector.class": "io.debezium.connector.mysql.MySqlConnector","database.hostname": "mysql","database.user": "debezium","database.password": "dbz","database.server.name": "inventory","table.include.list": "inventory.products","database.history.kafka.bootstrap.servers": "kafka:9092"}
}'

3. 编写ES消费者

const { Kafka } = require('kafkajs');
const kafka = new Kafka({ brokers: ['kafka:9092'] });
const consumer = kafka.consumer({ groupId: 'es-sync' });
consumer.connect().then(() => {consumer.subscribe({ topic: 'inventory.products' });consumer.run({eachMessage: async ({ message }) => {const event = JSON.parse(message.value);switch (event.op) {case 'c':case 'u':await esClient.index({index: 'products',id: event.after.id,body: event.after});break;case 'd':await esClient.delete({ index: 'products', id: event.before.id });break;}}});
});

四、避坑指南:关键注意事项

1. 数据一致性

  • 使用`version`字段实现乐观锁(ES的`ifseqno`和`ifprimaryterm`)
  • 幂等写入:确保重复消费消息不会导致数据错误

2. 性能优化

  • ES批量写入使用`Bulk API`,建议每批1000-5000条
  • 调整MySQL的Binlog格式为`ROW`,确保Debezium正确解析

3. 错误处理

  • 死信队列(DLQ)存储同步失败的数据
  • 监控延迟:通过Kafka的`consumer lag`检测同步进度

五、总结

初创项目:从应用层双写快速起步

存量数据迁移:Logstash JDBC + 定时任务组合拳

生产环境:必选Binlog监听方案,保障实时性与可靠性

技术选型建议:根据团队技术栈选择中间件——熟悉Java生态选Canal,云原生环境用Debezium+Kafka。

通过本文的代码示例和架构解析,您可快速构建适合自身业务的MySQL到ES同步管道。同步方案无银弹,合理权衡实时性、复杂度与运维成本是关键。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com