gpt4 book ai didi

python - SQL 优化以使用 Scrapy 增加批量插入

转载 作者:行者123 更新时间:2023-12-05 05:34:00 38 4
gpt4 key购买 nike

在我之前的帖子中,我询问了如何使用 scrapy 批量记录项目。题目在这里:

Buffered items and bulk insert to Mysql using scrapy

在@Alexander 的帮助下,我可以在缓存中保留 1000 个项目。但是,我这里的问题是缓存中的项目在传输到mysql时正在逐一记录。我这里唯一的问题是速度。我认为这个问题是由我优化不够的SQL代码引起的。

SQL中保存的逻辑如下;

将项目添加到 products 表中,如果 product_id 不存在,则将其添加到 new_products 表中。 (我正在后台运行一个脚本,将这些行从旧的删除到新的。我这里没有问题。换句话说,总共最多记录 50k 行。)

可能是 mysql 在插入 new_products 表期间变慢了。因为它会检查现有行中是否存在 product_id。

如果您能建议我可以一次在数据库中保存 1000 个项目的方法,我将非常高兴。

我正在使用的 pipeline.py:

from __future__ import print_function
import logging
from scrapy import signals
from itemadapter import ItemAdapter
from mysql.connector import errorcode
from amazon_scraper.items import AmazonMobileDetailsItem
import mysql.connector


class AmazonScraperPipeline:
table = 'products'
table2 = 'new_products'
conf = {
'host': 'localhost',
'user': 'xxxxxx',
'password': 'xxxxxx',
'database': 'xxxxxxx',
'raise_on_warnings': True
}

def __init__(self, **kwargs):
self._rows = [] # store rows temporarily
self._cached_rows = 0 # number of cached rows
self._cache_limit = 1000 # limit before saving to database
self.cnx = self.mysql_connect()

def open_spider(self, spider):
print("spider open")

def save_all(self): # calls self.save method for all cached rows
if len(self._rows) > 0:
list(map(self.save, self._rows))
self._cached_rows = 0 # reset the count
self._rows = [] # reset the cache

def cache_result(self, item): # adds new row to cache
self._rows.append(dict(item))
self._cached_rows += 1
if self._cached_rows >= self._cache_limit: # checks if limit reached
self.save_all() # if it has been reached then save all rows

def process_item(self, item, spider):
print("Saving item into db ...")
self.cache_result(item) # cache this item
return item

def close_spider(self, spider):
self.save_all() # Saves remaining rows once spider closes
self.cnx.close()

def mysql_connect(self):
try:
return mysql.connector.connect(**self.conf)
except mysql.connector.Error as err:
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
print("Something is wrong with your user name or password")
elif err.errno == errorcode.ER_BAD_DB_ERROR:
print("Database does not exist")
else:
print(err)


def save(self, row):
cursor = self.cnx.cursor()
cursor.execute("SELECT DISTINCT product_id FROM products;")
existing_ids = [row[0] for row in cursor.fetchall()]
create_query = ("INSERT INTO " + self.table +
"(rowid, date, listing_id, product_id, product_name, price, url) "
"VALUES (%(rowid)s, %(date)s, %(listing_id)s, %(product_id)s, %(product_name)s, %(price)s, %(url)s)")
# data_user = (rowid, date, listing_id, product_id, product_name, price, url)
# Insert new row
cursor.execute(create_query, row)
# lastRecordId = cursor.lastrowid

# Make sure data is committed to the database
# self.cnx.commit()
# cursor.close()
print("Item saved")

product_id = row['product_id']
if not product_id in existing_ids:
create_query = ("INSERT INTO " + self.table2 +
"(product_rowid, date, listing_id, product_id, product_name, price, url) "
"VALUES (%(rowid)s, %(date)s, %(listing_id)s, %(product_id)s, %(product_name)s, %(price)s, %(url)s)")
# data_user = (rowid, date, listing_id, product_id, product_name, price, url)

# new_cursor = self.cnx.cursor()
cursor.execute(create_query, row)
# lastRecordId = cursor.lastrowid
# self.cnx.commit()
# new_cursor.close()
print("New Item saved")
self.cnx.commit()

最佳答案

您可以通过在初始化时执行保存方法并将其副本存储为实例变量,然后在保存方法中使用新条目更新它来消除保存方法的第一个查询。另一个性能助推器可能来自于使用 mysql 游标的 executemany 功能,通过将所有行传递给保存方法而不是一次一个。


class Pipeline:
table = 'products'
table2 = 'new_products'
conf = {
'host': 'localhost',
'user': 'xxxxxx',
'password': 'xxxxxx',
'database': 'xxxxxxx',
'raise_on_warnings': True
}


def __init__(self, **kwargs):
self._rows = [] # store rows temporarily
self._unique_products = [] # unique product rows
self._cached_rows = 0 # number of cached rows
self._cache_limit = 1000 # limit before saving to database
self.cnx = self.mysql_connect()
self.existing_ids = self.get_product_ids()

def open_spider(self, spider):
print("spider open")

def save_all(self): # calls self.save method for all cached rows
if len(self._rows) > 0:
self.save(self._rows, self._unique_products)
self._cached_rows = 0 # reset the count
self._rows = [] # reset the cache
self._unique_products = []

def process_item(self, item, spider):
row = dict(item)
product_id = row['product_id']
if product_id not in self.existing_ids:
self._unique_products.append(row)
self.existing_ids.add(product_id)
self._rows.append(row)
self._cached_rows += 1
if self._cached_rows >= self._cache_limit: # checks if limit reached
self.save_all() # if it has been reached then save all rows
return item

def close_spider(self, spider):
self.save_all() # Saves remaining rows once spider closes
self.cnx.close()

def mysql_connect(self):
try:
return mysql.connector.connect(**self.conf)
except mysql.connector.Error as err:
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
print("Something is wrong with your user name or password")
elif err.errno == errorcode.ER_BAD_DB_ERROR:
print("Database does not exist")
else:
print(err)

def get_product_ids(self):
cursor = self.cnx.cursor()
cursor.execute("SELECT DISTINCT product_id FROM products;")
return set([row[0] for row in cursor.fetchall()])

def save(self, rows, products):
cursor = self.cnx.cursor()
create_query = ("INSERT INTO " + self.table +
"(rowid, date, listing_id, product_id, product_name, price, url) "
"VALUES (%(rowid)s, %(date)s, %(listing_id)s, %(product_id)s, %(product_name)s, %(price)s, %(url)s)")
# Insert new row
cursor.executemany(create_query, rows)
# Make sure data is committed to the database
self.cnx.commit()
cursor.close()
print("Item saved with ID: {}" . format(cursor.lastrowid))


create_query = ("INSERT INTO " + self.table2 +
"(product_rowid, date, listing_id, product_id, product_name, price, url) "
"VALUES (%(rowid)s, %(date)s, %(listing_id)s, %(product_id)s, %(product_name)s, %(price)s, %(url)s)")
new_cursor = self.cnx.cursor()
new_cursor.executemany(create_query, products)
self.cnx.commit()
new_cursor.close()
print("New Item saved with ID: {}" . format(new_cursor.lastrowid))

我其实很好奇这会带来多少性能提升,所以请及时分享差异。

关于python - SQL 优化以使用 Scrapy 增加批量插入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73695556/

38 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com