gpt4 book ai didi

java - 使用Kafka翻滚窗口查询时返回空数据

转载 作者:太空宇宙 更新时间:2023-11-04 09:37:35 24 4
gpt4 key购买 nike

我正在尝试查询状态存储以在 5 分钟的窗口内获取数据。为此,我使用 tumbling window 。添加了REST来查询数据。我有stream A它消耗来自 topic1 的数据并执行一些转换并将键值输出到 topic2 。现在在stream B我正在 topic2 上进行翻滚窗口操作数据。当我运行代码并使用 REST 进行查询时,我在浏览器上看到空数据。我可以看到状态存储中的数据在流动。

我观察到的是,而不是 topic2stream A 获取数据,我使用了一个生产者类将数据注入(inject)到 topic2并能够从浏览器查询数据。但是当 topic2正在从 stream A 获取数据,我得到空数据。

这是我的stream A代码:

public static void main(String[] args) {    

final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("topic1");
KStream<String, String> output = source
.map((k,v)->
{
Map<String, Object> Fields = new LinkedHashMap<>();

Fields.put("FNAME","ABC");
Fields.put("LNAME","XYZ");

Map<String, Object> nFields = new LinkedHashMap<>();
nFields.put("ADDRESS1","HY");
nFields.put("ADDRESS2","BA");
nFields.put("addF",Fields);

Map<String, Object> eve = new LinkedHashMap<>();
eve.put("nFields", nFields);

Map<String, Object> fevent = new LinkedHashMap<>();
fevent.put("eve", eve);
LinkedHashMap<String, Object> newMap = new LinkedHashMap<>(fevent);

return new KeyValue<>("JAY1234",newMap.toString());
});

output.to("topic2");

}

这是我的stream B代码(滚动窗口操作发生的地方):

public static void main(String[] args) {

final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> eventStream = builder.stream("topic2");

eventStream.groupByKey()
.windowedBy(TimeWindows.of(300000))
.reduce((v1, v2) -> v1 + ";" + v2, Materialized.as("TumblingWindowPoc"));

final Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
}

休息代码:

@GET()
@Path("/{storeName}/{key}")
@Produces(MediaType.APPLICATION_JSON)
public List<KeyValue<String, String>> windowedByKey(@PathParam("storeName") final String storeName,
@PathParam("key") final String key) {

final ReadOnlyWindowStore<String, String> store = streams.store(storeName,
QueryableStoreTypes.<String, String>windowStore());
if (store == null) {
throw new NotFoundException(); }

long timeTo = System.currentTimeMillis();
long timeFrom = timeTo - 30000;
final WindowStoreIterator<String> results = store.fetch(key, timeFrom, timeTo);

final List<KeyValue<String,String>> windowResults = new ArrayList<>();
while (results.hasNext()) {
final KeyValue<Long, String> next = results.next();
windowResults.add(new KeyValue<String,String>(key + "@" + next.key, next.value));
}
return windowResults;
}

这就是我的键值数据的样子:

JAY1234 {eve = {nFields = {ADDRESS1 = HY,ADDRESS2 = BA,Fields = {FNAME = ABC,LNAME = XYZ,}}}}

使用 REST 查询时我应该能够获取数据。任何帮助是极大的赞赏。谢谢!

最佳答案

获取窗口的 timeFrom 应该在窗口启动之前。所以如果你想要最后30秒的数据,你可以减去获取的窗口时长,比如timeTo - 30000 - 300000,然后从整个窗口数据中过滤掉需要事件的事件

关于java - 使用Kafka翻滚窗口查询时返回空数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56315993/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com