gpt4 book ai didi

multithreading - 使用 Clojure core.async 限制进程

转载 作者:行者123 更新时间:2023-12-03 03:28:15 24 4
gpt4 key购买 nike

我正在尝试使用 clojure core.async channel 来限制内存密集型并发进程。每个进程将图像加载到内存中并应用水印。如果我尝试同时处理太多图像,则会收到 OOM 错误。

下面的模式似乎可行,但感觉有点不优雅。我的问题是,有没有更好的方法来使用 core.async 来做到这一点?或者,我应该使用 java 并发工具来代替(即创建一个固定大小的线程池等)。

下面代码中的基本概念是使用全局固定大小 channel tchan,它用于限制进入in-chan的内容,基本上限制了数量并发进程数达到 tchan 的大小。

在下面的代码中,process-images 是入口点。

(def tbuff (buffer 20))

(def tchan
"tchan is used to throttle the number of processes
tbuff is a fixed size buffer"
(chan tbuff))

(defn accum-results
"Accumulates the images in results-chan"
[n result-chan]
(let [chans [result-chan (timeout timeout-ms)]]
(loop [imgs-out []
remaining n]
(if (zero? remaining)
imgs-out
(let [[img-result _] (alts!! chans)]
(if (nil? img-result)
(do
(log/warn "Image processing timed out")
(go (dotimes [_ remaining] (<! tchan)))
imgs-out)
(do
(go (<! tchan))
(recur (conj imgs-out img-result) (dec remaining)))))))))

(defn process-images
"Concurrently watermarks a list of images
Images is a sequence of maps representing image info
Concurrently fetches each actual image and applies the watermark
Returns a map of image info map -> image input stream"
[images]
(let [num-imgs (count images)
in-chan (chan num-imgs)
out-chan (chan num-imgs)]
;; set up the image-map consumer
;; asynchronously process things found on in-chan
(go
(dotimes [_ num-imgs]
; block here on input images
(let [img-in (<! in-chan)]
(thread
(let [img-out (watermark/watermarked-image-is img-in)]
(>!! out-chan [img-in img-out]))))))
;; put images on in-chan
(go
(doseq [img images]
(>! tchan :x)
(>! in-chan img)))
;; accum results
(let [results (accum-results num-imgs out-chan)]
(log/info (format "Processed %s of %s images and tbuff is %s"
(count results) num-imgs (count tbuff)))
(into {} results))))

最佳答案

我相信这正是pipeline是为了。

这是一个例子:

user> (require '[clojure.core.async :refer [<! <!! chan go go-loop pipeline pipeline-blocking pipeline-async] :as async])

user> (let [output (chan)
input (async/to-chan (range 10))]
(go-loop [x (<! output)]
(println x))
(pipeline 4
output
(map #(do
(Thread/sleep (rand-int 200))
(println "starting" %)
(Thread/sleep 1000)
(println "finished" %)
(inc %)))
input))
#object[clojure.core.async.impl.channels.ManyToManyChannel 0x3f434b5a "clojure.core.async.impl.channels.ManyToManyChannel@3f434b5a"]
user> starting 0
starting 3
starting 1
starting 2
finished 0
1
finished 3
finished 1
finished 2
starting 4
starting 5
starting 6
finished 4
finished 5
finished 6

关于multithreading - 使用 Clojure core.async 限制进程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37332588/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com