gpt4 book ai didi

hadoop - 如何使用 Kafka 从文件中读取新条目

转载 作者:可可西里 更新时间:2023-11-01 16:45:14 25 4
gpt4 key购买 nike

这是我从文件中读取的 Kafka 代码:

public void run() throws ClassNotFoundException, FileNotFoundException, IOException, ParseException{
Properties prop = new Properties();
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

producer = new KafkaProducer<String, String>(prop);

String currentLine = null;
reader = new BufferedReader(new FileReader(filePath));
while((currentLine = reader.readLine()) != null){
System.out.println("---------------------");
System.out.println(currentLine);
System.out.println("---------------------");
ProducerRecord<String, String> message = new ProducerRecord<String, String>(topicName, currentLine);
producer.send(message);
}
reader.close();
producer.close();
}

但是这段代码只运行了一个,这是显而易见的。
但是我有一个文件,它每小时不断更新新行。
如果我使用上面的代码,这将再次运行整个文件。如何只对添加到文件中的新行重新运行。

最佳答案

答案应该取决于文件的管理方式。如果它可以旋转并且您需要处理恢复,那么 Flume 或 tail -F(Flume 1.6 无论如何都是这样实现的)对您不起作用。另外,您使用的是什么操作系统?
另一种方法可能是 tail with logstash 或 file beats 并写入 Kafka。如果你喜欢 python,看看 tailcher ( https://github.com/thanos/tailchaser )

关于hadoop - 如何使用 Kafka 从文件中读取新条目,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37367410/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com