gpt4 book ai didi

java - Transformer Kafka 中的 ManagedChannel 是线程安全的吗

转载 作者:行者123 更新时间:2023-12-02 01:01:51 24 4
gpt4 key购买 nike

这是我的变压器:

public class DataEnricher implements Transformer < byte[], EnrichedData, KeyValue < byte[], EnrichedData >> {

private ManagedChannel channel;
private InfoClient infoclient;
private LRUCacheCollector < String,
InfoResponse > cache;


public DataEnricher() {}

@Override
public void init(ProcessorContext context) {
channel = ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext().build();
infoclient = new InfoClient(channel);
}

@Override
public KeyValue < byte[],
EnrichedData > transform(byte[] key, EnrichedData request) {
InfoResponse infoResponse = null;
String someInfo = request.getSomeInfo();
try {
infoResponse = infoclient.getMoreInfo(someInfo);
} catch (Exception e) {
logger.warn("An exception has occurred during retrieval.", e.getMessage());
}
EnrichedData enrichedData = EnrichedDataBuilder.addExtraInfo(request, infoResponse);
return new KeyValue < > (key, enrichedData);
}

@Override
public KeyValue < byte[],
DataEnricher > punctuate(long timestamp) {
return null;
}

@Override
public void close() {
client.shutdown();
}
}

在 Kafka Streams 中,每个流线程都会初始化自己的流拓扑副本,然后按 ProcessorContext(即每个任务,即每个分区)实例化该拓扑。那么 init() 不会被调用并覆盖/泄漏每个分区的 channel 吗?而且由于我们有多个线程,甚至会与 channel/client 的创建发生竞争?有没有办法防止这种情况发生?

这在 run() 方法中调用:

public KafkaStreams createStreams() {
final Properties streamsConfiguration = new Properties();
//other configuration is setup here
streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
streamsConfiguration.put(
StreamsConfig.NUM_STREAM_THREADS_CONFIG,
3);

StreamsBuilder streamsBuilder = new StreamsBuilder();

RequestJsonSerde requestSerde = new RequestJsonSerde();
DataEnricher dataEnricher = new DataEnricher();
// Get the stream of requests
final KStream < byte[], EnrichedData > requestsStream = streamsBuilder
.stream(requestsTopic, Consumed.with(Serdes.ByteArray(), requestSerde));
final KStream < byte[], EnrichedData > enrichedRequestsStream = requestsStream
.filter((key, request) - > {
return Objects.nonNull(request);
}
.transform(() - > dataEnricher);

enrichedRequestsStream.to(enrichedRequestsTopic, Produced.with(Serdes.ByteArray()));

return new KafkaStreams(streamsBuilder.build(), new StreamsConfig(streamsConfiguration));
}

最佳答案

ManagedChannel无关,但您必须在TransformerSupplier中为每个ProcessContext提供新的DataEnricher实例。

KStream.transform(DataEnricher::new);

一旦我遇到一些与此相关的 Kafka 流异常,我将尝试重新创建它。

在我看来,如果您不使用标点符号向下游发送更多记录,并且新 key 与输入记录相同,您应该使用 transformValues() 因为 transform() 当应用聚合、连接等基于键的操作时,可能会导致重新分区。

关于java - Transformer Kafka 中的 ManagedChannel 是线程安全的吗,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60555175/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com