- ubuntu12.04环境下使用kvm ioctl接口实现最简单的虚拟机
- Ubuntu 通过无线网络安装Ubuntu Server启动系统后连接无线网络的方法
- 在Ubuntu上搭建网桥的方法
- ubuntu 虚拟机上网方式及相关配置详解
CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.
这篇CFSDN的博客文章python实现MySQL指定表增量同步数据到clickhouse的脚本由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.
python实现MySQL指定表增量同步数据到clickhouse,脚本如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
|
#!/usr/bin/env python3
# _*_ coding:utf8 _*_
from
pymysqlreplication
import
BinLogStreamReader
from
pymysqlreplication.row_event
import
(DeleteRowsEvent,UpdateRowsEvent,WriteRowsEvent,)
import
clickhouse_driver
import
configparser
import
os
configfile
=
'repl.ini'
########## 配置文件repl.ini 操作 ##################
def
create_configfile(configfile,log_file,log_pos):
config
=
configparser.ConfigParser()
if
not
os.path.exists(configfile):
config[
'replinfo'
]
=
{
'log_file'
:log_file,
'log_pos'
:
str
(log_pos)}
with
open
(configfile,
'w+'
) as f:
config.write(f)
### repl.ini 写操作 ##################
def
write_config(configfile,log_file,log_pos):
config
=
configparser.ConfigParser()
config.read(configfile)
config.
set
(
'replinfo'
,
'log_file'
,log_file)
config.
set
(
'replinfo'
,
'log_pos'
,
str
(log_pos))
if
os.path.exists(configfile):
with
open
(configfile,
'w+'
) as f:
config.write(f)
else
:
create_configfile(configfile)
### 配置文件repl.ini 读操作 ##################
def
read_config(configfile):
config
=
configparser.ConfigParser()
config.read(configfile)
# print(config['replinfo']['log_file'])
# print(config['replinfo']['log_pos'])
return
(config[
'replinfo'
][
'log_file'
],
int
(config[
'replinfo'
][
'log_pos'
]))
############# clickhouse 操作 ##################
def
ops_clickhouse(db,table,sql):
column_type_dic
=
{}
try
:
client
=
clickhouse_driver.Client(host
=
'127.0.0.1'
,\
port
=
9000
,\
user
=
'default'
,\
password
=
'clickhouse'
)
# sql="select name,type from system.columns where database='{0}' and table='{1}'".format(db,table)
client.execute(sql)
except
Exception as error:
message
=
"获取clickhouse里面的字段类型错误. %s"
%
(error)
# logger.error(message)
print
(message)
exit(
1
)
MYSQL_SETTINGS
=
{
'host'
:
'127.0.0.1'
,
'port'
:
13306
,
'user'
:
'root'
,
'passwd'
:
'Root@0101'
}
only_events
=
(DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent)
def
main():
## 每次重启时,读取上次同步的log_file,log_pos
(log_file,log_pos)
=
read_config(configfile)
# print(log_file+'|'+ str(log_pos))
print
(
'-----------------------------------------------------------------------------'
)
stream
=
BinLogStreamReader(connection_settings
=
MYSQL_SETTINGS, resume_stream
=
True
, blocking
=
True
, \
server_id
=
10
,
only_tables
=
't_repl'
, only_schemas
=
'test'
, \
log_file
=
log_file,log_pos
=
log_pos, \
only_events
=
only_events, \
fail_on_table_metadata_unavailable
=
True
, slave_heartbeat
=
10
)
try
:
for
binlogevent
in
stream:
for
row
in
binlogevent.rows:
## delete操作
if
isinstance
(binlogevent, DeleteRowsEvent):
info
=
dict
(row[
"values"
].items())
# print("DELETE FROM `%s`.`%s` WHERE %s = %s ;" %(binlogevent.schema ,binlogevent.table,binlogevent.primary_key,info[binlogevent.primary_key]) )
# print("ALTER TABLE `%s`.`%s` DELETE WHERE %s = %s ;" %(binlogevent.schema ,binlogevent.table,binlogevent.primary_key,info[binlogevent.primary_key]) )
sql
=
"ALTER TABLE `%s`.`%s` DELETE WHERE %s = %s ;"
%
(binlogevent.schema ,binlogevent.table,binlogevent.primary_key,info[binlogevent.primary_key])
## update 操作
elif
isinstance
(binlogevent, UpdateRowsEvent):
info_before
=
dict
(row[
"before_values"
].items())
info_after
=
dict
(row[
"after_values"
].items())
# info_set = str(info_after).replace(":","=").replace("{","").replace("}","")
info_set
=
str
(info_after).replace(
":"
,
"="
).replace(
"{"
, "
").replace("
}
", "
").replace("
'
","
")
# print("UPDATE `%s`.`%s` SET %s WHERE %s = %s ;"%(binlogevent.schema,binlogevent.table,info_set,binlogevent.primary_key,info_before[binlogevent.primary_key] ) )
# print("ALTER TABLE %s.%s UPDATE %s WHERE %s = %s ;"%(binlogevent.schema,binlogevent.table,info_set,binlogevent.primary_key,info_before[binlogevent.primary_key] ) )
sql
=
"ALTER TABLE %s.%s UPDATE %s WHERE %s = %s ;"
%
(binlogevent.schema,binlogevent.table,info_set,binlogevent.primary_key,info_before[binlogevent.primary_key] )
## insert 操作
elif
isinstance
(binlogevent, WriteRowsEvent):
info
=
dict
(row[
"values"
].items())
# print("INSERT INTO %s.%s(%s)VALUES%s ;"%(binlogevent.schema,binlogevent.table , ','.join(info.keys()) ,str(tuple(info.values())) ) )
sql
=
"INSERT INTO %s.%s(%s)VALUES%s ;"
%
(binlogevent.schema,binlogevent.table ,
','
.join(info.keys()) ,
str
(
tuple
(info.values())) )
ops_clickhouse(
'test'
,
't_repl'
,sql )
# 当前log_file,log_pos写入配置文件
write_config(configfile, stream.log_file, stream.log_pos)
except
Exception as e:
print
(e)
finally
:
stream.close()
if
__name__
=
=
"__main__"
:
main()
'''
BinLogStreamReader()参数
ctl_connection_settings:集群保存模式信息的连接设置
resume_stream:从位置或binlog的最新事件或旧的可用事件开始
log_file:设置复制开始日志文件
log_pos:设置复制开始日志pos(resume_stream应该为true)
auto_position:使用master_auto_position gtid设置位置
blocking:在流上读取被阻止
only_events:允许的事件数组
ignored_events:被忽略的事件数组
only_tables:包含要观看的表的数组(仅适用于binlog_format ROW)
ignored_tables:包含要跳过的表的数组
only_schemas:包含要观看的模式的数组
ignored_schemas:包含要跳过的模式的数组
freeze_schema:如果为true,则不支持ALTER TABLE。速度更快。
skip_to_timestamp:在达到指定的时间戳之前忽略所有事件。
report_slave:在SHOW SLAVE HOSTS中报告奴隶。
slave_uuid:在SHOW SLAVE HOSTS中报告slave_uuid。
fail_on_table_metadata_unavailable:如果我们无法获取有关row_events的表信息,应该引发异常
slave_heartbeat:(秒)主站应主动发送心跳连接。这也减少了复制恢复时GTID复制的流量(在许多事件在binlog中跳过的情况下)。请参阅mysql文档中的MASTER_HEARTBEAT_PERIOD以了解语义
'''
|
知识点扩展:
MySQL备份-增量同步 。
mysql增量同步主要使用binlog文件进行同步,binlog文件主要记录的是数据库更新操作相关的内容.
1. 备份数据的意义 。
针对不同业务,7*24小时提供服务和数据的重要性不同。 数据库数据是比较核心的数据,对企业的经营至关重要,数据库备份显得尤为重要.
2. 备份数据库 。
MySQL数据库自带的备份命令 `mysqldump`,基本使用方法: 语法:`mysqldump -u username -p password dbname > filename.sql` 。
执行备份命令 。
`mysqldump -uroot -pmysqladmin db_test > /opt/mysql_bak.sql` 。
查看备份内容 。
`grep -v "#|\*|--|^$" /opt/mysql_bak.sql` 。
到此这篇关于python实现MySQL指定表增量同步数据到clickhouse的脚本的文章就介绍到这了,更多相关python实现MySQL增量同步数据内容请搜索我以前的文章或继续浏览下面的相关文章希望大家以后多多支持我! 。
原文链接:https://blog.csdn.net/u010719917/article/details/114085351 。
最后此篇关于python实现MySQL指定表增量同步数据到clickhouse的脚本的文章就讲到这里了,如果你想了解更多关于python实现MySQL指定表增量同步数据到clickhouse的脚本的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
目前我正在构建相当大的网络系统,我需要强大的 SQL 数据库解决方案。我选择 Mysql 而不是 Postgres,因为一些任务需要只读(MyISAM 引擎)而其他任务需要大量写入(InnoDB)。
我在 mysql 中使用如下命令。当它显示表格数据时,它被格式化为一个非常干净的表格,间距均匀且 |作为列分隔符。 SELECT * FROM TABLE_NAME; 当我从 CLI 运行命令时,如下
我知道这个问题之前已经被问过好几次了,我已经解决了很多问题,但到目前为止没有任何效果。 MySQL 试图将自身安装到的目录 (usr/local/mysql) 肯定有问题。关于我的错误的奇怪之处在于我
以下是我的 SQL 数据结构,我正在尝试如下两个查询: Select Wrk_ID, Wrk_LastName, Skill_Desc from Worker, Skill where
我们有一个本地 mysql 服务器(不在公共(public)域上),并希望将该服务器复制到我们拥有的 google 云 sql 实例。我的问题是:1.这可能吗?2.我们的本地服务器只能在本地网络上访问
我有一个表(test_table),其中一些字段值(例如字段 A、B 和 C)是从外部应用程序插入的,还有一个字段(字段 D),我想从现有表(store_table)插入其值,但在插入前者(A、B 和
我想创建一个 AWS RDS 实例,然后使用 terraform 管理数据库用户。因此,首先,我创建了一个 RDS 实例,然后使用创建的 RDS 实例初始化 mysql 提供程序,以进一步将其用于用户
当用户在我的网站上注册时,他们会在我的一个数据库中创建自己的表格。该表存储用户发布的所有帖子。我还想做的是也为他们生成自己的 MySql 用户——该用户仅有权从他们的表中读取、写入和删除。 创建它应该
我有一个关于 ColdFusion 和 Mysql 的问题。我有两个表:PRODUCT 和 PRODUCT_CAT。我想列出包含一些标记为:IS_EXTRANET=1 的特殊产品的类别。所以我写了这个
我想获取 recipes_id 列的值,以获取包含 ingredient_id 的 2,17 和 26 条目的值。 假设 ingredient_id 2 丢失则不获取记录。 我已经尝试过 IN 运算符
在 Ubuntu 中,我通常安装两者,但 MySQL 的客户端和服务器之间有什么区别。 作为奖励,当一个新语句提到它需要 MySQL 5.x 时,它是指客户端、服务器还是两者兼而有之。例如这个链接ht
我重新访问了我的数据库并注意到我有一些 INT 类型的主键。 这还不够独特,所以我想我会有一个指导。 我来自微软 sql 背景,在 ssms 中你可以 选择类型为“uniqeidentifier”并自
我的系统上有 MySQL,我正在尝试确定它是 Oracle MySQL 还是 MySQL。 Oracle MySQL 有区别吗: http://www.oracle.com/us/products/m
我是在生产 MySQL 中运行的应用程序的新维护者。之前的维护者已经离开,留下的文档很少,而且联系不上了。 我面临的问题是执行以下请求大约需要 10 秒: SELECT COUNT(*) FROM `
我有两个位于不同机器上的 MySQL 数据库。我想自动将数据从一台服务器传输到另一台服务器。比方说,我希望每天早上 4:00 进行数据传输。 可以吗?是否有任何 MySQL 内置功能可以让我们做到这一
有什么方法可以使用 jdbc 查询位于 mysql 根目录之外的目录中的 mysql 表,还是必须将它们移动到 mysql 根目录内的数据库文件夹中?我在 Google 上搜索时没有找到任何东西。 最
我在 mysql 数据库中有两个表。成员和 ClassNumbers。两个表都有一个付费年份字段,都有一个代码字段。我想用代码数字表中的值更新成员表中的付费年份,其中成员中的代码与 ClassNumb
情况:我有 2 台服务器,其中一台当前托管一个实时 WordPress 站点,我希望能够将该站点转移到另一台服务器,以防第一台服务器出现故障。传输源文件很容易;传输数据库是我需要弄清楚如何做的。两台服
Phpmyadmin 有一个功能是“复制数据库到”..有没有mysql查询来写这个函数?类似于将 db A 复制到新的 db B。 最佳答案 首先创建复制数据库: CREATE DATABASE du
我有一个使用 mySQL 作为后端的库存软件。我已经在我的计算机上对其进行了测试,并且运行良好。 当我在计算机上安装我的软件时,我必须执行以下步骤: 安装 mySQL 服务器 将用户名指定为“root
我是一名优秀的程序员,十分优秀!