gpt4 book ai didi

java - Spring Cloud Stream kafka聚合使用 Autowiring bean的新实例

转载 作者:行者123 更新时间:2023-12-01 21:50:42 24 4
gpt4 key购买 nike

有没有办法在 Kafka Streams 聚合中使用 Autowired bean 的新实例?

@EnableBinding(Processor.class)
public class MessageReceiver {

@StreamListener(target = Processor.INPUT)
@SendTo(Processor.OUTPUT)
public KStream<String, List<CustomEvent>> process(KStream<String, Event> eventKStream) {

JsonSerde<EventAggregator> eventAggregatorJsonSerde = new JsonSerde<>(EventAggregator.class);

TimeWindowedKStream<String, Event> timeWindowedKStream = eventKStream
.groupByKey()
.windowedBy(TimeWindows.of(60000).advanceBy(30000));

timeWindowedKStream
.aggregate(
EventAggregator::new, // how to use Autowired bean here, which is different for different window and different key
((key, value, aggregator) -> aggregator.add(value)),
Materialized.with(Serdes.String(), eventAggregatorJsonSerde)
);

// continues...
}

有没有办法使用 Autowiring bean而不是EventAggregator::new ??

EventAggregator.java

public class EventAggregator {

//@Autowired
//private SomeClass someClass; // this is null as of now, since spring can't autowire inside a non managed bean

List<CustomEvent> customEventList = new ArrayList<>();
FIFOMap<String, Integer> map = new LinkedHashMap<>(4); // map which stores only 4 value

public EventAggregator add(Event event) {
map.put(new Date().toString(), event.getValue());
generateCustomEvent(); // when I have 4 values, I need to do some operation and generate custom event
return this;
}

private void generateCustomEvent() {
if (map.size() == 4) {
// someClass complex operation
CustomEvent e = new CustomEvent("", "", "");
customEventList.add(e);
}
}
}

最佳答案

如评论中所示, EventAggregator::new 的实例化由 Kafka Streams 触发。如果您想访问此对象内的 spring bean,实现此目的的一种方法是提供一个采用 Spring 应用程序上下文的构造函数,然后显式查询类内的 bean(例如使用 getBean() )方法。您可以在主类中 Autowiring 上下文,然后将其与 EventAggregator 一起传递。构造函数。

关于java - Spring Cloud Stream kafka聚合使用 Autowiring bean的新实例,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58764855/

24 4 0