- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
对于这篇便条的长度,我提前表示歉意。我花了相当多的时间来缩短它,这是我能得到的尽可能小的。
我有一个谜团,非常感谢您的帮助。这个谜团来自于我在 Clojure 中编写的 rxjava observer
的行为,它是通过从在线示例抄袭的几个简单的 observable
来实现的。
一个可观察对象同步向其观察者的 onNext 处理程序发送消息,而我所谓的有原则的观察者的行为符合预期。
另一个可观察对象通过 Clojure future
在另一个线程上异步执行相同的操作。完全相同的观察者不会捕获发布到其 onNext
的所有事件;它似乎只是在尾部丢失了随机数量的消息。
在等待 promise
d onCompleted
的等待到期和等待发送到 的所有事件的到期之间存在有意的竞争>代理
收集器。如果 promise
获胜,我预计 onCompleted
会看到 false
,并且 agent
中的队列可能很短。如果代理
获胜,我希望看到onCompleted
的true
以及来自代理
队列的所有消息。我不期望的一个结果是 onCompleted
的 true
以及来自 agent
的短队列。但是,墨菲不 sleep ,这正是我所看到的。我不知道垃圾收集是否有问题,或者 Clojure 的 STM 内部排队,或者我的愚蠢,或者完全是其他原因。
我在这里按照其独立形式的顺序呈现源代码,以便可以通过lein repl
直接运行它。有三个仪式需要解决:首先,leiningen 项目文件 project.clj
,它声明了对 Netflix rxjava 0.9.0
版本的依赖:
(defproject expt2 "0.1.0-SNAPSHOT"
:description "FIXME: write description"
:url "http://example.com/FIXME"
:license {:name "Eclipse Public License"
:url "http://www.eclipse.org/legal/epl-v10.html"}
:dependencies [[org.clojure/clojure "1.5.1"]
[com.netflix.rxjava/rxjava-clojure "0.9.0"]]
:main expt2.core)
现在,命名空间和 Clojure 要求以及 Java 导入:
(ns expt2.core
(:require clojure.pprint)
(:refer-clojure :exclude [distinct])
(:import [rx Observable subscriptions.Subscriptions]))
最后,一个用于输出到控制台的宏:
(defmacro pdump [x]
`(let [x# ~x]
(do (println "----------------")
(clojure.pprint/pprint '~x)
(println "~~>")
(clojure.pprint/pprint x#)
(println "----------------")
x#)))
最后,我的观察者。我使用代理
来收集任何可观察对象的onNext
发送的消息。我使用 atom
来收集潜在的 onError
。我为 onCompleted
使用了 promise
,以便观察者外部的消费者可以等待它。
(defn- subscribe-collectors [obl]
(let [;; Keep a sequence of all values sent:
onNextCollector (agent [])
;; Only need one value if the observable errors out:
onErrorCollector (atom nil)
;; Use a promise for 'completed' so we can wait for it on
;; another thread:
onCompletedCollector (promise)]
(letfn [;; When observable sends a value, relay it to our agent"
(collect-next [item] (send onNextCollector (fn [state] (conj state item))))
;; If observable errors out, just set our exception;
(collect-error [excp] (reset! onErrorCollector excp))
;; When observable completes, deliver on the promise:
(collect-completed [ ] (deliver onCompletedCollector true))
;; In all cases, report out the back end with this:
(report-collectors [ ]
(pdump
;; Wait for everything that has been sent to the agent
;; to drain (presumably internal message queues):
{:onNext (do (await-for 1000 onNextCollector)
;; Then produce the results:
@onNextCollector)
;; If we ever saw an error, here it is:
:onError @onErrorCollector
;; Wait at most 1 second for the promise to complete;
;; if it does not complete, then produce 'false'.
;; I expect if this times out before the agent
;; times out to see an 'onCompleted' of 'false'.
:onCompleted (deref onCompletedCollector 1000 false)
}))]
;; Recognize that the observable 'obl' may run on another thread:
(-> obl
(.subscribe collect-next collect-error collect-completed))
;; Therefore, produce results that wait, with timeouts, on both
;; the completion event and on the draining of the (presumed)
;; message queue to the agent.
(report-collectors))))
现在,这是一个同步可观察对象。它将 25 条消息注入(inject)其观察者的 onNext 喉咙,然后调用他们的 onCompleted 。
(defn- customObservableBlocking []
(Observable/create
(fn [observer] ; This is the 'subscribe' method.
;; Send 25 strings to the observer's onNext:
(doseq [x (range 25)]
(-> observer (.onNext (str "SynchedValue_" x))))
; After sending all values, complete the sequence:
(-> observer .onCompleted)
; return a NoOpSubsription since this blocks and thus
; can't be unsubscribed (disposed):
(Subscriptions/empty))))
我们将观察者订阅到这个可观察的:
;;; The value of the following is the list of all 25 events:
(-> (customObservableBlocking)
(subscribe-collectors))
它按预期工作,我们在控制台上看到以下结果
{:onNext (do (await-for 1000 onNextCollector) @onNextCollector),
:onError @onErrorCollector,
:onCompleted (deref onCompletedCollector 1000 false)}
~~>
{:onNext
["SynchedValue_0"
"SynchedValue_1"
"SynchedValue_2"
"SynchedValue_3"
"SynchedValue_4"
"SynchedValue_5"
"SynchedValue_6"
"SynchedValue_7"
"SynchedValue_8"
"SynchedValue_9"
"SynchedValue_10"
"SynchedValue_11"
"SynchedValue_12"
"SynchedValue_13"
"SynchedValue_14"
"SynchedValue_15"
"SynchedValue_16"
"SynchedValue_17"
"SynchedValue_18"
"SynchedValue_19"
"SynchedValue_20"
"SynchedValue_21"
"SynchedValue_22"
"SynchedValue_23"
"SynchedValue_24"],
:onError nil,
:onCompleted true}
----------------
这是一个异步可观察对象,它仅在 future
的线程上执行完全相同的操作:
(defn- customObservableNonBlocking []
(Observable/create
(fn [observer] ; This is the 'subscribe' method
(let [f (future
;; On another thread, send 25 strings:
(doseq [x (range 25)]
(-> observer (.onNext (str "AsynchValue_" x))))
; After sending all values, complete the sequence:
(-> observer .onCompleted))]
; Return a disposable (unsubscribe) that cancels the future:
(Subscriptions/create #(future-cancel f))))))
;;; For unknown reasons, the following does not produce all 25 events:
(-> (customObservableNonBlocking)
(subscribe-collectors))
但是,令人惊讶的是,我们在控制台上看到的是:true
代表 onCompleted
,这意味着 promise
没有超时;但只有一些异步消息。我们看到的实际消息数量因运行而异,这意味着存在一些并发现象。感谢线索。
----------------
{:onNext (do (await-for 1000 onNextCollector) @onNextCollector),
:onError @onErrorCollector,
:onCompleted (deref onCompletedCollector 1000 false)}
~~>
{:onNext
["AsynchValue_0"
"AsynchValue_1"
"AsynchValue_2"
"AsynchValue_3"
"AsynchValue_4"
"AsynchValue_5"
"AsynchValue_6"],
:onError nil,
:onCompleted true}
----------------
最佳答案
代理上的 await-for
意味着阻塞当前线程,直到所有操作因此分派(dispatch)远(从这个线程或代理)到代理已经发生,这意味着可能会发生在您的等待结束后,仍然有其他一些线程可以向代理发送消息,即你的情况发生了什么。在代理上的等待结束并且您在映射中的 :onNext
键中取消引用其值后,您将等待完成的 promise ,该 promise 在等待后结果为 true,但在平均值中一些其他消息被发送到代理以收集到向量中的时间。
您可以通过将 :onCompleted
键作为映射中的第一个键来解决此问题,这基本上意味着等待完成,然后等待代理,因为到那时就不再有 send
对代理的调用可以在 as 已经收到 onCompleted 之后发生。
{:onCompleted (deref onCompletedCollector 1000 false)
:onNext (do (await-for 0 onNextCollector)
@onNextCollector)
:onError @onErrorCollector
}
关于clojure - rxjava 和 clojure 异步之谜 : futures promises and agents, 哦天哪,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16868460/
到目前为止,我尝试的任何方法都未能让 Firefox 在某些文本下划线。 根据 Google 结果,这个问题已为人所知并已记录了五年多......这不可能是真的......到底发生了什么? 有人知道任
考虑一个示例,其中方法是纯虚拟的,采用模板化类型的参数(从外部类型注入(inject)),并且该模板化类型是本地类型(在函数体中定义)。这种情况会导致 g++ 下的编译时错误。不可否认,这是一个极端案
更新“Web 应用程序”(从开发服务器“重新发布”到实时服务器)同时保留用户数据(例如存储在文件系统中的图像、视频和音频)的最佳方法是什么) 在 VS 2010 构建/发布设置中? 此外,在这些更新期
本人第一次发帖,如有错误请多多包涵。另外,这是我第一次广泛使用 jquery,所以再一次...不要打败我。 好吧,情况如下... 我有一个 WP 模板,它的 jquery 相当多(不是我的),我不想尝
我试图用鼠标在 Canvas 上画一个圆圈,但我的数学是错误的,我不知道如何修复它。我希望当您单击并拖动以形成圆圈时,圆圈的顶部(或底部)和圆圈的侧面与光标的十字准线对齐。 我有一个 fiddle s
大家好, 我确信我陷入了回调噩梦,试图从传递 {query} 的 Mongoos.count 获取简单值。我可以在回调中获取该值并在控制台中看到它正常,但是尝试将其从异步回调设置中取出来却让我感到困惑
box model应该很简单,但我就是不明白。 这是一个无意义的表格 Box model test Box 1 Box 2 及其 CSS body
背景:我正在研究 framework它基于现有的 Java 类模型生成 C++ 代码。因此,我无法更改下面提到的循环依赖。 给定: 父子类关系 父级包含子级列表 用户必须能够在运行时查找列表元素类型
F#一出来,我就要在异步/并行编程领域发财了。一个 answer to this question在描述 Tasks、Parallel LINQ 和 Reactive Framework 之间的差异方
我正在尝试了解 iPhone 操作系统上的 OpenGL 对象模型。我当前正在屏幕上渲染到几个不同的 UIView(基于 CAEAGLayer 构建)。我目前将其中每一个都使用单独的 EAGLCont
关于 Google map 、AJAX 和一些后端数据的快速问题。 我将如何创建使用我存储在数据库中的信息“实时”更新的谷歌地图? 我认为它在我脑海中运作的方式是。 数据库从用户那里收集带有地理标记的
简单地说,我正在尝试制作一款全屏游戏。 我尝试使用以下代码: GraphicsEnvironment ge = GraphicsEnvironment.getLocalGraphicsEnvironm
好的.....我已经完成了所有相关问题的阅读和一些 MSDN 文章,以及大约一天的谷歌搜索。 这个问题的当前“最先进”答案是什么: 我正在使用 VS 2008,C++ 非托管代码。我有一个包含很多 D
在开发内容管理系统时,我遇到了一些困难。回到我的数据模型,我注意到一些问题可能会随着时间的推移变得更加普遍。 即,我想维护用户记录修改的审计跟踪(更改日志)(甚至会记录用户记录修改)。由于包含任意数量
假设我有一个很大的对象列表(数千或数万),每个对象都带有一些标签。有数十或数百个可能的标签,它们的使用遵循典型的幂律:有些标签使用得非常频繁,但大多数很少见。事实上,除了最常见的几十个标签之外,所有标
和我一起回到三年前吧。我记得构建的 Web 控件通过 AJAX 动态插入到页面的 HTML 中,然后就地呈现。我们使用了 Prototype JavaScript 库和 XMLHTTP Request
这里很沮丧。我不是数据库管理员,但可以绕过。我正在针对 Progress OpenEdge 数据库编写一些 ODBC 查询,我们只能查看该数据库。很长一段时间以来都没有出现任何问题,直到最近他们更改了
我认为我面临着一种独特的情况,主要是因为我找不到任何可以帮助我解决这个问题的方法。我正在尝试在蒙版元素内添加视差效果。在元素可以......“视差?”之前......它必须滑入视野,在蒙面容器内。 在
您在 Rails 应用程序的 Selenium 测试中使用哪些数据?您是否从固定装置加载?使用现有的开发数据库?使用单独的(非 fixture )数据库? 我正在考虑我的选择。我有一个带有大型 Sel
我有一个 Ruby on Rails 项目(使用 git 进行版本控制),其中包含许多存在于各种公共(public) GitHub 存储库中的外部 JavaScript 依赖项。将这些依赖项包含在我的
我是一名优秀的程序员,十分优秀!