gpt4 book ai didi

java - 一对多KStream-KStream加入

转载 作者:行者123 更新时间:2023-12-02 02:32:09 26 4
gpt4 key购买 nike

如何在两个kafka KStream之间执行一对多连接?下面给出的代码以一对一的方式连接两个 Kafka KStream。有人可以指导如何在 KStream 之间执行一对多连接吗?主题中接收到的数据是泛型 < String ,JsonNode >写入主题的数据的形式为{"来自订单":"test1:,"来自订单项目":"test2"}{“来自订单”:“test1:,”来自orderitem”:“test3”}

是否可以获取这种格式的数据:{"来自订单":"test1,{"来自 orderitem":"test2"},{"来自 orderitem":"test3"}}

public class ConsumerThreadPool {

private static final String TOPIC = "jre1";
private static final String NEXTTOPIC ="Kafka";
private static final String FINALTOPIC="jvm1";
private static final Integer NUM_THREADS = 1;
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();

final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
final Serde<String> stringSerde = Serdes.String();


int threadNumber = 0;
@Autowired
private ConsumerConfigFactory consumerConfigFactory;

@SuppressWarnings("unused")
private ConsumerConnector consumer;
private ExecutorService threadPool;

public ConsumerThreadPool() {
threadPool = Executors.newFixedThreadPool(NUM_THREADS);
}

@PostConstruct
public void startConsuming() {
ConsumerConfig consumerConfig = consumerConfigFactory.getConsumerConfig();
consumer = createJavaConsumerConnector(consumerConfig);
KStreamBuilder builder = new KStreamBuilder();
/* KTable<String,JsonNode> message = builder.table(stringSerde,jsonSerde,TOPIC);


KTable<String,JsonNode> orderstream = message

.filter((k,v)-> v.path("table").asText().equals("TEST.S_ORDER")
);
KTable<String,JsonNode> orderlist=message.filter((k,v)-> v.path("table").asText().equals("TEST.S_ORDER_ITEM"));
orderstream.to(stringSerde,jsonSerde,FINALTOPIC);
orderlist.to(stringSerde,jsonSerde,FINALTOPIC); */
KStream<String,JsonNode>streams=builder.stream(TOPIC);

KStream<String,JsonNode> orderstream=streams.filter((k,v)-> v.path("table").asText().equals("TEST.S_ORDER"))
.map((k,v)->KeyValue.pair(v.path("after").path("ROW_ID").asText(),v));




KStream<String, JsonNode> orderlist=streams.filter((k,v)-> v.path("table").asText().equals("TEST.S_ORDER_ITEM"))
.map((k,v)->KeyValue.pair(v.path("after").path("ORDER_ID").asText(),v));





KStream<String,JsonNode> nextstream =orderstream.join(orderlist,(new ValueJoiner<JsonNode,JsonNode,JsonNode>(){
@Override
public JsonNode apply(JsonNode first,JsonNode second){
ObjectNode jNode = JsonNodeFactory.instance.objectNode();
return jNode.put("from order",first.get("op_type").textValue())
.put("from orderitem",second.get("op_type").textValue() );
}
}),JoinWindows.of(TimeUnit.SECONDS.toMillis(30)),stringSerde,jsonSerde,jsonSerde);

nextstream.to(stringSerde,jsonSerde,FINALTOPIC);
KafkaStreams stream=new KafkaStreams(builder, consumerConfigFactory.getConsumeConfig());
stream.start();
consume();
stream.close();
}

public void consume() {



@SuppressWarnings("resource")
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(consumerConfigFactory.createConsume());
consumer.subscribe(Arrays.asList(FINALTOPIC));

while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
if(!records.isEmpty()){
System.out.println("ConsumerRecords object created: "+records);
threadPool.submit(new MessageConsumer(records, threadNumber));
threadNumber++;
}

}

}

}

最佳答案

正如您已经指出的,KStream-KStream 已经是一对多连接。看来您想将唯一键的所有连接结果聚合到一条记录中。

您可以应用 .groupByKey().aggregate() 来执行此操作。聚合函数使用空 JSON 进行初始化,并且每次新的联接结果到达时都会将新记录添加到 JSON。

关于java - 一对多KStream-KStream加入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46934945/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com