- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在 Elixir 中建立一个工作队列作为学术练习。目前,我的工作人员必须在创建队列时手动注册自己(参见 MyQuestion.Worker.start_link
)。
我希望我的主管在创建/重新启动时将可用的工作人员注册到队列中,因为这似乎有助于测试工作人员并最大限度地减少耦合。
有没有办法做我在 MyQuestion.Supervisor
中的代码中描述的事情?
defmodule MyQuestion.Supervisor do
use Supervisor
def start_link do
supervisor = Supervisor.start_link(__MODULE__, :ok)
end
def init(:ok) do
children = [
worker(MyQuestion.JobQueue, []),
worker(MyQuestion.Worker, [], id: :worker_0),
worker(MyQuestion.Worker, [], id: :worker_1)]
supervise(children, strategy: :rest_for_one)
end
# LOOKING FOR SOMETHING LIKE THIS
# on worker spawn, I want to add the worker to the queue
def child_spawned(pid, {MyQuestion.Worker, _, _}) do
# add worker to queue
MyQuestion.JobQueue.add_new_worker(pid)
end
# LOOKING FOR SOMETHING LIKE THIS
# I want some way to do the following (imagine the callback existed)
def child_terminated(pid, reason, state)
# with this information I could tell the job queue to mark
# the job associated with the pid as failed and to retry
# or maybe extract the job id from the worker state, etc.
MyQuestion.JobQueue.remove_worker(pid)
MyQuestion.JobQueue.restart_job_for_failed_worker(pid)
end
end
defmodule MyQuestion.JobQueue do
def start_link do
Agent.start_link(fn -> [] end, name: __MODULE__)
end
def new_worker(pid) do
# register pid with agent state in available worker list, etc.
end
def add_job(job_description) do
# find idle worker and run job
<... snip ...>
end
<... snip ...>
end
defmodule MyQuestion.Worker do
use GenServer
def start_link do
# start worker
{:ok, worker} = GenServer.start_link(__MODULE__, [])
# Now we have a worker pid, so we can register that pid with the queue
# I wish this could be in the supervisor or else where.
MyQuestion.JobQueue.add_new_worker(worker)
# must return gen server's start link
{:ok, worker}
end
<... snip ...>
end
最佳答案
他们的关键是调用 Process.monitor(pid)
的组合。 – 然后您将接到 handle_info
的电话– 并手动调用 Supervisor.start_child
这给了你pid。
我之前曾尝试使用 handle_info
但永远无法调用它。 Process.monitor(pid)
必须从您希望接收通知的同一进程中调用,因此您必须从 handle_call
内部调用它函数将监视器与您的服务器进程相关联。可能有一个函数可以将代码作为另一个进程运行(即 run_from_process(job_queue_pid, fn -> Process.monitor(pid_to_monitor) end)
),但我找不到任何东西。
附件是一个非常幼稚的作业队列实现。我只在 Elixir 呆了一天,所以代码既困惑又不惯用,但我附上它是因为似乎缺乏围绕该主题的示例代码。
看HeavyIndustry.JobQueue
, handle_info
, create_new_worker
.这段代码有一个明显的问题:它能够在工作人员崩溃时重新启动它们,但它无法从该代码启动下一个作业的队列(由于需要 GenServer.call
内的 handle_info
,这使我们陷入僵局)。我认为您可以通过将启 Action 业的进程与跟踪作业的进程分开来解决此问题。如果您运行示例代码,您会注意到它最终会停止运行作业,即使队列中还有一个作业(:crash
作业)。
defmodule HeavyIndustry.Supervisor do
use Supervisor
def start_link do
Supervisor.start_link(__MODULE__, :ok)
end
def init(:ok) do
# default to supervising nothing, we will add
supervise([], strategy: :one_for_one)
end
def create_children(supervisor, worker_count) do
# create the job queue. defaults to no workers
Supervisor.start_child(supervisor, worker(HeavyIndustry.JobQueue, [[supervisor, worker_count]]))
end
end
defmodule HeavyIndustry.JobQueue do
use GenServer
@job_queue_name __MODULE__
def start_link(args, _) do
GenServer.start_link(__MODULE__, args, name: @job_queue_name)
end
def init([supervisor, n]) do
# set some default state
state = %{
supervisor: supervisor,
max_workers: n,
jobs: [],
workers: %{
idle: [],
busy: []
}
}
{:ok, state}
end
def setup() do
# we want to be aware of worker failures. we hook into this by calling
# Process.monitor(pid), but this links the calling process with the monitored
# process. To make sure the calls come to US and not the process that called
# setup, we create the workers by passing a message to our server process
state = GenServer.call(@job_queue_name, :setup)
# gross passing the whole state back here to monitor but the monitoring must
# be started from the server process and we can't call GenServer.call from
# inside the :setup call else we deadlock.
workers = state.workers.idle
GenServer.call(@job_queue_name, {:monitor_pids, workers})
end
def add_job(from, job) do
# add job to queue
{:ok, our_job_id} = GenServer.call(@job_queue_name, {:create_job, %{job: job, reply_to: from}})
# try to run the next job
case GenServer.call(@job_queue_name, :start_next_job) do
# started our job
{:ok, started_job_id = ^our_job_id} -> {:ok, :started}
# started *a* job
{:ok, _} -> {:ok, :pending}
# couldnt start any job but its ok...
{:error, :no_idle_workers} -> {:ok, :pending}
# something fell over...
{:error, e} -> {:error, e}
# yeah I know this is bad.
_ -> {:ok}
end
end
def start_next_job do
GenServer.call(@job_queue_name, :start_next_job)
end
##
# Internal API
##
def handle_call(:setup, _, state) do
workers = Enum.map(0..(state.max_workers-1), fn (n) ->
{:ok, pid} = start_new_worker(state.supervisor)
pid
end)
state = %{state | workers: %{state.workers | idle: workers}}
{:reply, state, state}
end
defp start_new_worker(supervisor) do
spec = Supervisor.Spec.worker(HeavyIndustry.Worker, [], id: :"Worker.#{:os.system_time}", restart: :temporary)
# start worker
Supervisor.start_child(supervisor, spec)
end
def handle_call({:monitor_pids, list}, _, state) do
Enum.each(list, &Process.monitor(&1))
{:reply, :ok, state}
end
def handle_call({:create_job, job}, from, state) do
job = %{
job: job.job,
reply_to: job.reply_to,
id: :os.system_time, # id for task
status: :pending, # start pending, go active, then remove
pid: nil
}
# add new job to jobs list
state = %{state | jobs: state.jobs ++ [job]}
{:reply, {:ok, job.id}, state}
end
def handle_call(:start_next_job, _, state) do
IO.puts "==> Start Next Job"
IO.inspect state
IO.puts "=================="
reply = case {find_idle_worker(state.workers), find_next_job(state.jobs)} do
{{:error, :no_idle_workers}, _} ->
# no workers for job, doesnt matter if we have a job
{:error, :no_idle_workers}
{_, nil} ->
# no job, doesnt matter if we have a worker
{:error, :no_more_jobs}
{{:ok, worker}, job} ->
# have worker, have job, do work
# update state to set job active and worker busy
jobs = state.jobs -- [job]
job = %{job | status: :active, pid: worker}
jobs = jobs ++ [job]
idle = state.workers.idle -- [worker]
busy = state.workers.busy ++ [worker]
state = %{state | jobs: jobs, workers: %{idle: idle, busy: busy}}
{:ok, task_id} = Task.start(fn ->
result = GenServer.call(worker, job.job)
remove_job(job)
free_worker(worker)
send job.reply_to, %{answer: result, job: job.job}
start_next_job
end)
{:ok, job.id}
end
{:reply, reply, state}
end
defp find_idle_worker(workers) do
case workers do
%{idle: [], busy: _} -> {:error, :no_idle_workers}
%{idle: [worker | idle], busy: busy} -> {:ok, worker}
end
end
defp find_next_job(jobs) do
jobs |> Enum.find(&(&1.status == :pending))
end
defp free_worker(worker) do
GenServer.call(@job_queue_name, {:free_worker, worker})
end
defp remove_job(job) do
GenServer.call(@job_queue_name, {:remove_job, job})
end
def handle_call({:free_worker, worker}, from, state) do
idle = state.workers.idle ++ [worker]
busy = state.workers.busy -- [worker]
{:reply, :ok, %{state | workers: %{idle: idle, busy: busy}}}
end
def handle_call({:remove_job, job}, from, state) do
jobs = state.jobs -- [job]
{:reply, :ok, %{state | jobs: jobs}}
end
def handle_info(msg = {reason, ref, :process, pid, _reason}, state) do
IO.puts "Worker collapsed: #{reason} #{inspect pid}, clear and restart job"
# find job for collapsed worker
# set job to pending again
job = Enum.find(state.jobs, &(&1.pid == pid))
fixed_job = %{job | status: :pending, pid: nil}
jobs = (state.jobs -- [job]) ++ [fixed_job]
# remote worker from lists
idle = state.workers.idle -- [pid]
busy = state.workers.busy -- [pid]
# start new worker
{:ok, pid} = start_new_worker(state.supervisor)
# add worker from lists
idle = state.workers.idle ++ [pid]
# cant call GenServer.call from here to monitor pid,
# so duplicate the code a bit...
Process.monitor(pid)
# update state
state = %{state | jobs: jobs, workers: %{idle: idle, busy: busy}}
{:noreply, state}
end
end
defmodule HeavyIndustry.Worker do
use GenServer
def start_link do
GenServer.start_link(__MODULE__, :ok)
end
def init(:ok) do
# workers have no persistent state
IO.puts "==> Worker up! #{inspect self}"
{:ok, nil}
end
def handle_call({:sum, list}, from, _) do
sum = Enum.reduce(list, fn (n, acc) -> acc + n end)
{:reply, sum, nil}
end
def handle_call({:fib, n}, from, _) do
sum = fib_calc(n)
{:reply, sum, nil}
end
def handle_call({:stop}, from, state) do
{:stop, "my-stop-reason", "my-stop-reply", state}
end
def handle_call({:crash}, from, _) do
{:reply, "this will crash" ++ 1234, nil}
end
def handle_call({:timeout}, from, _) do
:timer.sleep 10000
{:reply, "this will timeout", nil}
end
# Slow fib
defp fib_calc(0), do: 0
defp fib_calc(1), do: 1
defp fib_calc(n), do: fib_calc(n-1) + fib_calc(n-2)
end
defmodule Looper do
def start do
{:ok, pid} = HeavyIndustry.Supervisor.start_link
{:ok, job_queue} = HeavyIndustry.Supervisor.create_children(pid, 2)
HeavyIndustry.JobQueue.setup()
add_jobs
loop
end
def add_jobs do
jobs = [
{:sum, [100, 200, 300]},
{:crash},
{:fib, 35},
{:fib, 35},
{:sum, [88, 88, 99]},
{:fib, 35},
{:fib, 35},
{:fib, 35},
{:sum, 0..100},
# {:stop}, # stop not really a failure
{:sum, [88, 88, 99]},
# {:timeout},
{:sum, [-1]}
]
Enum.each(jobs, fn (job) ->
IO.puts "~~~~> Add job: #{inspect job}"
case HeavyIndustry.JobQueue.add_job(self, job) do
{:ok, :started} -> IO.puts "~~~~> Started job immediately"
{:ok, :pending} -> IO.puts "~~~~> Job in queue"
val -> IO.puts "~~~~> ... val: #{inspect val}"
end
end)
end
def loop do
receive do
value ->
IO.puts "~~~~> Received: #{inspect value}"
loop
end
end
end
Looper.start
关于elixir - 检测 Elixir/OTP 主 pipe 生成和终止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33187041/
我正在尝试使用以下 keytool 命令为我的应用程序生成 keystore : keytool -genkey -alias tomcat -keystore tomcat.keystore -ke
编辑:在西里尔正确解决问题后,我注意到只需将生成轴的函数放在用于生成标签的函数下面就可以解决问题。 我几乎读完了 O'Reilly 书中关于 D3.js 的教程,并在倒数第二页上制作了散点图,但是当添
虽然使用 GraphiQL 效果很好,但我的老板要求我实现一个用户界面,用户可以在其中通过 UI 元素(例如复选框、映射关系)检查呈现给他们的元素并获取数据,这样做将为该人生成 graphql 输入,
我尝试在 Netbean 6.8 中使用 ws-import 生成 Java 类。我想重新生成 jax-ws,因为在 ebay.api.paypalapi 包中发现了一个错误(我认为该错误是由于 Pa
我有一个 perl 脚本,它获取系统日期并将该日期写入文件名。 系统日期被分配给 TRH1 变量,然后它被设置为一个文件名。 $TRH1 =`date + %Y%m%d%H%M`; print "TR
我是 Haskell 的新手,需要帮助。我正在尝试构建一种必须具有某种唯一性的新数据类型,因此我决定使用 UUID 作为唯一标识符: data MyType = MyType { uuid ::
我制作了一个脚本,它可以根据 Mysql 数据库中的一些表生成 XML。 该脚本在 PHP 中运行。 public function getRawMaterials($apiKey, $format
所以这是我的项目中的一个问题。 In this task, we will use OpenSSL to generate digital signatures. Please prepare a f
我在 SAS LIFEREG 中有一个加速故障时间模型,我想绘制它。因为 SAS 在绘图方面非常糟糕,我想实际重新生成 R 中曲线的数据并将它们绘制在那里。 SAS 提出了一个尺度(在指数分布固定为
我正在为 Django 后端制作一个样板,并且我需要能够使它到达下一个下载它的人显然无法访问我的 secret key 的地方,或者拥有不同的 key 。我一直在研究一些选项,并在这个过程中进行了实验
我正在创建一个生成采购订单的应用程序。我可以根据用户输入的详细信息创建文本文件。我想生成一个看起来比普通文本文件好得多的 Excel。有没有可以在我的应用程序中使用的开源库? 最佳答案 目前还没有任何
我正在尝试使用 ScalaCheck 为 BST 创建一个 Gen,但是当我调用 .sample 方法时,它给了我 java.lang.NullPointerException。我哪里错了? seal
已关闭。此问题需要 debugging details 。目前不接受答案。 编辑问题以包含 desired behavior, a specific problem or error, and the
我尝试编写一些代码,例如(在verilog中): parameter N = 128; if (encoder_in[0] == 1) begin 23 binary_out = 1;
我正忙于在 Grails 项目中进行从 MySQL 到 Postgres 的相当复杂的数据迁移。 我正在使用 GORM 在 PostGres 中生成模式,然后执行 MySQL -> mysqldump
如何使用纯 XSLT 生成 UUID?基本上是寻找一种使用 XSLT 创建独特序列的方法。该序列可以是任意长度。 我正在使用 XSLT 2.0。 最佳答案 这是一个good example 。基本上,
我尝试安装.app文件,但是当我安装并单击“同步”(在iTunes中)时,我开始在设备上开始安装,然后停止,这是一个问题,我不知道在哪里,但我看到了我无法解决的奇怪的事情: 最佳答案 似乎您没有在Xc
自从我生成 JavaDocs 以来已经有一段时间了,我确信这些选项在过去 10 年左右的时间里已经得到了改进。 我能否得到一些有关生成器的建议,该生成器将输出类似于 .Net 文档结构的 JavaDo
我想学习如何生成 PDF,我不想使用任何第三方工具,我想自己用代码创建它。到目前为止,我所看到的唯一示例是我通过在第 3 方 dll 上打开反射器查看的代码,以查看发生了什么。不幸的是,到目前为止我看
我正在从 Epplus 库生成 excel 条形图。 这是我成功生成的。 我的 table 是这样的 Mumbai Delhi Financial D
我是一名优秀的程序员,十分优秀!