gpt4 book ai didi

networking - 如何在 Clojure (/Java) 中稳健地发出大量并发 HTTPS 请求

转载 作者:行者123 更新时间:2023-12-03 08:17:27 24 4
gpt4 key购买 nike

我有一个输入流,我想制作 2 HTTPS在将结果传递给程序的另一部分之前,对每个请求进行网络请求。典型的吞吐量是每秒 50 个。

for each input:
HTTP request A
HTTP request B
pass event on with (A.body and B.body)

我正在使用 http-kit 客户端,默认是异步的。它返回一个 promise ,也可以接受一个回调。 Http-kit 使用 Java NIO(参见 herehere)

请求进入的速度,加上发出请求的时间,足够高,需要异步完成。

我尝试了 3 种方法:
  • 当一个事件进来时,把它放在一个 channel 上。数量go套路拉断 channel 。每个发出请求,通过 deref“阻止”goblock来自 HTTP 请求的 promise 。这是行不通的,因为我不认为 promise 与线程配合得很好。
  • 当有事件发生时,立即启动 future ,它“阻止”异步 promise 。这导致 非常 CPU使用率高。加上不知何故的网络资源匮乏。
  • 当事件进入时,触发 http-kit立即请求请求 A,传入一个回调,该回调产生请求 B,传递一个传递事件的回调。这会在几个小时后导致内存不足错误。

  • 这些都可以工作并处理一段时间的容量。他们最终都崩溃了。最近一次崩溃,大约 12 小时后:
    Mar 10, 2016 2:05:59 AM com.mchange.v2.async.ThreadPoolAsynchronousRunner$DeadlockDetector run
    WARNING: com.mchange.v2.async.ThreadPoolAsynchronousRunner$DeadlockDetector@1bc8a7f5 -- APPARENT DEADLOCK!!! Creating emergency threads for unassigned pending
    tasks!
    Mar 10, 2016 3:38:38 AM com.mchange.v2.async.ThreadPoolAsynchronousRunner$DeadlockDetector run
    WARNING: com.mchange.v2.async.ThreadPoolAsynchronousRunner$DeadlockDetector@1bc8a7f5 -- APPARENT DEADLOCK!!! Complete Status:
    Managed Threads: 3
    Active Threads: 1
    Active Tasks:
    com.mchange.v2.resourcepool.BasicResourcePool$1DestroyResourceTask@65d8b232 (com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#0)
    Pending Tasks:
    com.mchange.v2.resourcepool.BasicResourcePool$AcquireTask@359acb0d
    Pool thread stack traces:
    Thread[com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#0,5,main]
    com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread.run(ThreadPoolAsynchronousRunner.java:560)
    Thread[com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#1,5,main]
    java.lang.Object.wait(Native Method)
    com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread.run(ThreadPoolAsynchronousRunner.java:534)
    Thread[com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#2,5,main]
    java.lang.Object.wait(Native Method)
    com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread.run(ThreadPoolAsynchronousRunner.java:534)


    Thu Mar 10 04:38:34 UTC 2016 [client-loop] ERROR - select exception, should not happen
    java.lang.OutOfMemoryError: Java heap space
    at java.io.ByteArrayOutputStream.<init>(ByteArrayOutputStream.java:77)
    at sun.security.ssl.OutputRecord.<init>(OutputRecord.java:76)
    at sun.security.ssl.EngineOutputRecord.<init>(EngineOutputRecord.java:65)
    at sun.security.ssl.HandshakeOutStream.<init>(HandshakeOutStream.java:63)
    at sun.security.ssl.Handshaker.activate(Handshaker.java:514)
    at sun.security.ssl.SSLEngineImpl.kickstartHandshake(SSLEngineImpl.java:717)
    at sun.security.ssl.SSLEngineImpl.beginHandshake(SSLEngineImpl.java:743)
    at org.httpkit.client.HttpClient.finishConnect(HttpClient.java:310)
    at org.httpkit.client.HttpClient.run(HttpClient.java:375)
    at java.lang.Thread.run(Thread.java:745)
    Mar 10, 2016 4:56:34 AM baleen.events invoke
    SEVERE: Thread error: Java heap space
    java.lang.OutOfMemoryError: Java heap space
    Mar 10, 2016 5:00:43 AM baleen.events invoke
    SEVERE: Thread error: Java heap space
    java.lang.OutOfMemoryError: Java heap space
    Mar 10, 2016 4:58:25 AM baleen.events invoke
    SEVERE: Thread error: Java heap space
    java.lang.OutOfMemoryError: Java heap space

    我不知道失败的原因是什么。可能是有太多的闭包被持有,或者逐渐的资源泄漏,或者线程饥饿。

    问题
  • 每秒发出 50 个 HTTP 请求,每个请求可能需要 200 毫秒,这意味着在任何给定时间可能有 100 个请求在运行,这听起来是不是负担过重?
  • 我如何以处理吞吐量且稳健的方式执行此操作?

  • 编辑

    YourKit 分析器告诉我,我有大约 2GB 的 char[] s 通过 org.httpkit.client.Handler s 通过 java.util.concurrent.FutureTask s 表明以某种方式保留了对旧处理程序(即请求)的引用。尝试使用回调的全部原因是为了避免这种情况(尽管它们可能会以某种方式陷入闭包)

    最佳答案

    1. Does making 50 HTTP requests per second, each of which might take 200ms, meaning that there might be 100 requests in flight at any given time, sound like an excessive burden?


    这在现代硬件上绝对不过分。

    1. How do I do this in a way that handles the throughput and is robust?


    您可以结合 core.async 管道和 http-kit 的回调来实现这一点。你真的不需要创建 go每个请求的例程(尽管这不应该受到伤害),因为您可以使用 async put!来自 http-kit 回调。

    对管道的每个步骤使用有界缓冲区来限制事件连接的数量,这将(至少)受到系统上可用的临时 TCP 端口数量的限制。

    这是一个小程序的示例,它执行类似于您所描述的操作。它从 channel 读取“事件”——在这种情况下,每个事件都是 ID“1”——并在 HTTP 服务上查找这些 ID。它从第一次调用中获取响应,查找 JSON key "next"并将其作为第 2 步的 URL 加入队列。最后,当该查找完成时,它会向 out 添加一个事件。 channel 哪一个 go例行监测报告统计数据。
    (ns concur-req.core
    (require [clojure.core.async :as async]
    [cheshire.core :refer [decode]]
    [org.httpkit.client :as http]))

    (defn url-of
    [id]
    ;; this service responds within 100-200ms
    (str "http://localhost:28080/" id ".json"))

    (defn retrieve-json-async
    [url c]
    (http/get url nil
    (fn [{body :body status :status :as resp}]
    (if (= 200 status)
    (async/put! c (decode body true))
    (println "ERROR:" resp))
    (async/close! c))))

    (defn run [parallelism stop-chan]
    (let [;; allocate half of the parallelism to each step
    step1-n (int (max (/ parallelism 2) 1))
    step2-n step1-n
    ;; buffer to take ids, transform them into urls
    step1-chan (async/chan step1-n (map url-of))
    ;; buffer for result of pulling urls from step1, xform by extracting :next url
    step2-chan (async/chan step2-n (map :next))
    ;; buffer to count completed results
    out-chan (async/chan 1 (map (constantly 1)))
    ;; for delivering the final result
    final-chan (async/chan)
    start-time (System/currentTimeMillis)]

    ;; process URLs from step1 and put the result in step2
    (async/pipeline-async step1-n step2-chan retrieve-json-async step1-chan)
    ;; process URLs from step2 and put the result in out
    (async/pipeline-async step2-n out-chan retrieve-json-async step2-chan)

    ;; keep the input channel full until stop-chan is closed.
    (async/go-loop []
    (let [[v c] (async/alts! [stop-chan [step1-chan "1"]])]
    (if (= c stop-chan)
    (async/close! step1-chan)
    (recur))))

    ;; count messages on out-chan until the pipeline is closed, printing
    ;; status message every second
    (async/go-loop [status-timer (async/timeout 1000) subt 0 accu 0]
    (let [[v c] (async/alts! [status-timer out-chan])]
    (cond (= c status-timer)
    (do (println subt "records...")
    (recur (async/timeout 1000) 0 (+ subt accu)))

    (nil? v)
    (async/>! final-chan (+ subt accu))

    :else
    (recur status-timer (+ v subt) accu))))

    ;; block until done, then emit final report.
    (let [final-total (async/<!! final-chan)
    elapsed-ms (- (System/currentTimeMillis) start-time)
    elapsed-s (/ elapsed-ms 1000.0)]
    (print (format "Processed %d records with parallelism %d in %.3f seconds (%d/sec)\n"
    final-total parallelism elapsed-s
    (int (/ final-total elapsed-s)))))))

    (defn run-for
    [seconds parallelism]
    (let [stop-chan (async/chan)]
    (future
    (Thread/sleep (* seconds 1000))
    (async/close! stop-chan))
    (run parallelism stop-chan)))

    (do
    ;; Warm up the connection pool, avoid somaxconn problems...
    (doseq [p (map #(* 20 (inc %)) (range 25))]
    (run-for 1 p))
    (run-for (* 60 60 6) 500))

    为了测试这一点,我设置了一个 HTTP 服务,该服务仅在 100-200 毫秒之间的随机时间休眠后才响应。然后我在我的 Macbook Pro 上运行了这个程序 6 个小时。

    并行度设置为 500 时,我平均每秒完成 1155 个事务(每秒完成 2310 个 HTTP 请求)。我相信通过一些调整(尤其是通过将 HTTP 服务移动到不同的机器上),这可能会更高。 JVM 内存在前 30 分钟内攀升至 1.5 GB,然后保持该大小。我正在使用 Oracle 的 64 位 1.8 JVM。

    关于networking - 如何在 Clojure (/Java) 中稳健地发出大量并发 HTTPS 请求,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35914186/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com