gpt4 book ai didi

scala - Alpakka Consumer 不消费来自通过 Docker 运行的 Kafka 的消息

转载 作者:行者123 更新时间:2023-12-02 02:39:10 24 4
gpt4 key购买 nike

我已经通过 Docker compose 运行了 Kafka 和 Zookeeper。我能够使用 Kafka 终端向主题发送/消费消息,并且能够通过 Conduktor 监控所有内容。但不幸的是,我无法使用 Alpakka 连接器通过我的 Scala 应用程序使用消息。该应用程序连接到该主题,但每当我向该主题发送消息时,都没有任何反应。

只有 Kafka 和 Zookeeper 通过 docker-compose 运行。我直接在主机上运行 Scala 消费者应用程序。

Docker 组合

version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
HOSTNAME_COMMAND: "route -n | awk '/UG[ \t]/{print $$2}'"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
depends_on:
- zookeeper

Scala 应用

object Main extends App {
implicit val actorSystem = ActorSystem()

import actorSystem.dispatcher

val kafkaConsumerSettings = ConsumerSettings(actorSystem, new StringDeserializer, new StringDeserializer)
.withGroupId("new_id")
.withCommitRefreshInterval(1.seconds)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.withBootstrapServers("localhost:9092")

Consumer
.plainSource(kafkaConsumerSettings, Subscriptions.topics("test1"))
.map(msg => msg.value())
.runWith(Sink.foreach(println)).onComplete {
case Failure(exception) => exception.printStackTrace()
case Success(value) => println("done")
}
}

应用 - 控制台输出

16:58:33.877 INFO  [akka.event.slf4j.Slf4jLogger]                     Slf4jLogger started
16:58:34.470 INFO [akka.kafka.internal.SingleSourceLogic] [1955f] Starting. StageActor Actor[akka://default/system/Materializers/StreamSupervisor-0/$$a#-591284224]
16:58:34.516 INFO [org.apache.kafka.clients.consumer.ConsumerConfig] ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = default
client.id =
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = novo_id
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

16:58:34.701 INFO [org.apache.kafka.common.utils.AppInfoParser] Kafka version: 2.4.0
16:58:34.702 INFO [org.apache.kafka.common.utils.AppInfoParser] Kafka commitId: 77a89fcf8d7fa018
16:58:34.702 INFO [org.apache.kafka.common.utils.AppInfoParser] Kafka startTimeMs: 1585256314699
16:58:34.715 INFO [org.apache.kafka.clients.consumer.KafkaConsumer] [Consumer clientId=consumer-novo_id-1, groupId=novo_id] Subscribed to topic(s): test1
16:58:35.308 INFO [org.apache.kafka.clients.Metadata] [Consumer clientId=consumer-novo_id-1, groupId=novo_id] Cluster ID: c2XBuDIJTI-gBs9guTvG

最佳答案

导出 KAFKA_ADVERTISED_LISTENERS

Describes how the host name that is advertised and can be reached by clients. The value is published to ZooKeeper for clients to use.

If using the SSL or SASL protocol, the endpoint value must specify the protocols in the following formats:

  • SSL: SSL:// or SASL_SSL://

  • SASL: SASL_PLAINTEXT:// or SASL_SSL://

KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:29092

现在您的消费者可以使用端口 29092:

.withBootstrapServers("localhost:29092")

关于scala - Alpakka Consumer 不消费来自通过 Docker 运行的 Kafka 的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60876449/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com