- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
MySQL 自身简单、高效、可靠,是又拍云内部使用最广泛的数据库。但是当数据量达到一定程度的时候,对整个 MySQL 的操作会变得非常迟缓。而公司内部 robin/logs 表的数据量已经达到 800w,后续又有全文检索的需求。这个需求直接在 MySQL 上实施是难以做到的.
由于传统的 mysql 数据库并不擅长海量数据的检索,当数据量到达一定规模时(估算单表两千万左右),查询和插入的耗时会明显增加。同样,当需要对这些数据进行模糊查询或是数据分析时,MySQL作为事务型关系数据库很难提供良好的性能支持。使用适合的数据库来实现模糊查询是解决这个问题的关键.
但是,切换数据库会迎来两个问题,一是已有的服务对现在的 MySQL 重度依赖,二是 MySQL 的事务能力和软件生态仍然不可替代,直接迁移数据库的成本过大。我们综合考虑了下,决定同时使用多个数据库的方案,不同的数据库应用于不同的使用场景。而在支持模糊查询功能的数据库中,elasticsearch 自然是首选的查询数据库。这样后续对业务需求的切换也会非常灵活.
那具体该如何实现呢?在又拍云以往的项目中,也有遇到相似的问题。之前采用的方法是在业务中编写代码,然后同步到 elasticsearch 中。具体是这样实施的:每个系统编写特定的代码,修改 MySQL 数据库后,再将更新的数据直接推送到需要同步的数据库中,或推送到队列由消费程序来写入到数据库中.
但这个方案有一些明显的缺点:
系统高耦合,侵入式代码,使得业务逻辑复杂度增加 。
方案不通用,每一套同步都需要额外定制,不仅增加业务处理时间,还会提升软件复复杂度 。
工作量和复杂度增加 。
在业务中编写同步方案,虽然在项目早期比较方便,但随着数据量和系统的发展壮大,往往最后会成为业务的大痛点.
既然以往的方案有明显的缺点,那我们如何来解决它呢?优秀的解决方案往往是 “通过架构来解决问题“,那么能不能通过架构的思想来解决问题呢?
答案是可以的。我们可以将程序伪装成 “从数据库”,主库的增量变化会传递到从库,那这个伪装成 “从数据库” 的程序就能实时获取到数据变化,然后将增量的变化推送到消息队列 MQ,后续消费者消耗 MQ 的数据,然后经过处理之后再推送到各自需要的数据库.
这个架构的核心是通过监听 MySQL 的 binlog 来同步增量数据,通过基于 query 的查询旧表来同步旧数据,这就是本文要讲的一种异构数据库同步的实践.
经过深度的调研,成功得到了一套异构数据库同步方案,并且成功将公司生产环境下的 robin/logs 的表同步到了 elasticsearch 上.
首先对 MySQL 开启 binlog,但是由于 maxwell 需要的 binlog_format=row 原本的生产环境的数据库不宜修改。这里请教了海杨前辈,他提供了”从库联级“的思路,在从库中监听 binlog 绕过了操作生产环境重启主库的操作,大大降低了系统风险.
后续操作比较顺利,启动 maxwell 监听从库变化,然后将增量变化推送到 kafka ,最后配置 logstash 消费 kafka中的数据变化事件信息,将结果推送到 elasticsearch。配置 logstash需要结合表结构,这是整套方案实施的重点.
这套方案使用到了kafka、maxwell、logstash、elasticsearch。其中 elasticsearch 与 kafka已经在生产环境中有部署,所以无需单独部署维护。而 logstash 与 maxwell 只需要修改配置文件和启动命令即可快速上线。整套方案的意义不仅在于成本低,而且可以大规模使用,公司内有 MySQL 同步到其它数据库的需求时,都可以上任.
使用该方案同步和业务实现同步的对比 。
写入到 elasticsearch 性能对比 (8核4G内存) 。
经过对比测试,800w 数据量全量同步,使用 logstash 写到 elasticsearch,实际需要大概 3 小时,而旧方案的写入时间需要 2.5 天.
接下来,我们来看看具体是如何实现的.
本方案无需编写额外代码,非侵入式的,实现 MySQL 数据与 elasticsearch 数据库的同步.
下列是本次方案需要使用所有的组件:
MySQL 。
Kafka 。
Maxwell(监听 binlog) 。
Logstash(将数据同步给 elasticsearch) 。
Elasticsearch 。
本次使用 MySQL 5.5 作示范,其他版本的配置可能稍许不同需要 。
首先我们需要增加一个数据库只读的用户,如果已有的可以跳过.
-- 创建一个 用户名为 maxwell 密码为 xxxxxx 的用户
CREATE USER 'maxwell'@'%' IDENTIFIED BY 'XXXXXX';
GRANT ALL ON maxwell.* TO 'maxwell'@'localhost';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';
开启数据库的 binlog ,修改 mysql 配置文件,注意 maxwell 需要的 binlog 格式必须是 row .
# /etc/mysql/my.cnf
[mysqld]
# maxwell 需要的 binlog 格式必须是 row
binlog_format=row
# 指定 server_id 此配置关系到主从同步需要按情况设置,
# 由于此mysql没有开启主从同步,这边默认设置为 1
server_id=1
# logbin 输出的文件名, 按需配置
log-bin=master
重启 MySQL 并查看配置是否生效:
sudo systemctl restart mysqld
select @@log_bin;
-- 正确结果是 1
select @@binlog_format;
-- 正确结果是 ROW
如果要监听的数据库开启了主从同步,并且不是主数据库,需要再从数据库开启 binlog 联级同步.
# /etc/my.cnf
log_slave_updates = 1
需要被同步到 elasticsearch 的表结构.
-- robin.logs
show create table robin.logs;
-- 表结构
CREATE TABLE `logs` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`content` text NOT NULL,
`user_id` int(11) NOT NULL,
`status` enum('SUCCESS','FAILED','PROCESSING') NOT NULL,
`type` varchar(20) DEFAULT '',
`meta` text,
`created_at` bigint(15) NOT NULL,
`idx_host` varchar(255) DEFAULT '',
`idx_domain_id` int(11) unsigned DEFAULT NULL,
`idx_record_value` varchar(255) DEFAULT '',
`idx_record_opt` enum('DELETE','ENABLED','DISABLED') DEFAULT NULL,
`idx_orig_record_value` varchar(255) DEFAULT '',
PRIMARY KEY (`id`),
KEY `created_at` (`created_at`)
) ENGINE=InnoDB AUTO_INCREMENT=8170697 DEFAULT CHARSET=utf8
本次使用 maxwell-1.39.2 作示范, 确保机器中包含 java 环境, 推荐 openjdk11 。
下载 maxwell 程序 。
wget https://github.com/zendesk/maxwell/releases/download/v1.39.2/maxwell-1.39.2.tar.gz
tar zxvf maxwell-1.39.2.tar.gz **&&** cd maxwell-1.39.2
maxwell 使用了两个数据库:
一个是需要被监听binlog的数据库(只需要读权限) 。
另一个是记录maxwell服务状态的数据库,当前这两个数据库可以是同一个 。
重要参数说明:
host 需要监听binlog的数据库地址 。
port 需要监听binlog的数据库端口 。
user 需要监听binlog的数据库用户名 。
password 需要监听binlog的密码 。
replication_host 记录maxwell服务的数据库地址 。
replication_port 记录maxwell服务的数据库端口 。
replication_user 记录maxwell服务的数据库用户名 。
filter 用于监听binlog数据时过滤不需要的数据库数据或指定需要的数据库 。
producer 将监听到的增量变化数据提交给的消费者 (如 stdout、kafka) 。
kafka.bootstrap.servers kafka 服务地址 。
kafka_version kafka 版本 。
kafka_topic 推送到kafka的主题 。
启动 maxwell 。
注意,如果 kafka 配置了禁止自动创建主题,需要先自行在 kafka 上创建主题,kafka_version 需要根据情况指定, 此次使用了两张不同的库 。
./bin/maxwell
--host=mysql-maxwell.mysql.svc.cluster.fud3
--port=3306
--user=root
--password=password
--replication_host=192.168.5.38
--replication_port=3306
--replication_user=cloner
--replication_password=password
--filter='exclude: *.*, include: robin.logs'
--producer=kafka
--kafka.bootstrap.servers=192.168.30.10:9092
--kafka_topic=maxwell-robinlogs --kafka_version=0.9.0.1
Logstash 包中已经包含了 openjdk,无需额外安装.
wget https://artifacts.elastic.co/downloads/logstash/logstash-8.5.0-linux-x86_64.tar.gz
tar zxvf logstash-8.5.0-linux-x86_64.tar.gz
删除不需要的配置文件.
rm config/logstash.yml
修改 logstash 配置文件,此处语法参考官方文档( https://www.elastic.co/guide/en/logstash/current/input-plugins.html) .
# config/logstash-sample.conf
input {
kafka {
bootstrap_servers => "192.168.30.10:9092"
group_id => "main"
topics => ["maxwell-robinlogs"]
}
}
filter {
json {
source => "message"
}
# 将maxwell的事件类型转化为es的事件类型
# 如增加 -> index 修改-> update
translate {
source => "[type]"
target => "[action]"
dictionary => {
"insert" => "index"
"bootstrap-insert" => "index"
"update" => "update"
"delete" => "delete"
}
fallback => "unknown"
}
# 过滤无效的数据
if ([action] == "unknown") {
drop {}
}
# 处理数据格式
if [data][idx_host] {
mutate {
add_field => { "idx_host" => "%{[data][idx_host]}" }
}
} else {
mutate {
add_field => { "idx_host" => "" }
}
}
if [data][idx_domain_id] {
mutate {
add_field => { "idx_domain_id" => "%{[data][idx_domain_id]}" }
}
} else {
mutate {
add_field => { "idx_domain_id" => "" }
}
}
if [data][idx_record_value] {
mutate {
add_field => { "idx_record_value" => "%{[data][idx_record_value]}" }
}
} else {
mutate {
add_field => { "idx_record_value" => "" }
}
}
if [data][idx_record_opt] {
mutate {
add_field => { "idx_record_opt" => "%{[data][idx_record_opt]}" }
}
} else {
mutate {
add_field => { "idx_record_opt" => "" }
}
}
if [data][idx_orig_record_value] {
mutate {
add_field => { "idx_orig_record_value" => "%{[data][idx_orig_record_value]}" }
}
} else {
mutate {
add_field => { "idx_orig_record_value" => "" }
}
}
if [data][type] {
mutate {
replace => { "type" => "%{[data][type]}" }
}
} else {
mutate {
replace => { "type" => "" }
}
}
mutate {
add_field => {
"id" => "%{[data][id]}"
"content" => "%{[data][content]}"
"user_id" => "%{[data][user_id]}"
"status" => "%{[data][status]}"
"meta" => "%{[data][meta]}"
"created_at" => "%{[data][created_at]}"
}
remove_field => ["data"]
}
mutate {
convert => {
"id" => "integer"
"user_id" => "integer"
"idx_domain_id" => "integer"
"created_at" => "integer"
}
}
# 只提炼需要的字段
mutate {
remove_field => [
"message",
"original",
"@version",
"@timestamp",
"event",
"database",
"table",
"ts",
"xid",
"commit",
"tags"
]
}
}
output {
# 结果写到es
elasticsearch {
hosts => ["http://es-zico2.service.upyun:9500"]
index => "robin_logs"
action => "%{action}"
document_id => "%{id}"
document_type => "robin_logs"
}
# 结果打印到标准输出
stdout {
codec => rubydebug
}
}
执行程序:
# 测试配置文件*
bin/logstash -f config/logstash-sample.conf --config.test_and_exit
# 启动*
bin/logstash -f config/logstash-sample.conf --config.reload.automatic
完成启动后,后续的增量数据 maxwell 会自动推送给 logstash 最终推送到 elasticsearch ,而之前的旧数据可以通过 maxwell 的 bootstrap 来同步,往下面表中插入一条任务,那么 maxwell 会自动将所有符合条件的 where_clause 的数据推送更新.
INSERT INTO maxwell.bootstrap
( database_name, table_name, where_clause, client_id )
values
( 'robin', 'logs', 'id > 1', 'maxwell' );
后续可以在 elasticsearch 检测数据是否同步完成,可以先查看数量是否一致,然后抽样对比详细数据.
# 检测 elasticsearch 中的数据量
GET robin_logs/robin_logs/_count
最后此篇关于如何高效实现MySQL与elasticsearch的数据同步的文章就讲到这里了,如果你想了解更多关于如何高效实现MySQL与elasticsearch的数据同步的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
背景: 我最近一直在使用 JPA,我为相当大的关系数据库项目生成持久层的轻松程度给我留下了深刻的印象。 我们公司使用大量非 SQL 数据库,特别是面向列的数据库。我对可能对这些数据库使用 JPA 有一
我已经在我的 maven pom 中添加了这些构建配置,因为我希望将 Apache Solr 依赖项与 Jar 捆绑在一起。否则我得到了 SolarServerException: ClassNotF
interface ITurtle { void Fight(); void EatPizza(); } interface ILeonardo : ITurtle {
我希望可用于 Java 的对象/关系映射 (ORM) 工具之一能够满足这些要求: 使用 JPA 或 native SQL 查询获取大量行并将其作为实体对象返回。 允许在行(实体)中进行迭代,并在对当前
好像没有,因为我有实现From for 的代码, 我可以转换 A到 B与 .into() , 但同样的事情不适用于 Vec .into()一个Vec . 要么我搞砸了阻止实现派生的事情,要么这不应该发
在 C# 中,如果 A 实现 IX 并且 B 继承自 A ,是否必然遵循 B 实现 IX?如果是,是因为 LSP 吗?之间有什么区别吗: 1. Interface IX; Class A : IX;
就目前而言,这个问题不适合我们的问答形式。我们希望答案得到事实、引用资料或专业知识的支持,但这个问题可能会引发辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visit the
我正在阅读标准haskell库的(^)的实现代码: (^) :: (Num a, Integral b) => a -> b -> a x0 ^ y0 | y0 a -> b ->a expo x0
我将把国际象棋游戏表示为 C++ 结构。我认为,最好的选择是树结构(因为在每个深度我们都有几个可能的移动)。 这是一个好的方法吗? struct TreeElement{ SomeMoveType
我正在为用户名数据库实现字符串匹配算法。我的方法采用现有的用户名数据库和用户想要的新用户名,然后检查用户名是否已被占用。如果采用该方法,则该方法应该返回带有数据库中未采用的数字的用户名。 例子: “贾
我正在尝试实现 Breadth-first search algorithm , 为了找到两个顶点之间的最短距离。我开发了一个 Queue 对象来保存和检索对象,并且我有一个二维数组来保存两个给定顶点
我目前正在 ika 中开发我的 Python 游戏,它使用 python 2.5 我决定为 AI 使用 A* 寻路。然而,我发现它对我的需要来说太慢了(3-4 个敌人可能会落后于游戏,但我想供应 4-
我正在寻找 Kademlia 的开源实现C/C++ 中的分布式哈希表。它必须是轻量级和跨平台的(win/linux/mac)。 它必须能够将信息发布到 DHT 并检索它。 最佳答案 OpenDHT是
我在一本书中读到这一行:-“当我们要求 C++ 实现运行程序时,它会通过调用此函数来实现。” 而且我想知道“C++ 实现”是什么意思或具体是什么。帮忙!? 最佳答案 “C++ 实现”是指编译器加上链接
我正在尝试使用分支定界的 C++ 实现这个背包问题。此网站上有一个 Java 版本:Implementing branch and bound for knapsack 我试图让我的 C++ 版本打印
在很多情况下,我需要在 C# 中访问合适的哈希算法,从重写 GetHashCode 到对数据执行快速比较/查找。 我发现 FNV 哈希是一种非常简单/好/快速的哈希算法。但是,我从未见过 C# 实现的
目录 LRU缓存替换策略 核心思想 不适用场景 算法基本实现 算法优化
1. 绪论 在前面文章中提到 空间直角坐标系相互转换 ,测绘坐标转换时,一般涉及到的情况是:两个直角坐标系的小角度转换。这个就是我们经常在测绘数据处理中,WGS-84坐标系、54北京坐标系
在软件开发过程中,有时候我们需要定时地检查数据库中的数据,并在发现新增数据时触发一个动作。为了实现这个需求,我们在 .Net 7 下进行一次简单的演示. PeriodicTimer .
二分查找 二分查找算法,说白了就是在有序的数组里面给予一个存在数组里面的值key,然后将其先和数组中间的比较,如果key大于中间值,进行下一次mid后面的比较,直到找到相等的,就可以得到它的位置。
我是一名优秀的程序员,十分优秀!