gpt4 book ai didi

multithreading - 何时使用非阻塞 >!/线程和阻塞>!!/带有 clojure core.async 的 goroutines

转载 作者:行者123 更新时间:2023-12-04 03:01:04 25 4
gpt4 key购买 nike

我正在编写一个 ETL 流程来从产品数据库中读取事件级数据,对其进行转换/聚合并写入分析数据仓库。我正在使用 clojure 的 core.async 库将这些进程分成并发执行的组件。这是我的代码的主要部分现在的样子

    (ns data-staging.main
(:require [clojure.core.async :as async])
(:use [clojure.core.match :only (match)]
[data-staging.map-vecs]
[data-staging.tables])
(:gen-class))

(def submissions (make-table "Submission" "Valid"))
(def photos (make-table "Photo"))
(def videos (make-table "Video"))
(def votes (make-table "Votes"))

;; define channels used for sequential data processing
(def chan-in (async/chan 100))
(def chan-out (async/chan 100))

(defn write-thread [table]
"infinitely loops between reading subsequent 10000 rows from
table and ouputting a vector of the rows(maps)
into 'chan-in'"
(while true
(let [next-rows (get-rows table)]
(async/>!! chan-in next-rows)
(set-max table (:max-id (last next-rows))))))

(defn aggregator []
"takes output from 'chan-in' and aggregates it by coupon_id, date.
then adds / drops any fields that are needed / not needed and inputs
into 'chan-out'"
(while true
(->>
(async/<!! chan-in)
aggregate
(async/>!! chan-out))))

(defn read-thread []
"reads data from chan out and interts into Analytics DB"
(while true
(upsert (async/<!! chan-out))))

(defn -main []
(async/thread (write-thread submissions))
(async/thread (write-thread photos))
(async/thread (write-thread videos))
(async/thread-call aggregator)
(async/thread-call read-thread))

如您所见,我将每个 os 组件放在自己的线程上并使用阻塞 >!!调用 channel 。感觉就像使用非阻塞 >!对于这个用例,调用和 go 例程可能会更好,特别是对于花费大部分时间执行 i/o 和等待产品数据库中的新行的数据库读取。是这样吗?如果是这样,实现它的最佳方法是什么?我对这两种方法之间的所有权衡以及究竟如何有效地使用 go 例程有点不清楚。此外,任何其他关于如何改进整体架构的建议将不胜感激!

最佳答案

就个人而言,我认为您在这里使用线程可能是正确的选择。 go-blocks 神奇的非阻塞性质来自“ parking ”,这是 core.async 的状态机使用的一种特殊的伪阻塞——但由于您的数据库调用真正阻塞而不是将状态机置于 parking 状态,你只是阻塞了 core.async 线程池中的一些线程。它确实取决于您的同步调用需要多长时间,所以这是基准可以提供信息的那种事情,但我强烈怀疑线程是这里的正确方法。

一个异常(exception)是您的聚合器功能。在我看来它可以被折叠到 chan-out 的定义中,如 (def chan-out (map< aggregate chan-in)) .

对于 go-blocks 与线程的一般概述,Martin Trojer 写了一篇很好的 examination of the two approaches在哪种情况下哪个更快。 Cliff's Notes 版本是 go-blocks 适合于调整已经异步的库以与 core.async 一起使用,而线程则适合于从同步部分中创建异步进程。例如,如果您的数据库有一个基于回调的 API,那么 go-blocks 将是一个绝对的胜利。但由于它是同步的,它们并不适合。

关于multithreading - 何时使用非阻塞 >!/线程和阻塞>!!/带有 clojure core.async 的 goroutines,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21445284/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com