gpt4 book ai didi

python实现MySQL指定表增量同步数据到clickhouse的脚本

转载 作者:qq735679552 更新时间:2022-09-29 22:32:09 26 4
gpt4 key购买 nike

CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.

这篇CFSDN的博客文章python实现MySQL指定表增量同步数据到clickhouse的脚本由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.

python实现MySQL指定表增量同步数据到clickhouse,脚本如下

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#!/usr/bin/env python3
# _*_ coding:utf8 _*_
 
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (DeleteRowsEvent,UpdateRowsEvent,WriteRowsEvent,)
import clickhouse_driver
import configparser
import os
 
configfile = 'repl.ini'
########## 配置文件repl.ini 操作 ##################
def create_configfile(configfile,log_file,log_pos):
   config = configparser.ConfigParser()
 
   if not os.path.exists(configfile):
     config[ 'replinfo' ] = { 'log_file' :log_file, 'log_pos' : str (log_pos)}
 
     with open (configfile, 'w+' ) as f:
       config.write(f)
 
### repl.ini 写操作 ##################
def write_config(configfile,log_file,log_pos):
   config = configparser.ConfigParser()
   config.read(configfile)
 
   config. set ( 'replinfo' , 'log_file' ,log_file)
   config. set ( 'replinfo' , 'log_pos' , str (log_pos))
 
   if os.path.exists(configfile):
     with open (configfile, 'w+' ) as f:
       config.write(f)
   else :
     create_configfile(configfile)
 
### 配置文件repl.ini 读操作 ##################
def read_config(configfile):
   config = configparser.ConfigParser()
   config.read(configfile)
   # print(config['replinfo']['log_file'])
   # print(config['replinfo']['log_pos'])
   return (config[ 'replinfo' ][ 'log_file' ], int (config[ 'replinfo' ][ 'log_pos' ]))
 
############# clickhouse 操作 ##################
def ops_clickhouse(db,table,sql):
   column_type_dic = {}
   try :
     client = clickhouse_driver.Client(host = '127.0.0.1' ,\
                      port = 9000 ,\
                      user = 'default' ,\
                      password = 'clickhouse' )
     # sql="select name,type from system.columns where database='{0}' and table='{1}'".format(db,table)
     client.execute(sql)
 
   except Exception as error:
     message = "获取clickhouse里面的字段类型错误. %s" % (error)
     # logger.error(message)
     print (message)
     exit( 1 )
 
MYSQL_SETTINGS = { 'host' : '127.0.0.1' , 'port' : 13306 , 'user' : 'root' , 'passwd' : 'Root@0101' }
only_events = (DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent)
def main():
   ## 每次重启时,读取上次同步的log_file,log_pos
   (log_file,log_pos) = read_config(configfile)
   # print(log_file+'|'+ str(log_pos))
   print ( '-----------------------------------------------------------------------------' )
   stream = BinLogStreamReader(connection_settings = MYSQL_SETTINGS, resume_stream = True , blocking = True , \
                 server_id = 10 ,
                  only_tables = 't_repl' , only_schemas = 'test' , \
                 log_file = log_file,log_pos = log_pos, \
                 only_events = only_events, \
                 fail_on_table_metadata_unavailable = True , slave_heartbeat = 10 )
 
   try :
     for binlogevent in stream:
       for row in binlogevent.rows:
         ## delete操作
         if isinstance (binlogevent, DeleteRowsEvent):
           info = dict (row[ "values" ].items())
           # print("DELETE FROM `%s`.`%s` WHERE %s = %s ;" %(binlogevent.schema ,binlogevent.table,binlogevent.primary_key,info[binlogevent.primary_key]) )
           # print("ALTER TABLE `%s`.`%s` DELETE WHERE %s = %s ;" %(binlogevent.schema ,binlogevent.table,binlogevent.primary_key,info[binlogevent.primary_key]) )
           sql = "ALTER TABLE `%s`.`%s` DELETE WHERE %s = %s ;" % (binlogevent.schema ,binlogevent.table,binlogevent.primary_key,info[binlogevent.primary_key])
 
         ## update 操作
         elif isinstance (binlogevent, UpdateRowsEvent):
           info_before = dict (row[ "before_values" ].items())
           info_after = dict (row[ "after_values" ].items())
           # info_set = str(info_after).replace(":","=").replace("{","").replace("}","")
           info_set = str (info_after).replace( ":" , "=" ).replace( "{" , " ").replace(" } ", " ").replace(" ' "," ")
           # print("UPDATE `%s`.`%s` SET %s WHERE %s = %s ;"%(binlogevent.schema,binlogevent.table,info_set,binlogevent.primary_key,info_before[binlogevent.primary_key]  ) )
           # print("ALTER TABLE %s.%s UPDATE %s WHERE %s = %s ;"%(binlogevent.schema,binlogevent.table,info_set,binlogevent.primary_key,info_before[binlogevent.primary_key]  ) )
           sql = "ALTER TABLE %s.%s UPDATE %s WHERE %s = %s ;" % (binlogevent.schema,binlogevent.table,info_set,binlogevent.primary_key,info_before[binlogevent.primary_key]  )
 
         ## insert 操作
         elif isinstance (binlogevent, WriteRowsEvent):
           info = dict (row[ "values" ].items())
           # print("INSERT INTO %s.%s(%s)VALUES%s ;"%(binlogevent.schema,binlogevent.table , ','.join(info.keys()) ,str(tuple(info.values())) ) )
           sql = "INSERT INTO %s.%s(%s)VALUES%s ;" % (binlogevent.schema,binlogevent.table , ',' .join(info.keys()) , str ( tuple (info.values())) )
         ops_clickhouse( 'test' , 't_repl' ,sql )
 
         # 当前log_file,log_pos写入配置文件
         write_config(configfile, stream.log_file, stream.log_pos)
 
   except Exception as e:
     print (e)
   finally :
     stream.close()
 
