作者热门文章
- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
在尝试OSGi PushStream库时,我觉得它确实很慢。我创建了两个方法,它们使用PushStream来做相同的事情,另一个方法是简单的BlockingQueue(请参见下面的代码),结果如下:
Queue needs 3 milliseconds to process 1000 events.
PushStream needs 31331 milliseconds to process 1000 events.
public class TestPush{
@Test
public void testPushStream() throws Exception {
final PromiseFactory pf = new PromiseFactory(PromiseFactory.inlineExecutor());
final PushStreamProvider psp = new PushStreamProvider();
final SimplePushEventSource<Integer> source =
psp.buildSimpleEventSource(Integer.class).withQueuePolicy(QueuePolicyOption.BLOCK).build();
final Deferred<Instant> startD = pf.deferred();
final Deferred<Instant> endD = pf.deferred();
psp.createStream(source).onClose(() -> endD.resolve( Instant.now()) ).forEach((i) -> {
if (i == 0) {
startD.resolve( Instant.now() );
}
});
final Promise<Long> nbEvent = psp.createStream(source).count();
for (int i = 0; i < 1000; i++) {
source.publish(i);
}
source.endOfStream();
System.out.println("PushStream needs "
+ Duration.between( startD.getPromise().getValue(), endD.getPromise().getValue() ).toMillis()
+ " milliseconds to process " + nbEvent.getValue() + " events.");
}
@Test
public void testBlockingQueue() throws Exception {
final PromiseFactory pf = new PromiseFactory(PromiseFactory.inlineExecutor());
final Executor e = Executors.newFixedThreadPool(1);
final ArrayBlockingQueue<Integer> abq = new ArrayBlockingQueue<>(32);
final Deferred<Instant> startD = pf.deferred();
final Deferred<Instant> endD = pf.deferred();
final Deferred<Integer> nbEvent = pf.deferred();
e.execute( () -> {
try {
Integer i = 0;
Integer last = 0;
do {
i = abq.take();
if (i == 0) {
startD.resolve(Instant.now());
} else if (i != -1) {
last = i;
}
}
while (i != -1);
endD.resolve(Instant.now());
nbEvent.resolve(last + 1);
}
catch (final InterruptedException exception) {
exception.printStackTrace();
}
});
for (int i = 0; i < 1000; i++) {
abq.put(i);
}
abq.put(-1);
System.out.println("Queue needs "
+ Duration.between( startD.getPromise().getValue(), endD.getPromise().getValue() ).toMillis()
+ " milliseconds to process " + nbEvent.getPromise().getValue() + " events.");
}
}
最佳答案
这是一个有趣的问题:)
Why the PushStream is slower? What I am doing wrong?
psp.createStream(source)
创建流时,将使用32个元素的缓冲区和基于缓冲区大小的线性反压策略来设置该流,满时返回一秒钟,如果其中一项返回31毫秒。值得注意的是,每个元素31毫秒总计30秒!
@Test
public void testPushStream2() throws Exception {
final PromiseFactory pf = new PromiseFactory(PromiseFactory.inlineExecutor());
final PushStreamProvider psp = new PushStreamProvider();
final SimplePushEventSource<Integer> source =
psp.buildSimpleEventSource(Integer.class)
.withQueuePolicy(QueuePolicyOption.BLOCK)
.build();
final Deferred<Instant> startD = pf.deferred();
final Deferred<Instant> endD = pf.deferred();
psp.buildStream(source).unbuffered().build().onClose(() -> endD.resolve( Instant.now()) ).forEach((i) -> {
if (i == 0) {
startD.resolve( Instant.now() );
}
});
final Promise<Long> nbEvent = psp.buildStream(source).unbuffered().build().count();
for (int i = 0; i < 1000; i++) {
source.publish(i);
}
source.endOfStream();
System.out.println("PushStream needs "
+ Duration.between( startD.getPromise().getValue(), endD.getPromise().getValue() ).toMillis()
+ " milliseconds to process " + nbEvent.getValue() + " events.");
}
PushStream needs 39 milliseconds to process 1000 events.
onClose()
处理程序。这为推送流管道增加了一个额外的阶段。实际上,您可以将onClose移动为promise的结果,从而减少了管道的长度(您只需要运行一次)。
@Test
public void testPushStream3() throws Exception {
final PromiseFactory pf = new PromiseFactory(PromiseFactory.inlineExecutor());
final PushStreamProvider psp = new PushStreamProvider();
final SimplePushEventSource<Integer> source =
psp.buildSimpleEventSource(Integer.class)
.withQueuePolicy(QueuePolicyOption.BLOCK)
.build();
final Deferred<Instant> startD = pf.deferred();
final Deferred<Instant> endD = pf.deferred();
psp.buildStream(source).unbuffered().build().forEach((i) -> {
if (i == 0) {
startD.resolve( Instant.now() );
}
});
final Promise<Long> nbEvent = psp.buildStream(source).unbuffered().build().count()
.onResolve(() -> endD.resolve( Instant.now()));
for (int i = 0; i < 1000; i++) {
source.publish(i);
}
source.endOfStream();
System.out.println("PushStream needs "
+ Duration.between( startD.getPromise().getValue(), endD.getPromise().getValue() ).toMillis()
+ " milliseconds to process " + nbEvent.getValue() + " events.");
}
PushStream needs 21 milliseconds to process 1000 events.
@Test
public void testPushStream4() throws Exception {
final PromiseFactory pf = new PromiseFactory(PromiseFactory.inlineExecutor());
final PushStreamProvider psp = new PushStreamProvider();
final SimplePushEventSource<Integer> source =
psp.buildSimpleEventSource(Integer.class)
.withQueuePolicy(QueuePolicyOption.BLOCK)
.build();
final Deferred<Instant> startD = pf.deferred();
final Deferred<Instant> endD = pf.deferred();
final Promise<Long> nbEvent = psp.buildStream(source).unbuffered().build()
.filter(i -> {
if (i == 0) {
startD.resolve( Instant.now() );
}
return true;
})
.count()
.onResolve(() -> endD.resolve( Instant.now()));
for (int i = 0; i < 1000; i++) {
source.publish(i);
}
source.endOfStream();
System.out.println("PushStream needs "
+ Duration.between( startD.getPromise().getValue(), endD.getPromise().getValue() ).toMillis()
+ " milliseconds to process " + nbEvent.getValue() + " events.");
}
PushStream needs 3 milliseconds to process 1000 events.
关于asynchronous - OSGi PushStream慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53692861/
我通读了我能找到的所有文档、论坛和示例,但找不到关于 pushstream module 如何实现的描述。在以下情况下表现: 我正在使用 nginx+pushstream 在 session 队列中为
我正在尝试实现每 500 毫秒发送大量数据(对象,而不是文件)的服务器。 经过一番阅读,服务器通过 http2 发送的事件似乎是最快的选择(由于 http2 是二进制协议(protocol),SSE
我是一名优秀的程序员,十分优秀!