- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我试图弄清楚如何在 GCP 上使用 GKE 上的 helm 部署的 Dask 集群中添加安全层,这将强制用户将证书和 key 文件输入到安全对象中,如本文档 [1] 中所述.不幸的是,调度程序 pod 崩溃时出现超时错误。调查日志后,错误如下:
Traceback (most recent call last):
File "/opt/conda/bin/dask-scheduler", line 10, in <module>
sys.exit(go())
File "/opt/conda/lib/python3.7/site-packages/distributed/cli/dask_scheduler.py", line 226, in go
main()
File "/opt/conda/lib/python3.7/site-packages/click/core.py", line 764, in __call__
return self.main(*args, **kwargs)
File "/opt/conda/lib/python3.7/site-packages/click/core.py", line 717, in main
rv = self.invoke(ctx)
File "/opt/conda/lib/python3.7/site-packages/click/core.py", line 956, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/opt/conda/lib/python3.7/site-packages/click/core.py", line 555, in invoke
return callback(*args, **kwargs)
File "/opt/conda/lib/python3.7/site-packages/distributed/cli/dask_scheduler.py", line 206, in main
**kwargs
File "/opt/conda/lib/python3.7/site-packages/distributed/scheduler.py", line 1143, in __init__
self.connection_args = self.security.get_connection_args("scheduler")
File "/opt/conda/lib/python3.7/site-packages/distributed/security.py", line 224, in get_connection_args
"ssl_context": self._get_tls_context(tls, ssl.Purpose.SERVER_AUTH),
File "/opt/conda/lib/python3.7/site-packages/distributed/security.py", line 187, in _get_tls_context
ctx = ssl.create_default_context(purpose=purpose, cafile=ca)
File "/opt/conda/lib/python3.7/ssl.py", line 584, in create_default_context
context.load_verify_locations(cafile, capath, cadata)
FileNotFoundError: [Errno 2] No such file or directory
Helm Config Yaml 文件如下:
scheduler:
allowed-failures: 5
env:
- name: DASK_DISTRIBUTED__COMM__DEFAULT_SCHEME
value: "tls"
- name: DASK_DISTRIBUTED__COMM__REQUIRE_ENCRYPTION
value: "true"
- name: DASK_DISTRIBUTED__COMM__TLS__CA_FILE
value: "myca.pem"
- name: DASK_DISTRIBUTED__COMM__TLS__SCHEDULER__KEY
value: "mykey.pem"
- name: DASK_DISTRIBUTED__COMM__TLS__SCHEDULER__CERT
value: "myca.pem"
- name: DASK_DISTRIBUTED__COMM__TLS__WORKER__KEY
value: "mykey.pem"
- name: DASK_DISTRIBUTED__COMM__TLS__WORKER__CERT
value: "myca.pem"
- name: DASK_DISTRIBUTED__COMM__TLS__CLIENT__KEY
value: "mykey.pem"
- name: DASK_DISTRIBUTED__COMM__TLS__CLIENT__CERT
value: "myca.pem"
我按如下方式创建 key 和证书文件:
openssl req -newkey rsa:4096 -nodes -sha256 -x509 -days 3650 -nodes -out myca.pem -keyout mykey.pem
这是一个最小的完整可验证示例:
import dask.dataframe as dd
from dask.distributed import Client
from distributed.security import Security
sec = Security(tls_ca_file='myca.pem',
tls_client_cert='myca.pem',
tls_client_key='mykey.pem',
require_encryption=True)
with Client("tls://<scheduler_ip>:8786", security=sec) as dask_client:
ddf = dd.read_csv('gs://<bucket_name>/my_file.csv',
engine='python',
error_bad_lines=False,
encoding="utf-8",
assume_missing=True
)
print(ddf.shape[0].compute())
[1]
https://distributed.dask.org/en/latest/tls.html
最佳答案
我解决了这个问题。 Dask 工作人员和调度程序都需要在配置中包含证书文件。此外,我们还需要在 dockerfile 中烘焙证书。请参阅下面的完整配置:
Dockerfile
FROM daskdev/dask
RUN conda install --yes \
-c conda-forge \
python==3.7
ADD certs /certs/
ENTRYPOINT ["tini", "-g", "--", "/usr/bin/prepare.sh"]
Helm 配置
worker:
name: worker
image:
repository: "gcr.io/PROJECT_ID/mydask"
tag: "latest"
env:
- name: DASK_DISTRIBUTED__COMM__DEFAULT_SCHEME
value: "tls"
- name: DASK_DISTRIBUTED__COMM__REQUIRE_ENCRYPTION
value: "true"
- name: DASK_DISTRIBUTED__COMM__TLS__CA_FILE
value: "certs/myca.pem"
- name: DASK_DISTRIBUTED__COMM__TLS__SCHEDULER__KEY
value: "certs/mykey.pem"
- name: DASK_DISTRIBUTED__COMM__TLS__SCHEDULER__CERT
value: "certs/myca.pem"
- name: DASK_DISTRIBUTED__COMM__TLS__WORKER__KEY
value: "certs/mykey.pem"
- name: DASK_DISTRIBUTED__COMM__TLS__WORKER__CERT
value: "certs/myca.pem"
- name: DASK_DISTRIBUTED__COMM__TLS__CLIENT__KEY
value: "certs/mykey.pem"
- name: DASK_DISTRIBUTED__COMM__TLS__CLIENT__CERT
value: "certs/myca.pem"
scheduler:
name: scheduler
image:
repository: "gcr.io/PROJECT_ID/mydask"
tag: "latest"
env:
- name: DASK_DISTRIBUTED__COMM__DEFAULT_SCHEME
value: "tls"
- name: DASK_DISTRIBUTED__COMM__REQUIRE_ENCRYPTION
value: "true"
- name: DASK_DISTRIBUTED__COMM__TLS__CA_FILE
value: "certs/myca.pem"
- name: DASK_DISTRIBUTED__COMM__TLS__SCHEDULER__KEY
value: "certs/mykey.pem"
- name: DASK_DISTRIBUTED__COMM__TLS__SCHEDULER__CERT
value: "certs/myca.pem"
- name: DASK_DISTRIBUTED__COMM__TLS__WORKER__KEY
value: "certs/mykey.pem"
- name: DASK_DISTRIBUTED__COMM__TLS__WORKER__CERT
value: "certs/myca.pem"
- name: DASK_DISTRIBUTED__COMM__TLS__CLIENT__KEY
value: "certs/mykey.pem"
- name: DASK_DISTRIBUTED__COMM__TLS__CLIENT__CERT
value: "certs/myca.pem"
关于python - Dask:如何向 Dask 集群添加安全性 (TLS/SSL)?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63124109/
如果我有一个依赖于某些全局或其他常量的函数,如下所示: x = 123 def f(partition): return partition + x # note that x is def
我们可以通过哪些方式在 Dask Arrays 中执行项目分配?即使是一个非常简单的项目分配,如:a[0] = 2 不起作用。 最佳答案 正确的。这是文档中提到的第一个限制。 通常,涉及 for 循环
[mapr@impetus-i0057 latest_code_deepak]$ dask-worker 172.26.32.37:8786 distributed.nanny - INFO -
我正在构建一个 FastAPI 应用程序,它将为 Dask 数组的 block 提供服务。我想利用 FastAPI's asynchronous functionality旁边Dask-distrib
在延迟数据帧处理的几个阶段之后,我需要在保存数据帧之前对其进行重新分区。但是,.repartition() 方法要求我知道分区的数量(而不是分区的大小),这取决于处理后数据的大小,这是未知的。 我想我
我正在努力转换 dask.bag将字典放入 dask.delayed pandas.DataFrames进入决赛 dask.dataframe 我有一个函数 (make_dict) 将文件读入一个相当
我正在尝试使用 dask_cudf/dask 读取单个大型 parquet 文件(大小 > gpu_size),但它目前正在读取它到一个分区中,我猜这是从文档字符串推断出的预期行为: dask.dat
当启动一个 dask 分布式本地集群时,您可以为 dashboard_address 设置一个随机端口或地址。 如果稍后获取scheduler对象。有没有办法提取仪表板的地址。 我有这个: clust
我有一个 dask 数据框,由 parquet 支持。它有 1.31 亿行,当我对整个帧执行一些基本操作时,它们需要几分钟。 df = dd.read_parquet('data_*.pqt') un
我正在使用 24 个 vCPU 的谷歌云计算实例。运行代码如下 import dask.dataframe as dd from distributed import Client client =
我正在尝试在多台机器上分发一个大型 Dask 数据帧,以便(稍后)在数据帧上进行分布式计算。我为此使用了 dask-distributed。 我看到的所有 dask 分布式示例/文档都是从网络资源(h
我在 Django 服务器后面使用 Dask,这里总结了我的基本设置:https://github.com/MoonVision/django-dask-demo/可以在这里找到 Dask 客户端:h
我有以下格式的 Dask DataFrame: date hour device param value 20190701 21 dev_01 att_1 0.00
我正在尝试使用 dask 而不是 Pandas,因为我有 2.6gb csv 文件。 我加载它,我想删除一列。但似乎无论是 drop 方法 df.drop('column') 或切片 df[ : ,
我有一个比我的内存大得多的文本文件。我想按字典顺序对该文件的行进行排序。我知道如何手动完成: 分成适合内存的块 对块进行排序 合并块 我想用 dask 来做。我认为处理大量数据将是 dask 的一个用
使用 Dask 的分布式调度程序时,我有一个正在远程工作人员上运行的任务,我想停止该任务。 我该如何阻止?我知道取消方法,但如果任务已经开始执行,这似乎不起作用。 最佳答案 如果它还没有运行 如果任务
我需要将一个非常大的 dask.bag 的元素提交到一个非线程安全的存储区,即我需要类似的东西 for x in dbag: store.add(x) 我无法使用compute,因为包太大,无
如果我有一个已经索引的 Dask 数据框 >>> A.divisions (None, None) >>> A.npartitions 1 我想设置分区,到目前为止我正在做 A.reset_index
根据 this回答,如果 Dask 知道数据帧的索引已排序,则 Dask 数据帧可以执行智能索引。 如果索引已排序,我如何让 Dask 知道? 在我的具体情况下,我正在做这样的事情: for sour
我想从具有特定数量的工作人员的 python 启动本地集群,然后将客户端连接到它。 cluster = LocalCluster(n_workers=8, ip='127.0.0.1') client
我是一名优秀的程序员,十分优秀!