作者热门文章
- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个输入流,我想制作 2 HTTPS
在将结果传递给程序的另一部分之前,对每个请求进行网络请求。典型的吞吐量是每秒 50 个。
for each input:
HTTP request A
HTTP request B
pass event on with (A.body and B.body)
http-kit
客户端,默认是异步的。它返回一个 promise ,也可以接受一个回调。 Http-kit 使用 Java NIO(参见
here 和
here)
go
套路拉断 channel 。每个发出请求,通过 deref
“阻止”goblock来自 HTTP 请求的 promise 。这是行不通的,因为我不认为 promise 与线程配合得很好。 future
,它“阻止”异步 promise 。这导致 非常 CPU使用率高。加上不知何故的网络资源匮乏。 http-kit
立即请求请求 A,传入一个回调,该回调产生请求 B,传递一个传递事件的回调。这会在几个小时后导致内存不足错误。 Mar 10, 2016 2:05:59 AM com.mchange.v2.async.ThreadPoolAsynchronousRunner$DeadlockDetector run
WARNING: com.mchange.v2.async.ThreadPoolAsynchronousRunner$DeadlockDetector@1bc8a7f5 -- APPARENT DEADLOCK!!! Creating emergency threads for unassigned pending
tasks!
Mar 10, 2016 3:38:38 AM com.mchange.v2.async.ThreadPoolAsynchronousRunner$DeadlockDetector run
WARNING: com.mchange.v2.async.ThreadPoolAsynchronousRunner$DeadlockDetector@1bc8a7f5 -- APPARENT DEADLOCK!!! Complete Status:
Managed Threads: 3
Active Threads: 1
Active Tasks:
com.mchange.v2.resourcepool.BasicResourcePool$1DestroyResourceTask@65d8b232 (com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#0)
Pending Tasks:
com.mchange.v2.resourcepool.BasicResourcePool$AcquireTask@359acb0d
Pool thread stack traces:
Thread[com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#0,5,main]
com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread.run(ThreadPoolAsynchronousRunner.java:560)
Thread[com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#1,5,main]
java.lang.Object.wait(Native Method)
com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread.run(ThreadPoolAsynchronousRunner.java:534)
Thread[com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#2,5,main]
java.lang.Object.wait(Native Method)
com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread.run(ThreadPoolAsynchronousRunner.java:534)
Thu Mar 10 04:38:34 UTC 2016 [client-loop] ERROR - select exception, should not happen
java.lang.OutOfMemoryError: Java heap space
at java.io.ByteArrayOutputStream.<init>(ByteArrayOutputStream.java:77)
at sun.security.ssl.OutputRecord.<init>(OutputRecord.java:76)
at sun.security.ssl.EngineOutputRecord.<init>(EngineOutputRecord.java:65)
at sun.security.ssl.HandshakeOutStream.<init>(HandshakeOutStream.java:63)
at sun.security.ssl.Handshaker.activate(Handshaker.java:514)
at sun.security.ssl.SSLEngineImpl.kickstartHandshake(SSLEngineImpl.java:717)
at sun.security.ssl.SSLEngineImpl.beginHandshake(SSLEngineImpl.java:743)
at org.httpkit.client.HttpClient.finishConnect(HttpClient.java:310)
at org.httpkit.client.HttpClient.run(HttpClient.java:375)
at java.lang.Thread.run(Thread.java:745)
Mar 10, 2016 4:56:34 AM baleen.events invoke
SEVERE: Thread error: Java heap space
java.lang.OutOfMemoryError: Java heap space
Mar 10, 2016 5:00:43 AM baleen.events invoke
SEVERE: Thread error: Java heap space
java.lang.OutOfMemoryError: Java heap space
Mar 10, 2016 4:58:25 AM baleen.events invoke
SEVERE: Thread error: Java heap space
java.lang.OutOfMemoryError: Java heap space
char[]
s 通过
org.httpkit.client.Handler
s 通过
java.util.concurrent.FutureTask
s 表明以某种方式保留了对旧处理程序(即请求)的引用。尝试使用回调的全部原因是为了避免这种情况(尽管它们可能会以某种方式陷入闭包)
最佳答案
- Does making 50 HTTP requests per second, each of which might take 200ms, meaning that there might be 100 requests in flight at any given time, sound like an excessive burden?
- How do I do this in a way that handles the throughput and is robust?
go
每个请求的例程(尽管这不应该受到伤害),因为您可以使用 async
put!
来自 http-kit 回调。
"next"
并将其作为第 2 步的 URL 加入队列。最后,当该查找完成时,它会向
out
添加一个事件。 channel 哪一个
go
例行监测报告统计数据。
(ns concur-req.core
(require [clojure.core.async :as async]
[cheshire.core :refer [decode]]
[org.httpkit.client :as http]))
(defn url-of
[id]
;; this service responds within 100-200ms
(str "http://localhost:28080/" id ".json"))
(defn retrieve-json-async
[url c]
(http/get url nil
(fn [{body :body status :status :as resp}]
(if (= 200 status)
(async/put! c (decode body true))
(println "ERROR:" resp))
(async/close! c))))
(defn run [parallelism stop-chan]
(let [;; allocate half of the parallelism to each step
step1-n (int (max (/ parallelism 2) 1))
step2-n step1-n
;; buffer to take ids, transform them into urls
step1-chan (async/chan step1-n (map url-of))
;; buffer for result of pulling urls from step1, xform by extracting :next url
step2-chan (async/chan step2-n (map :next))
;; buffer to count completed results
out-chan (async/chan 1 (map (constantly 1)))
;; for delivering the final result
final-chan (async/chan)
start-time (System/currentTimeMillis)]
;; process URLs from step1 and put the result in step2
(async/pipeline-async step1-n step2-chan retrieve-json-async step1-chan)
;; process URLs from step2 and put the result in out
(async/pipeline-async step2-n out-chan retrieve-json-async step2-chan)
;; keep the input channel full until stop-chan is closed.
(async/go-loop []
(let [[v c] (async/alts! [stop-chan [step1-chan "1"]])]
(if (= c stop-chan)
(async/close! step1-chan)
(recur))))
;; count messages on out-chan until the pipeline is closed, printing
;; status message every second
(async/go-loop [status-timer (async/timeout 1000) subt 0 accu 0]
(let [[v c] (async/alts! [status-timer out-chan])]
(cond (= c status-timer)
(do (println subt "records...")
(recur (async/timeout 1000) 0 (+ subt accu)))
(nil? v)
(async/>! final-chan (+ subt accu))
:else
(recur status-timer (+ v subt) accu))))
;; block until done, then emit final report.
(let [final-total (async/<!! final-chan)
elapsed-ms (- (System/currentTimeMillis) start-time)
elapsed-s (/ elapsed-ms 1000.0)]
(print (format "Processed %d records with parallelism %d in %.3f seconds (%d/sec)\n"
final-total parallelism elapsed-s
(int (/ final-total elapsed-s)))))))
(defn run-for
[seconds parallelism]
(let [stop-chan (async/chan)]
(future
(Thread/sleep (* seconds 1000))
(async/close! stop-chan))
(run parallelism stop-chan)))
(do
;; Warm up the connection pool, avoid somaxconn problems...
(doseq [p (map #(* 20 (inc %)) (range 25))]
(run-for 1 p))
(run-for (* 60 60 6) 500))
关于networking - 如何在 Clojure (/Java) 中稳健地发出大量并发 HTTPS 请求,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35914186/
我是一名优秀的程序员,十分优秀!