gpt4 book ai didi

java - 使用kafkastreams根据记录内容上的内容写入kafka Topic

转载 作者:行者123 更新时间:2023-12-02 13:35:33 25 4
gpt4 key购买 nike

我正在尝试根据父级中的记录内容从kafka中的一个主题(父级)写入另一个主题(子级)。如果我从父主题消费,则示例记录为 {"date":{"string":"2017-03-20"},"time":{"string":"20:04:13:563 "},"event_nr":1572470,"interface":"事务管理器","event_id":5001,"date_time":1490040253563,"entity":"事务管理器","state":0,"msg_param_1":{ "string":"ISWSnk"},"msg_param_2":{"string":"应用程序启动"},"msg_param_3":null,"msg_param_4":null,"msg_param_5":null,"msg_param_6":null,"msg_param_7 ":null,"msg_param_8":null,"msg_param_9":null,"long_msg_param_1":null,"long_msg_param_2":null,"long_msg_param_3":null,"long_msg_param_4":null,"long_msg_param_5":null,"long_msg_param_6": null,"long_msg_param_7":null,"long_msg_param_8":null,"long_msg_param_9":null,"last_sent":{"long":1490040253563},"transmit_count":{"int":1},"team_id":null, "app_id":{"int":4},"logged_by_app_id":{"int":4},"entity_type":{"int":3},"binary_data":null}

我想使用实体的值写入与实体值同名的主题(实体的值有固定数量,因此我可以静态创建该主题,如果以编程方式动态创建主题是很困难的)。我正在尝试使用这个

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import java.util.Properties;

public class entityDataLoader {
public static void main(final String[] args) throws Exception {
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "map-function-lambda-example");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

// Set up serializers and deserializers, which we will use for overriding the default serdes
// specified above.
final Serde<String> stringSerde = Serdes.String();
final Serde<byte[]> byteArraySerde = Serdes.ByteArray();

// In the subsequent lines we define the processing topology of the Streams application.
final KStreamBuilder builder = new KStreamBuilder();

// Read the input Kafka topic into a KStream instance.
final KStream<byte[], String> textLines = builder.stream(byteArraySerde, stringSerde, "postilion-events");

String content = textLines.toString();
String entity = JSONExtractor.returnJSONValue(content, "entity");
System.out.println(entity);

textLines.to(entity);

final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.cleanUp();
streams.start();

// Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}

content的内容是org.apache.kafka.streams.kstream.internals.KStreamImpl@568db2f2,很明显@KStream.toString() 不是尝试获取实体值的正确方法。

附注JSONExtractor 类定义为

import org.json.simple.JSONObject;
import org.json.simple.parser.ParseException;
import org.json.simple.parser.JSONParser;
class JSONExtractor {

public static String returnJSONValue(String args, String value){
JSONParser parser = new JSONParser();
String app= null;
System.out.println(args);
try{
Object obj = parser.parse(args);
JSONObject JObj = (JSONObject)obj;
app= (String) JObj.get(value);
return app;
}
catch(ParseException pe){
System.out.println("No Object found");
System.out.println(pe);
}
return app;
}
}

最佳答案

您可以使用 branch() 将父流拆分为“子流”,并将每个“子流”写入一个输出主题(参见 http://docs.confluent.io/current/streams/developer-guide.html#stateless-transformations )

您的 branch() 必须为所有输出主题创建一个“子流”,但因为您知道所有主题,所以这应该不是问题。

此外,对于 Kafka Streams,建议在启动应用程序之前创建所有输出主题(参见 http://docs.confluent.io/current/streams/developer-guide.html#user-topics )

关于java - 使用kafkastreams根据记录内容上的内容写入kafka Topic,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43019358/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com