- ubuntu12.04环境下使用kvm ioctl接口实现最简单的虚拟机
- Ubuntu 通过无线网络安装Ubuntu Server启动系统后连接无线网络的方法
- 在Ubuntu上搭建网桥的方法
- ubuntu 虚拟机上网方式及相关配置详解
CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.
这篇CFSDN的博客文章详解Mysql如何实现数据同步到Elasticsearch由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.
基于mysql的binlog日志订阅:binlog日志是mysql用来记录数据实时的变化 。
mysql数据同步到es中分为两种,分别是全量同步和增量同步 。
全量同步表示第一次建立好es索引之后,将mysql中所有数据一次性导入到es中 。
增量同步表示mysql中产生新的数据,这些新的数据包括三种情况,就是新插入mysql中的数据,更新老的数据,删除的数据,这些数据的变动与新增都要同步到es中 。
logstash官方插件,集成在logstash中,下载logstash即可,通过配置文件实现mysql与elasticsearch数据同步 。
优点 。
缺点 。
go-mysql-elasticsearch 是国内作者开发的一款插件 。
优点 。
缺点 。
elasticsearch-jdbc 目前最新的版本是2.3.4,支持的elasticsearch的版本为2.3.4, 未实践 。
优点 。
缺点 。
第一步安装:
logstash5.x之后,集成了logstash-input-jdbc插件。安装logstash后通过命令安装logstash-input-jdbc插件 。
1
2
|
cd /logstash-6.4.2/bin
./logstash-plugin install logstash-input-jdbc
|
第二步配置:
在logstash-6.4.2/config文件夹下新建jdbc.conf,配置如下 。
在logstash-6.4.2/config 目录下新建jdbc.sql文件 。
1
|
select
*
from
t_employee
|
第三步运行 。
1
2
3
4
5
|
cd logstash-6.4.2
# 检查配置文件语法是否正确
bin/logstash -f config/jdbc.conf
--config.test_and_exit
# 启动
bin/logstash -f config/jdbc.conf
--config.reload.automatic
|
--config.reload.automatic:会自动重新加载配置文件内容 。
在kibana中创建索引后查看同步数据 。
1
2
|
put octopus
get octopus/_search
|
第一步:mysql binlog日志 。
go-mysql-elasticsearch通过mysql中binlog日志实现数据增加,删除,修改同步elasticsearch 。
mysql的binlog日志主要用于数据库的主从复制与数据恢复。binlog中记录了数据的增删改查操作,主从复制过程中,主库向从库同步binlog日志,从库对binlog日志中的事件进行重放,从而实现主从同步.
mysql binlog日志有三种模式,分别为:
1
2
3
|
row: 记录每一行数据被修改的情况,但是日志量太大
statement: 记录每一条修改数据的sql语句,减少了日志量,但是sql语句使用函数或触发器时容易出现主从不一致
mixed: 结合了row和statement的优点,根据具体执行数据操作的sql语句选择使用row或者statement记录日志
|
要通过mysql binlog将数据同步到es集群,只能使用row模式,因为只有row模式才能知道mysql中的数据的修改内容.
以update操作为例,row模式的binlog日志内容示例如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
set
timestamp
=1527917394/*!*/;
begin
/*!*/;
#
at
3751
#180602 13:29:54 server id 1 end_log_pos 3819 crc32 0x8dabdf01 table_map: `webservice`.`building` mapped
to
number 74
#
at
3819
#180602 13:29:54 server id 1 end_log_pos 3949 crc32 0x59a8ed85 update_rows:
table
id 74 flags: stmt_end_f
binlog
'
uisswxmbaaaaraaaaosoaaaaaeoaaaaaaaeacndlynnlcnzpy2uacgj1awxkaw5naayidwepereg
wacaaqaaaahfq40=
uisswx8baaaaggaaag0paaaaaeoaaaaaaaeaagag///a1gcaaaaaaaalynvpbgrpbmctmtaadwb3
ukrnbjnlylv5d1k3ajvbd64www+ufsdwbwaaaaaaaatidwlszgluzy0xmaepahdsre1um0tivxl3
wtdqnvsprhzbd64whe2owq==
'
/*!*/;
###
update
`webservice`.`building`
###
where
### @1=2006 /* longint meta=0 nullable=0 is_null=0 */
### @2=
'building-10'
/* varstring(192) meta=192 nullable=0 is_null=0 */
### @3=0 /* tinyint meta=0 nullable=0 is_null=0 */
### @4=
'wrdmn3kbuywy7j5'
/* varstring(384) meta=384 nullable=0 is_null=0 */
### @5=1527754262 /*
timestamp
(0) meta=0 nullable=0 is_null=0 */
### @6=1527754262 /*
timestamp
(0) meta=0 nullable=0 is_null=0 */
###
set
### @1=2006 /* longint meta=0 nullable=0 is_null=0 */
### @2=
'building-10'
/* varstring(192) meta=192 nullable=0 is_null=0 */
### @3=1 /* tinyint meta=0 nullable=0 is_null=0 */
### @4=
'wrdmn3kbuywy7j5'
/* varstring(384) meta=384 nullable=0 is_null=0 */
### @5=1527754262 /*
timestamp
(0) meta=0 nullable=0 is_null=0 */
### @6=1527754262 /*
timestamp
(0) meta=0 nullable=0 is_null=0 */
#
at
3949
#180602 13:29:54 server id 1 end_log_pos 3980 crc32 0x58226b8f xid = 182
commit
/*!*/;
|
statement模式下binlog日志内容示例为:
1
2
3
4
5
6
|
set
timestamp
=1527919329/*!*/;
update
building
set
status=1
where
id=2000
/*!*/;
#
at
688
#180602 14:02:09 server id 1 end_log_pos 719 crc32 0x4c550a7d xid = 200
commit
/*!*/;
|
从row模式和statement模式下update操作的日志内容可以看出,row模式完整地记录了要修改的某行数据更新前的所有字段的值以及更改后所有字段的值,而statement模式只单单记录了update操作的sql语句。我们要将mysql的数据实时同步到es, 只能选择row模式的binlog, 获取并解析binlog日志的数据内容,执行es document api,将数据同步到es集群中.
查看,修改binlog模式 。
1
2
3
4
5
6
7
8
9
10
11
|
# 查看binlog模式
mysql> show variables
like
"%binlog_format%"
;
# 修改binlog模式
mysql>
set
global
binlog_format=
'row'
;
# 查看binlog是否开启
mysql> show variables
like
'log_bin'
;
# 开启bīnlog
修改my.cnf文件log-bin = mysql-bin
|
第二步安装 。
1
2
3
4
5
6
7
8
9
10
11
12
|
# 安装go
sudo apt-get install go
# 安装godep
go get github.com/tools/godep
# 获取go-mysql-elasticsearch插件
go get github.com/siddontang/go-mysql-elasticsearch
# 安装go-mysql-elasticsearch插件
cd go/src/github.com/siddontang/go-mysql-elasticsearch
make
|
第三步配置 。
go/src/github.com/siddontang/go-mysql-elasticsearch/etc/river.toml 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
# mysql address,
user
and
password
#
user
must have replication privilege
in
mysql.
my_addr =
"127.0.0.1:3306"
# 需要同步的mysql基本设置
my_user =
"root"
my_pass =
"root"
# elasticsearch address
es_addr =
"127.0.0.1:9200"
# 本地elasticsearch配置
# path
to
store data,
like
master.info,
and
dump mysql data
data_dir =
"./var"
# 数据存储的url
# 以下配置保存默认不变
#
inner
http status address
stat_addr =
"127.0.0.1:12800"
# pseudo server id
like
a slave
server_id = 1001
# mysql
or
mariadb
flavor =
"mysql"
# mysqldump execution path
mysqldump =
"mysqldump"
# mysql data source
[[source]]
schema
=
"test"
//elasticsearch 与 mysql 同步时对应的数据库名称
#
only
below tables will be synced
into
elasticsearch.
# 要同步test这个
database
里面的几张表。对于一些项目如果使用了分表机制,我们可以用通配符来匹配,譬如t_[0-9]{4},就可# 以匹配
table
t_0000 到 t_9999。
tables = [
"t"
,
"t_[0-9]{4}"
,
"tfield"
,
"tfilter"
]
# below
is
for
special
rule
mapping
# 对一个
table
,我们需要指定将它的数据同步到 es 的哪一个
index
的 type 里面。如果不指定,我们默认会用起
schema
#
name
作为 es 的
index
和 type
[[
rule
]]
schema
=
"test"
//数据库名称
table
=
"t"
//表名称
index
=
"test"
//对应的索引名称
type =
"t"
//对应的类型名称
# 将所有满足格式 t_[0-9]{4} 的
table
同步到 es 的
index
为 test,type 为 t 的下面。当然,这些表需要保证
#
schema
是一致的
[[
rule
]]
schema
=
"test"
table
=
"t_[0-9]{4}"
index
=
"test"
type =
"t"
# 对于
table
tfilter,我们只会同步 id 和
name
这两列,其他的都不会同步
filter = [
"id"
,
"name"
]
#
table
tfield 的
column
id ,我们映射成了 es_id,而 tags 则映射成了 es_tags
# list 这个字段,他显示的告知需要将对应的
column
数据转成 es 的 array type。这个现在通常用于 mysql 的
varchar
# 等类型,我们可能会存放类似 “a,b,c” 这样的数据,然后希望同步给 es 的时候变成 [a, b, c] 这样的列表形式。
[
rule
.field]
# map
column
`id`
to
es field `es_id`
id=
"es_id"
# map
column
`tags`
to
es field `es_tags`
with
array type
tags=
"es_tags,list"
# map
column
`keywords`
to
es
with
array type
keywords=
",list"
|
第四步运行 。
1
2
|
cd go/src/github.com/siddontang/go-mysql-elasticsearch
bin/go-mysql-elasticsearch -config=./etc/river.toml
|
下载工具 。
解压:unzip elasticsearch-jdbc-2.3.2.0-dist.zip 。
设置环境变量 。
1
2
|
[root@autofelix /]# vi /etc/profile
export jdbc_importer_home=/elasticsearch-jdbc-2.3.2.0
|
使环境变量生效 。
1
|
[root@autofelix /]# source /etc/profile
|
配置参考 。
第一步:在根目录下建立根目录下新建文件夹odbc_es 如下 。
1
2
3
|
[root@autofelix /]# ll /odbc_es/
drwxr-xr-x 2 root root 4096 jun 16 03:11 logs
-rwxrwxrwx 1 root root 542 jun 16 04:03 mysql_import_es.sh
|
第二步:新建脚本mysql_import_es.sh,内容如下 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
[root@autofelix odbc_es]# cat mysql_import_es.sh
'#!/bin/sh
bin=$jdbc_importer_home/bin
lib=$jdbc_importer_home/lib
echo '
{
"type"
:
"jdbc"
,
"jdbc"
: {
"elasticsearch.autodiscover"
:
true
,
"elasticsearch.cluster"
:
"my-application"
, #簇名,详见:/usr/
local
/elasticsearch/config/elasticsearch.yml
"url"
:
"jdbc:mysql://10.8.5.101:3306/test"
, #mysql数据库地址
"user"
:
"root"
, #mysql用户名
"password"
:
"123456"
, #mysql密码
"sql"
:
"select * from cc"
,
"elasticsearch"
: {
"host"
:
"10.8.5.101"
,
"port"
: 9300
},
"index"
:
"myindex"
, #新的
index
"type"
:
"mytype"
#新的type
}
}'| java \
-cp
"${lib}/*"
\
-dlog4j.configurationfile=${bin}/log4j2.xml \
org.xbib.tools.runner \
org.xbib.tools.jdbcimporter
|
第三步:为 mysql_import_es.sh 添加可执行权限.
1
|
[root@autofelix odbc_es]# chmod a+x mysql_import_es.sh
|
第四步:执行脚本mysql_import_es.sh 。
1
|
[root@autofelix odbc_es]# ./mysql_import_es.sh
|
到此这篇关于详解mysql如何实现数据同步到elasticsearch的文章就介绍到这了,更多相关mysq数据同步到elasticsearch内容请搜索我以前的文章或继续浏览下面的相关文章希望大家以后多多支持我! 。
原文链接:https://blog.csdn.net/weixin_41635750/article/details/121445723 。
最后此篇关于详解Mysql如何实现数据同步到Elasticsearch的文章就讲到这里了,如果你想了解更多关于详解Mysql如何实现数据同步到Elasticsearch的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我正在实现 IMAP 客户端,但 IMAP 邮箱同步出现问题。 首先,可以从 IMAP 服务器获取新邮件,但我不知道如何从邮箱中查找已删除的邮件。 我是否应该从服务器获取所有消息并将其与本地数据进行比
我研究线程同步。当我有这个例子时: class A { public synchronized void methodA(){ } public synchronized void met
嗨,我做了一个扩展线程的东西,它添加了一个包含 IP 的对象。然后我创建了该线程的两个实例并启动它们。他们使用相同的列表。 我现在想使用 Synchronized 来阻止并发更新问题。但它不起作用,我
我正在尝试使用 FTP 定期将小数据文件从程序上传到服务器。用户从使用 javascript XMLHttpRequest 函数读取数据的网页访问数据。这一切似乎都有效,但我正在努力解决由 FTP 和
我不知道如何同步下一个代码: javascript: (function() { var s2 = document.createElement('script'); s2.src =
关闭。这个问题需要更多focused .它目前不接受答案。 想改进这个问题吗? 更新问题,使其只关注一个问题 editing this post . 关闭 7 年前。 Improve this qu
一 点睛 1 Message 在基于 Message 的系统中,每一个 Event 也可以被称为 Message,Message 是对 Event 更高一个层级的抽象,每一个 Message 都有一个
一 点睛 1 Message 在基于 Message 的系统中,每一个 Event 也可以被称为 Message,Message 是对 Event 更高一个层级的抽象,每一个 Message 都有一个
目标:我所追求的是每次在数据库中添加某些内容时(在 $.ajax 到 Submit_to_db.php 之后),从数据库获取数据并刷新 main.php(通过 draw_polygon 更明显)。 所
我有一个重复动画,需要与其他一些 transient 动画同步。重复动画是一条在屏幕上移动 4 秒的扫描线。当它经过下面的图像时,这些图像需要“闪烁”。 闪烁的图像可以根据用户的意愿来来去去和移动。它
我有 b 个块,每个块有 t 个线程。 我可以用 __syncthreads() 同步特定块中的线程。例如 __global__ void aFunction() { for(i=0;i #
我正在使用azure表查询来检索分配给用户的所有错误实体。 此外,我更改了实体的属性以声明该实体处于处理模式。 处理完实体后,我将从表中删除该实体。 当我进行并行测试时,可能会发生查询期间,一个实体已
我想知道 SQLite 是如何实现它的。它基于文件锁定吗?当然,并不是每个访问它的用户都锁定了整个数据库;那效率极低。它是基于多个文件还是仅基于一个大文件? 如果有人能够简要概述一下 sqlite 中
我想post到php,当id EmpAgree1时,然后它的post变量EmpAgree=1;当id为EmpAgree2时,则后置变量EmpAgree=2等。但只是读取i的最后一个值,为什么?以及如何
CUBLAS 文档提到我们在读取标量结果之前需要同步: “此外,少数返回标量结果的函数,例如 amax()、amin、asum()、rotg()、rotmg()、dot() 和 nrm2(),通过引用
我知道下面的代码中缺少一些内容,我的问题是关于 RemoteImplementation 中的同步机制。我还了解到该网站和其他网站上有几个关于 RMI 和同步的问题;我在这里寻找明确的确认/矛盾。 我
我不太确定如何解决这个问题......所以我可能需要几次尝试才能正确回答这个问题。我有一个用于缓存方法结果的注释。我的代码目前是一个私有(private)分支,但我正在处理的部分从这里开始: http
我对 Java 非常失望,因为它不允许以下代码尽可能地并发移动。当没有同步时,两个线程会更频繁地切换,但是当尝试访问同步方法时,在第二个线程获得锁之前以及在第一个线程获得锁之前再次花费太长时间(比如
过去几周我一直在研究java多线程。我了解了synchronized,并理解synchronized避免了多个线程同时访问相同的属性。我编写此代码是为了在同一线程中运行两个线程。 val gate =
我有一个关于 Java 同步的简单问题。 请假设以下代码: public class Test { private String address; private int age;
我是一名优秀的程序员,十分优秀!