gpt4 book ai didi

java - Kafka Streams 不读取输入主题

转载 作者:行者123 更新时间:2023-11-30 06:29:25 27 4
gpt4 key购买 nike

我已经根据教程创建了示例 Kafka Streams 应用程序:

    public static void main(String[] args) throws Exception {
Logger log = Logger.getLogger("Name");

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordprint");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.99.100:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

final KStreamBuilder builder = new KStreamBuilder();
builder.stream("onecon_postgres").print();

final KafkaStreams streams = new KafkaStreams(builder, props);
final CountDownLatch latch = new CountDownLatch(1);

// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});

try {
streams.start();
log.info("After Start");
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}

不幸的是,这个应用程序不读取输入流。我有一个来自 PostgreSQL 的 JDBC 源连接器,它可以很好地从一个数据库传输数据(我可以在本主题中的 Kafka Connect UI 数据上看到)。

我遇到的问题是,即使我在属性 IP 中的 BOOTSTRAP_SERVERS_CONFIG 中更改了 IP 是localhost,我也不知道为什么。

[main] INFO org.apache.kafka.streams.StreamsConfig - StreamsConfig values: 
application.id = streams-linesplit
application.server =
**bootstrap.servers = [localhost:9092]**
buffered.records.per.partition = 1000
cache.max.bytes.buffering = 10485760
client.id =
commit.interval.ms = 30000
connections.max.idle.ms = 540000
default.key.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp
default.value.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
key.serde = null
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
num.standby.replicas = 0
num.stream.threads = 1
partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper
poll.ms = 100
processing.guarantee = at_least_once
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = 1
request.timeout.ms = 40000
retry.backoff.ms = 100
rocksdb.config.setter = null
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
state.cleanup.delay.ms = 600000
state.dir = /tmp/kafka-streams
timestamp.extractor = null
value.serde = null
windowstore.changelog.additional.retention.ms = 86400000
zookeeper.connect =

为了克服这个问题,我使用了 netsh 来转发流量,但我看不到这个应用程序来消耗我的流。

netsh interface portproxy add v4tov4 listenport=9092 listenaddress=127.0.0.1 connectport=9092 connectaddress=192.168.99.100

最佳答案

Unfortunately this application does not read input stream.

您的 Kafka Streams 应用程序和 Kafka 代理之间似乎存在网络问题。 “Kafka Streams 不起作用”的可能性相当小。

此外,如果您不提供更多信息,我们很难为您提供帮助:

  • 您的 Kafka 代理使用什么版本的 Kafka?
  • 您的应用程序使用哪个 Kafka (Streams) 版本?
  • 哪个操作系统?
  • 网络设置是什么?
    • 运行应用程序的计算机的 IP 地址。
    • 您的 Kafka 代理(或多个代理)在哪个 IP + 端口上监听新连接?是192.168.99.100:9092吗?
  • 您在应用程序日志中看到了什么?您是否看到 ERRORWARN 日志消息?

The problem I have is even though I have changed IP in BOOTSTRAP_SERVERS_CONFIG in Properties IP is localhost I don't know why.

我不明白 - 为什么您认为将 BOOTSTRAP_SERVERS_CONFIG 更改为 localhost:9092 会解决您原来的问题?我了解到 Kafka 代理实际上监听 192.168.99.100:9092

To overcome this I have used netsh to forward traffic but I cannot see this application to consume my stream.

端口转发很可能没有帮助。如果不更新 Kafka 代理的配置,默认情况下代理将仅在其“真实”IP + 端口上进行通信。稍微简化一下:配置为监听 192.168.99.100:9092 的代理不会响应 Kafka Streams 应用程序发送的 localhost:9092 请求,即使您正在这样做来自运行 Kafka Streams 应用程序的计算机上的 localhost:9092 -> 192.168.99.100:9092 的端口转发。

希望这对您有所帮助!

关于java - Kafka Streams 不读取输入主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46428323/

27 4 0