gpt4 book ai didi

apache-kafka - 我们如何将相同键的所有值合并为一个列表,并返回键和值为字符串的 Kafka Streams

转载 作者:行者123 更新时间:2023-12-05 07:14:37 25 4
gpt4 key购买 nike

我有一个关于 kafka 主题的数据作为 (key:id, {id:1, body:...})表示消息的 key 与 id 相同。但是可以有多个具有相同 ID 但不同正文的消息。所以我得到了 kstream <String, String>

现在我想获取所有具有相同 ID(键)的消息并将所有值合并为一个列表并返回为

Kstream<String, List<String>>

有什么建议吗?

最佳答案

    //Create a Stream with a state store

StreamsBuilder builder = new StreamsBuilder();

StoreBuilder<KeyValueStore<String, List<String>>> logTracerStateStore = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(LOG_TRACE_STATE_STORE), Serdes.String(),
new ListSerde<String>(Serdes.String()));

//add this to stream builder
builder.addStateStore(logTracerStateStore);

KStream<String, String> kafkaStream = builder.stream(TOPIC);
splitProcessor(kafkaStream);
logger.info("creating stream for topic {} ..", TOPIC);

final Topology topology = builder.build();
return new KafkaStreams(topology, streamConfiguration(bootstrapServers));


// Stream List Serde

public class ListSerde<T> implements Serde<List<T>> {

private final Serde<List<T>> inner;

public ListSerde( final Serde<T> avroSerde) {
inner = Serdes.serdeFrom(new ListSerializer<>( avroSerde.serializer()),
new ListDeserializer<>( avroSerde.deserializer()));
}

@Override
public Serializer<List<T>> serializer() {
return inner.serializer();
}

@Override
public Deserializer<List<T>> deserializer() {
return inner.deserializer();
}

@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
inner.serializer().configure(configs, isKey);
inner.deserializer().configure(configs, isKey);
}

@Override
public void close() {
inner.serializer().close();
inner.deserializer().close();
}
}

// Serializer & deserializers

public class ListSerializer<T> implements Serializer<List<T>> {

// private final Comparator<T> comparator;
private final Serializer<T> valueSerializer;

public ListSerializer( final Serializer<T> valueSerializer) {
// this.comparator = comparator;
this.valueSerializer = valueSerializer;
}
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
// do nothing
}

@Override
public byte[] serialize(final String topic, final List<T> list) {
final int size = list.size();
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final DataOutputStream out = new DataOutputStream(baos);
final Iterator<T> iterator = list.iterator();
try {
out.writeInt(size);
while (iterator.hasNext()) {
final byte[] bytes = valueSerializer.serialize(topic, iterator.next());
out.writeInt(bytes.length);
out.write(bytes);
}
out.close();
} catch (final IOException e) {
throw new RuntimeException("unable to serialize List", e);
}
return baos.toByteArray();
}

@Override
public void close() {

}

}

//------------
public class ListDeserializer<T> implements Deserializer<List<T>> {

// private final Comparator<T> comparator;
private final Deserializer<T> valueDeserializer;

public ListDeserializer(final Deserializer<T> valueDeserializer) {
// this.comparator = comparator;
this.valueDeserializer = valueDeserializer;
}

@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
// do nothing
}

@Override
public List<T> deserialize(final String s, final byte[] bytes) {
if (bytes == null || bytes.length == 0) {
return null;
}
final List<T> list = new ArrayList<>();
final DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(bytes));
try {
final int records = dataInputStream.readInt();
for (int i = 0; i < records; i++) {
final byte[] valueBytes = new byte[dataInputStream.readInt()];
dataInputStream.read(valueBytes);
list.add(valueDeserializer.deserialize(s, valueBytes));
}
// dataInputStream.close();
} catch (final IOException e) {
throw new RuntimeException("Unable to deserialize PriorityQueue", e);
}finally {
try {
dataInputStream.close();
} catch (Exception e2) {
// TODO: handle exception
}
}
return list;
}

@Override
public void close() {

}

}
/// Now create Stream Processors

public class LogTraceStreamStateProcessor implements Processor<String, String>{

private static final Logger logger = Logger.getLogger(LogTraceStreamStateProcessor.class);
IStore stateStore;

/**
* Initialize the transformer.
*/
@Override
public void init(ProcessorContext context) {
logger.info("initializing processor and looking for monitoring store");
stateStore = MonitoringStateStoreFactory.getInstance().getStore();
logger.debug("found the monitoring store - {} ", stateStore);
stateStore.initLogTraceStoreProcess(context);
logger.debug("initalizing monitoring store.");
}

@Override
public void process(String key, String value) {

logger.debug("Storing the value for logtrace storage - {} ", value);
stateStore.storeLogTrace(value);
logger.debug("finished Storing the value for logtrace storage - {} ", value);

}

@Override
public void close() {
// TODO Auto-generated method stub

}

}

// access the key value state store like below
KeyValueStore<String, List<String>> stateStore = (KeyValueStore<String, List<String>>) traceStreamContext.getStateStore(EXEID_REQ_REL_STORE);

//Now add a list to new key for a new message and if the key exists then add a new message in the list

public void storeTraceData(String traceData) {
try {
TraceEvent tracer = new TraceEvent();

logger.debug("Received the Trace value - {}", traceData);
tracer = mapper.readValue(traceData, TraceEvent.class);
logger.debug("trace unmarshelling has been completed successfully !!!");

String key = tracer.getExecutionId();

List<String> listEvents = stateStore.get(key);

if (listEvents != null && !listEvents.isEmpty()) {

logger.debug("event is already in store so storing in the list for execution id - {}", key);
listEvents.add(requestId);
stateStore.put(key, listEvents);
} else {
logger.debug(
"event is not present in the store so creating a new list and adding into store for execution id - {}",
key);
List<String> list = new ArrayList<>();
list.add(requestId);

stateStore.put(key, list);

}

} catch (Throwable e) {
logger.error("exception while processing the trace event .. ", e);
} finally {
try {
traceStreamContext.commit();
} catch (Exception e2) {
e2.printStackTrace();
}
}

}
/// now this is how you can access the message from state store
public ReadOnlyKeyValueStore<String, List<String>> tracerStore() {
return waitUntilStoreIsQueryable(KEY_NAME);
}




关于apache-kafka - 我们如何将相同键的所有值合并为一个列表,并返回键和值为字符串的 Kafka Streams,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59861276/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com