- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我正在尝试编写一个 Python 模型,该模型能够使用多线程模块和 peewee 在 PostgreSQL 数据库中进行一些处理。
在单核模式下,代码可以正常工作,但是,当我尝试使用多核运行代码时,我遇到了 SSL 错误。
我想发布我的模型结构,希望有人可以建议如何以正确的方式设置我的模型。目前,我选择使用一种面向对象的方法,在这种方法中我建立一个在池中共享的连接。为了阐明我所做的事情,我现在将展示我目前拥有的源代码
我有三个文件:main.py、models.py 和 parser.py。内容如下
models.py 定义 peewee postgresql 表并连接到 postgres 服务器
import peewee as pw
from playhouse.pool import PooledPostgresqlExtDatabase
KVK_KEY = "id_number"
NAME_KEY = "name"
N_VOWELS_KEY = "n_vowels"
# initialise the data base
database = PooledPostgresqlExtDatabase(
"testdb", user="postgres", host="localhost", port=5432, password="xxxx",
max_connections=8, stale_timeout=300 )
class BaseModel(pw.Model):
class Meta:
database = database
only_save_dirty = True
# this class describes the format of the sql data base
class Company(BaseModel):
id_number = pw.IntegerField(primary_key=True)
name = pw.CharField(null=True)
n_vowels = pw.IntegerField(default=-1)
processor = pw.IntegerField(default=-1)
def connect_database(database_name, reset_database=False):
""" connect the database """
database.connect()
if reset_database:
database.drop_tables([Company])
database.create_tables([Company])
parser.py 包含 CompanyParser 类,用作执行所有处理的代码引擎。它生成一些人工数据,存储到postgresql数据库中,然后使用run方法对已经存储在数据库中的数据做一些处理
import pandas as pd
import numpy as np
import random
import string
import peewee as pw
from models import (Company, database, KVK_KEY, NAME_KEY)
import multiprocessing as mp
MAX_SQL_CHUNK = 1000
np.random.seed(0)
def random_name(size=8, chars=string.ascii_lowercase):
""" Create a random character string of 'size' characters """
return "".join(random.choice(chars) for _ in range(size))
def vowel_count(characters):
"""
Count the number of vowels in the string 'characters' and return as an integer
"""
count = 0
for char in characters:
if char in list("aeiou"):
count += 1
return count
class CompanyParser(mp.Process):
def __init__(self, number_of_companies=100, i_proc=None,
number_of_procs=1,
first_id=None, last_id=None):
if i_proc is not None and number_of_procs > 1:
mp.Process.__init__(self)
self.i_proc = i_proc
self.number_of_procs = number_of_procs
self.n_companies = number_of_companies
self.data_df: pd.DataFrame = None
self.first_id = first_id
self.last_id = last_id
def generate_data(self):
""" Create a dataframe with fake company data and id's """
id_list = np.random.randint(1000000, 9999999, self.n_companies)
company_list = np.array([random_name() for _ in range(self.n_companies)])
self.data_df = pd.DataFrame(data=np.vstack([id_list, company_list]).T,
columns=[KVK_KEY, NAME_KEY])
self.data_df.sort_values([KVK_KEY], inplace=True)
def store_to_database(self):
"""
Store the company data to a sql database
"""
record_list = list(self.data_df.to_dict(orient="index").values())
n_batch = int(len(record_list) / MAX_SQL_CHUNK) + 1
with database.atomic():
for cnt, batch in enumerate(pw.chunked(record_list, MAX_SQL_CHUNK)):
print(f"writing {cnt}/{n_batch}")
Company.insert_many(batch).execute()
def run(self):
print("Making query at {}".format(self.i_proc))
query = (Company.
select().
where(Company.id_number.between(self.first_id, self.last_id)))
print("Found {} companies".format(query.count()))
for cnt, company in enumerate(query):
print("Processing @ {} - {}: company {}/{}".format(self.i_proc, cnt,
company.id_number,
company.name))
number_of_vowels = vowel_count(company.name)
company.n_vowels = number_of_vowels
company.processor = self.i_proc
print(f"storing number of vowels: {number_of_vowels}")
company.save()
最后,我的主脚本加载存储在 models.py 和 parser.py 中的类并启动代码。
from models import (Company, connect_database)
from parser import CompanyParser
number_of_processors = 2
connect_database(None, reset_database=True)
# init an object of the CompanyParser and use the create database
parser = CompanyParser()
company_ids = Company.select(Company.id_number)
parser.generate_data()
parser.store_to_database()
n_companies = company_ids.count()
n_comp_per_proc = int(n_companies / number_of_processors)
print("Found {} companies: {} per proc".format(n_companies, n_comp_per_proc))
for i_proc in range(number_of_processors):
i_start = i_proc * n_comp_per_proc
first_id = company_ids[i_start]
last_id = company_ids[i_start + n_comp_per_proc - 1]
print(f"Running proc {i_proc} for id {first_id} until id {last_id}")
sub_parser = CompanyParser(first_id=first_id, last_id=last_id,
i_proc=i_proc,
number_of_procs=number_of_processors)
if number_of_processors > 1:
sub_parser.start()
else:
sub_parser.run()
在 number_of_processors = 1 的情况下,此脚本可以正常工作。它生成人工数据,将其存储到 PostgreSQL 数据库并对数据进行一些处理(它计算名称中元音的数量并将其存储到 n_vowels 列)
但是,如果我尝试用 number_of_processors = 2 的 2 个内核运行它,我会遇到以下错误
/opt/miniconda3/bin/python /home/eelco/PycharmProjects/multiproc_peewee/main.py
writing 0/1
Found 100 companies: 50 per proc
Running proc 0 for id 1020737 until id 5295565
Running proc 1 for id 5302405 until id 9891087
Making query at 0
Found 50 companies
Processing @ 0 - 0: company 1020737/wqrbgxiu
storing number of vowels: 2
Making query at 1
Process CompanyParser-1:
Processing @ 0 - 1: company 1086107/lkbagrbc
storing number of vowels: 1
Processing @ 0 - 2: company 1298367/nsdjsqio
storing number of vowels: 2
Traceback (most recent call last):
File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 2714, in execute_sql
cursor.execute(sql, params or ())
psycopg2.OperationalError: SSL error: sslv3 alert bad record mac
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/miniconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
self.run()
File "/home/eelco/PycharmProjects/multiproc_peewee/parser.py", line 82, in run
company.save()
File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 5748, in save
rows = self.update(**field_dict).where(self._pk_expr()).execute()
File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1625, in inner
return method(self, database, *args, **kwargs)
File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1696, in execute
return self._execute(database)
File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 2121, in _execute
cursor = database.execute(self)
File "/opt/miniconda3/lib/python3.7/site-packages/playhouse/postgres_ext.py", line 468, in execute
cursor = self.execute_sql(sql, params, commit=commit)
File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 2721, in execute_sql
self.commit()
File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 2512, in __exit__
reraise(new_type, new_type(*exc_args), traceback)
File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 186, in reraise
raise value.with_traceback(tb)
File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 2714, in execute_sql
cursor.execute(sql, params or ())
peewee.OperationalError: SSL error: sslv3 alert bad record mac
Process CompanyParser-2:
Traceback (most recent call last):
File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 2714, in execute_sql
cursor.execute(sql, params or ())
psycopg2.OperationalError: SSL error: decryption failed or bad record mac
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/miniconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
self.run()
File "/home/eelco/PycharmProjects/multiproc_peewee/parser.py", line 72, in run
print("Found {} companies".format(query.count()))
File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1625, in inner
return method(self, database, *args, **kwargs)
File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1881, in count
return Select([clone], [fn.COUNT(SQL('1'))]).scalar(database)
File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1625, in inner
return method(self, database, *args, **kwargs)
File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1866, in scalar
row = self.tuples().peek(database)
File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1625, in inner
return method(self, database, *args, **kwargs)
File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1853, in peek
rows = self.execute(database)[:n]
File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1625, in inner
return method(self, database, *args, **kwargs)
File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1696, in execute
return self._execute(database)
File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1847, in _execute
cursor = database.execute(self)
File "/opt/miniconda3/lib/python3.7/site-packages/playhouse/postgres_ext.py", line 468, in execute
cursor = self.execute_sql(sql, params, commit=commit)
File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 2721, in execute_sql
self.commit()
File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 2512, in __exit__
reraise(new_type, new_type(*exc_args), traceback)
File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 186, in reraise
raise value.with_traceback(tb)
File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 2714, in execute_sql
cursor.execute(sql, params or ())
peewee.OperationalError: SSL error: decryption failed or bad record mac
Process finished with exit code 0
当第二个线程开始对数据库做某事时,不知何故出了问题。有人建议让这段代码正常工作吗?我已经尝试过以下方法
希望有人能指教。
问候艾尔科
最佳答案
今天在网上搜索后,我在这里找到了问题的解决方案:github.com/coleifer .正如 coleifer 提到的那样:显然,在开始连接到数据库之前,您首先必须设置所有分支。基于这个想法,我修改了我的代码,现在可以正常工作了。
对于那些感兴趣的人,我将再次发布我的 python 脚本,以便您了解我是如何做到的。这是因为我没有那么多明确的例子,所以也许它可以帮助其他人。
首先,所有数据库和 peewee 模块现在都移到了初始化函数中,这些函数只在 CompanyParser 类的构造函数中调用。所以 models.py 看起来像
import peewee as pw
from playhouse.pool import PooledPostgresqlExtDatabase, PostgresqlDatabase, PooledPostgresqlDatabase
KVK_KEY = "id_number"
NAME_KEY = "name"
N_VOWELS_KEY = "n_vowels"
def init_database():
db = PooledPostgresqlDatabase(
"testdb", user="postgres", host="localhost", port=5432, password="xxxxx",
max_connections=8, stale_timeout=300)
return db
def init_models(db, reset_tables=False):
class BaseModel(pw.Model):
class Meta:
database = db
# this class describes the format of the sql data base
class Company(BaseModel):
id_number = pw.IntegerField(primary_key=True)
name = pw.CharField(null=True)
n_vowels = pw.IntegerField(default=-1)
processor = pw.IntegerField(default=-1)
if db.is_closed():
db.connect()
if reset_tables and Company.table_exists():
db.drop_tables([Company])
db.create_tables([Company])
return Company
然后, worker 类 CompanyParser 在 parser.py 脚本中定义,如下所示
import multiprocessing as mp
import random
import string
import numpy as np
import pandas as pd
import peewee as pw
from models import (KVK_KEY, NAME_KEY, init_database, init_models)
MAX_SQL_CHUNK = 1000
np.random.seed(0)
def random_name(size=32, chars=string.ascii_lowercase):
""" Create a random character string of 'size' characters """
return "".join(random.choice(chars) for _ in range(size))
def vowel_count(characters):
"""
Count the number of vowels in the string 'characters' and return as an integer
"""
count = 0
for char in characters:
if char in list("aeiou"):
count += 1
return count
class CompanyParser(mp.Process):
def __init__(self, reset_tables=False,
number_of_companies=100, i_proc=None,
number_of_procs=1, first_id=None, last_id=None):
if i_proc is not None and number_of_procs > 1:
mp.Process.__init__(self)
self.i_proc = i_proc
self.reset_tables = reset_tables
self.number_of_procs = number_of_procs
self.n_companies = number_of_companies
self.data_df: pd.DataFrame = None
self.first_id = first_id
self.last_id = last_id
# initialise the database and models
self.database = init_database()
self.Company = init_models(self.database, reset_tables=self.reset_tables)
def generate_data(self):
""" Create a dataframe with fake company data and id's and return the array of id's"""
id_list = np.random.randint(1000000, 9999999, self.n_companies)
company_list = np.array([random_name() for _ in range(self.n_companies)])
self.data_df = pd.DataFrame(data=np.vstack([id_list, company_list]).T,
columns=[KVK_KEY, NAME_KEY])
self.data_df.drop_duplicates([KVK_KEY], inplace=True)
self.data_df.sort_values([KVK_KEY], inplace=True)
return self.data_df[KVK_KEY].values
def store_to_database(self):
"""
Store the company data to a sql database
"""
record_list = list(self.data_df.to_dict(orient="index").values())
n_batch = int(len(record_list) / MAX_SQL_CHUNK) + 1
with self.database.atomic():
for cnt, batch in enumerate(pw.chunked(record_list, MAX_SQL_CHUNK)):
print(f"writing {cnt}/{n_batch}")
self.Company.insert_many(batch).execute()
def run(self):
query = (self.Company.
select().
where(self.Company.id_number.between(self.first_id, self.last_id)))
for cnt, company in enumerate(query):
print("Processing @ {} - {}: company {}/{}".format(self.i_proc, cnt, company.id_number,
company.name))
number_of_vowels = vowel_count(company.name)
company.n_vowels = number_of_vowels
company.processor = self.i_proc
try:
company.save()
except (pw.OperationalError, pw.InterfaceError) as err:
print("failed save for {} {}: {}".format(self.i_proc, cnt, err))
else:
pass
最后,启动进程的 main.py 脚本:
from parser import CompanyParser
import time
def main():
number_of_processors = 2
number_of_companies = 10000
parser = CompanyParser(number_of_companies=number_of_companies, reset_tables=True)
company_ids = parser.generate_data()
parser.store_to_database()
n_companies = company_ids.size
n_comp_per_proc = int(n_companies / number_of_processors)
print("Found {} companies: {} per proc".format(n_companies, n_comp_per_proc))
if not parser.database.is_closed():
parser.database.close()
processes = list()
for i_proc in range(number_of_processors):
i_start = i_proc * n_comp_per_proc
first_id = company_ids[i_start]
last_id = company_ids[i_start + n_comp_per_proc - 1]
print(f"Running proc {i_proc} for id {first_id} until id {last_id}")
sub_parser = CompanyParser(first_id=first_id, last_id=last_id, i_proc=i_proc,
number_of_procs=number_of_processors)
if number_of_processors > 1:
sub_parser.start()
else:
sub_parser.run()
processes.append(sub_parser)
# this blocks the script until all processes are done
for job in processes:
job.join()
# make sure all the connections are closed
for i_proc in range(number_of_processors):
db = processes[i_proc].database
if not db.is_closed():
db.close()
print("Goodbye!")
if __name__ == "__main__":
start = time.time()
main()
duration = time.time() - start
print(f"Done in {duration} s")
如您所见,数据库连接是在类中按进程完成的。此示例有效,是多处理 + peewee 和 PostgreSQL 的完整示例。希望这可以帮助其他人。如果您有任何意见或改进建议,请告诉我。
关于python multiprocessing + peewee + postgresql 因 SSL 错误而失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54317651/
我在 Cloudflare 的域名服务器上有一个域名 example.com。该域指向我的专用服务器的 IP 地址,该服务器运行 CentOS/WHM/cPanel。该站点可访问 - 一切都很好。 我
我正在努力将 SSL 支持添加到我们现有的应用程序中,并已开始考虑向后兼容性。 与我读过的其他帖子不同的一个特殊情况是服务器可能不一定使用 SSL 代码更新。所以我将有一个 SSL 客户端连接到一个对
我有几个 https://*.rest-service.mydomain.com。随着服务数量的增加,我觉得管理 SSL 证书的成本很高。我为 *.mydomain.com 购买了通配符证书。 新添加
我的客户要求我在他的网站上做反向 ssl。但我是这个学期的新手。谁能帮我解决这个问题。 请描述或引用如何做。 最佳答案 查看 this wiki article . In the case of se
关闭。这个问题是opinion-based .它目前不接受答案。 想改进这个问题?更新问题,以便 editing this post 可以用事实和引用来回答它. 去年关闭。 Improve this
我连接到我的网络服务器上的存储库,但是当我尝试推送我的更改时,它显示:“错误 403:需要 ssl”,但在我的存储库设置中我已经激活了 ssl 选项。 有什么建议吗? 最佳答案 当您连接到存储库时,您
抱歉,如果这听起来像是转储问题,我已经阅读了很多关于 SSL 握手和 SSL 工作原理的文章和文档。我对一件事感到困惑,如果有人能澄清我就太好了。 我知道私钥要保密。但是我已经看到通过在请求中指定私钥
随着物联网越来越主流,越来越需要从硬件发送http请求。 一个主要问题是硬件微 Controller 无法发送 ssl 请求,但大多数服务器/网站/服务都在使用 ssl。 所以,问题是,有没有桥(一个
我有一个 ssl 页面,它还从非 ssl 站点下载头像。我能做些什么来隔离该内容,以便浏览器不会警告用户混合内容吗? 最佳答案 只是一个想法 - 或者: 尝试在头像网站上使用 ssl url,如有必要
我在 Digital Ocean droplet(使用 nginx)上设置了两个域。我已经在其中一个(domain1)中安装了一个 SSL 证书,并且那个证书一切正常。第二个域 (domain2) 不
我收到这个错误: Error frontend: 502 Bad gateway 99.110.244:443 2017/09/28 13:03:51 [error] 34080#34080: *10
关闭。这个问题不符合Stack Overflow guidelines .它目前不接受答案。 这个问题似乎与 help center 中定义的范围内的编程无关。 . 关闭 6 年前。 Improve
我遇到了一个问题,我正在构建一个 nginx 反向代理以定向到不同 url 路径上的多个微服务。 该系统完全基于 docker,因此开发和生产使用相同的环境。这在安装 SSL 时给我带来了问题,因为
所以我知道要求 SSL 证书和接受之间的根本区别,一个意味着您必须拥有 SSL 证书,另一个意味着您不需要。 在某个网页的 IIS 管理器中,我有以下设置: 我遇到的问题是,当我设置需要 SSL 证书
我今天才发现 .app 域名需要 SSL 证书。我购买它是为了将 DNS 重定向到已经设置了 SSL 证书的站点,所以我的问题是是否可以设置它? 我正在使用 Google Domains,在将合成临时
堆栈 : react ,NGINX 1.14.0,GUnicorn,Django 2.2.8,Python 3.6.9 错误 : 在浏览器:当 React 调用 Django API(当然是在请求头中
假设我在计算机上编辑主机文件以使 google.com 指向我的 VPS 服务器 IP,并且服务器具有通过 Apache 或 Nginx 配置的 google.com 的虚拟主机/服务器 block
我有一个场景,我正在处理用于 URL 路由的 IIS 网站配置。我已添加网站并在服务器上导入所需的证书。 我的情况是(我有多个网站 URL 和两个 SSL 证书 - 如下所示): qatest1.ab
我知道服务器发送的证书无法伪造(仍然存在 MD5 冲突,但成本高昂),但是伪造客户端又如何呢?在中间人攻击中:我们不能告诉服务器我们是合法客户端并从该服务器获取数据并对其进行操作,然后使用合法客户端公
我已通读相关问题,但无法完全找到我要查找的内容。我设置了一个名为“domain.com”的域,并创建了两个子域“client.domain.com”和“client-intern.domain.com
我是一名优秀的程序员,十分优秀!