- VisualStudio2022插件的安装及使用-编程手把手系列文章
- pprof-在现网场景怎么用
- C#实现的下拉多选框,下拉多选树,多级节点
- 【学习笔记】基础数据结构:猫树
最近在开发的过程中遇到这么一个问题,当产生某种类型的工单后,需要实时通知到另外的系统,由另外的系统进行数据的研判操作。 由于某种原因, 像向消息队列中推送工单消息、或直接调用另外系统的接口、或者部署Cannal 等都不可行,因此此处使用 mysql-binlog-connector-java 这个库来完成数据库binlog的监听,从而通知到另外的系统.
mysql-binlog-connector-java是一个Java库,通过它可以实现mysql binlog日志的监听和解析操作。它提供了一系列可靠的方法,使开发者通过监听数据库的binlog日志,来实时获取数据库的变更信息,比如:数据的插入、更新、删除等操作.
github地址 https://github.com/osheroff/mysql-binlog-connector-java 。
mysql> show variables like '%log_bin%';
+---------------------------------+------------------------------------+
| Variable_name | Value |
+---------------------------------+------------------------------------+
| log_bin | ON |
| log_bin_basename | /usr/local/mysql/data/binlog |
| log_bin_index | /usr/local/mysql/data/binlog.index |
| log_bin_trust_function_creators | OFF |
| log_bin_use_v1_row_events | OFF |
| sql_log_bin | ON |
+---------------------------------+------------------------------------+
log_bin 的值为 ON 时,表示开启了binlog 。
# 修改 my.cnf 配置文件
[mysqld]
#binlog日志的基本文件名,需要注意的是启动mysql的用户需要对这个目录(/usr/local/var/mysql/binlog)有写入的权限
log_bin=/usr/local/var/mysql/binlog/mysql-bin
# 配置binlog日志的格式
binlog_format = ROW
# 配置 MySQL replaction 需要定义,不能和已有的slaveId 重复
server-id=1
CREATE USER binlog_user IDENTIFIED BY 'binlog#Replication2024!';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'binlog_user'@'%';
FLUSH PRIVILEGES;
注意:不同的mysql版本事件类型可能不同,我们本地是mysql8 。
TABLE_MAP: 在表的 insert、update、delete 前的事件,用于记录操作的数据库名和表名。
EXT_WRITE_ROWS: 插入数据事件类型,即 insert 类型
EXT_UPDATE_ROWS: 插入数据事件类型,即 update 类型
EXT_DELETE_ROWS: 插入数据事件类型,即 delete 类型
ROTATE: 当mysqld切换到新的二进制日志文件时写入。当发出一个FLUSH LOGS 语句。或者当前二进制日志文件超过max_binlog_size。
一般情况下,当我们向数据库中执行insert、update或delete事件时,一般会先有一个TABLE_MAP事件发出,通过这个事件,我们就知道当前操作的是那个数据库和表。 但是如果我们操作的表上存在触发器时,那么可能顺序就会错乱,导致我们获取到错误的数据库名和表名.
此处以 EXT_UPDATE_ROWS 事件为列,当我们往数据库中update一条记录时,触发此事件,事件内容为
Event{header=EventHeaderV4{timestamp=1727498351000, eventType=EXT_UPDATE_ROWS, serverId=1, headerLength=19, dataLength=201, nextPosition=785678, flags=0}, data=UpdateRowsEventData{tableId=264, includedColumnsBeforeUpdate={0, 1, 2, 3, 4, 5, 6, 7}, includedColumns={0, 1, 2, 3, 4, 5, 6, 7}, rows=[
{before=[1, zhangsan, 张三-update, 0, [B@7b720427, [B@238552f, 1727524798000, 1727495998000], after=[1, zhangsan, 张三-update, 0, [B@21dae489, [B@2c0fff72, 1727527151000, 1727498351000]}
]}}
从上面的语句中可以看到includedColumnsBeforeUpdate和includedColumns这2个字段表示更新前的列名和更新后的列名,但是这个时候展示的数字,那么如果展示具体的列名呢? 可以通过information_schema.COLUMNS获取.
默认情况下,就是从最新的binlog位置开始监听.
BinaryLogClient client = new BinaryLogClient(hostname, port, username, password);
BinaryLogClient client = new BinaryLogClient(hostname, port, username, password);
// binlog的文件名
client.setBinlogFilename("");
// binlog的具体位置
client.setBinlogPosition(11);
这个指的是,当我们的 mysql-binlog-connector-java 程序宕机后,如果数据发生了binlog的变更,我们应该从程序上次宕机的位置的position进行监听,而不是程序重启后从最新的binlog position位置开始监听。默认情况下mysql-binlog-connector-java程序没有为我们实现,需要我们自己去实现。大概的实现思路为:
ROTATE
事件,可以获取到最新的binlog文件名和位置。CREATE TABLE `binlog_demo`
(
`id` int NOT NULL AUTO_INCREMENT COMMENT '主键',
`user_name` varchar(64) DEFAULT NULL COMMENT '用户名',
`nick_name` varchar(64) DEFAULT NULL COMMENT '昵称',
`sex` tinyint DEFAULT NULL COMMENT '性别 0-女 1-男 2-未知',
`address` text COMMENT '地址',
`ext_info` json DEFAULT NULL COMMENT '扩展信息',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
`update_time` timestamp NULL DEFAULT NULL COMMENT '修改时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uidx_username` (`user_name`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='测试binlog'
-- 0、删除数据
truncate table binlog_demo;
-- 1、添加数据
insert into binlog_demo(user_name, nick_name, sex, address, ext_info, create_time, update_time)
values ('zhangsan', '张三', 1, '地址', '[
"aaa",
"bbb"
]', now(), now());
-- 2、修改数据
update binlog_demo
set nick_name = '张三-update',
sex = 0,
address = '地址-update',
ext_info = '{
"ext_info": "扩展信息"
}',
create_time = now(),
update_time = now()
where user_name = 'zhangsan';
-- 3、删除数据
delete
from binlog_demo
where user_name = 'zhangsan';
通过mysql-binlog-connector-java库,当数据库中的表数据发生变更时,进行监听.
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 监听 mysql binlog -->
<dependency>
<groupId>com.zendesk</groupId>
<artifactId>mysql-binlog-connector-java</artifactId>
<version>0.29.2</version>
</dependency>
</dependencies>
package com.huan.binlog;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 初始化 binary log client
*
* @author huan.fu
* @date 2024/9/22 - 16:23
*/
@Component
public class BinaryLogClientInit {
private static final Logger log = LoggerFactory.getLogger(BinaryLogClientInit.class);
private BinaryLogClient client;
@PostConstruct
public void init() throws IOException, TimeoutException {
/**
* # 创建用户
* CREATE USER binlog_user IDENTIFIED BY 'binlog#Replication2024!';
* GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'binlog_user'@'%';
* FLUSH PRIVILEGES;
*/
String hostname = "127.0.0.1";
int port = 3306;
String username = "binlog_user";
String password = "binlog#Replication2024!";
// 创建 BinaryLogClient客户端
client = new BinaryLogClient(hostname, port, username, password);
// 这个 serviceId 不可重复
client.setServerId(12);
// 反序列化配置
EventDeserializer eventDeserializer = new EventDeserializer();
eventDeserializer.setCompatibilityMode(
// 将日期类型的数据反序列化成Long类型
EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG
);
client.setEventDeserializer(eventDeserializer);
client.registerEventListener(new BinaryLogClient.EventListener() {
@Override
public void onEvent(Event event) {
EventType eventType = event.getHeader().getEventType();
log.info("接收到事件类型: {}", eventType);
log.warn("接收到的完整事件: {}", event);
log.info("============================");
}
});
client.registerLifecycleListener(new BinaryLogClient.AbstractLifecycleListener() {
@Override
public void onConnect(BinaryLogClient client) {
log.info("客户端连接到 mysql 服务器 client: {}", client);
}
@Override
public void onCommunicationFailure(BinaryLogClient client, Exception ex) {
log.info("客户端和 mysql 服务器 通讯失败 client: {}", client);
}
@Override
public void onEventDeserializationFailure(BinaryLogClient client, Exception ex) {
log.info("客户端序列化失败 client: {}", client);
}
@Override
public void onDisconnect(BinaryLogClient client) {
log.info("客户端断开 mysql 服务器链接 client: {}", client);
}
});
// client.connect 在当前线程中进行解析binlog,会阻塞当前线程
// client.connect(xxx) 会新开启一个线程,然后在这个线程中解析binlog
client.connect(10000);
}
@PreDestroy
public void destroy() throws IOException {
client.disconnect();
}
}
从上图中可以看到,我们获取到了更新后的数据,但是具体更新了哪些列名这个我们是不清楚的.
此处以更新数据为例,大体的实现思路如下:
TABLE_MAP
事件,用于获取到 insert
、update
或delete
语句操作前的数据库
和表
。information_schema.COLUMNS
表获取 某个表在某个数据库中具体的列信息(比如:列名、列的数据类型等操作)。<!-- 操作数据库 -->
<dependency>
<groupId>commons-dbutils</groupId>
<artifactId>commons-dbutils</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.33</version>
</dependency>
成员变量
,database
和tableName
用于接收数据库和表名。/**
* 数据库
*/
private String database;
/**
* 表名
*/
private String tableName;
TABLE_MAP
事件,获取数据库和表名// 成员变量 - 数据库名
private String database;
// 成员变量 - 表名
private String tableName;
client.registerEventListener(new BinaryLogClient.EventListener() {
@Override
public void onEvent(Event event) {
EventType eventType = event.getHeader().getEventType();
log.info("接收到事件类型: {}", eventType);
log.info("============================");
if (event.getData() instanceof TableMapEventData) {
TableMapEventData eventData = (TableMapEventData) event.getData();
database = eventData.getDatabase();
tableName = eventData.getTable();
log.info("获取到的数据库名: {} 和 表名为: {}", database, tableName);
}
}
});
/**
* 数据库工具类
*
* @author huan.fu
* @date 2024/10/9 - 02:39
*/
public class DbUtils {
public static Map<String, String> retrieveTableColumnInfo(String database, String tableName) throws SQLException {
Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/temp_work", "binlog_user", "binlog#Replication2024!");
QueryRunner runner = new QueryRunner();
Map<String, String> columnInfoMap = runner.query(
connection,
"select a.COLUMN_NAME,a.ORDINAL_POSITION from information_schema.COLUMNS a where a.TABLE_SCHEMA = ? and a.TABLE_NAME = ?",
resultSet -> {
Map<String, String> result = new HashMap<>();
while (resultSet.next()) {
result.put(resultSet.getString("ORDINAL_POSITION"), resultSet.getString("COLUMN_NAME"));
}
return result;
},
database,
tableName
);
connection.close();
return columnInfoMap;
}
public static void main(String[] args) throws SQLException {
Map<String, String> stringObjectMap = DbUtils.retrieveTableColumnInfo("temp_work", "binlog_demo");
System.out.println(stringObjectMap);
}
}
client.registerEventListener(new BinaryLogClient.EventListener() {
@Override
public void onEvent(Event event) {
EventType eventType = event.getHeader().getEventType();
log.info("接收到事件类型: {}", eventType);
log.warn("接收到的完整事件: {}", event);
log.info("============================");
// 通过 TableMap 事件获取 数据库名和表名
if (event.getData() instanceof TableMapEventData) {
TableMapEventData eventData = (TableMapEventData) event.getData();
database = eventData.getDatabase();
tableName = eventData.getTable();
log.info("获取到的数据库名: {} 和 表名为: {}", database, tableName);
}
// 监听更新事件
if (event.getData() instanceof UpdateRowsEventData) {
try {
// 获取表的列信息
Map<String, String> columnInfo = DbUtils.retrieveTableColumnInfo(database, tableName);
// 获取更新后的数据
UpdateRowsEventData eventData = ((UpdateRowsEventData) event.getData());
// 可能更新多行数据
List<Map.Entry<Serializable[], Serializable[]>> rows = eventData.getRows();
for (Map.Entry<Serializable[], Serializable[]> row : rows) {
// 更新前的数据
Serializable[] before = row.getKey();
// 更新后的数据
Serializable[] after = row.getValue();
// 保存更新后的一行数据
Map<String, Serializable> afterUpdateRowMap = new HashMap<>();
for (int i = 0; i < after.length; i++) {
// 因为 columnInfo 中的列名的位置是从1开始,而此处是从0开始
afterUpdateRowMap.put(columnInfo.get((i + 1) + ""), after[i]);
}
log.info("监听到更新的数据为: {}", afterUpdateRowMap);
}
} catch (Exception e) {
log.error("监听更新事件发生了异常");
}
}
// 监听插入事件
if (event.getData() instanceof WriteRowsEventData) {
log.info("监听到插入事件");
}
// 监听删除事件
if (event.getData() instanceof DeleteRowsEventData) {
log.info("监听到删除事件");
}
}
});
update binlog_demo
set nick_name = '张三-update11',
-- sex = 0,
-- address = '地址-update1',
-- ext_info = '{"ext_info":"扩展信息"}',
-- create_time = now(),
update_time = now()
where user_name = 'zhangsan';
从下图中可知,针对 text 类型的字段,默认转换成了byte[]类型,那么怎样将其转换成String类型呢?
此处针对更新语句来演示 。
注意:断点跟踪源码发现text类型的数据映射成了blob类型,因此需要重写 deserializeBlob 方法 。
public class CustomUpdateRowsEventDataDeserializer extends UpdateRowsEventDataDeserializer {
public CustomUpdateRowsEventDataDeserializer(Map<Long, TableMapEventData> tableMapEventByTableId) {
super(tableMapEventByTableId);
}
@Override
protected Serializable deserializeBlob(int meta, ByteArrayInputStream inputStream) throws IOException {
byte[] bytes = (byte[]) super.deserializeBlob(meta, inputStream);
if (null != bytes && bytes.length > 0) {
return new String(bytes, StandardCharsets.UTF_8);
}
return null;
}
}
注意: 需要通过 EventDeserializer 来进行注册 。
// 反序列化配置
EventDeserializer eventDeserializer = new EventDeserializer();
Field field = EventDeserializer.class.getDeclaredField("tableMapEventByTableId");
field.setAccessible(true);
Map<Long, TableMapEventData> tableMapEventByTableId = (Map<Long, TableMapEventData>) field.get(eventDeserializer);
eventDeserializer.setEventDataDeserializer(EventType.EXT_UPDATE_ROWS, new CustomUpdateRowsEventDataDeserializer(tableMapEventByTableId)
.setMayContainExtraInformation(true));
// 反序列化配置
EventDeserializer eventDeserializer = new EventDeserializer();
eventDeserializer.setCompatibilityMode(
// 将日期类型的数据反序列化成Long类型
EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG
);
// 表示对 删除事件不感兴趣 ( 对于DELETE事件的反序列化直接返回null )
eventDeserializer.setEventDataDeserializer(EventType.EXT_DELETE_ROWS, new NullEventDataDeserializer());
对于不感兴趣的事件直接使用NullEventDataDeserializer,可以提高程序的性能.
当binlog的信息发生变更时,需要保存起来,下次程序重新启动时,读取之前保存好的binlog信息.
此处为了模拟,将binlog的信息保存到文件中.
/**
* binlog position 的持久化处理
*
* @author huan.fu
* @date 2024/10/11 - 12:54
*/
public class FileBinlogPositionHandler {
/**
* binlog 信息实体类
*/
public static class BinlogPositionInfo {
/**
* binlog文件的名字
*/
public String binlogName;
/**
* binlog的位置
*/
private Long position;
/**
* binlog的server id的值
*/
private Long serverId;
}
/**
* 保存binlog信息
*
* @param binlogName binlog文件名
* @param position binlog位置信息
* @param serverId binlog server id
*/
public void saveBinlogInfo(String binlogName, Long position, Long serverId) {
List<String> data = new ArrayList<>(3);
data.add(binlogName);
data.add(position + "");
data.add(serverId + "");
try {
Files.write(Paths.get("binlog-info.txt"), data);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* 获取 binlog 信息
*
* @return BinlogPositionInfo
*/
public BinlogPositionInfo retrieveBinlogInfo() {
try {
List<String> lines = Files.readAllLines(Paths.get("binlog-info.txt"));
BinlogPositionInfo info = new BinlogPositionInfo();
info.binlogName = lines.get(0);
info.position = Long.parseLong(lines.get(1));
info.serverId = Long.parseLong(lines.get(2));
return info;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
// 设置 binlog 信息
FileBinlogPositionHandler fileBinlogPositionHandler = new FileBinlogPositionHandler();
FileBinlogPositionHandler.BinlogPositionInfo binlogPositionInfo = fileBinlogPositionHandler.retrieveBinlogInfo();
if (null != binlogPositionInfo) {
log.info("获取到了binlog 信息 binlogName: {} position: {} serverId: {}", binlogPositionInfo.binlogName,
binlogPositionInfo.position, binlogPositionInfo.serverId);
client.setBinlogFilename(binlogPositionInfo.binlogName);
client.setBinlogPosition(binlogPositionInfo.position);
client.setServerId(binlogPositionInfo.serverId);
}
// FORMAT_DESCRIPTION(写入每个二进制日志文件前的描述事件) HEARTBEAT(心跳事件)这2个事件不进行binlog位置的记录
if (eventType != EventType.FORMAT_DESCRIPTION && eventType != EventType.HEARTBEAT) {
// 当有binlog文件切换时产生
if (event.getData() instanceof RotateEventData) {
RotateEventData eventData = event.getData();
// 保存binlog position 信息
fileBinlogPositionHandler.saveBinlogInfo(eventData.getBinlogFilename(), eventData.getBinlogPosition(), event.getHeader().getServerId());
} else {
// 非 rotate 事件,保存位置信息
EventHeaderV4 header = event.getHeader();
FileBinlogPositionHandler.BinlogPositionInfo info = fileBinlogPositionHandler.retrieveBinlogInfo();
long position = header.getPosition();
long serverId = header.getServerId();
fileBinlogPositionHandler.saveBinlogInfo(info.binlogName, position, serverId);
}
}
address
的值为 地址-update2
address
的值为地址-offline-update
地址-offline-update
的事件最后此篇关于在Java程序中监听mysql的binlog的文章就讲到这里了,如果你想了解更多关于在Java程序中监听mysql的binlog的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
1、binlog简介 binlog即binary log,二进制日志文件。它记录了数据库所有执行的DDL和DML语句(除了数据查询语句select、show等),以事件形式记录并保存在二进制
如果MySQL服务器启用了二进制日志,你可以使用mysqlbinlog工具来恢复从指定的时间点开始 (例如,从你最后一次备份)直到现在或另一个指定的时间点的数据。“mysqlbinlog:用于处理二
我想编写一个服务来跟踪 mysql bin 日志以获取有关数据库更改的通知。 有没有开源的库可以读取和解析ROW格式的mysql bin log? 最佳答案 使用mysqlbinlog。 The se
canal 定位:基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了mysql。 原理: canal模拟mysql slave的交互协议,伪装自己为mysql sla
以前备份binlog时,都是先在本地进行备份压缩,然后发送到远程服务器中。但是这其中还是有一定风险的,因为日志的备份都是周期性的,如果在某个周期中,服务器宕机了,硬盘损坏了,就可能导致这段时间的bi
mysql binlog3种格式,row,mixed,statement. 解析工作 mysqlbinlog --base64-output=DECODE-ROWS -v mysql-bin.00
要禁用给定表的 bin 日志记录,我知道我可以使用以下命令: replicate-ignore-table=db_name.tbl_name 但是我不确定需要在哪里执行它? 谢谢 最佳答案 您必须将其
我正在设置复制,但我面临的问题是,当我执行插入更新的任何查询甚至创建表时,二进制日志文件没有被更新。这里是我的主人 my.cnf server-id = 1 log_bin
如果我正在读取 MySql binlog,我可以获得同一事务中发生哪些语句的指示吗? 最佳答案 有nothing built-in yet ,但也许this page会提供一些帮助。他们提供了一个 a
我不同意这个问题得到了有效的回答:decode mysqlbinlog in C# . 我有,我认为是同一个问题:我想从 C# 应用程序中读取 MySql 二进制日志,但不知道文件的格式。如何正确解析
我有许多运行 5.1.63 版的 mysql 服务器,本周早些时候在对从属服务器运行一些查询时,我注意到从属服务器上的一些数据本应使用主服务器上的更新语句删除。 我最初的想法是: 团队中的某个人正在更
在mysql中,我通过做来调试记录是如何改变的 mysqlbinlog bin-88.log | grep "record-id"--before=2 --after=2 我如何用 mongo 做类似
例如,在创建新表或更新现有表上的数据时,这些事件将存储在mysql binlog中,也就是MySQL数据库的二进制日志。 二进制日志在MySQL复制中非常有用,主服务器将数据从二进制日志发送到远
我读到过,MariaDB 的 Maxscale(BinLog Server) 可用于将 bin 日志从 MySQL 集群中继到单个 BinLog Server,但是我想知道是否可以从不同的 MySQL
抱歉,这个问题可能非常基本,但我找不到任何东西来解决这个问题。 我有一个 aws rds,当我尝试获取我的 binlog 列表时,它只向我显示 2 个最新的列表。 SHOW BINARY LOGS;
我正在尝试查看 MySQL binlog 文件以追踪特定查询的来源。我使用 SHOW BINARY LOGS 查询来获取现有日志文件的名称,但是当尝试使用 mysqlbinlog 命令访问时,我不断收
我们希望打开查询日志记录,以便我们可以找到更改数据的查询。 是否可以将事务日志写入数据库?binlog和普通查询日志有什么区别? 最佳答案 如果启用,二进制日志将包含所有修改数据的查询。但您应该注意格
我有几个实时数据库需要迁移到新服务器。这些是经常使用的大型数据库。 我想在新服务器上使用从服务器设置复制并开始跨服务器移植数据。但是,我想尝试避免对当前主数据执行 mysqldump 以获得初始 bi
我在一对服务器上设置了复制。一个是主人,第二个是奴隶。 最近在 master 上,binlog 文件被过早清除(通过文件名,所以 mysql 没有阻止过早删除文件)。 现在 SLAVE 有状态: Go
如果全过程使用的是Mysql用户,应该可以正常启动。 如果用的ROOT用户,可能不能正常启动,原因是新建的目录权限不对。 可能会这样的错误提示: /usr/loc
我是一名优秀的程序员,十分优秀!