gpt4 book ai didi

spring-boot - EmbeddedKafka 不工作导致 Scala 错误

转载 作者:行者123 更新时间:2023-12-05 02:58:32 29 4
gpt4 key购买 nike

我有一个可用的基于 java-gradle 的 spring-boot 服务,可以生成和使用 Kafka 消息。但是我无法使用带有 @EmbeddedKafka 注释的 spring-kafka-test 库或使用 @ClassRule 方式创建集成测试。在这两种方式中,我最终都会遇到相同的错误(下面针对 Scala 指出)。如果有人对幕后可能发生的事情有任何线索,那将非常有帮助。

Spring Boot 版本:2.1.6.RELEASESpring Kafka 版本:2.2.7.RELEASE

生产者配置:

@EnableKafka
@Configuration
public class KafkaProducerConfig {

private String bootstrapAddress;

public KafkaProducerConfig(@Value("${spring.kafka.bootstrap-servers}") String bootstrapAddress) {
this.bootstrapAddress = bootstrapAddress;
}

@Bean
public ProducerFactory<String, Greeting> greetingProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, Greeting> greetingKafkaTemplate() {
return new KafkaTemplate<>(greetingProducerFactory());
}
}

生产者代码:

@Component
public class KafkaProducer {

@Autowired
private KafkaTemplate<String, Greeting> kafkaTemplate;

@Value(value = "${kiosk.kafka.topic.greeting}")
private String greetingTopicName;

public void sendMessage(Greeting greeting) {

ListenableFuture<SendResult<String, Greeting>> future = kafkaTemplate.send(greetingTopicName, greeting);

future.addCallback(new ListenableFutureCallback<SendResult<String, Greeting>>() {

@Override
public void onSuccess(SendResult<String, Greeting> result) {
System.out.println("Sent greeting=[" + greeting + "] with offset=[" + result.getRecordMetadata().offset() + "]");
}
@Override
public void onFailure(Throwable ex) {
System.out.println("Unable to send greeting=[" + greeting + "] due to : " + ex.getMessage());
}
});
}
}

属性文件:

spring.kafka.consumer.auto-offset-reset=earliest
spring.embedded.kafka.brokers=localhost:9092
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
spring.kafka.consumer.group-id=1

而且我在开始测试时尝试了两种方法和相同的错误:

测试类规则:

@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
public class PKafkaProducerClassRuleTest {

private static final Logger LOGGER =
LoggerFactory.getLogger(PKafkaProducerClassRuleTest.class);

private static String SENDER_TOPIC = "sender.t";

static {
System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
}

@Autowired
private PinPadKafkaProducer sender;

private KafkaMessageListenerContainer<String, String> container;

private BlockingQueue<ConsumerRecord<String, String>> records;

@ClassRule
public static EmbeddedKafkaRule embeddedKafka =
new EmbeddedKafkaRule(1, true, SENDER_TOPIC);

使用 EmbeddedKafka 进行测试:

@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1,
topics = {
"sender.t" })
public class PKafkaProducerEmbeddedKafkaTest {

private static final Logger LOGGER =
LoggerFactory.getLogger(PKafkaProducerEmbeddedKafkaTest.class);

private static String SENDER_TOPIC = "sender.t";

static {
System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY,
"spring.kafka.bootstrap-servers");
}

@Autowired
private EmbeddedKafkaBroker embeddedKafka;

@Autowired
private PinPadKafkaProducer sender;

@Test
public void someTest() {
sender.sendMessage(new Greeting("msgtest", "nametest"));
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", this.embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<Integer, String> consumer = cf.createConsumer();
this.embeddedKafka.consumeFromAnEmbeddedTopic(consumer, SENDER_TOPIC);
ConsumerRecords<Integer, String> replies = KafkaTestUtils.getRecords(consumer);
assertThat(replies.count()).isGreaterThanOrEqualTo(1);
}

}

两种情况下的错误:

