gpt4 book ai didi

Java BlockingQueue 生产/消费不正确

转载 作者:塔克拉玛干 更新时间:2023-11-02 20:05:40 27 4
gpt4 key购买 nike

我正在做一个项目,我需要使用 TwitterAPI 检索 Twitter 消息,处理它们并将它们存储在数据库中。我正在使用 Producer/Consumer BlockingQueue,其中元素的作用如下:

  • 生产者:使用 TwitterAPI 检索 Twitter 消息并将其存储在 BlockingQueue 中。
  • 消费者:从队列中取出一个元素,对其进行处理并将其存储在数据库中。

这是主类:

    // Creating shared object
BlockingQueue<TwitterMessage> sharedQueue = new ArrayBlockingQueue<TwitterMessage>(1);

// Creating Producer and Consumer Thread
Thread prodThread = new Thread(new TwitterStreamProducer(sharedQueue));
Thread consThread = new Thread(new TwitterStreamConsumer(sharedQueue));

// Starting producer and Consumer thread
prodThread.start();
consThread.start();

生产者处理 TwitterAPI 响应并将对象添加到队列中。

@Override
public void run() {
while (true) {
try {
message = extractData(); // extract data from TwitterAPI response and return TwitterMessage object
sharedQueue.put(message);

System.out.println("Produced: " + message.getTwitterMessage());
} catch (Exception ex) {
Logger.getLogger(TwitterStreamProducer.class.getName()).log(Level.SEVERE, null, ex);
}
}
}

消费者做如下:

private final BlockingQueue<TwitterMessage> sharedQueue;
private TwitterProcessor twitterProcessor;
private TwitterMessage twitterMessage;

public TwitterStreamConsumer(BlockingQueue<TwitterMessage> sharedQueue) {
this.sharedQueue = sharedQueue;
twitterProcessor = new TwitterProcessor();
}

@Override
public void run() {
while (true) {
try {
twitterMessage = this.twitterProcessor.process(sharedQueue.take());
if (twitterMessage.getTwitterMessage().length() > 1) {
System.out.printf("Consumed: %s\n", twitterMessage.getTwitterMessage());
}
} catch (InterruptedException ex) {
Logger.getLogger(TwitterStreamConsumer.class.getName()).log(Level.SEVERE, null, ex);
}
}
}

我希望看到的是以下内容:

Produced: …twittermessage1… 
Consumed: …twittermessage1…
Produced: …twittermessage2…
Consumed: …twittermessage2…
Produced: …twittermessage3…
Consumed: …twittermessage3…
...

但是,我得到的结果如下:

Produced: …twittermessage1…
Produced: …twittermessage2… <= problem
Consumed: …twittermessage1…
Produced: …twittermessage3…
Consumed: …twittermessage3…
Consumed: …twittermessage3… <= problem
Produced: …twittermessage4… <= problem
Produced: …twittermessage5…
Consumed: …twittermessage5…

如您所见,有时生产者和消费者之间存在重叠,生产者生产的消息过多而未被消费。有时一条消息会被消费两次(有时甚至超过两次)

编辑1这是控制台上打印的内容:

Produced: @1StevenGeorgiou thanks for the follow #ff
Processed: follow
Produced: @nmagliozzi6 @_PatrickKealy_ but of course!!!!!
Produced: @taylorgaglia Thanks Tayl 😊 miss you tooo
Processed: tayl miss
Produced: Hate this who to follow tab in #twitter it's shows the most pathetic people you know. Accidently added one I had to act fast to unfollow
Processed: hate follow tabshow pathet peopl accid ad act fast unfollow

编辑2正如 John Vint 建议打印出“System.identityHashCode(sharedQueue.take())”,我得到以下信息:

Produced: …
Consumed: 1206857787
Produced: …
Consumed: 1206857787

有人可以帮我解决这个问题吗?

谢谢!

最佳答案

代码运行正常:线程的执行顺序未定义。因此,生产者很有可能在处理前一条消息之前生成多条消息。这甚至是一个理想的特性,因为它允许有多个线程处理获取(生产者),这将花费一些时间作为阻塞,并且只有更少甚至单个消费者实际处理这些中间结果。

但是在您的代码中,您违反了生产者/消费者的基本规则,即他们之间的关系需要有所不同。由于您目前每条消息都有一个生产者/消费者对,因此使用的模式只会减慢速度。您应该增加 getter 的数量(并接受异步处理),或者 - 如果您不想要异步处理 - 完全删除该模式并让“消费者”自行获取消息。

编辑:如果您使用像 LinkedBlockingQueue 这样的并发队列你的问题应该已经解决了。
也看看 ExecutorService类,它大大简化了 Runnable 的线程处理。

关于Java BlockingQueue 生产/消费不正确,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22013336/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com