gpt4 book ai didi

Java - Spring Boot - Reactive Redis Stream (TEXT_EVENT_STREAM_VALUE)

转载 作者:行者123 更新时间:2023-12-04 03:42:39 26 4
gpt4 key购买 nike

我想编写一个始终显示 Redis 流(响应式(Reactive))的最新消息的端点。


实体看起来像这样 {'key' : 'some_key', 'status' : 'some_string'}

所以我想得到以下结果:

  1. 页面被调用,内容例如显示一个实体:
{'key' : 'abc', 'status' : 'status_A'}

页面关闭

  1. 然后将一个新实体添加到流中
XADD mystream * key abc status statusB
  1. 现在我更愿意在不更新选项卡的情况下查看流中的每个项目
{'key' : 'abc', 'status' : 'status_A'}
{'key' : 'abc', 'status' : 'status_B'}

当我尝试模拟这种行为时,它起作用了,我得到了预期的输出。
    @GetMapping(value="/light/live/mock", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Flux<Light> liveLightMock() {
List<Light> test = Arrays.asList(new Light("key", "on") , new Light("key", "off"),
new Light("key", "on") , new Light("key", "off"),
new Light("key", "on") , new Light("key", "off"),
new Light("key", "on") , new Light("key", "off"),
new Light("key", "on") , new Light("key", "off"));
return Flux.fromIterable(test).delayElements(Duration.ofMillis(500));
}

列表的各个元素一个接一个地显示,项目之间有 500 毫秒的延迟。

但是,当我尝试访问 Redis 而不是模拟变体时,它不再有效。我尝试依次测试部分功能。为了让我的想法首先起作用,保存 (1) 功能必须起作用,如果保存功能起作用,则显示没有重新激活功能的旧记录必须起作用 (2) 最后但并非最不重要的是,如果两者都起作用,我有点需要让重新激活部分继续进行。

也许你们可以帮助我让 Reactive Part 正常工作。我为此工作了好几天,但没有得到任何改进。

伙计们:)

测试 1) - 保存功能(精简版)

看起来正常。

    @GetMapping(value="/light/create", produces = MediaType.APPLICATION_JSON_VALUE)
@ResponseBody
public Flux<Light> createTestLight() {
String status = (++statusIdx % 2 == 0) ? "on" : "off";
Light light = new Light(Consts.LIGHT_ID, status);
return LightRepository.save(light).flux();
}
    @Override
public Mono<Light> save(Light light) {
Map<String, String> lightMap = new HashMap<>();
lightMap.put("key", light.getKey());
lightMap.put("status", light.getStatus());

return operations.opsForStream(redisSerializationContext)
.add("mystream", lightMap)
.map(__ -> light);
}

测试 2) - 加载/阅读功能(精简版)

似乎在工作,但不是 reaktiv -> 我在 WebView 打开时添加了一个新实体,该 View 显示了所有项目但在我添加新项目后没有更新。重新加载后我看到了每一个项目

如何让 getLights 返回与订阅流的 TEXT_EVENT_STREAM_VALUE 一起工作的内容?

    @Override
public Flux<Object> getLights() {
ReadOffset readOffset = ReadOffset.from("0");
StreamOffset<String> offset = StreamOffset.fromStart("mystream"); //fromStart or Latest

Function<? super MapRecord<String, Object, Object>, ? extends Publisher<?>> mapFunc = entries -> {
Map<Object, Object> kvp = entries.getValue();
String key = (String) kvp.get("key");
String status = (String) kvp.get("status");
Light light = new Light(key, status);
return Flux.just(light);
};

return operations.opsForStream()
.read(offset)
.flatMap(mapFunc);
}
    @GetMapping(value="/light/live", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Flux<Object> lightLive() {
return LightRepository.getLights();
}

测试 1) - 保存功能(长版)

端点和保存函数是不同类的一部分。字符串状态 = (++statusIdx % 2 == 0) ? "on": "off"; 触发器状态从开到关,到开,到关,...

    @GetMapping(value="/light/create", produces = MediaType.APPLICATION_JSON_VALUE)
@ResponseBody
public Flux<Light> createTestLight() {
String status = (++statusIdx % 2 == 0) ? "on" : "off";
Light light = new Light(Consts.LIGHT_ID, status);
return LightRepository.save(light).flux();
}
    @Override
public Mono<Light> save(Light light) {
Map<String, String> lightMap = new HashMap<>();
lightMap.put("key", light.getKey());
lightMap.put("status", light.getStatus());

return operations.opsForStream(redisSerializationContext)
.add("mystream", lightMap)
.map(__ -> light);
}

