{ return Functio-6ren">
gpt4 book ai didi

unit-testing - 使用 kafka 流绑定(bind)器 : using TopologyTestDriver I get the error of "The class is not in the trusted packages" 测试 Spring 云流

转载 作者:行者123 更新时间:2023-12-02 13:25:20 29 4
gpt4 key购买 nike

我有这个简单的流处理器 (不是消费者/生产者)使用 kafka 流活页夹。

@Bean
fun processFoo():Function<KStream<FooName, FooAddress>, KStream<FooName, FooAddressPlus>> {
return Function { input-> input.map { key, value ->
println("\nPAYLOAD KEY: ${key.name}\n");
println("\nPAYLOAD value: ${value.address}\n");
val output = FooAddressPlus()
output.address = value.address
output.name = value.name
output.plus = "$value.name-$value.address"
KeyValue(key, output)
}}
}
我正在尝试使用 TopologyTestDriver 对其进行测试:
@SpringBootTest(
webEnvironment = SpringBootTest.WebEnvironment.NONE,
classes = [Application::class, FooProcessor::class]
)
class FooProcessorTests {
var testDriver: TopologyTestDriver? = null
val INPUT_TOPIC = "input"
val OUTPUT_TOPIC = "output"

val inputKeySerde: Serde<FooName> = JsonSerde<FooName>()
val inputValueSerde: Serde<FooAddress> = JsonSerde<FooAddress>()
val outputKeySerde: Serde<FooName> = JsonSerde<FooName>()
val outputValueSerde: Serde<FooAddressPlus> = JsonSerde<FooAddressPlus>()

fun getStreamsConfiguration(): Properties? {
val streamsConfiguration = Properties()
streamsConfiguration[StreamsConfig.APPLICATION_ID_CONFIG] = "TopologyTestDriver"
streamsConfiguration[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = "dummy:1234"
streamsConfiguration[JsonDeserializer.TRUSTED_PACKAGES] = "*"
streamsConfiguration["spring.kafka.consumer.properties.spring.json.trusted.packages"] = "*"
return streamsConfiguration
}

@Before
fun setup() {
val builder = StreamsBuilder()
val input: KStream<FooName, FooAddress> = builder.stream(INPUT_TOPIC, Consumed.with(inputKeySerde, inputValueSerde))
val processor = FooProcessor()
val output: KStream<FooName, FooAddressPlus> = processor.processFoo().apply(input)
output.to(OUTPUT_TOPIC, Produced.with(outputKeySerde, outputValueSerde))
testDriver = TopologyTestDriver(builder.build(), getStreamsConfiguration())
}

@After
fun tearDown() {
try {
testDriver!!.close()
} catch (e: RuntimeException) {
// https://issues.apache.org/jira/browse/KAFKA-6647 causes exception when executed in Windows, ignoring it
// Logged stacktrace cannot be avoided
println("Ignoring exception, test failing in Windows due this exception:" + e.localizedMessage)
}
}

@org.junit.Test
fun testOne() {
val inputTopic: TestInputTopic<FooName, FooAddress> =
testDriver!!.createInputTopic(INPUT_TOPIC, inputKeySerde.serializer(), inputValueSerde.serializer())
val key = FooName()
key.name = "sherlock"
val value = FooAddress()
value.name = "sherlock"
value.address = "Baker street"
inputTopic.pipeInput(key, value)
val outputTopic: TestOutputTopic<FooName, FooAddressPlus> =
testDriver!!.createOutputTopic(OUTPUT_TOPIC, outputKeySerde.deserializer(), outputValueSerde.deserializer())
val message = outputTopic.readValue()

assertThat(message.name).isEqualTo(key.name)
assertThat(message.address).isEqualTo(value.address)
}
}
运行它时,我在 inputTopic.pipeInput(key, value) 行中收到此错误
类 'package.FooAddress' 不在受信任的包中:[java.util, java.lang]。如果您认为此类可以安全反序列化,请提供其名称。如果序列化仅由受信任的来源完成,您还可以启用 trust all ().*
关于如何解决这个问题的任何想法?在 getStreamsConfiguration() 中设置这些属性没有帮助。请注意,这是一个流处理器,而不是消费者/生产者。
非常感谢!

最佳答案

当 Kafka 自己创建 Serde 时,它​​通过调用 configure() 应用属性。 .
由于您自己实例化 Serde,因此您需要调用 configure()在它上面传递属性图。
这就是受信任的包属性传播到反序列化器的方式。
或者,您可以调用setTrustedPackages()在解串器上。

关于unit-testing - 使用 kafka 流绑定(bind)器 : using TopologyTestDriver I get the error of "The class is not in the trusted packages" 测试 Spring 云流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64516217/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com