gpt4 book ai didi

go - Golang Consumer连接Kafka后延迟接收Kafka消息

转载 作者:数据小太阳 更新时间:2023-10-29 03:21:08 26 4
gpt4 key购买 nike

我是 Golang 和 Kafa 的新手,所以这似乎是一个愚蠢的问题。

在我的 Kafka 消费者首次连接到 Kafka 服务器后,为什么在与 Kafka 服务器建立连接和接收第一条消息之间存在延迟(约 20 秒)?

它在 consumer.Messages() 之前打印一条消息,并为收到的每条消息打印另一条消息。大约 20 秒的延迟在第一个 fmt.Println 和第二个 fmt.Println 之间。

package main

import (
"fmt"

"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
)

func main() {

// Create the consumer and listen for new messages
consumer := createConsumer()

// Create a signal channel to know when we are done
done := make(chan bool)

// Start processing messages
go func() {
fmt.Println("Start consuming Kafka messages")
for msg := range consumer.Messages() {
s := string(msg.Value[:])
fmt.Println("Msg: ", s)
}
}()

<-done

}

func createConsumer() *cluster.Consumer {
// Define our configuration to the cluster
config := cluster.NewConfig()
config.Consumer.Return.Errors = false
config.Group.Return.Notifications = false
config.Consumer.Offsets.Initial = sarama.OffsetOldest

// Create the consumer
brokers := []string{"127.0.0.1:9092"}
topics := []string{"orders"}
consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config)
if err != nil {
log.Fatal("Unable to connect consumer to Kafka")
}
go handleErrors(consumer)
go handleNotifications(consumer)
return consumer
}

docker-compose.yml

version: '2'
services:
zookeeper:
image: "confluentinc/cp-zookeeper:5.0.1"
hostname: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

broker-1:
image: "confluentinc/cp-enterprise-kafka:5.0.1"
hostname: broker-1
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_BROKER_RACK: rack-a
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://127.0.0.1:9092'
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_DELETE_TOPIC_ENABLE: "true"
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: 'broker-1'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker-1:9092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
KAFKA_CREATE_TOPICS: "orders:1:1"

最佳答案

在我的 Kafka 消费者首次连接到 Kafka 服务器后,为什么在与 Kafka 服务器建立连接和接收第一条消息之间存在延迟(约 20 秒)?

There can not be that much delay because consumer used message channel which receive messages from kafka. As soon as the message is available in kafka queue it will be sent to message channel which consumer can receive.

为您提供代码实现:-

for msg := range consumer.Messages() {
s := string(msg.Value[:])
fmt.Println("Msg: ", s)
}

consumer.Messages() 返回一个 channel ,for 在 channel 上循环,只要 channel 内有消息就返回一条消息。

引用这个问题How to create a kafka consumer group in Golang?使用 sarama 连接。您不需要 sarama-cluster 进行连接。

关于go - Golang Consumer连接Kafka后延迟接收Kafka消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53671153/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com