- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试使用 Java 在 Eclipse 中创建 KStream 应用程序。现在我指的是互联网上可用的 KStreams 字数统计程序并对其进行修改。
我想要的是,我从输入主题读取的数据应该写入文件,而不是写入另一个输出主题。
但是当我尝试将 KStream/KTable 打印到本地文件时,我在输出文件中收到以下条目:
org.apache.kafka.streams.kstream.internals.KStreamImpl@4c203ea1
如何实现将 KStream 的输出重定向到文件?
下面是代码:
package KStreamDemo.kafkatest;
package org.apache.kafka.streams.examples.wordcount;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueMapper;
import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class TemperatureDemo {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "34.73.184.104:9092");
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
System.out.println("#1###################################################################################################################################################################################");
// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
// Note: To re-run the demo, you need to use the offset reset tool:
// https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
StreamsBuilder builder = new StreamsBuilder();
System.out.println("#2###################################################################################################################################################################################");
KStream<String, String> source = builder.stream("iot-temperature");
System.out.println("#5###################################################################################################################################################################################");
KTable<String, Long> counts = source
.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
return Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
}
})
.groupBy(new KeyValueMapper<String, String, String>() {
@Override
public String apply(String key, String value) {
return value;
}
})
.count();
System.out.println("#3###################################################################################################################################################################################");
System.out.println("OUTPUT:"+ counts);
System.out.println("#4###################################################################################################################################################################################");
// need to override value serde to Long type
counts.toStream().to("iot-temperature-max", Produced.with(Serdes.String(), Serdes.Long()));
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
final CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
}
最佳答案
这不正确
System.out.println("OUTPUT:"+ counts);
您需要执行counts.foreach
,然后将消息打印到文件中。
Print Kafka Stream Input out to console? (只需更新以写入文件即可)
<小时/>但是,可能更好地将流写到一个主题。并使用 Kafka Connect 写入文件。这是更符合行业标准的模式。鼓励 Kafka Streams 仅在 Kafka 内的主题之间移动数据,而不是与外部系统(或文件系统)集成
使用所需的主题信息编辑 connect-file-sink.properties
,然后
bin/connect-standalone config/connect-file-sink.properties
关于java - 如何使用 KStreams 将 Kafka 主题的数据写入文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55066840/
我正在尝试在 KStream-KStream 之间执行内部连接。我观察到当来自两个 KStreams 的消息都具有复合键(例如具有许多属性的 java pojo)时,连接不起作用,即使使用了 pojo
在 @StreamListener 的处理方法中,我将 school KStream 映射到 person KStream,并通过 .through() 方法填充主题“person”,我从中生成一个
我想用 KStream 接口(interface)批处理消息。 我有一个带有键/值的流 我试图在一个翻滚的窗口中收集它们,然后我想立即处理整个窗口。 builder.stream(longSerde,
希望有人知道这一点或者能给我指出正确的方向...... 我有一个通过 API REST 请求创建的数据主题。 REST 请求中收到的字段之一是记录 EventTime 的时间戳。这些记录将生成到 Ka
我遇到了基于KStreams的应用程序的问题:它将运行一次,当我停止/重新启动时,它会“卡住”并且不会再继续,直到我删除它所具有的各种主题创建的。这种情况不会每次发生,但往往会发生。 通常,当我将新版
我正在使用 KStream.to("outputtopic"); 编写输出主题在 apache 文档中提到它将自动创建传递给 to() 的主题。如何使用来自该主题的消息? 我可以使用 consumer
我的流是键/值对,我想将其作为“原始”并通过 60 秒聚合保存到数据库中。最初我是这样做的: ->foreach
我想在 Kafka 中使用 Java KStream 来过滤掉所有超过特定值的值。值以 JSON 格式交换,例如: ConsumerRecord(topic=u'test', partition=0,
由于我的基于 KStream 的应用程序不遵循传统的 Kafka 消费者路线,我应该如何跟踪消费者延迟?通常我会使用 ConsumerOffsetChecker (或类似的东西)但它需要一个消费者组名
我正在使用 Kafka Streams 开发 PoC。现在我需要获取流消费者中的偏移值,并使用它为每条消息生成一个唯一的键 (topic-offset)->hash。原因是:生产者是系统日志,只有少数
我创建了一个 Kafka 主题并向其推送了一条消息。 所以 bin/kafka-console-consumer --bootstrap-server abc.xyz.com:9092 --topic
我想创建一个基于 Kafka 流的应用程序,它处理一个主题并以大小为 X(即 50)的批处理获取消息,但如果流的流量较低,则在 Y 秒内(即5). 因此,我没有一条一条地处理消息,而是处理一个 Lis
有没有一种方法可以进行类似分支的操作,但将记录放在谓词计算结果为真的每个输出流中? Brach 将记录放入第一个匹配项(文档:在第一个匹配项中,一条记录被放置到一个且只有一个输出流中)。 最佳答案
我正在使用 Spring 云流,并且想稍微摆弄一下 KStreams/KTables。 我正在寻找从标准 Kafka 主题将其转变为流的方法。 我已经在 KSQL 中完成了此操作,但我试图弄清楚是否有
有没有办法计算 Java/scala 应用程序中 KTable/KStream 随着时间的推移大约将使用多少堆(或任何其他)内存? 我有一些具体的假设,我想知道它们是否正确: Kafka 流仅使用内部
有没有办法进行类似分支的操作,但将记录放置在谓词评估为 true 的每个输出流中? Brach 将记录放置到第一个匹配项(文档:在第一个匹配项上将记录放置到一个且仅一个输出流)。 最佳答案 您可以
似乎我的基于 KStream 的应用程序已经堆积了许多 GB 的文件(.sst、Log.old. 等)。 这些会自行清理还是我需要留意?设置一些参数来剔除它们? 最佳答案 关于这些本地/临时文件:其中
我希望将来自 KStream 的窗口化批处理输出组合在一起,并将它们写入辅助存储。 我期待看到 .punctuate() 大约每 30 秒被调用一次。我得到的反而被保存了here . (原文件几千行)
我读了,但我不能理解太多。我读到我可以使用 KTable 而不是日志压缩。或者它有更多的功能。但是,我找不到这方面的好例子。我也无法在解释工作逻辑的好来源中看到它。你能解释一个 ktable 和 ks
我有以下拓扑: 创建状态存储 根据 SOME_CONDITION 过滤记录,将其值映射到新实体,最后将这些记录发布到另一个主题 STATIONS_LOW_CAPACITY_TOPIC 但是我在 STA
我是一名优秀的程序员,十分优秀!