- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
Redis系列1:深刻理解高性能Redis的本质 Redis系列2:数据持久化提高可用性 Redis系列3:高可用之主从架构 Redis系列4:高可用之Sentinel(哨兵模式) Redis系列5:深入分析Cluster 集群模式 追求性能极致:Redis6.0的多线程模型 追求性能极致:客户端缓存带来的革命 Redis系列8:Bitmap实现亿万级数据计算 Redis系列9:Geo 类型赋能亿级地图位置计算 Redis系列10:HyperLogLog实现海量数据基数统计 Redis系列11:内存淘汰策略 Redis系列12:Redis 的事务机制 Redis系列13:分布式锁实现 Redis系列14:使用List实现消息队列 Redis系列15:使用Stream实现消息队列 Redis系列16:聊聊布隆过滤器(原理篇) 。
布隆过滤器(Bloom Filter)是 Redis 4.0 版本提供的新功能,我们一般将它当做插件加载到 Redis 服务器中,给 Redis 提供强大的去重功能。 它是一种概率性数据结构,可用于判断一个元素是否存在于一个集合中。相比较之 Set 集合的去重功能,布隆过滤器空间上能节省 90% +,不足之处是去重率大约在 99% 左右,那就是有 1% 左右的误判率,这种误差是由布隆过滤器的自身结构决定的.
我们在遇到数据量大的时候,为了去重并避免大批量的重复计算,可以考虑使用 Bloom Filter 进行过滤。 具体常用的经典场景如下:
下面以缓存穿透为解决目标进行讲解.
缓存穿透是指访问一个不存在的key,缓存不起作用,请求会穿透到DB,流量井喷时会导致DB挂掉。 比如 我们查询用户的信息,程序会根据用户的编号去缓存中检索,如果找不到,再到数据库中搜索。如果你给了一个不存在的编号:XXXXXXXX,那么每次都比对不到,就透过缓存进入数据库。 这样风险很大,如果因为某些原因导致大量不存在的编号被查询,甚至被恶意伪造编号进行攻击,那将是灾难。 解决方案质疑就是在缓存之前在加一层 BloomFilter :
下面以火车票订购和查询为案例进行说明,如果火车票被恶意攻击,模拟了一模一样的火车票订单编号,那很可能通过大量的请求穿透过缓存层把数据库打雪崩了,所以使用布隆过滤器为服务提供一层保障。 具体的做法就是,我们在购买火车票成功的时候,把订单号的ID写入(异步或者消息队列的方式)到布隆过滤器中,保障后续的查询都在布隆过滤器中走一遍再进到缓存中去查询。火车票订单Id同步到 Bloom Filter 的步骤如下:
创建 Bloom Filter 的语法如下:
# BF.RESERVE {key} {error_rate} {capacity} [EXPANSION {expansion}] [NONSCALING]
BF.RESERVE ticket_orders 0.01 1000000
而上面那句命令是,通过BF.RESERVE命令手动创建一个名字为 ticket_orders,错误率为 0.01 ,初始容量为 1000000 的布隆过滤器。 这边需要注意的一些点是:
# BF.ADD {key} {value ... }
# 添加单个订单号
BF.ADD ticket_orders 2023061008795
(integer) 1
# 添加多个订单号
BF.MADD ticket_orders 2023061008796 2023061008797 2023061008798
1) (integer) 1
2) (integer) 1
3) (integer) 1
以上的语句是将已经订好的车票订单号存储到Bloom Filter中,包括一次存储单个和一次存储多个.
# BF.EXISTS {key} {value} ,存在的话返回 1,不存在返回 0
BF.EXISTS ticket_orders 2023061008795
(integer) 1
# 批量判断多个值是否存在于布隆过滤器,语句如下:
BF.MEXISTS ticket_orders 2023061008796 2023061008797 2023061008798
1) (integer) 0
2) (integer) 1
3) (integer) 0
BF.EXISTS 判断一个元素是否存在于 Bloom Filter中,返回值 = 1 表示存在,返回值 = 0 表示不存在。可以一次性判断单个元素,或者一次性判断多个元素.
# 使用 BF.INFO {key} 语法查看
BF.INFO ticket_orders
1) Capacity
2) (integer) 1000000
3) Size
4) (integer) 3
5) Number of filters
6) (integer) 1
7) Number of items inserted
8) (integer) 3
9) Expansion rate
10) (integer) 2
返回值解析: Capacity:预设容量,我们前面设置了1000000。 Size:实际占用情况,我们前面设置了3个值:2023061008796、 2023061008797、 2023061008798。 Number of filters:过滤器的层数。 Number of items inserted:实际已插入的元素数量。 Expansion rate:子过滤器扩容的系数,咱们前面创建的时候未设值,所以这边是默认 2.
综上,我们通过 BF.RESERVE、BF.ADD、BF.EXISTS、BF.INFO 等几个指令就能实现布隆过滤器的建设,避免缓存穿透的情况发生。 因为你查询缓存的时候,必然你先到Bloom Filter中先过滤一次,这样就不会因为无效的key把缓存打穿.
Spring Boot版本: 2.5.x.
如果实际情况可以使用更高版本 。
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.17.1</version>
</dependency>
spring:
application:
name: redission
redis:
cluster:
nodeAddresses: [
"redis://127.0.0.1:8000",
"redis://127.0.0.1:8001",
"redis://127.0.0.1:8002",
"redis://127.0.0.1:8003",
"redis://127.0.0.1:8004",
"redis://127.0.0.1:8005"
]
password: ********
single:
address: "redis://127.0.0.1:6379"
database: 6
@Service
public class BloomFilterService {
@Autowired
private RedissonClient redissonClient;
/**
* 创建布隆过滤器
* @param filterKey 过滤器名称,等同上面的key
* @param expectedCapacity 预计元素容量,等同于上面的capacity
* @param falseRate 允许的误判率,等同于上面的error_rate
* @param <T>
* @return
*/
public <T> RBloomFilter<T> create(String filterKey, long expectedCapacity, double falseRate) {
// 集群模式
RClusteredBloomFilter<T> bloomFilter = redissonClient.getClusteredBloomFilter(filterKey);
// 以下是单实例模式
// RBloomFilter<T> bloomFilter = redissonClient.getBloomFilter(filterKey);
bloomFilter.tryInit(expectedCapacity, falseRate);
return bloomFilter;
}
}
@Autowired
private BloomFilterService bloomFilterService;
@Test
public void testBloomFilter() {
// 预计元素容量 1000000
long expectedCapacity = 1000000L;
// 错误率
double falseRate = 0.01;
RBloomFilter<Long> bloomFilter = bloomFilterService.create("ticket_orders", expectedCapacity, falseRate);
// 元素增加测试并输出统计
for (long idx = 0; idx < expectedCapacity; idx++) {
bloomFilter.add(idx);
}
long eleCount = bloomFilter.count();
log.info("eleCount = {}.", elementCount);
}
本篇介绍了布隆过滤器的几种实现场景。 并以火车票订单信息查询为案例进行说明,如何使用布隆过滤器避免缓存穿透,避免被恶意攻击.
最后此篇关于Redis系列17:聊聊布隆过滤器(实践篇)的文章就讲到这里了,如果你想了解更多关于Redis系列17:聊聊布隆过滤器(实践篇)的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我有一个关于 Redis Pubsub 的练习,如下所示: 如果发布者发布消息但订阅者没有收到服务器崩溃。订阅者如何在重启服务器时收到该消息? 请帮帮我,谢谢! 最佳答案 在这种情况下,消息将永远消失
我们正在使用 Service Stack 的 RedisClient 的 BlockingDequeue 来保存一些数据,直到它可以被处理。调用代码看起来像 using (var client =
我有一个 Redis 服务器和多个 Redis 客户端。每个 Redis 客户端都是一个 WebSocket+HTTP 服务器,其中包括管理 WebSocket 连接。这些 WebSocket+HTT
我有多个 Redis 实例。我使用不同的端口创建了一个集群。现在我想将数据从预先存在的 redis 实例传输到集群。我知道如何将数据从一个实例传输到集群,但是当实例多于一个时,我无法做到这一点。 最佳
配置:三个redis集群分区,跨三组一主一从。当 Master 宕机时,Lettuce 会立即检测到中断并开始重试。但是,Lettuce 没有检测到关联的 slave 已经将自己提升为 master
我想根据从指定集合中检索这些键来删除 Redis 键(及其数据集),例如: HMSET id:1 password 123 category milk HMSET id:2 password 456
我正在编写一个机器人(其中包含要禁用的命令列表),用于监视 Redis。它通过执行禁用命令,例如 (rename-command ZADD "")当我重新启动我的机器人时,如果要禁用的命令列表发生变化
我的任务是为大量听众使用发布/订阅。这是来自 docs 的订阅的简化示例: r = redis.StrictRedis(...) p = r.pubsub() p.subscribe('my-firs
我一直在阅读有关使用 Redis 哨兵进行故障转移的内容。我打算有1个master+1个slave,如果master宕机超过1分钟,就把slave变成master。我知道这在 Sentinel 中是
与仅使用常规 Redis 和创建分片相比,使用 Redis 集群有哪些优势? 在我看来,Redis Cluster 更注重数据安全(让主从架构解决故障)。 最佳答案 我认为当您需要在不丢失任何数据的情
由于 Redis 以被动和主动方式使 key 过期, 有没有办法得到一个 key ,即使它的过期时间已过 (但 在 Redis 中仍然存在 )? 最佳答案 DEBUG OBJECT myKey 将返回
我想用redis lua来实现monitor命令,而不是redis-cli monitor。但我不知道怎么办。 redis.call('monitor') 不起作用。 最佳答案 您不能从 Redis
我读过 https://github.com/redisson/redisson 我发现有几个 Redis 复制设置(包括对 AWS ElastiCache 和 Azure Redis 缓存的支持)
Microsoft.AspNet.SignalR.Redis 和 StackExchange.Redis.Extensions.Core 在同一个项目中使用。前者需要StackExchange.Red
1. 认识 Redis Redis(Remote Dictionary Server)远程词典服务器,是一个基于内存的键值对型 NoSQL 数据库。 特征: 键值(key-value)型,value
1. Redis 数据结构介绍 Redis 是一个 key-value 的数据库,key 一般是 String 类型,但 value 类型多种多样,下面就举了几个例子: value 类型 示例 Str
1. 什么是缓存 缓存(Cache) 就是数据交换的缓冲区,是存贮数据的临时地方,一般读写性能较高。 缓存的作用: 降低后端负载 提高读写效率,降低响应时间 缓存的成本: 数据一致性成本 代码维护成本
我有一份记录 list 。对于我的每条记录,我都需要进行一些繁重的计算,因为我要在Redis中创建反向索引。为了达到到达记录,需要在管道中执行多个redis命令(sadd为100 s + set为1
我有一个三节点Redis和3节点哨兵,一切正常,所有主服务器和从属服务器都经过验证,并且哨兵配置文件已与所有Redis和哨兵节点一起更新,但是问题是当Redis主服务器关闭并且哨兵希望选举失败者时再次
我正在尝试计算Redis中存储的消息之间的响应时间。但是我不知道该怎么做。 首先,我必须像这样存储chat_messages的时间流 ZADD conversation:CONVERSATION_ID
我是一名优秀的程序员,十分优秀!