如何高效实现 MySQL 与 elasticsearch 的数据同步

MySQL 自身简单、高效、可靠,是又拍云内部使用最广泛的数据库。但是当数据量达到一定程度的时候,对整个 MySQL 的操作会变得非常迟缓。而公司内部 robin/logs 表的数据量已经达到 800w,后续又有全文检索的需求。这个需求直接在 MySQL 上实施是难以做到的。
原数据库的同步问题
系统高耦合,侵入式代码,使得业务逻辑复杂度增加 方案不通用,每一套同步都需要额外定制,不仅增加业务处理时间,还会提升软件复复杂度 工作量和复杂度增加
解决思路及方案
调整架构
改进数据库
成果展示前后对比
方案实施细节

MySQL Kafka Maxwell(监听 binlog) Logstash(将数据同步给 elasticsearch) Elasticsearch
1. MySQL配置
本次使用 MySQL 5.5 作示范,其他版本的配置可能稍许不同需要
-- 创建一个 用户名为 maxwell 密码为 xxxxxx 的用户
CREATE USER 'maxwell'@'%' IDENTIFIED BY 'XXXXXX';
GRANT ALL ON maxwell.* TO 'maxwell'@'localhost';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';
开启数据库的 `binlog`,修改 `mysql` 配置文件,注意 `maxwell` 需要的 `binlog` 格式必须是`row`。
# /etc/mysql/my.cnf
[mysqld]
# maxwell 需要的 binlog 格式必须是 row
binlog_format=row
# 指定 server_id 此配置关系到主从同步需要按情况设置,
# 由于此mysql没有开启主从同步,这边默认设置为 1
server_id=1
# logbin 输出的文件名, 按需配置
log-bin=master
sudo systemctl restart mysqld
select @@log_bin;
-- 正确结果是 1
select @@binlog_format;
-- 正确结果是 ROW
# /etc/my.cnf
log_slave_updates = 1
-- robin.logs
show create table robin.logs;
-- 表结构
CREATE TABLE `logs` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`content` text NOT NULL,
`user_id` int(11) NOT NULL,
`status` enum('SUCCESS','FAILED','PROCESSING') NOT NULL,
`type` varchar(20) DEFAULT '',
`meta` text,
`created_at` bigint(15) NOT NULL,
`idx_host` varchar(255) DEFAULT '',
`idx_domain_id` int(11) unsigned DEFAULT NULL,
`idx_record_value` varchar(255) DEFAULT '',
`idx_record_opt` enum('DELETE','ENABLED','DISABLED') DEFAULT NULL,
`idx_orig_record_value` varchar(255) DEFAULT '',
PRIMARY KEY (`id`),
KEY `created_at` (`created_at`)
) ENGINE=InnoDB AUTO_INCREMENT=8170697 DEFAULT CHARSET=utf8
2. Maxwell 配置
本次使用 maxwell-1.39.2 作示范, 确保机器中包含 java 环境, 推荐 openjdk11
下载 maxwell 程序
wget https://github.com/zendesk/maxwell/releases/download/v1.39.2/maxwell-1.39.2.tar.gz
tar zxvf maxwell-1.39.2.tar.gz **&&** cd maxwell-1.39.2
一个是需要被监听binlog的数据库(只需要读权限) 另一个是记录maxwell服务状态的数据库,当前这两个数据库可以是同一个
host 需要监听binlog的数据库地址 port 需要监听binlog的数据库端口 user 需要监听binlog的数据库用户名 password 需要监听binlog的密码 replication_host 记录maxwell服务的数据库地址 replication_port 记录maxwell服务的数据库端口 replication_user 记录maxwell服务的数据库用户名 filter 用于监听binlog数据时过滤不需要的数据库数据或指定需要的数据库 producer 将监听到的增量变化数据提交给的消费者 (如 stdout、kafka) kafka.bootstrap.servers kafka 服务地址 kafka_version kafka 版本 kafka_topic 推送到kafka的主题
启动 maxwell
./bin/maxwell
--host=mysql-maxwell.mysql.svc.cluster.fud3
--port=3306
--user=root
--password=password
--replication_host=192.168.5.38
--replication_port=3306
--replication_user=cloner
--replication_password=password
--filter='exclude: *.*, include: robin.logs'
--producer=kafka
--kafka.bootstrap.servers=192.168.30.10:9092
--kafka_topic=maxwell-robinlogs --kafka_version=0.9.0.1
3. 安装 Logstash
wget https://artifacts.elastic.co/downloads/logstash/logstash-8.5.0-linux-x86_64.tar.gz
tar zxvf logstash-8.5.0-linux-x86_64.tar.gz
rm config/logstash.yml
# config/logstash-sample.conf
input {
kafka {
bootstrap_servers => "192.168.30.10:9092"
group_id => "main"
topics => ["maxwell-robinlogs"]
}
}
filter {
json {
source => "message"
}
# 将maxwell的事件类型转化为es的事件类型
# 如增加 -> index 修改-> update
translate {
source => "[type]"
target => "[action]"
dictionary => {
"insert" => "index"
"bootstrap-insert" => "index"
"update" => "update"
"delete" => "delete"
}
fallback => "unknown"
}
# 过滤无效的数据
if ([action] == "unknown") {
drop {}
}
# 处理数据格式
if [data][idx_host] {
mutate {
add_field => { "idx_host" => "%{[data][idx_host]}" }
}
} else {
mutate {
add_field => { "idx_host" => "" }
}
}
if [data][idx_domain_id] {
mutate {
add_field => { "idx_domain_id" => "%{[data][idx_domain_id]}" }
}
} else {
mutate {
add_field => { "idx_domain_id" => "" }
}
}
if [data][idx_record_value] {
mutate {
add_field => { "idx_record_value" => "%{[data][idx_record_value]}" }
}
} else {
mutate {
add_field => { "idx_record_value" => "" }
}
}
if [data][idx_record_opt] {
mutate {
add_field => { "idx_record_opt" => "%{[data][idx_record_opt]}" }
}
} else {
mutate {
add_field => { "idx_record_opt" => "" }
}
}
if [data][idx_orig_record_value] {
mutate {
add_field => { "idx_orig_record_value" => "%{[data][idx_orig_record_value]}" }
}
} else {
mutate {
add_field => { "idx_orig_record_value" => "" }
}
}
if [data][type] {
mutate {
replace => { "type" => "%{[data][type]}" }
}
} else {
mutate {
replace => { "type" => "" }
}
}
mutate {
add_field => {
"id" => "%{[data][id]}"
"content" => "%{[data][content]}"
"user_id" => "%{[data][user_id]}"
"status" => "%{[data][status]}"
"meta" => "%{[data][meta]}"
"created_at" => "%{[data][created_at]}"
}
remove_field => ["data"]
}
mutate {
convert => {
"id" => "integer"
"user_id" => "integer"
"idx_domain_id" => "integer"
"created_at" => "integer"
}
}
# 只提炼需要的字段
mutate {
remove_field => [
"message",
"original",
"@version",
"@timestamp",
"event",
"database",
"table",
"ts",
"xid",
"commit",
"tags"
]
}
}
output {
# 结果写到es
elasticsearch {
hosts => ["http://es-zico2.service.upyun:9500"]
index => "robin_logs"
action => "%{action}"
document_id => "%{id}"
document_type => "robin_logs"
}
# 结果打印到标准输出
stdout {
codec => rubydebug
}
}
# 测试配置文件*
bin/logstash -f config/logstash-sample.conf --config.test_and_exit
# 启动*
bin/logstash -f config/logstash-sample.conf --config.reload.automatic
4. 全量同步
INSERT INTO maxwell.bootstrap
( database_name, table_name, where_clause, client_id )
values
( 'robin', 'logs', 'id > 1', 'maxwell' );
# 检测 elasticsearch 中的数据量
GET robin_logs/robin_logs/_count
快 来 找 又 小 拍

