- iOS/Objective-C 元类和类别
- objective-c - -1001 错误,当 NSURLSession 通过 httpproxy 和/etc/hosts
- java - 使用网络类获取 url 地址
- ios - 推送通知中不播放声音
我正在做一个项目,我需要使用 TwitterAPI 检索 Twitter 消息,处理它们并将它们存储在数据库中。我正在使用 Producer/Consumer BlockingQueue,其中元素的作用如下:
这是主类:
// Creating shared object
BlockingQueue<TwitterMessage> sharedQueue = new ArrayBlockingQueue<TwitterMessage>(1);
// Creating Producer and Consumer Thread
Thread prodThread = new Thread(new TwitterStreamProducer(sharedQueue));
Thread consThread = new Thread(new TwitterStreamConsumer(sharedQueue));
// Starting producer and Consumer thread
prodThread.start();
consThread.start();
生产者处理 TwitterAPI 响应并将对象添加到队列中。
@Override
public void run() {
while (true) {
try {
message = extractData(); // extract data from TwitterAPI response and return TwitterMessage object
sharedQueue.put(message);
System.out.println("Produced: " + message.getTwitterMessage());
} catch (Exception ex) {
Logger.getLogger(TwitterStreamProducer.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
消费者做如下:
private final BlockingQueue<TwitterMessage> sharedQueue;
private TwitterProcessor twitterProcessor;
private TwitterMessage twitterMessage;
public TwitterStreamConsumer(BlockingQueue<TwitterMessage> sharedQueue) {
this.sharedQueue = sharedQueue;
twitterProcessor = new TwitterProcessor();
}
@Override
public void run() {
while (true) {
try {
twitterMessage = this.twitterProcessor.process(sharedQueue.take());
if (twitterMessage.getTwitterMessage().length() > 1) {
System.out.printf("Consumed: %s\n", twitterMessage.getTwitterMessage());
}
} catch (InterruptedException ex) {
Logger.getLogger(TwitterStreamConsumer.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
我希望看到的是以下内容:
Produced: …twittermessage1…
Consumed: …twittermessage1…
Produced: …twittermessage2…
Consumed: …twittermessage2…
Produced: …twittermessage3…
Consumed: …twittermessage3…
...
但是,我得到的结果如下:
Produced: …twittermessage1…
Produced: …twittermessage2… <= problem
Consumed: …twittermessage1…
Produced: …twittermessage3…
Consumed: …twittermessage3…
Consumed: …twittermessage3… <= problem
Produced: …twittermessage4… <= problem
Produced: …twittermessage5…
Consumed: …twittermessage5…
如您所见,有时生产者和消费者之间存在重叠,生产者生产的消息过多而未被消费。有时一条消息会被消费两次(有时甚至超过两次)
编辑1这是控制台上打印的内容:
Produced: @1StevenGeorgiou thanks for the follow #ff
Processed: follow
Produced: @nmagliozzi6 @_PatrickKealy_ but of course!!!!!
Produced: @taylorgaglia Thanks Tayl 😊 miss you tooo
Processed: tayl miss
Produced: Hate this who to follow tab in #twitter it's shows the most pathetic people you know. Accidently added one I had to act fast to unfollow
Processed: hate follow tabshow pathet peopl accid ad act fast unfollow
编辑2正如 John Vint 建议打印出“System.identityHashCode(sharedQueue.take())”,我得到以下信息:
Produced: …
Consumed: 1206857787
Produced: …
Consumed: 1206857787
…
有人可以帮我解决这个问题吗?
谢谢!
最佳答案
代码运行正常:线程的执行顺序未定义。因此,生产者很有可能在处理前一条消息之前生成多条消息。这甚至是一个理想的特性,因为它允许有多个线程处理获取(生产者),这将花费一些时间作为阻塞,并且只有更少甚至单个消费者实际处理这些中间结果。
但是在您的代码中,您违反了生产者/消费者的基本规则,即他们之间的关系需要有所不同。由于您目前每条消息都有一个生产者/消费者对,因此使用的模式只会减慢速度。您应该增加 getter 的数量(并接受异步处理),或者 - 如果您不想要异步处理 - 完全删除该模式并让“消费者”自行获取消息。
编辑:如果您使用像 LinkedBlockingQueue 这样的并发队列你的问题应该已经解决了。
也看看 ExecutorService类,它大大简化了 Runnable 的线程处理。
关于Java BlockingQueue 生产/消费不正确,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22013336/
我在一个网站上工作,该网站在生产中只有 aspx 文件和 bin 目录和文件。任何人都知道这个网站是如何部署的,我通常有我的网站,我也会提交代码。 我的问题 2. 如何在同一台服务器上创建测试网站?我
您好,我认为这应该是一个相当简单的问题,但我对管理 git 不太熟悉。 我使用的是非常流行的 http://nvie.com/posts/a-successful-git-branching-mode
目前我的网站(生产服务器)已经有很多代码了。现在我想开始在我的项目中使用 Git 并为我的团队设置一个暂存服务器。谁能给我任何建议? 这是我脑海中的画面: Production
我目前正在学习 Erlang SO 用户能否提供有关他们的任何 Erlang 应用程序部署的有趣示例? 我想深入了解 Erlang 在过去的电信中的常见用途,以及 Erlang 在开发/部署过程中带来
我关注了Ryan's screencast并部署到 VPS。所以我使用 Unicorn + nginx + github + Ubuntu 12.04 LTS + capistrano。我也使用 i1
我想在 Azure 中维护临时环境和生产环境。每个都应该有自己的 blob 存储和 sql 存储。实现这一目标的最佳方法是什么?设置临时和生产 SQL Server 以及两个 Blob 存储帐户? 最
我无法使用 Electron 打包程序在内置的 Electron 应用程序中打开chrome开发工具。 我已经尝试过mainWindow.webContents.openDevTools(),但这没有
我有一个 Azure 应用程序服务环境。 可以在同一个 ASE 中运行多个应用服务计划(开发、测试和生产)吗? 基本上,我知道他们会共享前端池,我认为这很好,因为那里没有运行应用程序代码,并且它“..
我是 Maven 新手,有 Rails 背景。在较高级别上,如果我正在运行测试、在本地运行应用程序以及在部署到生产环境时,我希望连接到不同的数据库。 这就是我的想法。当我运行 mvn test 时,它
我有一个 Azure 应用程序服务环境。 可以在同一个 ASE 中运行多个应用服务计划(开发、测试和生产)吗? 基本上,我知道他们会共享前端池,我认为这很好,因为那里没有运行应用程序代码,并且它“..
我正在使用 faSTLane\produce 脚本制作一个新应用程序,我收到以下错误消息: in `parse_response': {"data"=>nil, "messages"=>{"warn"
使开发人员能够构建包含私有(private)数据的系统的当前做法是什么?谁能指出这类事情的“最佳实践”指南? 我们这里有一个 Catch-22,因为开发人员需要编写与具有被认为是“私有(private
我有一个连接 Azure SQL Server 的 Azure 云服务。当我第一次设置这个时,我真的不太了解自己在做什么,只是想熟悉 Azure。所以现在我想利用我所拥有的东西并将其转变为可靠的部署结
我是 Cordova 的新手。抱歉,如果这些是业余问题。我想详细了解典型手机应用程序的设置和架构。 我有一个本地版本的 Meteor Cordova 正在运行,它通过 Modulus 连接到远程服务器
我一直在寻找一些在一些 POS(销售点)设备和服务器之间同步数据的选项。 SymmetricDS似乎是具有商业友好许可证的选项之一。作为一个 Codehaus 项目确实保证了一定程度的质量,所以我同意
在 PHP 开发中,可以通过服务器的“环境”变量确定应用程序是在生产环境还是开发环境中运行。 在 tomcat 服务器上是否有类似的变量可用,或者是否有更好的方法将应用程序用于生产和开发? 最佳答案
我正在做一个项目,我需要使用 TwitterAPI 检索 Twitter 消息,处理它们并将它们存储在数据库中。我正在使用 Producer/Consumer BlockingQueue,其中元素的作
这个问题类似于:iPhone development - what is the difference between a development and distribution provision
我正在尝试根据 URL 在 Drupal 中设置环境。例如,如果我访问 mysite.local,它将使用 localdb 并将站点名称更改为“Local Mysite”;如果我转到 mysite.c
我今天一直在阅读 Magento 中的数据库同步。 我目前正在努力解决的一件事是在开发期间和上传到生产期间需要同步什么。现在假设一批更改将包含对数据库和类似代码的更改,下面是我对模型工作流的理解(我目
我是一名优秀的程序员,十分优秀!