- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我在 dask 1.1.1(最新版本)上,我已经使用以下命令在命令行中启动了一个 dask 调度程序:
$ dask-scheduler --port 9796 --bokeh-port 9797 --bokeh-prefix my_project
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO - Scheduler at: tcp://10.1.0.107:9796
distributed.scheduler - INFO - bokeh at: :9797
distributed.scheduler - INFO - Local Directory: /tmp/scheduler-pdnwslep
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Register tcp://10.1.25.4:36310
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.1.25.4:36310
distributed.core - INFO - Starting established connection
from dask.distributed import Client
c = Client('10.1.0.107:9796', set_as_default=False)
...
File "/root/anaconda3/lib/python3.7/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
tornado.gen.TimeoutError: Timeout
During handling of the above exception, another exception occurred:
...
File "/root/anaconda3/lib/python3.7/site-packages/distributed/comm/core.py", line 195, in _raise
raise IOError(msg)
OSError: Timed out trying to connect to 'tcp://10.1.0.107:9796' after 10 s: connect() didn't finish in time
最佳答案
(见有问题的评论)
dask 的包装器主要用于在我们的特定配置中进行烘焙,并使其易于在我们的系统中使用 docker 容器:
''' daskwrapper: easy access to distributed computing '''
import webbrowser
from dask.distributed import Client as DaskClient
from . import config
scheduler_config = { # from yaml
"scheduler_hostname": "schedulermachine.corpdomain.com"
"scheduler_ip": "10.0.0.1"}
worker_config = { # from yaml
"environments": {
"generic": {
"scheduler_port": 9796,
"dashboard_port": 9797,
"worker_port": 67176}}}
class Client():
def __init__(self, environment: str):
(
self.scheduler_hostname,
self.scheduler_port,
self.dashboard_port,
self.scheduler_address) = self.get_scheduler_details(environment)
self.client = DaskClient(self.scheduler_address, asynchronous=False)
def get_scheduler_details(self, environment: str) -> tuple:
''' gets it from a map of availble docker images... '''
envs = worker_config['environments']
return (
scheduler_config['scheduler_hostname'],
envs[environment]['scheduler_port'],
envs[environment]['dashboard_port'],
(
f"{scheduler_config['scheduler_hostname']}:"
f"{str(envs[environment]['scheduler_port'])}"))
def open_status(self):
webbrowser.open_new_tab(self.get_status())
def get_status(self):
return f'http://{self.scheduler_hostname}:{self.dashboard_port}/status'
def get_async_client(self):
''' returns a client instance so the user can use it directly '''
return DaskClient(self.scheduler_address, asynchronous=True)
def get(self, workflow: dict, tasks: 'str|list'):
return self.client.get(workflow, tasks)
async def submit(self, function: callable, args: list):
''' saved as example dask api '''
if not isinstance(args, list) and not isinstance(args, tuple):
args = [args]
async with DaskClient(self.scheduler_address, asynchronous=True) as client:
future = client.submit(function, *args)
result = await future
return result
def close(self):
return self.client.close()
那是客户端,它是这样使用的:
from daskwrapper import Client
dag = {'some_task': (some_task_function, )}
workers = Client(environment='some_environment')
workers.get(workflow=dag, tasks='some_task')
workers.close()
调度程序是这样启动的:
def start():
def start_scheduler(port, dashboard_port):
async def f():
s = Scheduler(
port=port,
dashboard_address=f"0.0.0.0:{dashboard_port}")
s = await s
await s.finished()
asyncio.get_event_loop().run_until_complete(f())
worker_config = configs.get(repo='spartan_worker')
envs = worker_config['environments']
for key, value in envs.items():
port = value['scheduler_port']
dashboard_port = str(value['dashboard_port'])
thread = Thread(
target=start_scheduler,
args=(port, dashboard_port))
thread.start()
和 worker :
def start(
scheduler_address: str,
scheduler_port: int,
worker_address: str,
worker_port: int
):
async def f(scheduler_address):
w = await Worker(
scheduler_address,
port=worker_port,
contact_address=f'{worker_address}:{worker_port}')
await w.finished()
asyncio.get_event_loop().run_until_complete(f(
f'tcp://{scheduler_address}:{str(scheduler_port)}'))
这可能不会直接帮助你解决这个问题,但我相信自从我们对它进行 dockerized 之后,我们不再有那个问题了。这里缺少很多东西,但这是基础知识,并且可能有更好的方法可以在分布式计算上获得专门的环境以方便使用,但这符合我们的需求。
关于python-3.x - Dask Client 无法连接到 dask-scheduler,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54678335/
有人可以看看我对 Quartz xml 的简单测试(每秒触发一次)并给我一个线索,为什么没有作业被添加到 sheduler 中?基本上我希望每秒触发“SimpleJob”类,我可以确定正在传递哪个作业
我创建了一个 Akka 的调度程序来每天按固定时间发送邮件(例如每天早上 6:00)。那么 Actor 怎么称呼呢?我的意思是我应该使用什么逻辑?谢谢。 最佳答案 只是计算现在和下一个下午 6 点之间
我正在使用 Quartz 调度,更具体地说是一个设置为每周每天晚上 10 点醒来的 cron 触发器。 我接触的另一个小组正在询问调度程序在一天中将唤醒多少次以检查它是否需要运行作业。晚上 10 点作
出现这些错误: 2018-01-22 18:00:59,797 [ServerService Thread Pool -- 79] ERROR org.quartz.ee.servlet.Quartz
出现这些错误: 2018-01-22 18:00:59,797 [ServerService Thread Pool -- 79] ERROR org.quartz.ee.servlet.Quartz
我对 Quartz Scheduler 工作线程有疑问。我创建了一个调度程序任务,它将每 3 小时执行一次。我创建了一份工作和一个触发器。当我执行这个调度程序时,我观察到一个奇怪的行为,同一个作业被分
我正在为我的网络应用程序实现 Quartz 调度程序。 我必须每周安排周一、周二重复 3 周 Quartz Scheduler 中的两种方式, 1)简单触发器: Trigger trigger = n
我正在使用 airbnb 的 Airflow ,我创建了一个简单的任务,如下所示。但是,即使我将间隔设置为每小时或任何其他间隔,调度程序仍会继续运行任务。我注意到的另一件事是,如果我将调度间隔设置为“
嗨,我是 Quartz Scheduler 的新手,我是第一次实现它。我想知道调度程序的开始调用是否会执行暂停的作业?或 暂停的作业只能通过恢复调用而不是其他任何方式来激活。请帮助我。 最佳答案 首先
如果我有一个运行着一堆触发器的 Quartz 调度程序,并且我想清除所有触发器,那么最好如何做到这一点? 我考虑过迭代组和名称,随时调用取消安排,但是当有数千个触发器到位时,这似乎非常慢(取消安排 1
嗨,我是 Quartz Scheduler 的新手,我是第一次实现它。我想知道调度程序的开始调用是否会执行暂停的作业?或 暂停的作业只能通过恢复调用而不是其他任何方式来激活。请帮助我。 最佳答案 首先
我在这里遇到了很多问题。我使用 ocLazyLoader 来加载完整的日历并且它运行良好,但是每当我尝试包含 fullCalendar-scheduler 时我在 JavaScript 中遇到这个错误
我最近在 Tardos 和 Kleinberg 的算法设计的第 4 章中阅读了有关间隔调度算法的内容。为间隔调度问题提供的解决方案是这样的: Sort the n intervals based on
如果一个进程被硬件中断(第一级中断处理程序)中断,那么 CPU 调度程序是否意识到这一点(例如,调度程序是否独立于被中断的进程计算硬件中断的执行时间)? 更多详情:我正在尝试解决以下问题:htop 中
为什么它们用于不同类型的任务?在处理计算任务与 io 任务时,它们有何不同? Schedulers.computation( ) - meant for computational work such
我在 couchbase 中使用 Observables。 Schedulers.io() 和 Schedulers.computation() 之间有什么区别? 最佳答案 RxJava调度器简介。
我遇到了一个可观察的问题: 在服务中我有一个函数(在 edit.component 中): public patchOne(entity: Tier): Observable { const
我正在研究 Flux 和 Mono,并在多线程环境中使用它们,并使用提供工作线程的 Schedular。 有很多选项可以使用 elastic、parallel 和 newElastic 来启动 Sch
FullCalendar 有一个名为 Scheduler 的附加组件,我正尝试将其与 PrimeNG-Schedule 组件一起使用。查看 PrimeNG 文档,有一个“选项”属性,我可以使用它向 F
我搜索了有关如何使用 Mass Transit 的 Quartz 集成 (https://github.com/MassTransit/MassTransit-Quartz) 的示例实现或博客文章。
我是一名优秀的程序员,十分优秀!