- mongodb - 在 MongoDB mapreduce 中,如何展平值对象?
- javascript - 对象传播与 Object.assign
- html - 输入类型 ="submit"Vs 按钮标签它们可以互换吗?
- sql - 使用 MongoDB 而不是 MS SQL Server 的优缺点
背景:我设置了一个 python
模块,用于从流式 API 中获取 JSON 对象,并使用 pymongo 将它们(一次批量插入 25 个)存储在 MongoDB 中。为了比较,我还有一个 bash 命令可以从同一个流 API 中 curl
并将其 pipe
到 mongoimport
。这两种方法都将数据存储在单独的集合中。
我定期监控集合的 count()
以检查它们的表现。
到目前为止,我看到 python
模块落后于 curl | 大约 1000 个 JSON 对象。 mongoimport
方法。
问题:如何优化我的 python
模块,使其与 curl | 同步mongoimport
?
我不能使用 tweetstream
,因为我使用的不是 Twitter API,而是第 3 方流媒体服务。
有人可以帮我吗?
Python
模块:
class StreamReader:
def __init__(self):
try:
self.buff = ""
self.tweet = ""
self.chunk_count = 0
self.tweet_list = []
self.string_buffer = cStringIO.StringIO()
self.mongo = pymongo.Connection(DB_HOST)
self.db = self.mongo[DB_NAME]
self.raw_tweets = self.db["raw_tweets_gnip"]
self.conn = pycurl.Curl()
self.conn.setopt(pycurl.ENCODING, 'gzip')
self.conn.setopt(pycurl.URL, STREAM_URL)
self.conn.setopt(pycurl.USERPWD, AUTH)
self.conn.setopt(pycurl.WRITEFUNCTION, self.handle_data)
self.conn.perform()
except Exception as ex:
print "error ocurred : %s" % str(ex)
def handle_data(self, data):
try:
self.string_buffer = cStringIO.StringIO(data)
for line in self.string_buffer:
try:
self.tweet = json.loads(line)
except Exception as json_ex:
print "JSON Exception occurred: %s" % str(json_ex)
continue
if self.tweet:
try:
self.tweet_list.append(self.tweet)
self.chunk_count += 1
if self.chunk_count % 1000 == 0
self.raw_tweets.insert(self.tweet_list)
self.chunk_count = 0
self.tweet_list = []
except Exception as insert_ex:
print "Error inserting tweet: %s" % str(insert_ex)
continue
except Exception as ex:
print "Exception occurred: %s" % str(ex)
print repr(self.buff)
def __del__(self):
self.string_buffer.close()
感谢阅读。
最佳答案
原来您的代码中有一个错误。
if self.chunk_count % 50 == 0
self.raw_tweets.insert(self.tweet_list)
self.chunk_count = 0
您重置了 chunk_count,但没有重置 tweet_list。因此,您第二次尝试插入 100 个项目(50 个新项目加上 50 个之前已发送到 DB 的项目)。您已解决此问题,但仍发现性能有所不同。
整个批量大小的事情结果是一个红鲱鱼。我尝试使用一个大的 json 文件并通过 python 加载它,而不是通过 mongoimport 加载它,Python 总是更快(即使在安全模式下 - 见下文)。
仔细查看您的代码,我意识到问题在于流式 API 实际上是以 block 的形式向您传递数据。您应该只获取这些 block 并将它们放入数据库中(这就是 mongoimport 正在做的事情)。您的 python 为拆分流、将其添加到列表然后定期向 Mongo 发送批处理所做的额外工作可能是我看到的和您看到的之间的区别。
为您的 handle_data() 尝试此代码段
def handle_data(self, data):
try:
string_buffer = StringIO(data)
tweets = json.load(string_buffer)
except Exception as ex:
print "Exception occurred: %s" % str(ex)
try:
self.raw_tweets.insert(tweets)
except Exception as ex:
print "Exception occurred: %s" % str(ex)
需要注意的一点是,您的 python inserts are not running in "safe mode" - 你应该通过在插入语句中添加一个参数 safe=True
来改变它。然后,您将在任何插入失败时获得异常,并且您的 try/catch 将打印暴露问题的错误。
它的性能成本也不高 - 我目前正在运行一个测试,大约五分钟后,两个集合的大小为 14120 14113。
关于python - 优化: Dumping JSON from a Streaming API to Mongo,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10855518/
我在 mongo 中有一个查询,看起来像 db.Table_Name.group({ "key": { "Camp": true }, "initial": { "Clicks": 0 }
这是一个示例查询: db.readings.find( {"_id.s": ISODate("2012-11-01T00:05:00Z") }).count() 查询在 mongo shell 中工作
我正在使用 mongo 模板查询基于状态和邮政编码的文档,这是使用以下标准和查询方法实现的: List modelClass = null; Criteria criteria = new Crite
目前,我有一个旧版本的 mongo,即 2.6 在我的系统上运行。我的网站已经投入生产,并且拥有大量客户数据。我正在计划升级到 mongo 3.2。 所以,我的问题是 mongo v3.2 的 mon
我构建了一个 MongoDB。我想通过某些分组进行聚合。我找到了 document ,这将为我做到这一点。一切正常,但确定limitations指出: 管道的输出只能包含 16 兆字节。如果你的结果s
我无法连接到 MongoDB .在 Ubuntu 中它可以工作,但我在 CentOS 工作现在。这是错误信息: MongoDB shell version: 2.4.2 connecting to:
我试图使用mongo运行最简单的mongo-express和docker-compose容器。我遇到了许多错误,这些错误将在以后解释。 我尝试了以下docker-compose配置: 1。 versi
我有一个 mongo 查询,如下所示。 db.Course.find( { $and: [{courseCallNo: {$in : [/^ssoapicall1$/i]} }, {clientId
我想知道为什么我会收到以下 php 通知: ( ! ) Notice: Mongo::__construct(): parsing servers in C:\htdocs\multishop\lib
(问题灵感来自this one) 给定一个数据集: db.mycollection.insert([ {a:1, b:2, c:3}, {a:1, b:3, c:4}, {a:0, b:1
如果我已经使用 PECL 成功安装了 mongoDB,还需要获取 debian 软件包“php5-mongo”吗?有什么不同?(这个问题应该迁移吗?) 最佳答案 区别与从 CPAN 或 debian(
Mongo 一天前还运行良好。然后今天早上我起床并尝试打开我得到这个: MongoDB shell version: 2.6.4 2015-01-06T11:10:54.142-0500 SE
我正在尝试使用 C# Mongo 驱动程序将文件上传到 Mongo Atlas。但我不明白如何连接类 MongoServerSettings。我试过这个: private static MongoSe
我有两个版本的 mongodb,2.4.3 和 2.6.0。我可以在不同的端口上启动这两个版本,但是在使用 ./mongos 运行它时出现错误: BadValue error: no args for
我需要在 不 关闭的情况下进行分片,或者在端口 27017 中重启现有的 mongo 实例。 我尝试了以下操作,(当默认端口正在运行和 mongo 实例时) mongod --shardsvr --d
在无意中升级了 mongodb 包(3.4.9 -> 3.6.1)后,有没有办法升级 mongo 数据库? 根据 mongo 文档,作为 prerequisite在升级过程中,featureCompa
我正在使用 Sails 0.12.3 和 mongo 3.2.7 这是我的 config/connections.js。 mongo: { adapter: 'sails-mongo', host
我正在使用 mongodb 构建一个基本的搜索引擎,我已经验证了基本查询在 mongo shell 中的工作。不过,我不太明白如何将其翻译成 PHP。 输入字符串中的空格表示“和”运算符和 |或管道字
我有一个用 @Document 注释的 Mongo 集合,我希望能够从字符串 (JSON) 中获取该 Java 对象,因为我们正在将这些类作为字符串插入队列。 Spring-Data-Mongo 中是
我正在使用 Linux Debian 9。我已经安装了 JDK 1.8。我使用的maven版本是3.6,springboot的版本是2.1。 mongodb版本是3.6。 下面是我试图保存在 mong
我是一名优秀的程序员,十分优秀!