gpt4 book ai didi

docker - 在确定分区位置之前 60000 毫秒的 Kafka 客户端超时已过期

转载 作者:行者123 更新时间:2023-12-04 21:30:06 25 4
gpt4 key购买 nike

我正在尝试将 Flink 连接到 Kafka 消费者

我正在使用 Docker Compose 构建 4 个容器 zookeeper、kafka、Flink JobManager 和 Flink TaskManager。

对于 zookeeper 和 Kafka,我使用 wurstmeister 图像,对于 Flink,我使用官方图像。

docker-compose.yml

version: '3.1'
services:
zookeeper:
image: wurstmeister/zookeeper:3.4.6
hostname: zookeeper
expose:
- "2181"
ports:
- "2181:2181"

kafka:
image: wurstmeister/kafka:2.11-2.0.0
depends_on:
- zookeeper
ports:
- "9092:9092"
hostname: kafka
links:
- zookeeper
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_PORT: 9092
KAFKA_CREATE_TOPICS: 'pipeline:1:1:compact'

jobmanager:
build: ./flink_pipeline
depends_on:
- kafka
links:
- zookeeper
- kafka
expose:
- "6123"
ports:
- "8081:8081"
command: jobmanager
environment:
JOB_MANAGER_RPC_ADDRESS: jobmanager
BOOTSTRAP_SERVER: kafka:9092
ZOOKEEPER: zookeeper:2181

taskmanager:
image: flink
expose:
- "6121"
- "6122"
links:
- jobmanager
- zookeeper
- kafka
depends_on:
- jobmanager
command: taskmanager
# links:
# - "jobmanager:jobmanager"
environment:
JOB_MANAGER_RPC_ADDRESS: jobmanager

当我向 Dispatcher 提交一个简单的作业时,作业失败并显示以下错误:
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition pipeline-0 could be determined

我的工作代码是:
public class Main {
public static void main( String[] args ) throws Exception
{
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// get input data by connecting to the socket
Properties properties = new Properties();
String bootstrapServer = System.getenv("BOOTSTRAP_SERVER");
String zookeeperServer = System.getenv("ZOOKEEPER");

if (bootstrapServer == null) {
System.exit(1);
}

properties.setProperty("zookeeper", zookeeperServer);
properties.setProperty("bootstrap.servers", bootstrapServer);
properties.setProperty("group.id", "pipeline-analysis");

FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<String>("pipeline", new SimpleStringSchema(), properties);
// kafkaConsumer.setStartFromGroupOffsets();
kafkaConsumer.setStartFromLatest();

DataStream<String> stream = env.addSource(kafkaConsumer);

// Defining Pipeline here

// Printing Outputs
stream.print();

env.execute("Stream Pipeline");
}
}

最佳答案

我知道我迟到了,但我犯了完全相同的错误。就我而言,我没有正确设置 TopicPartitions。我的主题有 2 个分区,我的生产者正在生成消息就好了,但它是 Spark 流应用程序,作为我的消费者,在 60 秒后提示相同的错误并没有真正启动和放弃。

我有错误的代码 -

List<TopicPartition> topicPartitionList = Arrays.asList(new topicPartition(topicName, Integer.parseInt(numPartition)));

正确代码 -
List<TopicPartition> topicPartitionList = new ArrayList<TopicPartition>();

for (int i = 0; i < Integer.parseInt(numPartitions); i++) {
topicPartitionList.add(new TopicPartition(topicName, i));
}

关于docker - 在确定分区位置之前 60000 毫秒的 Kafka 客户端超时已过期,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54853195/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com