- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
申请要求:
someObservable.
...incantations, other observables...
.subscribe ((EventA a) -> raiseAlertForMissingB (a));
A stream A1----A2----------A3------------------A4--------------A5---------
B stream ------------B1------------B3---------------B4------------B5-----
(A2 TIMEOUT)
merged ------------A1B1----------A3B3------A2??---A4B4----------A5B5---
A stream A6----A7----------A8----------------------
B stream ------------B7------------B6-------B8-----
merged ------------A7B7----------A6B6-----A8B8---
public class Test06Enigmativity {
private static final long A_PERIOD = 400;
private static final long B_PERIOD = 500;
private static final int TIMEOUT = 4_000;
private static final int[] bOrder = {
0, 1, 2,
4,
3, // out of order
6,
5, // out of order
7, 8,
10, 11, 12, 13, 14,
9, // out of order
15
};
private final long startTime = System.currentTimeMillis ();
public static void main (final String[] args) {
Test06Enigmativity app = new Test06Enigmativity ();
app.runEnigmativity ();
}
private void runEnigmativity () {
Observable<Long> aStream =
Observable.interval (A_PERIOD, TimeUnit.MILLISECONDS)
.doOnNext (seq -> {
output (" A%s", seq);
}).take (bOrder.length);
Observable<Long> bStream =
Observable.interval (B_PERIOD, TimeUnit.MILLISECONDS)
.map (seq -> {
long aId = (long) bOrder[seq.intValue ()];
output (" B%s", aId);
return aId;
})
.take (bOrder.length);
monitorEnigmativity (aStream, bStream, TIMEOUT)
.subscribe (this::output);
try {
Thread.sleep (60_000);
} catch (InterruptedException e) {
}
}
private Observable<String> monitorEnigmativity (Observable<Long> aStream, Observable<Long> bStream, int thresholdMsec) {
return Observable.create (subscriber ->
bStream.publish (pb ->
Observable.merge (
aStream.map (ax ->
pb
.filter (pbx -> pbx.equals (ax))
.take (1)
.timeout (thresholdMsec, TimeUnit.MILLISECONDS, Observable.defer (
() -> {
output (" timeout on B%s", ax);
return Observable.just (-1L);
}
)).map (pbx -> String.format ("%s,%s", ax, pbx))
)
)
).subscribe (subscriber::onNext)
);
}
private void output (String format, Object... args) {
System.out.printf ("tid:%3d %4dms %s%n", Thread.currentThread ().getId (), System.currentTimeMillis () - startTime,
String.format (format, args));
}
}
private static final long A_PERIOD = 400;
private static final long B_PERIOD = 500;
private static final int TIMEOUT = 4_000;
tid: 12 511ms A0
tid: 15 626ms B0
tid: 15 626ms 0,0
tid: 12 907ms A1
tid: 15 1126ms B1
tid: 15 1126ms 1,1
tid: 12 1307ms A2
tid: 15 1626ms B2
tid: 15 1626ms 2,2
tid: 12 1707ms A3
tid: 12 2107ms A4
tid: 15 2126ms B4
tid: 15 2126ms 4,4
tid: 12 2507ms A5
tid: 15 2625ms B3
tid: 15 2625ms 3,3
tid: 12 2907ms A6
tid: 15 3126ms B6
tid: 15 3126ms 6,6
tid: 12 3307ms A7
tid: 15 3626ms B5
tid: 15 3626ms 5,5
tid: 12 3707ms A8
tid: 12 4107ms A9
tid: 15 4126ms B7
tid: 15 4126ms 7,7
tid: 12 4507ms A10
tid: 15 4626ms B8
tid: 15 4626ms 8,8
tid: 12 4908ms A11
tid: 15 5127ms B10
tid: 15 5128ms 10,10
tid: 12 5307ms A12
tid: 15 5626ms B11
tid: 15 5626ms 11,11
tid: 12 5707ms A13
tid: 12 6107ms A14
tid: 15 6126ms B12
tid: 15 6126ms 12,12
tid: 12 6507ms A15
tid: 15 6626ms B13
tid: 15 6626ms 13,13
tid: 13 7109ms timeout on B9
tid: 13 7114ms 9,-1
tid: 15 7126ms B14
tid: 15 7126ms 14,14
tid: 15 7625ms B9
tid: 15 8126ms B15
tid: 15 8126ms 15,15
private static final long A_PERIOD = 400;
private static final long B_PERIOD = 410;
private static final int TIMEOUT = 2_000;
tid: 12 509ms A0
tid: 15 538ms B0
tid: 12 905ms A1
tid: 15 948ms B1
tid: 15 948ms 1,1
tid: 12 1305ms A2
tid: 15 1358ms B2
tid: 15 1358ms 2,2
tid: 12 1706ms A3
tid: 15 1768ms B4
tid: 12 2105ms A4
tid: 15 2178ms B3
tid: 15 2178ms 3,3
tid: 12 2505ms A5
tid: 16 2538ms timeout on B0
tid: 16 2544ms 0,-1
tid: 15 2588ms B6
tid: 12 2905ms A6
tid: 15 2998ms B5
tid: 15 2998ms 5,5
tid: 12 3305ms A7
tid: 15 3408ms B7
tid: 15 3408ms 7,7
tid: 12 3705ms A8
tid: 15 3817ms B8
tid: 15 3817ms 8,8
tid: 12 4105ms A9
tid: 14 4106ms timeout on B4
tid: 14 4106ms 4,-1
tid: 15 4228ms B10
tid: 12 4505ms A10
tid: 15 4638ms B11
tid: 12 4905ms A11
tid: 16 4906ms timeout on B6
tid: 16 4906ms 6,-1
tid: 15 5048ms B12
tid: 12 5305ms A12
tid: 15 5457ms B13
tid: 12 5705ms A13
tid: 15 5868ms B14
tid: 12 6106ms A14
tid: 13 6107ms timeout on B9
tid: 13 6107ms 9,-1
tid: 15 6279ms B9
tid: 14 6510ms timeout on B10
tid: 12 6510ms A15
tid: 14 6510ms 10,-1
tid: 15 6688ms B15
tid: 15 6688ms 15,15
package test;
import rx.Observable;
import java.util.concurrent.TimeUnit;
public class Test06Supertopi {
private static final long A_PERIOD = 400;
private static final long B_PERIOD = 500;
private static final int TIMEOUT = 3_000;
private static final int[] bOrder = {
0, 1, 2,
4,
3, // out of order
6,
5, // out of order
7, 8,
10, 11, 12, 13, 14,
9, // out of order
15
};
private final long startTime = System.currentTimeMillis ();
public static void main (final String[] args) {
Test06Supertopi app = new Test06Supertopi ();
app.runSupertopi ();
}
private void runSupertopi () {
Observable<Long> aStream =
Observable.interval (A_PERIOD, TimeUnit.MILLISECONDS)
.doOnNext (seq -> {
output (" A%s", seq);
}).take (bOrder.length);
Observable<Long> bStream =
Observable.interval (B_PERIOD, TimeUnit.MILLISECONDS)
.map (seq -> {
long aId = (long) bOrder[seq.intValue ()];
output (" B%s", aId);
return aId;
})
.take (bOrder.length);
monitorSupertopi (aStream, bStream, TIMEOUT)
.subscribe (this::output);
try {
Thread.sleep (60_000);
} catch (InterruptedException e) {
}
}
private Observable<String> monitorSupertopi (Observable<Long> aStream, Observable<Long> bStream, int thresholdMsec) {
return Observable.create (subscriber -> {
Observable<Long> a = aStream.publish ().refCount ();
Observable<Long> b = bStream.publish ().refCount ();
a.subscribe ((Long aId) -> {
Observable.merge (
Observable.timer (thresholdMsec, TimeUnit.MILLISECONDS)
.doOnNext (x -> {
output (" timeout on B%s", aId);
})
.map (x -> String.format ("%s,%s", aId, -1L)),
b.filter ((Long j) -> j.equals (aId))
.map ((Long pbx) -> String.format ("%s,%s", aId, pbx))
).take (1)
.subscribe (subscriber::onNext);
});
});
}
private void output (String format, Object... args) {
System.out.printf ("tid:%3d %4dms %s%n", Thread.currentThread ().getId (), System.currentTimeMillis () - startTime,
String.format (format, args));
}
}
private static final long A_PERIOD = 400;
private static final long B_PERIOD = 500;
private static final int TIMEOUT = 4_000;
tid: 14 522ms A0
tid: 14 922ms A1
tid: 16 1054ms B0
tid: 16 1055ms 0,0
tid: 14 1322ms A2
tid: 16 1555ms B1
tid: 16 1555ms 1,1
tid: 14 1721ms A3
tid: 16 2055ms B2
tid: 16 2055ms 2,2
tid: 14 2122ms A4
tid: 14 2522ms A5
tid: 16 2555ms B4
tid: 16 2555ms 4,4
tid: 14 2922ms A6
tid: 16 3055ms B3
tid: 16 3055ms 3,3
tid: 14 3322ms A7
tid: 16 3555ms B6
tid: 16 3555ms 6,6
tid: 14 3722ms A8
tid: 16 4055ms B5
tid: 16 4055ms 5,5
tid: 14 4122ms A9
tid: 14 4522ms A10
tid: 16 4555ms B7
tid: 16 4555ms 7,7
tid: 14 4922ms A11
tid: 16 5055ms B8
tid: 16 5055ms 8,8
tid: 14 5322ms A12
tid: 16 5555ms B10
tid: 16 5556ms 10,10
tid: 14 5723ms A13
tid: 16 6056ms B11
tid: 16 6057ms 11,11
tid: 14 6122ms A14
tid: 14 6522ms A15
tid: 16 6555ms B12
tid: 16 6555ms 12,12
tid: 16 7055ms B13
tid: 16 7055ms 13,13
tid: 13 7125ms timeout on B9
tid: 13 7125ms 9,-1
tid: 16 7555ms B14
tid: 16 7555ms 14,14
tid: 16 8055ms B9
tid: 16 8555ms B15
tid: 16 8555ms 15,15
private static final long A_PERIOD = 400;
private static final long B_PERIOD = 410;
private static final int TIMEOUT = 2_000;
tid: 14 539ms A0
tid: 14 939ms A1
tid: 16 983ms B0
tid: 16 983ms 0,0
tid: 14 1339ms A2
tid: 16 1393ms B1
tid: 16 1393ms 1,1
tid: 14 1739ms A3
tid: 16 1803ms B2
tid: 16 1803ms 2,2
tid: 14 2139ms A4
tid: 16 2213ms B4
tid: 16 2213ms 4,4
tid: 14 2539ms A5
tid: 16 2623ms B3
tid: 16 2623ms 3,3
tid: 14 2939ms A6
tid: 16 3032ms B6
tid: 16 3032ms 6,6
tid: 14 3339ms A7
tid: 16 3443ms B5
tid: 16 3443ms 5,5
tid: 14 3739ms A8
tid: 16 3852ms B7
tid: 16 3852ms 7,7
tid: 14 4139ms A9
tid: 16 4263ms B8
tid: 16 4263ms 8,8
tid: 14 4539ms A10
tid: 16 4672ms B10
tid: 16 4672ms 10,10
tid: 14 4939ms A11
tid: 16 5083ms B11
tid: 16 5083ms 11,11
tid: 14 5339ms A12
tid: 16 5493ms B12
tid: 16 5493ms 12,12
tid: 14 5739ms A13
tid: 16 5903ms B13
tid: 16 5903ms 13,13
tid: 14 6139ms A14
tid: 13 6140ms timeout on B9
tid: 13 6140ms 9,-1
tid: 16 6313ms B14
tid: 16 6313ms 14,14
tid: 14 6539ms A15
tid: 14 6950ms B0
tid: 14 7360ms B1
tid: 14 7770ms B2
tid: 14 8180ms B4
tid: 13 8540ms timeout on B15
tid: 13 8540ms 15,-1
package test;
import rx.Observable;
import rx.subjects.PublishSubject;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class Test06LeeCampbell {
private static final int TIMEOUT = 500;
class ScheduledEvent {
final String type;
final long aId;
final long atMsec;
volatile boolean expired;
public ScheduledEvent (long atMsec, String type, long aId) {
this.atMsec = atMsec;
this.type = type;
this.aId = aId;
}
}
ScheduledEvent[] scheduledEvents = {
new ScheduledEvent (10, "A", 0),
new ScheduledEvent (90, "B", 0),
new ScheduledEvent (110, "A", 1),
new ScheduledEvent (140, "B", 1),
new ScheduledEvent (190, "A", 2),
new ScheduledEvent (270, "B", 2),
new ScheduledEvent (310, "A", 3),
new ScheduledEvent (410, "A", 4),
new ScheduledEvent (440, "B", 4),
new ScheduledEvent (480, "B", 3),
new ScheduledEvent (510, "A", 5),
new ScheduledEvent (610, "A", 6),
//new ScheduledEvent (670, "B", 6),
new ScheduledEvent (710, "A", 7),
new ScheduledEvent (810, "A", 8),
new ScheduledEvent (860, "B", 7),
new ScheduledEvent (880, "B", 8),
new ScheduledEvent (910, "A", 9),
new ScheduledEvent (1100, "A", 10),
new ScheduledEvent (1110, "A", 11),
new ScheduledEvent (1120, "A", 12),
new ScheduledEvent (1130, "A", 13),
new ScheduledEvent (1140, "A", 14),
//new ScheduledEvent (1200, "B", 10),
//new ScheduledEvent (1210, "B", 11),
//new ScheduledEvent (1220, "B", 12),
new ScheduledEvent (1230, "B", 13),
new ScheduledEvent (1240, "B", 14),
new ScheduledEvent (1390, "B", 9),
new ScheduledEvent (1450, "A", 15),
new ScheduledEvent (3290, "B", 5),
new ScheduledEvent (3350, "B", 15)
};
private final long startTime = System.currentTimeMillis ();
public static void main (final String[] args) {
Test06LeeCampbell app = new Test06LeeCampbell ();
app.runLeeCampbell ();
}
private void runLeeCampbell () {
Observable<Long> aStream =
getCrudeSequencer ("A")
.doOnNext (seq -> {
output (" A%s", seq);
});
Observable<Long> bStreamCold =
getCrudeSequencer ("B")
.doOnNext (seq -> {
output (" B%s", seq);
});
PublishSubject<Long> bStream = PublishSubject.create ();
bStreamCold.subscribe (bStream);
monitorLeeCampbell (aStream, bStream, TIMEOUT)
.subscribe (this::output);
pause (10_000);
}
private Observable<String> monitorLeeCampbell (Observable<Long> aStream, Observable<Long> bStream, int thresholdMsec) {
return aStream.flatMap (a ->
bStream.filter (b -> b.equals (a))
.map (b -> String.format ("%s,%s", a, b))
.take (1)
.timeout (thresholdMsec, TimeUnit.MILLISECONDS)
.onErrorResumeNext (
throwable -> {
output (" timeout on B%s", a);
if (!(throwable instanceof TimeoutException)) {
throw new RuntimeException (throwable);
}
return Observable.just (String.format ("%s,%s", a, -1L));
}
)
);
}
private void output (String format, Object... args) {
System.out.printf ("tid:%3d %4dms %s%n", Thread.currentThread ().getId (), getElapsedMsec (),
String.format (format, args));
}
private long getElapsedMsec () {
return System.currentTimeMillis () - startTime;
}
private Observable<Long> getCrudeSequencer (String name) {
return Observable.create (subscriber ->
new Thread (() -> {
for (ScheduledEvent se : scheduledEvents) {
if (se.type.equals (name)) {
while (getElapsedMsec () < se.atMsec) {
pause (1);
}
subscriber.onNext (Long.valueOf (se.aId));
se.expired = true;
} else {
// Timing is not reliable for sequencing two threads
while (!se.expired) {
pause (1);
}
}
}
subscriber.onCompleted ();
}).start ()
);
}
private static void pause (final int millis) {
try {
Thread.sleep (millis);
} catch (InterruptedException e) {
}
}
}
tid: 12 90ms A0
tid: 11 157ms B0
tid: 11 157ms 0,0
tid: 12 159ms A1
tid: 11 160ms B1
tid: 11 160ms 1,1
tid: 12 190ms A2
tid: 11 270ms B2
tid: 11 270ms 2,2
tid: 12 310ms A3
tid: 12 410ms A4
tid: 11 440ms B4
tid: 11 440ms 4,4
tid: 11 480ms B3
tid: 11 480ms 3,3
tid: 12 510ms A5
tid: 12 610ms A6
tid: 12 710ms A7
tid: 12 810ms A8
tid: 11 860ms B7
tid: 11 860ms 7,7
tid: 11 880ms B8
tid: 11 880ms 8,8
tid: 12 910ms A9
tid: 15 1011ms timeout on B5
tid: 15 1012ms 5,-1
tid: 12 1100ms A10
tid: 12 1110ms A11
tid: 16 1111ms timeout on B6
tid: 16 1111ms 6,-1
tid: 12 1120ms A12
tid: 12 1130ms A13
tid: 12 1140ms A14
tid: 11 1230ms B13
tid: 11 1230ms 13,13
tid: 11 1240ms B14
tid: 11 1240ms 14,14
tid: 11 1390ms B9
tid: 11 1390ms 9,9
tid: 12 1450ms A15
tid: 14 1601ms timeout on B10
tid: 14 1601ms 10,-1
tid: 15 1611ms timeout on B11
tid: 15 1611ms 11,-1
tid: 16 1621ms timeout on B12
tid: 16 1621ms 12,-1
tid: 19 1951ms timeout on B15
tid: 19 1951ms 15,-1
tid: 11 3290ms B5
tid: 11 3350ms B15
最佳答案
假设:
AStream.SelectMany(a =>
BStream.Where(b => b == a)
.Select(b => new MatchMade(a, b))
.Take(1)
.Timeout(matchTimeout)
.Catch<TimeoutException>(ex=>Observable.Return(new NoMatchMade(a)))
)
关于system.reactive - 简单命令式任务的响应式(Reactive)方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35529284/
我刚刚更新了 Ruby,现在我在尝试启动 compass 时遇到以下错误: Encoding::CompatibilityError on line ["28"] of /usr/local/Cell
关闭。这个问题不符合Stack Overflow guidelines .它目前不接受答案。 我们不允许提问寻求书籍、工具、软件库等的推荐。您可以编辑问题,以便用事实和引用来回答。 关闭 6 年前。
关闭。这个问题需要debugging details .它目前不接受答案。 编辑问题以包含 desired behavior, a specific problem or error, and th
按照目前的情况,这个问题不适合我们的问答形式。我们希望答案得到事实、引用或专业知识的支持,但这个问题可能会引发辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visit the
我正在尝试在我的 iOS 应用程序中开发可折叠/ Accordion 式的功能。这将是您可以在网站上找到的典型 FAQ 类型功能。我想点击标题,然后显示详细信息。 因为这是帮助部分,只有几个项目,我认
我正在尝试设计一个基于 REST 的 Web 服务来与我正在开发的农场动物管理系统进行交互。 为了详细说明问题,我收藏了动物 属于一个农场。每只动物都有自己的信息——例如姓名、身份证号、品种年龄等。因
我有 3 种不同的表单,其中复选框数量不同,每个部分基本上代表一个表单,因此当用户选择该部分中的复选框时,它会显示他们在该部分的总金额中 checkout 了多少 HTML
我有一份 32 页的 PDF 版家谱。与其将家谱全部放在一个非常大的 PDF 页面上(这是我想要的),不如将其格式化为一组 8 个单独的美国信纸大小的页面应该在整个宽度上缝合; 4 行这样就完成了树。
指SASS implementation for Java? : 在 Maven 目标编译包中自动编译 compass-style.org 样式表的最佳方法是什么? 我不想发送太多的自编译库,也不想通
鉴于以下 XAML... 我正在寻找一种绑定(bind) ComboBox、Button 和 Command 的方法,以便当 ComboBox 的值更改时,在 Command 上调用 CanExe
在玩具应用程序中,我有一个显示所有帖子标题的“帖子”模板。当您单击每个标题时,我不想直接进入“显示” View ,而是直接内联展开该帖子的其余内容。 我考虑过让 postRoute 重用 postsR
我需要一些使用 Twitter Bootstrap 或其他响应式框架的自定义 Swagger-UI 实现。需要在我的移动设备上使用这样的 UI 测试我的 API,但 swagger-ui 不能很好地扩
我正在做一个项目,我真的在尝试编写面向对象的 JavaScript 代码。我刚刚开始阅读Douglas Crockford's JavaScript: The Good Parts我很快开始意识到用
在 C# 中,我通过执行以下操作来加密文本数据(请注意我正在以 block ( block )的形式加密数据): public string EncryptData(string pu
我正在构建一个社交网站,该网站将向全世界公开 REST API (WCF WebAPI),以便任何开发人员都能够为该网站创建客户端应用程序、将其与其他服务集成等。 我想为 API 实现 Faceboo
我是一名优秀的程序员,十分优秀!