验证函数 i

  1. 删除流,清空它
127.0.0.1:6379> del mystream
(integer) 1
127.0.0.1:6379> XLEN myStream
(integer) 0

两次调用创建端点/light/create我希望 Stream 现在有两个 Item,on with status = on,一个 with off

127.0.0.1:6379> XLEN mystream
(integer) 2
127.0.0.1:6379> xread STREAMS mystream 0-0
1) 1) "mystream"
2) 1) 1) "1610456865517-0"
2) 1) "key"
2) "light_1"
3) "status"
4) "off"
2) 1) "1610456866708-0"
2) 1) "key"
2) "light_1"
3) "status"
4) "on"

看起来保存部分正在工作。

测试 2) - 加载/阅读功能(长版)

似乎在工作,但不是 reaktiv -> 我添加了一个新实体并且页面更新了它的值

    @Override
public Flux<Object> getLights() {
ReadOffset readOffset = ReadOffset.from("0");
StreamOffset<String> offset = StreamOffset.fromStart("mystream"); //fromStart or Latest

Function<? super MapRecord<String, Object, Object>, ? extends Publisher<?>> mapFunc = entries -> {
Map<Object, Object> kvp = entries.getValue();
String key = (String) kvp.get("key");
String status = (String) kvp.get("status");
Light light = new Light(key, status);
return Flux.just(light);
};

return operations.opsForStream()
.read(offset)
.flatMap(mapFunc);
}
    @GetMapping(value="/light/live", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Flux<Object> lightLive() {
return LightRepository.getLights();
}
  1. 调用 /light/live -> 我应该有 N 个条目-> 如果我能看到条目,则正常显示正常工作(非 react 性)
  2. 调用 /light/create 两次 -> 现场少数人应该添加了 2 个条目 -> N+2条目
  3. 等待 1 分钟以防万一
  4. View 应显示 N+2 个 Reactiv 部分的条目才能正常工作
  5. 从 1 (/light/live) 刷新 View ,如果 Reactiv 有效,应该仍显示相同的数量

显示信息有效 (1),(2) 的添加部分有效,每个终端检查,4) 无效

因此显示器正在工作,但它没有反应

刷新浏览器后 (5) 我得到了预期的 N+2 条目 - 所以 (2) 也正常工作


最佳答案

这里有一个误解,响应式地从 Redis 读取并不意味着您已经订阅了新事件。

Reactive 不会为您提供实时更新,它会调用 Redis 一次并显示那里的任何内容。因此,即使您等待一两天,UI/控制台中也不会发生任何变化,您仍然会看到 N 个条目。

您需要使用 Redis PUB/SUB,或者您需要重复调​​用 Redis 以获取最新更新。

编辑:

一个可行的解决方案..

  private List<Light> reactiveReadToList() {
log.info("reactiveReadToList");
return read().collectList().block();
}

private Flux<Light> read() {
StreamOffset<Object> offset = StreamOffset.fromStart("mystream");
return redisTemplate
.opsForStream()
.read(offset)
.flatMap(
e -> {
Map<Object, Object> kvp = e.getValue();
String key = (String) kvp.get("key");
String id = (String) kvp.get("id");
String status = (String) kvp.get("status");
Light light = new Light(id, key, status);
log.info("{}", light);
return Flux.just(light);
});
}

读取器使用 react 模板按需从 Redis 读取数据并使用偏移量将其发送到客户端,它一次只发送一个事件,我们可以发送所有事件。

  @RequiredArgsConstructor
class DataReader {
@NonNull FluxSink<Light> sink;
private List<Light> readLights = null;
private int currentOffset = 0;
void register() {
readLights = reactiveReadToList();
sink.onRequest(
e -> {
long demand = sink.requestedFromDownstream();
for (int i = 0; i < demand && currentOffset < readLights.size(); i++, currentOffset++) {
sink.next(readLights.get(currentOffset));
}
if (currentOffset == readLights.size()) {
readLights = reactiveReadToList();
currentOffset = 0;
}
});
}
}

使用DataReader产生通量的方法

  public Flux<Light> getLights() {
return Flux.create(e -> new DataReader(e).register());
}

现在我们在接收器上添加了一个onRequest方法来处理客户端的需求,它根据需要从Redis流中读取数据并将其发送给客户端。

这看起来非常占用 CPU,如果没有更多新事件,也许我们应该延迟调用,如果我们发现没有新元素,也许可以在 register 方法中添加 sleep 调用流。

关于Java - Spring Boot - Reactive Redis Stream (TEXT_EVENT_STREAM_VALUE),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65685159/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com