gpt4 book ai didi

apache-kafka - 当存储值是 Avro SpecificRecord 时,KafkaStreamsStateStore 不工作

转载 作者:行者123 更新时间:2023-12-05 06:25:53 24 4
gpt4 key购买 nike

我有一个 Spring Cloud Kafka Streams 应用程序,在使用转换器执行重复数据删除时,它在 Processor API 中使用了 StateStore。

状态存储键值有以下类型:<String, TransferEmitted> .

在运行应用程序时,在将值放入状态存储 ( dedupStore.put(key, value) ) 时,出现此异常:

Caused by: java.lang.ClassCastException: com.codependent.outboxpattern.account.TransferEmitted cannot be cast to java.lang.String

这是因为 KafkaStreamsStateStore 的默认值 serde是 StringSerde .

因此,我在 KafkaStreamsStateStore 中添加了 valueSerde 参数注释,表示一个 SpecificAvroSerde :

    @KafkaStreamsStateStore(name = DEDUP_STORE, type = KafkaStreamsStateStoreProperties.StoreType.KEYVALUE,
valueSerde = "io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde")

现在我在 AbstractKafkaAvroSerializer.serializeImpl 中得到一个 NullPointerException因为在 id = this.schemaRegistry.getId(subject, schema); schemaRegistry 为空:

Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message Caused by: java.lang.NullPointerException at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:82) at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53) at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:65) at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:38)

尽管已经将模式注册表配置为 Spring bean...

@Configuration
class SchemaRegistryConfiguration {

@Bean
fun schemaRegistryClient(@Value("\${spring.cloud.stream.schema-registry-client.endpoint}") endpoint: String): SchemaRegistryClient {
val client = ConfluentSchemaRegistryClient()
client.setEndpoint(endpoint)
return client
}

}

...当 Kafka 设置 SpecificAvroSerde 时它使用无参数构造函数,因此它不会初始化架构注册表客户端:

public class SpecificAvroSerde<T extends SpecificRecord> implements Serde<T> {
private final Serde<T> inner;

public SpecificAvroSerde() {
this.inner = Serdes.serdeFrom(new SpecificAvroSerializer(), new SpecificAvroDeserializer());
}

public SpecificAvroSerde(SchemaRegistryClient client) {
if (client == null) {
throw new IllegalArgumentException("schema registry client must not be null");
} else {
this.inner = Serdes.serdeFrom(new SpecificAvroSerializer(client), new SpecificAvroDeserializer(client));
}
}

我如何配置此应用程序以允许序列化 StateStore<String, TransferEmitted> ?

项目摘录(源代码位于 https://github.com/codependent/kafka-outbox-pattern)

K流

const val DEDUP_STORE = "dedup-store"

@EnableBinding(KafkaStreamsProcessor::class)
class FraudKafkaStreamsConfiguration(private val fraudDetectionService: FraudDetectionService) {

@KafkaStreamsStateStore(name = DEDUP_STORE, type = KafkaStreamsStateStoreProperties.StoreType.KEYVALUE)
@StreamListener
@SendTo("output")
fun process(@Input("input") input: KStream<String, TransferEmitted>): KStream<String, TransferEmitted> {
return input
.transform(TransformerSupplier { DeduplicationTransformer() }, DEDUP_STORE)
.filter { _, value -> fraudDetectionService.isFraudulent(value) }

}

}

变形金刚

@Suppress("UNCHECKED_CAST")
class DeduplicationTransformer : Transformer<String, TransferEmitted, KeyValue<String, TransferEmitted>> {

private lateinit var dedupStore: KeyValueStore<String, TransferEmitted>
private lateinit var context: ProcessorContext

override fun init(context: ProcessorContext) {
this.context = context
dedupStore = context.getStateStore(DEDUP_STORE) as KeyValueStore<String, TransferEmitted>
}

override fun transform(key: String, value: TransferEmitted): KeyValue<String, TransferEmitted>? {
return if (isDuplicate(key)) {
null
} else {
dedupStore.put(key, value)
KeyValue(key, value)
}
}

private fun isDuplicate(key: String) = dedupStore[key] != null

override fun close() {
}
}

application.yml

spring:
application:
name: fraud-service
cloud:
stream:
schema-registry-client:
endpoint: http://localhost:8081
kafka:
streams:
binder:
configuration:
application:
id: fraud-service
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
schema:
registry:
url: http://localhost:8081
bindings:
input:
destination: transfer
contentType: application/*+avro
output:
destination: fraudulent-transfer
contentType: application/*+avro

server:
port: 8086

logging:
level:
org.springframework.cloud.stream: debug

最佳答案

我遇到了同样的问题,忘记了需要传入 schema.registry.url 以确保您可以将 Avro 记录存储在您的 State 存储中。

例如:

    @Bean
public StoreBuilder eventStore(Map<String, String> schemaConfig) {
final Duration windowSize = Duration.ofMinutes(DUPLICATION_WINDOW_DURATION);

// retention period must be at least window size -- for this use case, we don't need a longer retention period
// and thus just use the window size as retention time
final Duration retentionPeriod = windowSize;

// We have to specify schema.registry.url here, otherwise schemaRegistry value will end up null
KafkaAvroSerializer serializer = new KafkaAvroSerializer();
KafkaAvroDeserializer deserializer = new KafkaAvroDeserializer();
serializer.configure(schemaConfig, true);
deserializer.configure(schemaConfig, true);

final StoreBuilder<WindowStore<Object, Long>> dedupStoreBuilder = Stores.windowStoreBuilder(
Stores.persistentWindowStore(STORE_NAME,
retentionPeriod,
windowSize,
false
),
Serdes.serdeFrom(serializer, deserializer),
// timestamp value is long
Serdes.Long());
return dedupStoreBuilder;
}

@Bean
public Map<String, String> schemaConfig(@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String url) {
return Collections.singletonMap("schema.registry.url", "http://localhost:8081");
}

这是 application.yml 文件:

spring:
cloud:
stream:
schemaRegistryClient:
endpoint: http://localhost:8081

完成此操作后,我能够正确配置此 Store,并且不再看到 NullPointerException

关于apache-kafka - 当存储值是 Avro SpecificRecord 时,KafkaStreamsStateStore 不工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56634598/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com