- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我想并行处理数据库表中列出的任务。不寻找工作代码。
使用 Python 3.6、psycopg2.7.6、PostgreSQL 11
D 包含要处理的数据表和一个 tasks
表。 T ssh
的用户进入 P,可以在其中发出以下命令:
python -m core.utils.task
这个 task.py
脚本本质上是一个 while
循环,它从 tasks
表中获取一个任务 t
在 D 上,状态为"new",直到没有新任务为止。任务 t
基本上是另一个名为 do_something(t)
的函数的一组参数。 do_something(t)
本身将与 D 建立许多连接以获取需要处理的数据,并在任务完成后将其设置为“完成”状态——while
循环重新开始并获得一个新任务。
为了多次运行 python -m core.utils.task
,我打开了多个 ssh
连接。不太好,我知道; threading
或 multiprocessing
会更好。但他只是为了测试我是否可以运行上述命令两次。
有一个管理所有数据库交互的脚本,称为pgsql.py
,需要它来获取任务,然后通过do_something(t)
。我从 this SE post 改编了一个单例模式.
任务.py
import mymodule
import pgsql
def main():
while True:
r, c = pgsql.SQL.select_task() # rows and columns
task = dotdict(dict(zip(c, r[0])))
mymodule.do_something(task)
if __name__ == "__main__":
main()
我的模块.py
import pgsql
def do_something(t):
input = pgsql.SQL.get_images(t.table,t.schema,t.image_id,t.image_directory)
some_other_function(input)
pgsql.SQL.task_status(t.task_id,'done')
pgsql.py
import psycopg2 as pg
class Postgres(object):
"""Adapted from https://softwareengineering.stackexchange.com/a/358061/348371"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = object.__new__(cls)
db_config = {'dbname': 'dev01', 'host': 'XXXXXXXX',
'password': 'YYYYY', 'port': 5432, 'user': 'admin'}
try:
print('connecting to PostgreSQL database...')
connection = Postgres._instance.connection = pg.connect(**db_config)
connection.set_session(isolation_level='READ COMMITTED', autocommit=True)
except Exception as error:
print('Error: connection not established {}'.format(error))
Postgres._instance = None
else:
print('connection established')
return cls._instance
def __init__(self):
self.connection = self._instance.connection
def query(self, query):
try:
with self.connection.cursor() as cur:
cur.execute(query)
rows = cur.fetchall()
cols = [desc[0] for desc in cur.description]
except Exception as error:
print('error execting query "{}", error: {}'.format(query, error))
return None
else:
return rows, cols
def __del__(self):
self.connection.close()
db = Postgres()
class SQL():
def select_task():
s = """
UPDATE schema.tasks
SET status = 'ready'
WHERE task_id = ( SELECT task_id
FROM schema.tasks
WHERE tasks.status = 'new'
LIMIT 1)
RETURNING *
;
""".format(m=mode)
return Postgres.query(db, s)
def task_status(id,status):
s = """
UPDATE
schema.tasks
SET
status = '{s}'
WHERE
tasks.task_id = '{id}'
;
""".format(s=status,
id=id)
return Postgres.query(db, s)
这适用于一个 ssh
连接。任务从数据库中检索并处理,完成后任务设置为“完成”。一旦我在第二个终端中打开第二个 ssh
连接以运行 python -m core.utils.task
(也就是说,并行)任务的完全相同的行表在两者中都被处理 - 忽略它们已被更新。
您有什么建议可以让它发挥作用?有数百万个任务,我需要并行运行它们。在实现 threading
或 multiprocessing
之前,我想先用多个 ssh
连接测试它,坏主意吗?我在 psycopg2
的 set_session()
中尝试了 isolation levels
和 autocommit
设置,但没有成功。我检查了数据库服务器中的 session ,可以看到 python -m core.utils.task
的每个进程都有自己的 PID,只连接一次,就像这种单例模式应该起作用一样。非常感谢任何想法或指示如何处理这个问题!
最佳答案
主要问题是执行一项任务不是原子操作。因此,在不同的 ssh session 中,同一个任务可以被处理多次。
In this implementation, you can try to use an
"INPROGRESS"
status for task so as not to retrieve tasks that are already being processed (with"INPROGRESS"
status). But be sure to use autocommit.
但我会使用线程和数据库连接池来实现它。并将使用 OFFSET
和 LIMIT
批量提取任务。 do_something
、select_task
和 task_status
函数将实现批量任务。
此外,没有必要将 Postgres
类实现为单例。
修改(见下面的评论)
FOR UPDATE SKIP LOCKED
添加到当前实现中的 SQL 查询(参见 url )。ThreadPoolExecutor
和 PersistentConnectionPool
来实现。关于python - 使用 UPDATE ... RETURNING 的多个数据库连接,似乎不更新任务表中的行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58338994/
我有一台 MySQL 服务器和一台 PostgreSQL 服务器。 需要从多个表中复制或重新插入一组数据 MySQL 流式传输/同步到 PostgreSQL 表。 这种复制可以基于时间(Sync)或事
如果两个表的 id 彼此相等,我尝试从一个表中获取数据。这是我使用的代码: SELECT id_to , email_to , name_to , status_to
我有一个 Excel 工作表。顶行对应于列名称,而连续的行每行代表一个条目。 如何将此 Excel 工作表转换为 SQL 表? 我使用的是 SQL Server 2005。 最佳答案 这取决于您使用哪
我想合并两个 Django 模型并创建一个模型。让我们假设我有第一个表表 A,其中包含一些列和数据。 Table A -------------- col1 col2 col3 col
我有两个表:table1,table2,如下所示 table1: id name 1 tamil 2 english 3 maths 4 science table2: p
关闭。此题需要details or clarity 。目前不接受答案。 想要改进这个问题吗?通过 editing this post 添加详细信息并澄清问题. 已关闭 1 年前。 Improve th
下面两个语句有什么区别? newTable = orginalTable 或 newTable.data(originalTable) 我怀疑 .data() 方法具有性能优势,因为它在标准 AX 中
我有一个表,我没有在其中显式定义主键,它并不是真正需要的功能......但是一位同事建议我添加一个列作为唯一主键以随着数据库的增长提高性能...... 谁能解释一下这是如何提高性能的? 没有使用索引(
如何将表“产品”中的产品记录与其不同表“图像”中的图像相关联? 我正在对产品 ID 使用自动增量。 我觉得不可能进行关联,因为产品 ID 是自动递增的,因此在插入期间不可用! 如何插入新产品,获取产品
我有一个 sql 表,其中包含关键字和出现次数,如下所示(尽管出现次数并不重要): ____________ dog | 3 | ____________ rat | 7 | ____
是否可以使用目标表中的LAST_INSERT_ID更新源表? INSERT INTO `target` SELECT `a`, `b` FROM `source` 目标表有一个自动增量键id,我想将其
我正在重建一个搜索查询,因为它在“我看到的”中变得多余,我想知道什么 (albums_artists, artists) ( ) does in join? is it for boosting pe
以下是我使用 mysqldump 备份数据库的开关: /usr/bin/mysqldump -u **** --password=**** --single-transaction --databas
我试图获取 MySQL 表中的所有行并将它们放入 HTML 表中: Exam ID Status Assigned Examiner
如何查询名为 photos 的表中的所有记录,并知道当前用户使用单个查询将哪些结果照片添加为书签? 这是我的表格: -- -- Table structure for table `photos` -
我的网站都在 InnoDB 表上运行,目前为止运行良好。现在我想知道在我的网站上实时发生了什么,所以我将每个页面浏览量(页面、引荐来源网址、IP、主机名等)存储在 InnoDB 表中。每秒大约有 10
我在想我会为 mysql 准备两个表。一个用于存储登录信息,另一个用于存储送货地址。这是传统方式还是所有内容都存储在一张表中? 对于两个表...有没有办法自动将表 A 的列复制到表 B,以便我可以引用
我不是程序员,我从这个表格中阅读了很多关于如何解决我的问题的内容,但我的搜索效果不好 我有两张 table 表 1:成员 id*| name | surname -------------------
我知道如何在 ASP.NET 中显示真实表,例如 public ActionResult Index() { var s = db.StaffInfoDBSet.ToList(); r
我正在尝试运行以下查询: "insert into visits set source = 'http://google.com' and country = 'en' and ref = '1234
我是一名优秀的程序员,十分优秀!