19/11/2019 13:51:56.774+0100 ERROR o.s.t.c.TestContextManager [] - Caught exception while allowing TestExecutionListener [org.springframework.test.context.web.ServletTestExecutionListener@e9cee0d] to prepare test instance [com.goldcar.kiosk.PKafkaProducerEmbeddedKafkaTest@39271181]
java.lang.IllegalStateException: Failed to load ApplicationContext
at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:125)
at org.springframework.test.context.support.DefaultTestContext.getApplicationContext(DefaultTestContext.java:108)
at org.springframework.test.context.web.ServletTestExecutionListener.setUpRequestContextIfNecessary(ServletTestExecutionListener.java:190)
at org.springframework.test.context.web.ServletTestExecutionListener.prepareTestInstance(ServletTestExecutionListener.java:132)
at org.springframework.test.context.TestContextManager.prepareTestInstance(TestContextManager.java:246)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.createTest(SpringJUnit4ClassRunner.java:227)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner$1.runReflectiveCall(SpringJUnit4ClassRunner.java:289)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.methodBlock(SpringJUnit4ClassRunner.java:291)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:246)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:97)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:190)
at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:175)
at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:157)
at org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404)
at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'embeddedKafka': Invocation of init method failed; nested exception is java.lang.NoSuchMethodError: scala.Predef$.refArrayOps([Ljava/lang/Object;)Lscala/collection/mutable/ArrayOps;
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1778)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:405)
at org.springframework.kafka.test.context.EmbeddedKafkaContextCustomizer.customizeContext(EmbeddedKafkaContextCustomizer.java:109)
at org.springframework.boot.test.context.SpringBootContextLoader$ContextCustomizerAdapter.initialize(SpringBootContextLoader.java:300)
at org.springframework.boot.SpringApplication.applyInitializers(SpringApplication.java:621)
at org.springframework.boot.SpringApplication.prepareContext(SpringApplication.java:365)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:310)
at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:119)
at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContextInternal(DefaultCacheAwareContextLoaderDelegate.java:99)
at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:117)
... 49 common frames omitted
Caused by: java.lang.NoSuchMethodError: scala.Predef$.refArrayOps([Ljava/lang/Object;)Lscala/collection/mutable/ArrayOps;
at kafka.cluster.EndPoint$.<init>(EndPoint.scala:32)
at kafka.cluster.EndPoint$.<clinit>(EndPoint.scala)
at kafka.server.Defaults$.<init>(KafkaConfig.scala:68)
at kafka.server.Defaults$.<clinit>(KafkaConfig.scala)
at kafka.server.KafkaConfig$.<init>(KafkaConfig.scala:781)
at kafka.server.KafkaConfig$.<clinit>(KafkaConfig.scala)
at kafka.utils.TestUtils$.createBrokerConfig(TestUtils.scala:234)
at kafka.utils.TestUtils.createBrokerConfig(TestUtils.scala)
at org.springframework.kafka.test.EmbeddedKafkaBroker.createBrokerProperties(EmbeddedKafkaBroker.java:239)
at org.springframework.kafka.test.EmbeddedKafkaBroker.afterPropertiesSet(EmbeddedKafkaBroker.java:214)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1837)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1774)
... 58 common frames omitted

最佳答案

看来您在类路径上的 kafka jar 版本不匹配。

如果您使用与 Boot 默认设置不同的 kafka-clients 版本,就会发生这种情况。

参见 this appendix有关如何覆盖所有 kafka jar 版本的示例。

When you use spring-kafka-test (version 2.2.x) with the 2.1.x kafka-clients jar, you need to override certain transitive dependencies, as follows:

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring.kafka.version}</version>
</dependency>

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>${spring.kafka.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.1</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.1</version>
<classifier>test</classifier>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>2.1.1</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>2.1.1</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>

Note that when switching to scala 2.12 (recommended for 2.1.x and higher), the 2.11 version must be excluded from spring-kafka-test.

关于spring-boot - EmbeddedKafka 不工作导致 Scala 错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58936059/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com