gpt4 book ai didi

apache-kafka - Spring Kafka - 事件源 - 如何使用 Kafka + KafkaStreams API 查询某些实体状态的示例

转载 作者:行者123 更新时间:2023-12-04 04:55:27 25 4
gpt4 key购买 nike

我正在使用 Kafka 来实现基于事件源的架构。

假设我以 JSON 格式存储事件:

{"name": "ProductAdded", "productId":"1", quantity=3, dateAdded="2017-04-04" }

我想实现一个查询来获取某个日期 productId=X 的产品数量。

你能用 Spring Kafka KStreams 展示这个查询的大致实现吗?

更新:我已经使用 Spring Kafka KStreams 在这方面取得了一些进展,但是我遇到了反序列化错误。

这是我的 Spring Cloud Stream Kafka Producer :
public interface ProductProducer{

final String OUTPUT = "productsOut";

@Output(ProductProducer.OUTPUT)
MessageChannel output();

}

配置:
spring:
application:
name: product-generator-service
cloud:
stream:
kafka:
binder:
brokers:
- kafka
zk-nodes:
- kafka
bindings:
productsOut:
producer:
sync: true
bindings:
productsOut:
destination: orders
content-type: application/json

我使用以下代码发送消息,将 Map 正确序列化为 JSON 对象:
Map<String, Object> event = new HashMap<>();
event.put("name", "ProductCreated");
event.put("productId", product.getId());
event.put("quantity", product.getQuantity());
event.put("dateAdded", new Date());
productProducer.output().send(MessageBuilder.withPayload(event).build(), 500);
MessageBuilder.withPayload(event).build() -> GenericMessage [payload={quantity=1, productId=1, name=ProductCreated, dateAdded="xxxxx"}, headers={id=fc531176-e3e9-61b8-40e3-08074fabee4d, timestamp=1499845483095}]
产品服务申请我可以使用 阅读此消息Spring Cloud Stream 监听器 :
@Component
public class ProductListener{

@StreamListener(ProductConsumer.INPUT)
public void handleProduct(Map<String, Object> event){

然而与 KStream 我收到反序列化错误:
@Configuration
public class KStreamsConfig {

private static final String STREAMING_TOPIC1 = "orders";

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public StreamsConfig kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "product-service-kstream");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
//props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.serdeFrom(jsonSerializer, jsonDeserializer).getClass().getName());
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
return new StreamsConfig(props);
}

@Bean
public FactoryBean<KStreamBuilder> myKStreamBuilder(StreamsConfig streamsConfig) {
return new KStreamBuilderFactoryBean(streamsConfig);
}

@Bean
public KStream<?, ?> kStream(KStreamBuilder kStreamBuilder) {

Serde<Integer> integerSerde = Serdes.Integer();
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);

KStream<Integer, JsonNode> stream = kStreamBuilder.stream(null, integerSerde, jsonSerde, STREAMING_TOPIC1);
stream.print();
return stream;
}

}

异常(exception) :
org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'ÿ': was expecting ('true', 'false' or 'null')
at [Source: [B@288e4e9a; line: 1, column: 4]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'ÿ': was expecting ('true', 'false' or 'null')
at [Source: [B@288e4e9a; line: 1, column: 4]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1702)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:558)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3528)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2686)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:878)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:772)
at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3834)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3783)
at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2404)
at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)
at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:30)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:46)
at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:44)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:85)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:158)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:605)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)

更新 2:

为了找出到达 KStream 的内容,我将键和值都更改为字符串反序列化器,这就是正在打印的内容:
KStream<Integer, String> stream = kStreamBuilder.stream(null, integerSerde, stringSerde, STREAMING_TOPIC1);

打印值:
[KSTREAM-SOURCE-0000000000]: null , �contentType

为什么我没有得到 JSON 字符串?

更新 3:
我修复了反序列化问题,原因是消息生产者(Spring Cloud Stream)默认添加了一些 header 作为有效负载的一部分。我只需要禁用此 header 包含即可开始在 Kafka Streams 中正确接收消息:
spring:
application:
name: product-service
cloud:
stream:
kafka:
binder:
brokers:
- kafka
zk-nodes:
- kafka
bindings:
productsOut:
producer:
sync: true
bindings:
productsIn:
group: product-service
destination: orders
consumer:
max-attempts: 5
header-mode: raw
productsOut:
destination: orders
content-type: application/json
producer:
header-mode: raw

KStream 定义:
KStream<Integer, JsonNode> stream = kStreamBuilder.stream(integerSerde, jsonSerde, STREAMING_TOPIC1);

输出:
[KSTREAM-SOURCE-0000000000]: null , {"quantity":0,"productId":0,"name":"ProductCreated","dateAdded":1499930385450}

现在一切设置正确:我如何实现像我需要的那样的交互式查询? -> 获取某个日期productId=X的产品数量

最佳答案

我设法混合使用 Spring Cloud Streams(生成消息)和 Spring Kafka 来处理 KafkaStreams 并实现交互式查询( 重要 :注意问题更新 3:能够结合两个都):

Kafka 流配置 :

@Configuration
public class KStreamsConfig {

private static final String STREAMING_TOPIC1 = "orders";

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public StreamsConfig kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "product-service-streams");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
//props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.serdeFrom(jsonSerializer, jsonDeserializer).getClass().getName());
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
return new StreamsConfig(props);
}

@Bean
public KStreamBuilderFactoryBean myKStreamBuilder(StreamsConfig streamsConfig) {
return new KStreamBuilderFactoryBean(streamsConfig);
}

@Bean
public KStream<?, ?> kStream(KStreamBuilder kStreamBuilder, KStreamBuilderFactoryBean kStreamBuilderFactoryBean) {

Serde<Integer> integerSerde = Serdes.Integer();
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);

KStream<Integer, JsonNode> stream = kStreamBuilder.stream(integerSerde, jsonSerde, STREAMING_TOPIC1);

stream.map( (key, value) -> {
return new KeyValue<>(value.get("productId").asInt(), value.get("quantity").asInt());
}).groupByKey().reduce( (v1, v2) -> v1 + v2, "ProductsStock");

stream.print();
return stream;
}

}

请注意我如何生成 KTable 存储 ProductsStock我稍后会在服务中查询。

产品服务 :
@Autowired
private KStreamBuilderFactoryBean kStreamBuilderFactoryBean;

@Override
public Integer getProductStock(Integer id) {
KafkaStreams streams = kStreamBuilderFactoryBean.getKafkaStreams();
ReadOnlyKeyValueStore<Integer, Integer> keyValueStore =
streams.store("ProductsStock", QueryableStoreTypes.keyValueStore());
return keyValueStore.get(id);
}

关于apache-kafka - Spring Kafka - 事件源 - 如何使用 Kafka + KafkaStreams API 查询某些实体状态的示例,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45035723/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com