gpt4 book ai didi

java - OSGI kafka流应用程序抛出LogAndFailExceptionHandler

转载 作者:行者123 更新时间:2023-12-02 09:13:29 24 4
gpt4 key购买 nike

我无法在 Apache Felix 框架中运行我的简单 Kafka 流应用程序,将其作为普通 jar 运行正常。它抛出以下异常:

ERROR: bundle com.openet.odf.streamer-simple:1.0.0.SNAPSHOT (149)[com.openet.streamer.impl.streamerImpl(0)] : The activate method has thrown an exception                                                                [0/609]
java.lang.ExceptionInInitializerError
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:544)
at com.openet.streamer.impl.streamerImpl.activate(streamerImpl.java:122)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
...
Caused by: org.apache.kafka.common.config.ConfigException: Invalid value org.apache.kafka.streams.errors.LogAndFailExceptionHandler for configuration default.deserialization.exception.handler: Class org.apache.kafka.streams.
errors.LogAndFailExceptionHandler could not be found.

我的代码:

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "10.0.150.12:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
builder.stream("test").to("streams-pipe-output");
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props); // throws error here

将 Java 8 与 Apache Felix Framework 6.0.3 结合使用

重现步骤:

  1. 将 org.apache.servicemix.bundles.kafka-clients-2.3.1_1.jar 复制到捆绑文件夹
  2. 将 org.apache.servicemix.bundles.kafka-streams-2.3.1_1.jar 复制到捆绑文件夹
  3. 将应用程序 jar 复制到捆绑文件夹
  4. java -jar bin/felix.jar

感谢任何帮助/指示。

最佳答案

我通过执行以下操作成功解决了我的问题,感谢一位比我更熟悉 OSGi 的同事 =)

更多详细信息请参见此处: OSGi Classloading

KafkaStreams streams = null;
ClassLoader currentCL = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(LogAndFailExceptionHandler.class.getClassLoader());
streams = new KafkaStreams(builder.build(), props);
streams.start();
}
catch (Exception e) {

System.out.println(e.getMessage());

}
finally {
Thread.currentThread().setContextClassLoader(currentCL);
}

好的,解决另一个问题:该解决方案仅在做一些琐碎的事情时才有效。当做更有意义的事情(例如计数等)时,我收到与rocksdb相关的错误。

Caused by: java.lang.NoClassDefFoundError: org/rocksdb/Options
at org.apache.kafka.streams.state.internals.RocksDbKeyValueBytesStoreSupplier.get(RocksDbKeyValueBytesStoreSupplier.java:41)
at org.apache.kafka.streams.state.internals.RocksDbKeyValueBytesStoreSupplier.get(RocksDbKeyValueBytesStoreSupplier.java:23)
at org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilder.build(TimestampedKeyValueStoreBuilder.java:55)
at org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilder.build(TimestampedKeyValueStoreBuilder.java:35)
at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder$StateStoreFactory.build(InternalTopologyBuilder.java:135)
at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.buildProcessorNode(InternalTopologyBuilder.java:953)
at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.build(InternalTopologyBuilder.java:856)
at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.build(InternalTopologyBuilder.java:809)
at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.build(InternalTopologyBuilder.java:792)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:671)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:634)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:544)
at com.openet.streamer.impl.streamerImpl.activate(streamerImpl.java:69)
... 81 common frames omitted
Caused by: java.lang.ClassNotFoundException: org.rocksdb.Options not found by org.apache.servicemix.bundles.kafka-streams [150]
at org.apache.felix.framework.BundleWiringImpl.findClassOrResourceByDelegation(BundleWiringImpl.java:1639)
at org.apache.felix.framework.BundleWiringImpl.access$200(BundleWiringImpl.java:80)
at org.apache.felix.framework.BundleWiringImpl$BundleClassLoader.loadClass(BundleWiringImpl.java:2053)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

关于java - OSGI kafka流应用程序抛出LogAndFailExceptionHandler,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59206188/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com