gpt4 book ai didi

java - SubmissionPublisher 提交时不调用订阅者的 onNext

转载 作者:搜寻专家 更新时间:2023-11-01 02:01:07 24 4
gpt4 key购买 nike

每隔一段时间我都会用特定的查询检索推文。这些推文必须传递给计算和操作这些推文的服务。所以这些服务订阅了我的出版商。所以 publisher.hasSubscribers() 返回 true。但是提交或报价功能不会调用我的订阅者的 onNext。因此,作为“修复”,我循环遍历我的订阅者并自己调用它。但事实并非如此。

这是我的发布者的构造函数。

 public TwitterStreamer(Executor executor, int maxBufferCapacity, long period, TimeUnit unit, String searchQuery){
super(executor, maxBufferCapacity);
this.searchQuery = searchQuery;
scheduler = new ScheduledThreadPoolExecutor(1);
this.tweetGetter = scheduler.scheduleAtFixedRate(
() -> {
List<String> tweets = getTweets(searchQuery);
/* this.lastCall = LocalDateTime.now();
for(Flow.Subscriber sub : this.getSubscribers()){
sub.onNext(tweets);
}*/
this.submit(tweets);
if(tweets.size() >= 20) this.close();
}, 0, period, unit);
}

这是我的订阅者

    package myFlowAPI;

import Interfaces.IProcess;
import Services.LogToFileService;

import java.util.List;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicInteger;

public class MySubscriber implements Flow.Subscriber<List<String>> {
private Flow.Subscription subscription;
private AtomicInteger count;

private IProcess processor;

private String name;
private int DEMAND = 0;

public MySubscriber(String name, IProcess processor){
this.name = name;
this.processor = processor;
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
}


@Override
public void onNext(List<String> item) {
Object result = this.processor.process(item);
this.readResult(result);

switch (this.processor.getClass().getSimpleName()){
case "CalculateTweetStatsService":
if((Integer) result >= 20){
this.subscription.cancel();
}
break;
}
}

@Override
public void onError(Throwable throwable) {
System.out.println("Error is thrown " + throwable.getMessage());
}

@Override
public void onComplete() {
if(this.processor instanceof LogToFileService){
((LogToFileService) processor).closeResource();
}
System.out.println("complete");
}

private void readResult(Object result){
System.out.println("Result of " + this.processor.getClass().getSimpleName() + " processor is " + result.toString());
}
}

这是我订阅发布者的主要地方

public static void main(String[] args) {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());

String searchQuery;
try{
searchQuery = args[0] != null ? args[0] : "#capgemini50";
}catch (ArrayIndexOutOfBoundsException ex){
searchQuery = "#capgemini50";
}

TwitterStreamer streamer = new TwitterStreamer(executor, 5, 15L, SECONDS, searchQuery);

MySubscriber subscriber1 = new MySubscriber("LogFileSubscriber", new LogToFileService("./tweetsLogger.txt"));
MySubscriber subscriber2 = new MySubscriber("TotalTweetSubscriber",new CalculateTweetStatsService());
streamer.subscribe(subscriber1);
streamer.subscribe(subscriber2);

}

最佳答案

您需要订阅者明确请求数据,例如订阅后(参见 https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/Flow.Subscription.html#request-long-):

@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1);
}

同样在 onNext() 中处理以请求下一个项目。

关于java - SubmissionPublisher 提交时不调用订阅者的 onNext,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46712882/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com