推 荐 阅 读 


设为星标

更新不错过




设为星标

更新不错过
[广告]赞助链接:
关注数据与安全,洞悉企业级服务市场:https://www.ijiandao.com/
让资讯触达的更精准有趣:https://www.0xu.cn/

随时掌握互联网精彩
- QQ浏览器宣布接入DeepSeek-R1满血版:支持深度思考、联网搜索
- 华为公布 HarmonyOS 3 升级最新进展;内部人士回应马斯克决定任命朱晓彤为特斯拉全球 CEO|极客头条
- 一个做了 6 年开源操作系统开发者的内心独白:这不是没有意义!
- 国内首次!3位清华姚班本科生斩获STOC最佳学生论文奖
- B站大量虚拟主播被集体强制退款:收入蒸发,还倒欠B站;乔布斯被追授美国总统自由勋章;Grafana 9 发布|极客头条
- 美国拆除SSNDOB地下市场
- Open Talk 线上专场|算力时代 K8S 集群如何高效运行
- 共创云上新价值,服务型机器人迎来发展新浪潮
- 做技术开发到老 or 晋升管理层,程序员的终极目标是什么?
- 开拍!用华为手机拍摄你的第一部电影
- 披着“智慧”外衣的停车场,明目张胆泄露隐私
- 第二届用友·华为云杯开发者大赛落幕 低代码开发平台将爆发?
赞助链接