gpt4 book ai didi

clojure - 有资质的生产者消费者

转载 作者:行者123 更新时间:2023-12-02 09:25:05 25 4
gpt4 key购买 nike

我是 clojure 的新手,正在尝试了解如何正确使用其并发功能,因此任何批评/建议都会受到赞赏。所以我尝试在 clojure 中编写一个小型测试程序,其工作原理如下:

  1. 有 5 个生产者和 2 个消费者
  2. 生产者等待一段随机时间,然后将一个数字推送到共享队列中。
  3. 消费者应该在队列非空时立即从队列中取出一个数字,然后休眠一小段时间以模拟工作
  4. 当队列为空时,消费者应该阻塞
  5. 当队列中有超过 4 个项目时,生产者应该阻塞,以防止队列变得庞大

这是我对上述每个步骤的计划:

  1. 生产者和消费者将成为并不真正关心其状态的代理(只是零值或其他东西);我只是使用代理来发送“消费者”或“生产者”功能在某个时间执行。那么共享队列将是(defqueue(ref[]))。也许这应该是一个原子?
  2. 在“生产者”代理函数中,只需 (Thread/sleep (rand-int 1000)) 然后 (dosync (alterqueue conj (rand-int 100))) 推送到队列。
  3. 我正在考虑让消费者代理使用 add-watcher 来监视队列中的更改。但不确定这一点......它会唤醒消费者的任何变化,即使变化来自消费者拉动一些东西(可能使其变空)。也许在观察者函数中检查这一点就足够了。我看到的另一个问题是,如果所有消费者都很忙,那么当生产者向队列添加新内容时会发生什么?监视的事件是否会在某个消费者代理上排队或者消失?
  4. 见上文
  5. 我真的不知道该怎么做。我听说 clojure 的 seque 可能很有用,但我找不到足够的文档来说明如何使用它,而且我的初始测试似乎不起作用(抱歉,我身上不再有代码了)

最佳答案

这是我的看法。我特意只使用 Clojure 数据结构来看看效果如何。请注意,从 Java 工具箱中获取阻塞队列并在此处使用它是非常常见和惯用的;我认为代码很容易适应。 更新:我实际上确实将其改编为java.util.concurrent.LinkedBlockingQueue,见下文。

clojure.lang.PersistentQueue

调用(pro-con)开始测试运行;然后查看 output 的内容,看看是否发生了任何事情,并查看 queue-lengths 的内容,看看它们是否保持在给定的范围内。

更新:为了解释为什么我觉得需要使用下面的ensure(我在IRC上被问到这个问题),这是为了防止写入倾斜(参见维基百科)有关定义的文章Snapshot isolation)。如果我用@queue替换(确保队列),两个或多个生产者就可以检查队列的长度,发现它小于4,然后将其他项目放入队列中,并可能使队列的总长度超过 4,从而打破约束。同样,两个使用 @queue 的消费者可以接受相同的项目进行处理,然后从队列中弹出两个项目。 确保防止这两种情况发生。

(def go-on? (atom true))
(def queue (ref clojure.lang.PersistentQueue/EMPTY))
(def output (ref ()))
(def queue-lengths (ref ()))
(def *max-queue-length* 4)

(defn overseer
([] (overseer 20000))
([timeout]
(Thread/sleep timeout)
(swap! go-on? not)))

(defn queue-length-watch [_ _ _ new-queue-state]
(dosync (alter queue-lengths conj (count new-queue-state))))

(add-watch queue :queue-length-watch queue-length-watch)

(defn producer [tag]
(future
(while @go-on?
(if (dosync (let [l (count (ensure queue))]
(when (< l *max-queue-length*)
(alter queue conj tag)
true)))
(Thread/sleep (rand-int 2000))))))

(defn consumer []
(future
(while @go-on?
(Thread/sleep 100) ; don't look at the queue too often
(when-let [item (dosync (let [item (first (ensure queue))]
(alter queue pop)
item))]
(Thread/sleep (rand-int 500)) ; do stuff
(dosync (alter output conj item)))))) ; and let us know

(defn pro-con []
(reset! go-on? true)
(dorun (map #(%1 %2)
(repeat 5 producer)
(iterate inc 0)))
(dorun (repeatedly 2 consumer))
(overseer))

java.util.concurrent.LinkedBlockingQueue

使用LinkedBlockingQueue编写的上述版本。请注意,代码的总体轮廓基本相同,但某些细节实际上稍微清晰一些。我从此版本中删除了 queue-lengths,因为 LBQ 为我们处理了该约束。

(def go-on? (atom true))
(def *max-queue-length* 4)
(def queue (java.util.concurrent.LinkedBlockingQueue. *max-queue-length*))
(def output (ref ()))

(defn overseer
([] (overseer 20000))
([timeout]
(Thread/sleep timeout)
(swap! go-on? not)))

(defn producer [tag]
(future
(while @go-on?
(.put queue tag)
(Thread/sleep (rand-int 2000)))))

(defn consumer []
(future
(while @go-on?
;; I'm using .poll on the next line so as not to block
;; indefinitely if we're done; note that this has the
;; side effect that nulls = nils on the queue will not
;; be handled; there's a number of other ways to go about
;; this if this is a problem, see docs on LinkedBlockingQueue
(when-let [item (.poll queue)]
(Thread/sleep (rand-int 500)) ; do stuff
(dosync (alter output conj item)))))) ; and let us know

(defn pro-con []
(reset! go-on? true)
(dorun (map #(%1 %2)
(repeat 5 producer)
(iterate inc 0)))
(dorun (repeatedly 2 consumer))
(overseer))

关于clojure - 有资质的生产者消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/2760017/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com