gpt4 book ai didi

python - 如何使用 Python 通过 App Function 向 Azure 存储表插入数千行

转载 作者:行者123 更新时间:2023-12-02 22:57:59 25 4
gpt4 key购买 nike

我正在尝试使用 Python 编写 Azure 应用程序函数我正在其中创建一个简单的 Azure 存储表和最多可保存 10000 行。

我尝试使用以下方法逐个创建实体

from azure.data.tables import TableClient, TableTransactionError
...
table_client.create_entity({...})

它可以工作,但速度很慢。

然后我尝试使用

from concurrent.futures import ProcessPoolExecutor as PoolExecutor

这大大加快了整个过程,但出于明显的原因,您无法在Azure应用程序功能中使用它

我也尝试过使用

table_client.upsert_entity(i)
...

table_client.submit_transaction(operations)

但还是很慢。

我终于尝试了



# Create a new batch
batch = TableBatch()

# Count how many items are stored in the batch
inBatch = 0

# Loop over all the data we want to insert
for x in dataToStore:

# Insert the entity into the batch
batch.insert_entity({
'PartitionKey': 'PARTITION1',
'RowKey': str(x['rowkey']),
'someKey': x['someValue'],
'someOtherKey': x['someOtherValue']
})

# Increment the batch item counter
inBatch += 1

# We can only send batches with up to 100 records
if inBatch > 99:
# Commit the batch (send to Azure)
table_service.commit_batch('tablename', batch)

# Reset the batch so it doesn't contain any old items
batch = TableBatch()
inBatch = 0

但这非常慢,而且根本不耐用。,

Azure 存储表声称可以快速保存大量数据。

有人知道怎么做吗?

最佳答案

一些观察。你没有提到你的测试实际花了多长时间。这很有用。

通常在写入表存储时,您会选择一个分布良好的分区 ID。但是,在您的示例中,您使用相同的分区 id = 'PARTITION1'。在这种情况下,您可以尝试使用 Cosmos 表存储 SDK 来批量处理您的实体并一次性加载它们。仅当批处理中的所有实体都具有相同的分区键时,您才能在 Cosmos 表存储 SDK 中使用批处理。

Documentation声明您最多可以加载 100 个实体或 4MB(以先满足者为准)。

我使用 Cosmos 表 python SDK 进行了快速测试,并一次性批量处理了 50 个实体。我能够在大约 35 秒内加载 1000 个实体。加载 10000 个实体花了我 4 分 55 秒。我不确定这是否适合您的要求,或者您期望更快的速度。另请注意,我的示例实体的大小比您的稍大。下面是我的代码,这看起来与你的非常相似。这应该很容易放入 Azure 函数中。

#USING COSMOS TABLE STORAGE API
from azure.cosmosdb.table import TableService, TableBatch
from datetime import datetime
from random import randrange
import random
import names
import json
from datetime import timedelta
import uuid


acc_name = '<storageacct_name>'
acc_key = 'xxxxxxxxxxxxxxxxx'
table_name='xxxxxxx'
Number_of_docs = 10000
d1 = datetime.strptime('1/1/2008 1:30 PM', '%m/%d/%Y %I:%M %p')
d2 = datetime.strptime('1/1/2009 4:50 AM', '%m/%d/%Y %I:%M %p')

service = TableService(account_name=acc_name,
account_key=acc_key)

def random_date(start, end):
"""
This function will return a random datetime between two datetime
objects.
"""
delta = end - start
int_delta = (delta.days * 24 * 60 * 60) + delta.seconds
random_second = randrange(int_delta)
return start + timedelta(seconds=random_second)

def create_entity(id):
my_entity = {"PartitionKey" : "PARTITION1",
"RowKey" : str(id),
"employee_id" : str(id),
"first_name" : names.get_first_name(),
"last_name" : names.get_last_name(),
"regn_no" : "TEST00000000"+ str(id),
"start_date" : random_date(d1,d2),
"salary" : random.randint(12000, 2000000),
"inserted_at" : str(datetime.now())
}
return my_entity

starttime = datetime.utcnow()
print("Starting ingestion: ", starttime.strftime("%Y-%m-%d %H:%M:%S.%f"))
batch_no = 0
batch = TableBatch()
for i in range(Number_of_docs):
i = i + 1
batch_no = batch_no + 1
my_entity = create_entity(i)
batch.insert_or_merge_entity(my_entity)
if (batch_no > 50):
service.commit_batch(table_name, batch)
batch_no = 0
batch = TableBatch()
else:
batch_no = batch_no + 1

if (batch_no > 0):
service.commit_batch(table_name, batch)

endtime = datetime.utcnow()
print("\nrun_sample done :" + endtime.strftime("%Y-%m-%d %H:%M:%S.%f"))
print("Time taken :" + str(endtime-starttime))
print("Number of messages :" + str(Number_of_docs))

编辑:抱歉,我确实意识到 Cosmos 表 SDK 正在被 azure data-tabes API 取代,如 article 3 天前发布。所以我使用新的表存储SDK重写了这段代码并再次进行了测试。结果实际上更好。 3:55 对于 10000 个实体。您可以找到有关如何使用这个新 SDK 的更多示例 HERE .

#USING THE NEW TABLE STORAGE API
from azure.data.tables import TableClient
from datetime import datetime
from random import randrange
import random
import names
import json
from datetime import timedelta
import uuid


conn='xxxxxxxxxxxxxxxxxxxxx;EndpointSuffix=core.windows.net'
tablename='mytable'
table_client = TableClient.from_connection_string(conn_str=conn,table_name=tablename)
Number_of_docs = 10000
d1 = datetime.strptime('1/1/2008 1:30 PM', '%m/%d/%Y %I:%M %p')
d2 = datetime.strptime('1/1/2009 4:50 AM', '%m/%d/%Y %I:%M %p')

def random_date(start, end):
"""
This function will return a random datetime between two datetime
objects.
"""
delta = end - start
int_delta = (delta.days * 24 * 60 * 60) + delta.seconds
random_second = randrange(int_delta)
return start + timedelta(seconds=random_second)

def create_entity(id):
my_entity = {"PartitionKey" : "PARTITION1",
"RowKey" : str(id),
"employee_id" : str(id),
"first_name" : names.get_first_name(),
"last_name" : names.get_last_name(),
"regn_no" : "TEST00000000"+ str(id),
"start_date" : random_date(d1,d2),
"salary" : random.randint(12000, 2000000),
"inserted_at" : str(datetime.now())
}
return my_entity

starttime = datetime.utcnow()
print("Starting ingestion: ", starttime.strftime("%Y-%m-%d %H:%M:%S.%f"))
batch_no = 0
operations = []
for i in range(Number_of_docs):
i = i + 1
batch_no = batch_no + 1
my_entity = create_entity(i)
operations.append(("upsert", my_entity))
if (batch_no > 50):
table_client.submit_transaction(operations)
batch_no = 0
operations = []
else:
batch_no = batch_no + 1

if (batch_no > 0):
service.commit_batch(table_name, batch)

endtime = datetime.utcnow()
print("\nrun_sample done :" + endtime.strftime("%Y-%m-%d %H:%M:%S.%f"))
print("Time taken :" + str(endtime-starttime))
print("Number of messages :" + str(Number_of_docs))

关于python - 如何使用 Python 通过 App Function 向 Azure 存储表插入数千行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70037447/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com