- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
我试图将大约 800 万条记录插入 Mongo,它似乎以每秒 1000 条记录的速度插入它们,这非常慢。
代码是用python写的,所以可能是python的问题,但我怀疑。这是代码:
def str2datetime(str):
return None if (not str or str == r'\N') else datetime.strptime(str, '%Y-%m-%d %H:%M:%S')
def str2bool(str):
return None if (not str or str == r'\N') else (False if str == '0' else True)
def str2int(str):
return None if (not str or str == r'\N') else int(str)
def str2float(str):
return None if (not str or str == r'\N') else float(str)
def str2float2int(str):
return None if (not str or str == r'\N') else int(float(str) + 0.5)
def str2latin1(str):
return unicode(str, 'latin-1')
_ = lambda x: x
converters_map = {
'test_id': str2int,
'android_device_id': str2int,
'android_fingerprint': _,
'test_date': str2datetime,
'client_ip_address': _,
'download_kbps': str2int,
'upload_kbps': str2int,
'latency': str2int,
'server_name': _,
'server_country': _,
'server_country_code': _,
'server_latitude': str2float,
'server_longitude': str2float,
'client_country': _,
'client_country_code': _,
'client_region_name': str2latin1,
'client_region_code': _,
'client_city': str2latin1,
'client_latitude': str2float,
'client_longitude': str2float,
'miles_between': str2float2int,
'connection_type': str2int,
'isp_name': _,
'is_isp': str2bool,
'network_operator_name': _,
'network_operator': _,
'brand': _,
'device': _,
'hardware': _,
'build_id': _,
'manufacturer': _,
'model': str2latin1,
'product': _,
'cdma_cell_id': str2int,
'gsm_cell_id': str2int,
'client_ip_id': str2int,
'user_agent': _,
'client_net_speed': str2int,
'iphone_device_id': str2int,
'carrier_name': _,
'iso_country_code': _,
'mobile_country_code': str2int,
'mobile_network_code': str2int,
'model': str2latin1,
'version': _,
'server_sponsor_name': _,
}
def read_csv_zip(path):
with ZipFile(path) as z:
with z.open(z.namelist()[0]) as input:
r = csv.reader(input)
header = r.next()
converters = tuple((title if title != 'test_id' else '_id', converters_map[title]) for title in header)
for row in r:
row = {converter[0]:converter[1](value) for converter, value in zip(converters, row)}
yield row
argv = [x for x in argv if not x == '']
if len(argv) == 1:
print("Usage: " + argv[0] + " zip-file")
exit(1)
zip_file = argv[1]
collection_name = zip_file[:zip_file.index('_')]
print("Populating " + collection_name + " with the data from " + zip_file)
with Connection() as connection:
db = connection.db
collection = db.__getattr__(collection_name)
i = 0;
try:
start = time()
for item in read_csv_zip(zip_file):
i += 1
if (i % 1000) == 0:
stdout.write("\r%d " % i)
stdout.flush()
try:
collection.insert(item)
except Exception as exc:
print("Failed at the record #{0} (id = {1})".format(i,item['_id']))
print exc
print("Elapsed time = {0} seconds, {1} records.".format(time() - start, i))
raw_input("Press ENTER to exit")
except Exception as exc:
print("Failed at the record #{0} (id = {1})".format(i,item['_id']))
print exc
exit(1)
插入262796条记录(一个csv文件)需要350秒。
mongo 服务器在同一台机器上运行,没有人使用它。所以,如果有办法的话,我可以直接写入数据库文件。
我对分片不感兴趣,因为 800 万条记录不应该需要分片,不是吗?
我的问题是我做错了什么?也许我选择的数据库是错误的?典型的流程是每月刷新一次记录,然后仅对数据库进行查询。
谢谢。
编辑
事实证明,瓶颈不是 mongo,而是读取 zip 文件。我更改了代码,以 1000 行为一组读取 zip 文件,然后通过调用 Collection.insert
将它们提供给 mongo。它是 zip 文件,它需要所有时间。这是修改后的代码:
def insert_documents(collection, source, i, batch_size):
count = 0;
while True:
items = list(itertools.islice(source, batch_size))
if len(items) == 0:
break;
old_i = i
count += len(items)
i += len(items)
if (old_i / 1000) != (i / 1000):
sys.stdout.write("\r%d " % i)
sys.stdout.flush()
try:
collection.insert(items)
except Exception as exc:
print("Failed at some record between #{0} (id = {1}) and #{2} (id = {3})".format(old_i,items[0]['_id'],i,items[-1]['_id']))
print exc
return count
def main():
argv = [x for x in sys.argv if not x == '']
if len(argv) == 1:
print("Usage: " + argv[0] + " zip-file")
exit(1)
zip_file = argv[1]
collection_name = zip_file[:zip_file.index('_')]
print("Populating " + collection_name + " with the data from " + zip_file)
with Connection() as connection:
ookla = connection.ookla
collection = ookla.__getattr__(collection_name)
i = 0;
start = time()
count = insert_documents(collection, read_csv_zip(zip_file), i, 1000)
i += count
print("Elapsed time = {0} seconds, {1} records.".format(time() - start, count))
raw_input("Press ENTER to exit")
if __name__ == "__main__":
main()
事实证明,大部分时间都进入了 items = list(itertools.islice(source, batch_size))
。
关于如何改进它有什么想法吗?
最佳答案
尽管您在评论中指出您不能使用 mongoimport,但您可以而且应该使用。可以完美导入日期以及您的 str2latin 转换。只需预处理您的 csv 使其与 mongoimport 兼容,您就大功告成了。
将日期转换为 {myDate:{$date: msSinceEpoch}}
并且 mongoimport 会理解它。因此,通过一个预处理步骤,您可以使用 mongoimport 并根据您的用例,我不明白为什么这会成为问题。
也就是说,mongoimport 不应比批量插入快一个数量级,尽管 1000/秒并不慢,但它肯定不符合我在简单的开发机器上获得的性能类型。如果我使用批量插入而不是单声道插入,我可以轻松达到 30k/sec 甚至更高,尤其是使用 safe=false 写入(在这种情况下应该没问题,因为您可以在导入后作为第二步进行验证)。你的瓶颈是什么资源? (检查 mongostat 和 top)
关于python - 如何有效地将大型压缩 csv 文件中的数百万条记录插入到 mongo 数据库中?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9760242/
我有一个网站。 必须登录才能看到里面的内容。 但是,我使用此代码登录。 doc = Jsoup.connect("http://46.137.207.181/Account/Login.aspx")
我正在尝试为我的域创建一个 SPF 记录并使我的邮件服务器能够对其进行评估。我在邮件服务器上使用 Postfix 并使用 policyd-spf (Python) 来评估记录。目前,我通过我的私有(p
我需要为负载平衡的 AWS 站点 mywebsite.com 添加 CName 记录。记录应该是: @ CNAME mywebsite.us-east-1.elb.amazon
我目前正在开发一个相当大的多层应用程序,该应用程序将部署在海外。虽然我希望它在解聚后不会折叠或爆炸,但我不能 100% 确定这一点。因此,如果我知道我可以请求日志文件,以准确找出问题所在以及原因,那就
我使用以下命令从我的网络摄像头录制音频和视频 gst-launch-0.10 v4l2src ! video/x-raw-yuv,width=640,height=480,framerate=30/1
我刚刚开始使用 ffmpeg 将视频分割成图像。我想知道是否可以将控制台输出信息保存到日志文件中。我试过“-v 10”参数,也试过“-loglevel”参数。我在另一个 SO 帖子上看到使用 ffmp
我想针对两个日期查询我的表并检索其中的记录。 我这样声明我的变量; DECLARE @StartDate datetime; DECLARE @EndDate datetime; 并像这样设置我的变量
在 javascript 中,我可以使用简单的 for 循环访问对象的每个属性,如下所示 var myObj = {x:1, y:2}; var i, sum=0; for(i in myObj) s
最近加入了一个需要处理大量代码的项目,我想开始记录和可视化调用图的一些流程,让我更好地理解一切是如何组合在一起的。这是我希望在我的理想工具中看到的: 每个节点都是一个函数/方法 如果一个函数可以调用另
如何使用反射在F#中创建记录类型?谢谢 最佳答案 您可以使用 FSharpValue.MakeRecord [MSDN]创建一个记录实例,但是我认为F#中没有任何定义记录类型的东西。但是,记录会编译为
关闭。这个问题不满足Stack Overflow guidelines .它目前不接受答案。 想改善这个问题吗?更新问题,使其成为 on-topic对于堆栈溢出。 3年前关闭。 Improve thi
我是 Sequelize 的新手并且遇到了一些语法问题。我制作了以下模型: // User sequelize.define('user', { name: { type: DataTyp
${student.name} Notify 这是我的output.jsp。请注意,我已经放置了一个链接“Notify”以将其转发到 display.jsp 上。但我不确定如何将 Stud
例如,这是我要做的查询: server:"xxx.xxx.com" AND request_url:"/xxx/xxx/xxx" AND http_X_Forwarded_Proto:(https O
我一直在开发大量 Java、PHP 和 Python。所有这些都提供了很棒的日志记录包(分别是 Log4J、Log 或logging)。这在调试应用程序时有很大帮助。特别是当应用程序 headless
在我的Grails应用程序中,我异步运行一些批处理过程,并希望该过程记录各种状态消息,以便管理员以后可以检查它们。 我考虑过将log4j JDBC附加程序用作最简单的解决方案,但是据我所知,它不使用D
我想将进入 MQ 队列的消息记录到数据库/文件或其他日志队列,并且我无法修改现有代码。是否有任何方法可以实现某种类似于 HTTP 嗅探器的消息记录实用程序?或者也许 MQ 有一些内置的功能来记录消息?
如果我有一条包含通用字段的记录,在更改通用字段时是否有任何方法可以模仿方便的 with 语法? 即如果我有 type User = // 'photo can be Bitmap or Url {
假设我有一个名为 Car 的自定义对象。其中的所有字段都是私有(private)的。 public class Car { private String mName; private
当记录具有特定字段时,我需要返回 true 的函数,反之亦然。示例: -record(robot, {name, type=industrial, ho
我是一名优秀的程序员,十分优秀!