- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我想编写一个始终显示 Redis 流(响应式(Reactive))的最新消息的端点。
实体看起来像这样 {'key' : 'some_key', 'status' : 'some_string'}
。
所以我想得到以下结果:
{'key' : 'abc', 'status' : 'status_A'}
页面未关闭
XADD mystream * key abc status statusB
{'key' : 'abc', 'status' : 'status_A'}
{'key' : 'abc', 'status' : 'status_B'}
@GetMapping(value="/light/live/mock", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Flux<Light> liveLightMock() {
List<Light> test = Arrays.asList(new Light("key", "on") , new Light("key", "off"),
new Light("key", "on") , new Light("key", "off"),
new Light("key", "on") , new Light("key", "off"),
new Light("key", "on") , new Light("key", "off"),
new Light("key", "on") , new Light("key", "off"));
return Flux.fromIterable(test).delayElements(Duration.ofMillis(500));
}
列表的各个元素一个接一个地显示,项目之间有 500 毫秒的延迟。
但是,当我尝试访问 Redis 而不是模拟变体时,它不再有效。我尝试依次测试部分功能。为了让我的想法首先起作用,保存 (1) 功能必须起作用,如果保存功能起作用,则显示没有重新激活功能的旧记录必须起作用 (2) 最后但并非最不重要的是,如果两者都起作用,我有点需要让重新激活部分继续进行。
也许你们可以帮助我让 Reactive Part 正常工作。我为此工作了好几天,但没有得到任何改进。
伙计们:)
看起来正常。
@GetMapping(value="/light/create", produces = MediaType.APPLICATION_JSON_VALUE)
@ResponseBody
public Flux<Light> createTestLight() {
String status = (++statusIdx % 2 == 0) ? "on" : "off";
Light light = new Light(Consts.LIGHT_ID, status);
return LightRepository.save(light).flux();
}
@Override
public Mono<Light> save(Light light) {
Map<String, String> lightMap = new HashMap<>();
lightMap.put("key", light.getKey());
lightMap.put("status", light.getStatus());
return operations.opsForStream(redisSerializationContext)
.add("mystream", lightMap)
.map(__ -> light);
}
似乎在工作,但不是 reaktiv -> 我在 WebView 打开时添加了一个新实体,该 View 显示了所有项目但在我添加新项目后没有更新。重新加载后我看到了每一个项目
如何让 getLights
返回与订阅流的 TEXT_EVENT_STREAM_VALUE
一起工作的内容?
@Override
public Flux<Object> getLights() {
ReadOffset readOffset = ReadOffset.from("0");
StreamOffset<String> offset = StreamOffset.fromStart("mystream"); //fromStart or Latest
Function<? super MapRecord<String, Object, Object>, ? extends Publisher<?>> mapFunc = entries -> {
Map<Object, Object> kvp = entries.getValue();
String key = (String) kvp.get("key");
String status = (String) kvp.get("status");
Light light = new Light(key, status);
return Flux.just(light);
};
return operations.opsForStream()
.read(offset)
.flatMap(mapFunc);
}
@GetMapping(value="/light/live", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Flux<Object> lightLive() {
return LightRepository.getLights();
}
端点和保存函数是不同类的一部分。字符串状态 = (++statusIdx % 2 == 0) ? "on": "off";
触发器状态从开到关,到开,到关,...
@GetMapping(value="/light/create", produces = MediaType.APPLICATION_JSON_VALUE)
@ResponseBody
public Flux<Light> createTestLight() {
String status = (++statusIdx % 2 == 0) ? "on" : "off";
Light light = new Light(Consts.LIGHT_ID, status);
return LightRepository.save(light).flux();
}
@Override
public Mono<Light> save(Light light) {
Map<String, String> lightMap = new HashMap<>();
lightMap.put("key", light.getKey());
lightMap.put("status", light.getStatus());
return operations.opsForStream(redisSerializationContext)
.add("mystream", lightMap)
.map(__ -> light);
}
验证函数 i
127.0.0.1:6379> del mystream
(integer) 1
127.0.0.1:6379> XLEN myStream
(integer) 0
两次调用创建端点/light/create
我希望 Stream 现在有两个 Item,on with status = on,一个 with off
127.0.0.1:6379> XLEN mystream
(integer) 2
127.0.0.1:6379> xread STREAMS mystream 0-0
1) 1) "mystream"
2) 1) 1) "1610456865517-0"
2) 1) "key"
2) "light_1"
3) "status"
4) "off"
2) 1) "1610456866708-0"
2) 1) "key"
2) "light_1"
3) "status"
4) "on"
看起来保存部分正在工作。
似乎在工作,但不是 reaktiv -> 我添加了一个新实体并且页面更新了它的值
@Override
public Flux<Object> getLights() {
ReadOffset readOffset = ReadOffset.from("0");
StreamOffset<String> offset = StreamOffset.fromStart("mystream"); //fromStart or Latest
Function<? super MapRecord<String, Object, Object>, ? extends Publisher<?>> mapFunc = entries -> {
Map<Object, Object> kvp = entries.getValue();
String key = (String) kvp.get("key");
String status = (String) kvp.get("status");
Light light = new Light(key, status);
return Flux.just(light);
};
return operations.opsForStream()
.read(offset)
.flatMap(mapFunc);
}
@GetMapping(value="/light/live", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Flux<Object> lightLive() {
return LightRepository.getLights();
}
/light/live
-> 我应该有 N
个条目-> 如果我能看到条目,则正常显示正常工作(非 react 性)/light/create
两次 -> 现场少数人应该添加了 2 个条目 -> N+2
条目N+2
个 Reactiv 部分的条目才能正常工作/light/live
) 刷新 View ,如果 Reactiv 有效,应该仍显示相同的数量显示信息有效 (1),(2) 的添加部分有效,每个终端检查,4) 无效
因此显示器正在工作,但它没有反应
刷新浏览器后 (5) 我得到了预期的 N+2
条目 - 所以 (2) 也正常工作
最佳答案
这里有一个误解,响应式地从 Redis 读取并不意味着您已经订阅了新事件。
Reactive 不会为您提供实时更新,它会调用 Redis 一次并显示那里的任何内容。因此,即使您等待一两天,UI/控制台中也不会发生任何变化,您仍然会看到 N 个条目。
您需要使用 Redis PUB/SUB,或者您需要重复调用 Redis 以获取最新更新。
编辑:
一个可行的解决方案..
private List<Light> reactiveReadToList() {
log.info("reactiveReadToList");
return read().collectList().block();
}
private Flux<Light> read() {
StreamOffset<Object> offset = StreamOffset.fromStart("mystream");
return redisTemplate
.opsForStream()
.read(offset)
.flatMap(
e -> {
Map<Object, Object> kvp = e.getValue();
String key = (String) kvp.get("key");
String id = (String) kvp.get("id");
String status = (String) kvp.get("status");
Light light = new Light(id, key, status);
log.info("{}", light);
return Flux.just(light);
});
}
读取器使用 react 模板按需从 Redis 读取数据并使用偏移量将其发送到客户端,它一次只发送一个事件,我们可以发送所有事件。
@RequiredArgsConstructor
class DataReader {
@NonNull FluxSink<Light> sink;
private List<Light> readLights = null;
private int currentOffset = 0;
void register() {
readLights = reactiveReadToList();
sink.onRequest(
e -> {
long demand = sink.requestedFromDownstream();
for (int i = 0; i < demand && currentOffset < readLights.size(); i++, currentOffset++) {
sink.next(readLights.get(currentOffset));
}
if (currentOffset == readLights.size()) {
readLights = reactiveReadToList();
currentOffset = 0;
}
});
}
}
使用DataReader
产生通量的方法
public Flux<Light> getLights() {
return Flux.create(e -> new DataReader(e).register());
}
现在我们在接收器上添加了一个onRequest
方法来处理客户端的需求,它根据需要从Redis流中读取数据并将其发送给客户端。
这看起来非常占用 CPU,如果没有更多新事件,也许我们应该延迟调用,如果我们发现没有新元素,也许可以在 register
方法中添加 sleep 调用流。
关于Java - Spring Boot - Reactive Redis Stream (TEXT_EVENT_STREAM_VALUE),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65685159/
我有一个关于 Redis Pubsub 的练习,如下所示: 如果发布者发布消息但订阅者没有收到服务器崩溃。订阅者如何在重启服务器时收到该消息? 请帮帮我,谢谢! 最佳答案 在这种情况下,消息将永远消失
我们正在使用 Service Stack 的 RedisClient 的 BlockingDequeue 来保存一些数据,直到它可以被处理。调用代码看起来像 using (var client =
我有一个 Redis 服务器和多个 Redis 客户端。每个 Redis 客户端都是一个 WebSocket+HTTP 服务器,其中包括管理 WebSocket 连接。这些 WebSocket+HTT
我有多个 Redis 实例。我使用不同的端口创建了一个集群。现在我想将数据从预先存在的 redis 实例传输到集群。我知道如何将数据从一个实例传输到集群,但是当实例多于一个时,我无法做到这一点。 最佳
配置:三个redis集群分区,跨三组一主一从。当 Master 宕机时,Lettuce 会立即检测到中断并开始重试。但是,Lettuce 没有检测到关联的 slave 已经将自己提升为 master
我想根据从指定集合中检索这些键来删除 Redis 键(及其数据集),例如: HMSET id:1 password 123 category milk HMSET id:2 password 456
我正在编写一个机器人(其中包含要禁用的命令列表),用于监视 Redis。它通过执行禁用命令,例如 (rename-command ZADD "")当我重新启动我的机器人时,如果要禁用的命令列表发生变化
我的任务是为大量听众使用发布/订阅。这是来自 docs 的订阅的简化示例: r = redis.StrictRedis(...) p = r.pubsub() p.subscribe('my-firs
我一直在阅读有关使用 Redis 哨兵进行故障转移的内容。我打算有1个master+1个slave,如果master宕机超过1分钟,就把slave变成master。我知道这在 Sentinel 中是
与仅使用常规 Redis 和创建分片相比,使用 Redis 集群有哪些优势? 在我看来,Redis Cluster 更注重数据安全(让主从架构解决故障)。 最佳答案 我认为当您需要在不丢失任何数据的情
由于 Redis 以被动和主动方式使 key 过期, 有没有办法得到一个 key ,即使它的过期时间已过 (但 在 Redis 中仍然存在 )? 最佳答案 DEBUG OBJECT myKey 将返回
我想用redis lua来实现monitor命令,而不是redis-cli monitor。但我不知道怎么办。 redis.call('monitor') 不起作用。 最佳答案 您不能从 Redis
我读过 https://github.com/redisson/redisson 我发现有几个 Redis 复制设置(包括对 AWS ElastiCache 和 Azure Redis 缓存的支持)
Microsoft.AspNet.SignalR.Redis 和 StackExchange.Redis.Extensions.Core 在同一个项目中使用。前者需要StackExchange.Red
1. 认识 Redis Redis(Remote Dictionary Server)远程词典服务器,是一个基于内存的键值对型 NoSQL 数据库。 特征: 键值(key-value)型,value
1. Redis 数据结构介绍 Redis 是一个 key-value 的数据库,key 一般是 String 类型,但 value 类型多种多样,下面就举了几个例子: value 类型 示例 Str
1. 什么是缓存 缓存(Cache) 就是数据交换的缓冲区,是存贮数据的临时地方,一般读写性能较高。 缓存的作用: 降低后端负载 提高读写效率,降低响应时间 缓存的成本: 数据一致性成本 代码维护成本
我有一份记录 list 。对于我的每条记录,我都需要进行一些繁重的计算,因为我要在Redis中创建反向索引。为了达到到达记录,需要在管道中执行多个redis命令(sadd为100 s + set为1
我有一个三节点Redis和3节点哨兵,一切正常,所有主服务器和从属服务器都经过验证,并且哨兵配置文件已与所有Redis和哨兵节点一起更新,但是问题是当Redis主服务器关闭并且哨兵希望选举失败者时再次
我正在尝试计算Redis中存储的消息之间的响应时间。但是我不知道该怎么做。 首先,我必须像这样存储chat_messages的时间流 ZADD conversation:CONVERSATION_ID
我是一名优秀的程序员,十分优秀!