- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
社区,您能帮我理解为什么 ~3% 的消息没有在 HDFS
中结束吗?我用 JAVA
编写了一个简单的生成器来生成 1000 万条消息。
public static final String TEST_SCHEMA = "{"
+ "\"type\":\"record\","
+ "\"name\":\"myrecord\","
+ "\"fields\":["
+ " { \"name\":\"str1\", \"type\":\"string\" },"
+ " { \"name\":\"str2\", \"type\":\"string\" },"
+ " { \"name\":\"int1\", \"type\":\"int\" }"
+ "]}";
public KafkaProducerWrapper(String topic) throws UnknownHostException {
// store topic name
this.topic = topic;
// initialize kafka producer
Properties config = new Properties();
config.put("client.id", InetAddress.getLocalHost().getHostName());
config.put("bootstrap.servers", "myserver-1:9092");
config.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
config.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
config.put("schema.registry.url", "http://myserver-1:8089");
config.put("acks", "all");
producer = new KafkaProducer(config);
// parse schema
Schema.Parser parser = new Schema.Parser();
schema = parser.parse(TEST_SCHEMA);
}
public void send() {
// generate key
int key = (int) (Math.random() * 20);
// generate record
GenericData.Record r = new GenericData.Record(schema);
r.put("str1", "text" + key);
r.put("str2", "text2" + key);
r.put("int1", key);
final ProducerRecord<String, GenericRecord> record = new ProducerRecord<>(topic, "K" + key, (GenericRecord) r);
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
logger.error("Send failed for record {}", record, e);
messageErrorCounter++;
return;
}
logger.debug("Send succeeded for record {}", record);
messageCounter++;
}
});
}
public String getStats() { return "Messages sent: " + messageCounter + "/" + messageErrorCounter; }
public long getMessageCounter() {
return messageCounter + messageErrorCounter;
}
public void close() {
producer.close();
}
public static void main(String[] args) throws InterruptedException, UnknownHostException {
// initialize kafka producer
KafkaProducerWrapper kafkaProducerWrapper = new KafkaProducerWrapper("my-test-topic");
long max = 10000000L;
for (long i = 0; i < max; i++) {
kafkaProducerWrapper.send();
}
logger.info("producer-demo sent all messages");
while (kafkaProducerWrapper.getMessageCounter() < max)
{
logger.info(kafkaProducerWrapper.getStats());
Thread.sleep(2000);
}
logger.info(kafkaProducerWrapper.getStats());
kafkaProducerWrapper.close();
}
我在独立模式下使用Confluent HDFS Connector
将数据写入HDFS
。配置如下:
name=hdfs-consumer-test
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=my-test-topic
hdfs.url=hdfs://my-cluster/kafka-test
hadoop.conf.dir=/etc/hadoop/conf/
flush.size=100000
rotate.interval.ms=20000
# increase timeouts to avoid CommitFailedException
consumer.session.timeout.ms=300000
consumer.request.timeout.ms=310000
heartbeat.interval.ms= 60000
session.timeout.ms= 100000
连接器将数据写入 HDFS,但在等待 20000 毫秒后(由于 rotate.interval.ms
)并未收到所有消息。
scala> spark.read.avro("/kafka-test/topics/my-test-topic/partition=*/my-test-topic*")
.count()
res0: Long = 9749015
知道这种行为的原因是什么吗?我的错误在哪里?我正在使用 Confluent 3.0.1/Kafka 10.0.0.1。
最佳答案
您是否看到最后几条消息没有移动到 HDFS?如果是这样,您可能遇到了此处描述的问题 https://github.com/confluentinc/kafka-connect-hdfs/pull/100
尝试在 rotate.interval.ms 过期后向主题发送一条消息,以验证这是否是您遇到的问题。如果您需要根据时间轮换,升级以获取修复可能是个好主意。
关于hadoop - Confluent HDFS 连接器正在丢失消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40879750/
我已经在 https://docs.confluent.io/current/quickstart/ce-docker-quickstart.html 的帮助下使用 docker 在我的 Window
说明页面显示为:- 安装 Confluent Cloud CLI运行此命令以安装 Confluent Cloud CLI。curl -L --http1.1 https://cnfl.io/cclou
以下内容基于此处的快速入门指南:http://docs.confluent.io/current/cp-docker-images/docs/quickstart.html 在那里,他们在自己的 do
我们计划将 AWS MSK 服务用于 Managed Kafka 和 Schema Registry 以及来自 Confluent 的 Kafka Connect 服务来运行我们的连接器(Elasti
我正在使用 Confluent 管理的 Kafka 集群、Schema Registry 服务并尝试在 Flink 作业中处理 Debezium 消息。该作业配置为使用 Table & SQL Con
confluent control center 没有启动。 我执行了以下命令来启动 Confluent 平台 Zookeeper 开始(1 号航站楼) Kafka 启动(2 号航站楼) Schema
目前,我将Maven与io.fabric8 docker-maven-plugin一起使用,以自动启动Kafka和ZooKeeper。这是我当前的配置,可以正常工作: 2181 12
应用概述: 接受来自客户端的日志的提取服务。处理日志,然后将其推送到Kafka。由于这些日志非常重要,我无法承受同样的损失,因此我决定采用相同的交易记录。 文档状态: 通过设置生产者实例来配置交易 t
我正在寻找安装融合模式注册表的选项,是否可以单独下载和安装注册表并使其与现有的 kafka 设置一起使用? 谢谢 最佳答案 假设您已经运行了 Zookeeper/Kafka,您可以通过运行以下命令使用
我想将 kafka 日志文件移动到 hadoop 日志文件。所以我遵循 HDFS 连接器配置 /quickstart-hdfs.properties name=hdfs-sink connector.
社区,您能帮我理解为什么 ~3% 的消息没有在 HDFS 中结束吗?我用 JAVA 编写了一个简单的生成器来生成 1000 万条消息。 public static final String TEST_
我是 kafka 的新手并且很流畅。我写了一个与 https://www.confluent.fr/blog/schema-registry-avro-in-spring-boot-applicati
有人能帮我理解融合控制中心时间线上的红色条纹是什么意思吗? 最佳答案 红色条纹表示在处理监控数据时遇到了丢失或重复的拦截器记录。文档描述了这种行为: It’s also possible for me
我正在使用 confluent 连接我的数据库和 ES,因为出现异常: org.apache.kafka.connect.errors.DataException: STRUCT is not sup
我正在使用 confluentinc/cp-kafka Docker 镜像在本地运行 Kafka,我正在设置以下日志记录容器环境变量: KAFKA_LOG4J_ROOT_LOGLEVEL: ERROR
我正在查看 Apache Kafka v2.0.0,我无法弄清楚它支持哪个版本的 java(OpenJDK 或 Oracle JDK 的新生命周期)?到目前为止,我看到了相互矛盾的文档。例如:在 Ap
我正在运行 AWS linux 服务器 (centOS 7.x) 中试用开源 Confluent 平台。在以 root 身份在 oss(开源版本)版本上执行 yum install 后,可以很直接地使
我尝试在以下位置运行 kafka-streams 示例:https://github.com/confluentinc/examples/tree/master/kafka-streams 分支“ka
我想问一下我在Mac OS Mojave上安装confluent golang kafka客户端的问题。 为什么安装golang kafka客户端时总是报错 go get -u github.com/
我正在使用 python 3.7 和 confluent-kafka。 以下是我用来轮询 kafka 服务器并读取消息的伪代码。 while True: MSG
我是一名优秀的程序员,十分优秀!