作者热门文章
- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
这是我的变压器:
public class DataEnricher implements Transformer < byte[], EnrichedData, KeyValue < byte[], EnrichedData >> {
private ManagedChannel channel;
private InfoClient infoclient;
private LRUCacheCollector < String,
InfoResponse > cache;
public DataEnricher() {}
@Override
public void init(ProcessorContext context) {
channel = ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext().build();
infoclient = new InfoClient(channel);
}
@Override
public KeyValue < byte[],
EnrichedData > transform(byte[] key, EnrichedData request) {
InfoResponse infoResponse = null;
String someInfo = request.getSomeInfo();
try {
infoResponse = infoclient.getMoreInfo(someInfo);
} catch (Exception e) {
logger.warn("An exception has occurred during retrieval.", e.getMessage());
}
EnrichedData enrichedData = EnrichedDataBuilder.addExtraInfo(request, infoResponse);
return new KeyValue < > (key, enrichedData);
}
@Override
public KeyValue < byte[],
DataEnricher > punctuate(long timestamp) {
return null;
}
@Override
public void close() {
client.shutdown();
}
}
在 Kafka Streams 中,每个流线程都会初始化自己的流拓扑副本,然后按 ProcessorContext(即每个任务,即每个分区)实例化该拓扑。那么 init()
不会被调用并覆盖/泄漏每个分区的 channel 吗?而且由于我们有多个线程,甚至会与 channel/client
的创建发生竞争?有没有办法防止这种情况发生?
这在 run()
方法中调用:
public KafkaStreams createStreams() {
final Properties streamsConfiguration = new Properties();
//other configuration is setup here
streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
streamsConfiguration.put(
StreamsConfig.NUM_STREAM_THREADS_CONFIG,
3);
StreamsBuilder streamsBuilder = new StreamsBuilder();
RequestJsonSerde requestSerde = new RequestJsonSerde();
DataEnricher dataEnricher = new DataEnricher();
// Get the stream of requests
final KStream < byte[], EnrichedData > requestsStream = streamsBuilder
.stream(requestsTopic, Consumed.with(Serdes.ByteArray(), requestSerde));
final KStream < byte[], EnrichedData > enrichedRequestsStream = requestsStream
.filter((key, request) - > {
return Objects.nonNull(request);
}
.transform(() - > dataEnricher);
enrichedRequestsStream.to(enrichedRequestsTopic, Produced.with(Serdes.ByteArray()));
return new KafkaStreams(streamsBuilder.build(), new StreamsConfig(streamsConfiguration));
}
最佳答案
与ManagedChannel
无关,但您必须在TransformerSupplier
中为每个ProcessContext
提供新的DataEnricher实例。
KStream.transform(DataEnricher::new);
一旦我遇到一些与此相关的 Kafka 流异常,我将尝试重新创建它。
在我看来,如果您不使用标点符号向下游发送更多记录,并且新 key 与输入记录相同,您应该使用 transformValues()
因为 transform()
当应用聚合、连接等基于键的操作时,可能会导致重新分区。
关于java - Transformer Kafka 中的 ManagedChannel 是线程安全的吗,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60555175/
我是一名优秀的程序员,十分优秀!