gpt4 book ai didi

python - 优化: Dumping JSON from a Streaming API to Mongo

转载 作者:IT老高 更新时间:2023-10-28 13:16:29 25 4
gpt4 key购买 nike

背景:我设置了一个 python 模块,用于从流式 API 中获取 JSON 对象,并使用 pymongo 将它们(一次批量插入 25 个)存储在 MongoDB 中。为了比较,我还有一个 bash 命令可以从同一个流 API 中 curl 并将其 pipemongoimport。这两种方法都将数据存储在单独的集合中。

我定期监控集合的 count() 以检查它们的表现。

到目前为止,我看到 python 模块落后于 curl | 大约 1000 个 JSON 对象。 mongoimport 方法。

问题:如何优化我的 python 模块,使其与 curl | 同步mongoimport?

我不能使用 tweetstream,因为我使用的不是 Twitter API,而是第 3 方流媒体服务。

有人可以帮我吗?

Python 模块:


class StreamReader:
def __init__(self):
try:
self.buff = ""
self.tweet = ""
self.chunk_count = 0
self.tweet_list = []
self.string_buffer = cStringIO.StringIO()
self.mongo = pymongo.Connection(DB_HOST)
self.db = self.mongo[DB_NAME]
self.raw_tweets = self.db["raw_tweets_gnip"]
self.conn = pycurl.Curl()
self.conn.setopt(pycurl.ENCODING, 'gzip')
self.conn.setopt(pycurl.URL, STREAM_URL)
self.conn.setopt(pycurl.USERPWD, AUTH)
self.conn.setopt(pycurl.WRITEFUNCTION, self.handle_data)
self.conn.perform()
except Exception as ex:
print "error ocurred : %s" % str(ex)

def handle_data(self, data):
try:
self.string_buffer = cStringIO.StringIO(data)
for line in self.string_buffer:
try:
self.tweet = json.loads(line)
except Exception as json_ex:
print "JSON Exception occurred: %s" % str(json_ex)
continue

if self.tweet:
try:
self.tweet_list.append(self.tweet)
self.chunk_count += 1
if self.chunk_count % 1000 == 0
self.raw_tweets.insert(self.tweet_list)
self.chunk_count = 0
self.tweet_list = []

except Exception as insert_ex:
print "Error inserting tweet: %s" % str(insert_ex)
continue
except Exception as ex:
print "Exception occurred: %s" % str(ex)
print repr(self.buff)

def __del__(self):
self.string_buffer.close()

感谢阅读。

最佳答案

原来您的代码中有一个错误。

                if self.chunk_count % 50 == 0
self.raw_tweets.insert(self.tweet_list)
self.chunk_count = 0

您重置了 chunk_count,但没有重置 tweet_list。因此,您第二次尝试插入 100 个项目(50 个新项目加上 50 个之前已发送到 DB 的项目)。您已解决此问题,但仍发现性能有所不同。

整个批量大小的事情结果是一个红鲱鱼。我尝试使用一个大的 json 文件并通过 python 加载它,而不是通过 mongoimport 加载它,Python 总是更快(即使在安全模式下 - 见下文)。

仔细查看您的代码,我意识到问题在于流式 API 实际上是以 block 的形式向您传递数据。您应该只获取这些 block 并将它们放入数据库中(这就是 mongoimport 正在做的事情)。您的 python 为拆分流、将其添加到列表然后定期向 Mongo 发送批处理所做的额外工作可能是我看到的和您看到的之间的区别。

为您的 handle_data() 尝试此代码段

def handle_data(self, data):
try:
string_buffer = StringIO(data)
tweets = json.load(string_buffer)
except Exception as ex:
print "Exception occurred: %s" % str(ex)
try:
self.raw_tweets.insert(tweets)
except Exception as ex:
print "Exception occurred: %s" % str(ex)

需要注意的一点是,您的 python inserts are not running in "safe mode" - 你应该通过在插入语句中添加一个参数 safe=True 来改变它。然后,您将在任何插入失败时获得异常,并且您的 try/catch 将打印暴露问题的错误。

它的性能成本也不高 - 我目前正在运行一个测试,大约五分钟后,两个集合的大小为 14120 14113。

关于python - 优化: Dumping JSON from a Streaming API to Mongo,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10855518/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com