if __name__ = = "__main__" :
   main()
 
 
 
'''
BinLogStreamReader()参数
ctl_connection_settings:集群保存模式信息的连接设置
resume_stream:从位置或binlog的最新事件或旧的可用事件开始
log_file:设置复制开始日志文件
log_pos:设置复制开始日志pos(resume_stream应该为true)
auto_position:使用master_auto_position gtid设置位置
blocking:在流上读取被阻止
only_events:允许的事件数组
ignored_events:被忽略的事件数组
only_tables:包含要观看的表的数组(仅适用于binlog_format ROW)
ignored_tables:包含要跳过的表的数组
only_schemas:包含要观看的模式的数组
ignored_schemas:包含要跳过的模式的数组
freeze_schema:如果为true,则不支持ALTER TABLE。速度更快。
skip_to_timestamp:在达到指定的时间戳之前忽略所有事件。
report_slave:在SHOW SLAVE HOSTS中报告奴隶。
slave_uuid:在SHOW SLAVE HOSTS中报告slave_uuid。
fail_on_table_metadata_unavailable:如果我们无法获取有关row_events的表信息,应该引发异常
slave_heartbeat:(秒)主站应主动发送心跳连接。这也减少了复制恢复时GTID复制的流量(在许多事件在binlog中跳过的情况下)。请参阅mysql文档中的MASTER_HEARTBEAT_PERIOD以了解语义
'''

知识点扩展:

MySQL备份-增量同步 。

mysql增量同步主要使用binlog文件进行同步,binlog文件主要记录的是数据库更新操作相关的内容.

1. 备份数据的意义 。

针对不同业务,7*24小时提供服务和数据的重要性不同。 数据库数据是比较核心的数据,对企业的经营至关重要,数据库备份显得尤为重要.

2. 备份数据库 。

MySQL数据库自带的备份命令 `mysqldump`,基本使用方法: 语法:`mysqldump -u username -p password dbname > filename.sql` 。

执行备份命令 。

`mysqldump -uroot -pmysqladmin db_test > /opt/mysql_bak.sql` 。

查看备份内容 。

`grep -v "#|\*|--|^$" /opt/mysql_bak.sql` 。

到此这篇关于python实现MySQL指定表增量同步数据到clickhouse的脚本的文章就介绍到这了,更多相关python实现MySQL增量同步数据内容请搜索我以前的文章或继续浏览下面的相关文章希望大家以后多多支持我! 。

原文链接:https://blog.csdn.net/u010719917/article/details/114085351 。

最后此篇关于python实现MySQL指定表增量同步数据到clickhouse的脚本的文章就讲到这里了,如果你想了解更多关于python实现MySQL指定表增量同步数据到clickhouse的脚本的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com