- VisualStudio2022插件的安装及使用-编程手把手系列文章
- pprof-在现网场景怎么用
- C#实现的下拉多选框,下拉多选树,多级节点
- 【学习笔记】基础数据结构:猫树
大纲 。
1.⽅案设计 。
2.安装与配置环境 。
。
1.⽅案设计 。
步骤一:首先需要配置一个crontab定时调度shell脚本,然后该脚本每天凌晨会通过rdbtools⼯具解析Redis的RDB⽂件,接着对解析出的内容进行过滤,把RDB⽂件中的⼤key导出到CSV⽂件.
。
步骤二:使⽤SQL导⼊CSV⽂件到MySQL数据库中,同时使⽤Canal监听MySQL的binlog⽇志.
。
步骤三:Canal会发送增量的大key数据消息到RocketMQ,RocketMQ的消费者系统会对增量的大key数据消息进⾏消费,消息中便会包含⼤key的详情信息。这样消费者就可以将⼤key的信息通过邮件等⽅式,通知开发⼈员.
。
为什么要把⼤key的CSV⽂件导⼊到MySQL存储?为什么不直接监听⼤key的CSV⽂件进⾏通知?
。
原因一:如果不导⼊MySQL,那么就⽆法使⽤Canal来监听。这样就要开发⼀个程序,定时去扫描Redis节点下解析出来的CSV⽂件。如果Redis集群中有多个节点,那么每⼀个节点都要去扫描。⽽将CSV导⼊到MySQL后,只需要使⽤Canal去监听MySQL表的binlog,就可以把增量数据同步到RocketMQ中,由消费者统⼀进⾏处理.
。
原因二:解析CSV⽂件⽐直接从MySQL中查询复杂很多,尤其是需要进行信息过滤。导⼊到MySQL后可以通过SQL轻松的对⼤key的记录进⾏条件筛选,并且可以对每天产⽣的⼤key数据进⾏存储分析.
。
RDB解析⽣成的CSV⽂件结构如下:
database,type,key,size_in_bytes,encoding,num_elements,len_largest_element, expiry
0,string,key1-string,20536,string,17280,17280,
0,list,key1-list,4006,quicklist,24,1530,
。
2.安装与配置环境 。
(1)依赖环境 。
(2)安装Python3 & pip3 。
(3)安装rdb-tools 。
(4)安装RocketMQ 。
(5)安装Canal 。
(6)rdbtools扫描RDB⽂件 。
(7)将CSV⽂件导⼊MySQL 。
。
(1)依赖环境 。
Python3、pip3、rdb-tools、Redis、MySQL、JDK、RocketMQ、Canal.
。
rdb-tools是开源的⼀个python项⽬,它可以⽤来解析Redis的RDB⽂件,但是要先安装Python环境.
连接地址:
https://github.com/sripathikrishnan/redis-rdb-tools
pip是Python的包管理⼯具,安装Python后,这个⼯具就会配套安装好.
。
(2)安装Python3 & pip3 。
# 安装编译⼯具
$ yum -y groupinstall "Development tools"
$ yum -y install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gdbm-devel db4-devel libpcap-devel xz-devel
$ yum install libffi-devel -y
# 下载python3.7.0
$ cd /usr/local
$ wget https://www.python.org/ftp/python/3.7.0/Python-3.7.0.tar.xz
$ tar -xvJf Python-3.7.0.tar.xz
# 编译
$ mkdir /usr/local/python3
$ cd Python-3.7.0
$ ./configure --prefix=/usr/local/python3
$ make && make install
(3)安装rdb-tools 。
# 使⽤pip包管理程序安装rdb-tools
$ pip3 install rdbtools python-lzf
# 配置环境变量
$ vim /etc/profile
# 在⽂件底部最末尾,追加如下两⾏内容
PATH=/usr/local/python3/bin:$PATH
export PATH
验证安装:
# 验证rdbtools是否安装成功
$ rdb -h
(4)安装RocketMQ 。
一.下载安装包 。
# 下载安装包,要注意版本与项⽬中依赖的RocketMQ版本兼容
$ wget https://archive.apache.org/dist/rocketmq/4.7.1/rocketmq-all-4.7.1-bin-release.zip
二.修改默认配置 。
# 解压
$ unzip rocketmq-all-4.7.1-bin-release.zip
# 切换⽬录
$ cd /usr/local/rocketmq-all-4.7.1-bin-release
# 修改nameserver默认堆栈⼤⼩
$ vim ./bin/runserver.sh
# 修改brokerserver默认堆栈⼤⼩
$ vim bin/runbroker.sh
NameServer默认配置如下:
-server -Xms4g -Xmx4g -Xmn2g
-XX:MetaspacesSize=128m -XX:MaxMetaspaceSize=320m
BrokerServer默认配置如下:
-server -Xms8g -Xmx8g -Xmn4g
三.修改BrokerServer的IP地址 。
$ vim /usr/local/rocketmq-all-4.7.1-bin-release/conf/broker.conf
在broker.conf⽂件中追加如下内容:
# brokerserver所在机器的公⽹IP地址
brokerIP1=192.168.95.129
四.启动RocketMQ 。
# 启动nameserver
$ nohup sh ./bin/mqnamesrv &
# 查看nameserver启动⽇志
$ tailf ~/logs/rocketmqlogs/namesrv.log
# 启动brokerserver
$ nohup sh bin/mqbroker -n 127.0.0.1:9876 -c conf/broker.conf &
# 查看brokerserver启动⽇志
$ tailf ~/logs/rocketmqlogs/broker.log
五.启动RocketMQ控制台 。
github地址:https://github.com/apache/rocketmq-externals/tree/release-rocketmq-console-1.0.0
可以从github上clone下载,然后使⽤maven命令打包,然后如下启动:
$ nohup java -jar -server -Xms256m -Xmx256m \
-Drocketmq.config.namesrvAddr=127.0.0.1:9876 -Dserver.port=8080 \
/usr/local/rocketmq-console-ng-1.0.1.jar &
(5)安装Canal 。
一.下载安装包 。
# 下载canal-admin
$ wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.admin-1.1.5.tar.gz
# 下载canal-deployer
$ wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
二.安装canal-admin 。
# 创建解压⽬录
$ mkdir /usr/local/canal-admin
# 解压
$ tar -zxvf canal.admin-1.1.5.tar.gz -c /usr/local/canal-admin
目录结构如下:
三.初始化Canal数据库 。
执⾏conf⽬录下的canal_manager.sql⽂件 。
# 执⾏conf⽬录下的canal_manager.sql⽂件
$ cd /usr/local/canal-admin/conf
$ mysql -u⽤户名 -p密码 -hIP地址 -P端⼝号 < canal_manager.sql
canal_manger.sql⽂件内容如下:
CREATE DATABASE /*!32312 IF NOT EXISTS*/ `canal_manager` /*!40100 DEFAULT
CHARACTER SET utf8 COLLATE utf8_bin */;
USE `canal_manager`;
SET NAMES utf8;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for canal_adapter_config
-- ----------------------------
DROP TABLE IF EXISTS `canal_adapter_config`;
CREATE TABLE `canal_adapter_config` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`category` varchar(45) NOT NULL,
`name` varchar(45) NOT NULL,
`status` varchar(45) DEFAULT NULL,
`content` text NOT NULL,
`modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE
CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Table structure for canal_cluster
-- ----------------------------
DROP TABLE IF EXISTS `canal_cluster`;
CREATE TABLE `canal_cluster` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`name` varchar(63) NOT NULL,
`zk_hosts` varchar(255) NOT NULL,
`modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE
CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Table structure for canal_config
-- ----------------------------
DROP TABLE IF EXISTS `canal_config`;
CREATE TABLE `canal_config` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`cluster_id` bigint(20) DEFAULT NULL,
`server_id` bigint(20) DEFAULT NULL,
`name` varchar(45) NOT NULL,
`status` varchar(45) DEFAULT NULL,
`content` text NOT NULL,
`content_md5` varchar(128) NOT NULL,
`modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE
CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `sid_UNIQUE` (`server_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Table structure for canal_instance_config
-- ----------------------------
DROP TABLE IF EXISTS `canal_instance_config`;
CREATE TABLE `canal_instance_config` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`cluster_id` bigint(20) DEFAULT NULL,
`server_id` bigint(20) DEFAULT NULL,
`name` varchar(45) NOT NULL,
`status` varchar(45) DEFAULT NULL,
`content` text NOT NULL,
`content_md5` varchar(128) DEFAULT NULL,
`modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE
CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `name_UNIQUE` (`name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Table structure for canal_node_server
-- ----------------------------
DROP TABLE IF EXISTS `canal_node_server`;
CREATE TABLE `canal_node_server` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`cluster_id` bigint(20) DEFAULT NULL,
`name` varchar(63) NOT NULL,
`ip` varchar(63) NOT NULL,
`admin_port` int(11) DEFAULT NULL,
`tcp_port` int(11) DEFAULT NULL,
`metric_port` int(11) DEFAULT NULL,
`status` varchar(45) DEFAULT NULL,
`modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE
CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Table structure for canal_user
-- ----------------------------
DROP TABLE IF EXISTS `canal_user`;
CREATE TABLE `canal_user` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`username` varchar(31) NOT NULL,
`password` varchar(128) NOT NULL,
`name` varchar(31) NOT NULL,
`roles` varchar(31) NOT NULL,
`introduction` varchar(255) DEFAULT NULL,
`avatar` varchar(255) DEFAULT NULL,
`creation_date` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE
CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
SET FOREIGN_KEY_CHECKS = 1;
-- ----------------------------
-- Records of canal_user
-- ----------------------------
BEGIN;
INSERT INTO `canal_user` VALUES (1, 'admin', '6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9', 'Canal Manager', 'admin', NULL, NULL, '2019-07-14 00:05:28');
COMMIT;
SET FOREIGN_KEY_CHECKS = 1;
四.修改conf⽬录下的application.yml⽂件 。
# 修改conf⽬录下的application.yml⽂件
$ vim /usr/local/canal-admin/conf/application.yml
application.yml⽂件内容如下:
server:
port: 8089
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
spring.datasource:
address: 127.0.0.1:3306
database: canal_manager
username: canal
password: canal
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
hikari:
maximum-pool-size: 30
minimum-idle: 1
canal:
adminUser: admin
adminPasswd: admin
五.启动canal-admin 。
$ cd /usr/local/canal-admin
$ bin/startup.sh
然后访问Canal管理控制台:http://ip:8089,⽤户名和密码分别是:admin | 123456.
。
此时登录进⼊后,会发现⽬前什么数据都没有。但这没有关系,接着会启动canal-server,因为要⽤canal-admin来管理每个canal-server的实例.
。
采⽤canal-admin来管理canal-server:当canal-server启动时,canal-server是会⾃动注册到canal-admin上的.
。
六.关闭canal-admin 。
$ cd /usr/local/canal.admin-1.1.5
$ bin/stop.sh
注意:关闭时,请不要使⽤kill进程号的⽅式来关闭,而使⽤执⾏脚本的⽅式关闭。因为如果kill进程后,下次再次执⾏启动脚本时,会出现found admin.pid , Please run stop.sh first ,then startup.sh的提示。当然,出现这种情况的时候,可以到bin⽬录下将admin.pid删除掉.
。
七.安装canal-server 。
# 创建解压⽬录
$ mkdir /usr/local/canal-server
# 解压
$ tar -zxvf canal.deployer-1.1.5.tar.gz -c /usr/local/canal-server
目录结构如下:
八.修改conf⽬录下的canal_local.properties 。
# 修改conf⽬录下的canal_local.properties⽂件
$ vim /usr/local/canal-server/conf/canal_local.properties
canal_local.properties⽂件内容如下:
# register ip 这⾥的ip选择您本机的ip(也就是启动canal-server机器的所在ip地址)
canal.register.ip = 192.168.95.129
# canal admin config 这⾥是部署canal-admin的所在机器的ip,当然也可以把canal-admin和canal-server部署到⼀台机器
canal.admin.manager = 192.168.95.129:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9
# admin auto register 这⾥⼀定得是true,否则⽆法在启动canal-server时候注册到canal-admin上
canal.admin.register.auto = true
canal.admin.register.cluster =
# canal-server注册到canal-admin控制台的名称,这⾥⽤canal-server的ip地址,注意改ip
canal.admin.register.name = 192.168.95.129
九.启动canal-server 。
$ cd /usr/local/canal-server
# 切记后⾯加⼊local的参数
$ bin/startup.sh local
这时可以到canal-admin的界面中查看canal-server是否已经注册成功.
。
十.在canal-admin中配置Canal Instance来监听数据库 。
⾸先点击界面右侧的Instance管理,再点击新建Instance来创建实例.
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false
# position info
# 需要修改
# 需要监听数据库的ip加端⼝
canal.instance.master.address=192.168.95.129:3306
# mysql主库连接时起始的binlog⽂件,这⾥可以不写,默认为mysql-bin
canal.instance.master.journal.name=
# binlog⽇志的位置
canal.instance.master.position=
# 开始同步binlog⽇志的时间戳,也就是从哪个时间点开始同步binlog⽇志,13位时间戳格式
canal.instance.master.timestamp=
# 如果数据库开启了gtid模式,这⾥填写master节点的gtid我们这⾥不写也是可以的,如果要开启,记得将canal.instance.gtidon改为true
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# 需要修改
# username/password这⾥填写要连接数据库的⽤户名和密码
canal.instance.dbUsername=root
# 需要修改
# canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# 需要修改
# table regex,这⾥填写订阅的数据库的库与表的相关正则表达式
canal.instance.filter.regex=careerplan_eshop_redis.redis_large_key_log
# table black regex
canal.instance.filter.black.regex=
# table field filter(format:
schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format:schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# 需要修改
# MQ Config,这⾥填写实例名称即可
# 如果canal.serverMode选择的不是tcp模式,这⾥填写相关的topic的名称,kafka和rocketmq默认的主题为example
# 同时需要注意,如果期望使⽤的canal-server的⼯作模式是MQ的⽅式来运⾏,那么需要修改canal.properties的配置
canal.mq.topic=binlog_monitor_large_key_topic
# dynamic topic route by schema or table regex
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################
接着点击保存,就会跳转到列表页,然后在新创建的canal-instance后⾯点击启动,接着点击操作⽇志去查看相关⽇志。⾄此,⼀个canal-instance就启动成功了.
。
十一.关闭canal-server 。
$ cd /usr/local/canal-server
$ bin/stop.sh
(6)rdbtools扫描RDB⽂件 。
# 切换⽬录
[root@localhost bin]# cd /usr/local/redis/bin
# 在这个⽬录下存放着Redis的rdb⽂件
[root@localhost bin]# ls
dump.rdb redis-benchmark redis-check-aof redis-check-rdb redis-cli
redis-sentinel redis-server
# 使⽤rdbtools⼯具过滤dump.rdb⽂件中的⼤key,⽣成dump.csv⽂件
[root@localhost bin]# rdb -c memory dump.rdb --bytes 10240 -f dump.csv
# 可以看到⽣成的dump.csv⽂件
[root@localhost bin]# ls
dump.csv dump.rdb redis-benchmark redis-check-aof redis-check-rdb
redis-cli redis-sentinel redis-server
# 查看dump.csv⽂件的内容
[root@localhost bin]# vim dump.csv
rdb参数说明:
rdb -c memory dump.rdb --bytes 10240 -f dump.csv
dump.rdb是指定Redis的rdb⽂件的路径,--bytes 10240表示过滤出key值⼤⼩超过10240B的key,也就是10K.
。
dump.csv⽂件内容如下:
database,type,key,size_in_bytes,encoding,num_elements,len_largest_element,expiry
0,string,key1-string,20536,string,17280,17280,
(7)将CSV⽂件导⼊MySQL 。
一.先查看secure_file_priv属性是否开启 。
secure_file_priv属性指定导⼊⽂件的位置,只有在该属性指定的⽬录下的⽂件才可以导⼊MySQL.
mysql> show variables like '%secure%';
+--------------------------+------------------------------------------------+
| Variable_name | Value |
+--------------------------+------------------------------------------------+
| require_secure_transport | OFF |
| secure_auth | ON |
| secure_file_priv | ... |
+--------------------------+------------------------------------------------+
二.修改MySQL配置⽂件⽂件 。
# 找到mysqld⽂件的位置
[root@localhost bin]# find / -name "mysqld"
/run/mysqld
/usr/sbin/mysqld
# 找到mysql的默认配置⽂件位置
[root@localhost bin]# /usr/sbin/mysqld --verbose --help |grep -A 1
'Default options'
Default options are read from the following files in the given order:
/etc/my.cnf /etc/mysql/my.cnf /usr/etc/my.cnf ~/.my.cn
# 修改mysql的默认配置⽂件
[root@localhost etc]# vim /etc/my.cnf
在[mysqld]模块下,如果存在下列属性就修改,如果不存在就追加:
[mysqld]
# 关闭安全⽂件导⼊路径
secure-file-priv=""
# 开启binlog
log-bin=mysql-bin
binlog-format=ROW
server_id=1
三.重启MySQL服务 。
# 检查mysql服务运⾏状态
$ service mysqld status
# 重启服务
$ service mysqld restart
重启服务后重新连接MySQL,查看secure_file_priv和log_bin的值:
mysql> show variables like '%secure%';
+--------------------------+-------+
| Variable_name | Value |
+--------------------------+-------+
| require_secure_transport | OFF |
| secure_auth | ON |
| secure_file_priv | |
+--------------------------+-------+
mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON |
+---------------+-------+
四.创建redis_large_key_log表 。
create table redis_large_key_log (
`id` bigint primary key auto_increment,
`database` tinyint comment 'Redis数据库索引',
`type` varchar(20) comment 'Redis数据类型',
`key` varchar(256) comment 'Redis key',
`size_in_bytes` int comment 'value对于的bytes',
`encoding` varchar(30) comment '编码',
`num_elements` int comment '元素数量',
`len_largest_element` int comment '元素⻓度',
`expiry` varchar(30) comment '过期时间',
`create_time` datetime DEFAULT CURRENT_TIMESTAMP,
`modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE
CURRENT_TIMESTAMP
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
五.编写导⼊csv⽂件的SQL脚本 。
$ cd /usr/local/redis/bin/
# 创建SQL脚本⽂件
$ touch csv-transfer-db.sql
# 编辑SQL脚本
vim csv-transfer-db.sql
csv-transfer-db.sql⽂件内容如下:
# 指定数据库
USE `careerplan_eshop_redis`;
load data infile '/usr/local/redis/bin/dump.csv' into table redis_large_key_log fields terminated by',' lines terminated by'\n' ignore 1 lines
(`database`,`type`,`key`,size_in_bytes,encoding,num_elements,len_largest_element,expiry);
六.编写定时任务脚本 。
脚本职能:
调⽤rdbtools⼯具,扫描⼤key,dump出csv⽂件.
调⽤SQL脚本,将csv⽂件导⼊数据库.
$ touch monitor-large-key-to-db.sh
$ vim monitor-large-key-to-db.sh
monitor-large-key-to-db.sh⽂件内容如下:
# crontab没有环境变量给你运⾏,所以要在shell开头⼿动添加环境
source /etc/profile
. ~/.bash_profile
#!/bin/bash
echo "开始执⾏monitor-large-key-to-db.sh脚本" >> /usr/local/redis/monitor-large-key-log.txt
rdb -c memory /usr/local/redis/bin/dump.rdb --bytes 102400 -f /usr/local/redis/bin/dump.csv
echo "扫描redis过滤出⼤key,⼤key数据保存到/usr/local/redis/bin/dump.csv⽂件" >> /usr/local/redis/monitor-large-key-log.txt
mysql -u⽤户名 -p密码 -hIP地址 -P端⼝号 < /usr/local/redis/bin/csv-transfer-db.sql
echo "csv⽂件数据已导⼊mysql" >> /usr/local/redis/monitor-large-key-log.txt
七.创建调度任务 。
$ crontab -e
# 每天凌晨3点进⾏⼀次调度,将会扫描rdb⽂件,将⼤key存储到MySQL
0 3 * * * sh /usr/local/redis/bin/monitor-large-key-to-db.sh
(8)binlog数据消费者 。
一.接⼝说明 。
消费redis_large_key_log表的binlog数据,该数据包含Redis的⼤key信息.
。
二.代码位置 。
com.demo.eshop.monitor.mq.consumer.ConsumerBeanConfig#receiveLargeKeyMonitorConsumer
具体实现如下:
@Configuration
public class ConsumerBeanConfig {
//配置内容对象
@Autowired
private RocketMQProperties rocketMQProperties;
//Redis大key binlog 消费者
@Bean("cookbookLargeKeyMonitorTopic")
public DefaultMQPushConsumer receiveLargeKeyMonitorConsumer(CookbookLargeKeyMonitorListener cookbookLargeKeyMonitorListener) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketMqConstant.BINLOG_MONITOR_LARGE_KEY_GROUP);
consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
consumer.subscribe(RocketMqConstant.BINLOG_MONITOR_LARGE_KEY_TOPIC, "*");
consumer.registerMessageListener(cookbookLargeKeyMonitorListener);
consumer.start();
return consumer;
}
...
}
三.参数说明 。
CookbookLargeKeyMonitorListener表示针对BINLOG_MONITOR_LARGE_KEY_GROUP的Listener,它会监听Canal推送的BINLOG_MONITOR_LARGE_KEY_TOPIC消息,然后对消息解析,通过邮件、钉钉等推送给开发⼈员。具体实现如下:
@Component
public class CookbookLargeKeyMonitorListener implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
try {
for (MessageExt messageExt : list) {
String msg = new String(messageExt.getBody());
// 解析binlog数据模型
BinlogDataDTO binlogData = BinlogUtils.getBinlogData(msg);
log.info("消费到binlog消息, binlogData: {}", binlogData);
// 推送通知
informByPush(binlogData);
}
} catch (Exception e) {
// 本次消费失败,下次重新消费
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
//第三方平台推送消息到app
private void informByPush(BinlogDataDTO binlogData) {
log.info("消息推送中:消息内容:{}", binlogData);
}
}
。
最后此篇关于Redis应用—7.大Value处理方案的文章就讲到这里了,如果你想了解更多关于Redis应用—7.大Value处理方案的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我正在通过 labrepl 工作,我看到了一些遵循此模式的代码: ;; Pattern (apply #(apply f %&) coll) ;; Concrete example user=> (a
我从未向应用商店提交过应用,但我会在不久的将来提交。 到目前为止,我对为 iPhone 而非 iPad 进行设计感到很自在。 我了解,通过将通用PAID 应用放到应用商店,客户只需支付一次就可以同时使
我有一个应用程序,它使用不同的 Facebook 应用程序(2 个不同的 AppID)在 Facebook 上发布并显示它是“通过 iPhone”/“通过 iPad”。 当 Facebook 应用程序
我有一个要求,我们必须通过将网站源文件保存在本地 iOS 应用程序中来在 iOS 应用程序 Webview 中运行网站。 Angular 需要服务器来运行应用程序,但由于我们将文件保存在本地,我们无法
所以我有一个单页客户端应用程序。 正常流程: 应用程序 -> OAuth2 服务器 -> 应用程序 我们有自己的 OAuth2 服务器,因此人们可以登录应用程序并获取与用户实体关联的 access_t
假设我有一个安装在用户设备上的 Android 应用程序 A,我的应用程序有一个 AppWidget,我们可以让其他 Android 开发人员在其中以每次安装成本为基础发布他们的应用程序推广广告。因此
Secrets of the JavaScript Ninja中有一个例子它提供了以下代码来绕过 JavaScript 的 Math.min() 函数,该函数需要一个可变长度列表。 Example:
当我分别将数组和对象传递给 function.apply() 时,我得到 NaN 的 o/p,但是当我传递对象和数组时,我得到一个数字。为什么会发生这种情况? 由于数组也被视为对象,为什么我无法使用它
CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界. 这篇CFSDN的博客文章ASP转换格林威治时间函数DateDiff()应用由作者收集整理,如果你
我正在将列表传递给 map并且想要返回一个带有合并名称的 data.frame 对象。 例如: library(tidyverse) library(broom) mtcars %>% spl
我有一个非常基本的问题,但我不知道如何实现它:我有一个返回数据框,其中每个工具的返回值是按行排列的: tmp<-as.data.frame(t(data.frame(a=rnorm(250,0,1)
我正在使用我的 FB 应用创建群组并邀请用户加入我的应用群组,第一次一切正常。当我尝试创建另一个组时,出现以下错误: {"(OAuthException - #4009) (#4009) 在有更多用户
我们正在开发一款类似于“会说话的本”应用程序的 child 应用程序。它包含大量用于交互式动画的 JPEG 图像序列。 问题是动画在 iPad Air 上播放正常,但在 iPad 2 上播放缓慢或滞后
我关注 clojure 一段时间了,它的一些功能非常令人兴奋(持久数据结构、函数式方法、不可变状态)。然而,由于我仍在学习,我想了解如何在实际场景中应用,证明其好处,然后演化并应用于更复杂的问题。即,
我开发了一个仅使用挪威语的应用程序。该应用程序不使用本地化,因为它应该仅以一种语言(挪威语)显示。但是,我已在 Info.plist 文件中将“本地化 native 开发区域”设置为“no”。我还使用
读完 Anthony's response 后上a style-related parser question ,我试图说服自己编写单体解析器仍然可以相当紧凑。 所以而不是 reference ::
multicore 库中是否有类似 sapply 的东西?还是我必须 unlist(mclapply(..)) 才能实现这一点? 如果它不存在:推理是什么? 提前致谢,如果这是一个愚蠢的问题,我们深表
我喜欢在窗口中弹出结果,以便更容易查看和查找(例如,它们不会随着控制台继续滚动而丢失)。一种方法是使用 sink() 和 file.show()。例如: y <- rnorm(100); x <- r
我有一个如下所示的 spring mvc Controller @RequestMapping(value="/new", method=RequestMethod.POST) public Stri
我正在阅读 StructureMap关于依赖注入(inject),首先有两部分初始化映射,具体类类型的接口(interface),另一部分只是实例化(请求实例)。 第一部分需要配置和设置,这是在 Bo
我是一名优秀的程序员,十分优秀!