gpt4 book ai didi

java - 重新启动追加器时,历史记录队列尾部停止

转载 作者:行者123 更新时间:2023-11-30 05:53:04 26 4
gpt4 key购买 nike

下面的代码中,重新启动tailer进程就可以了。但是,重新启动追加器进程会导致尾部无法接收更多消息。有没有办法重新启动附加程序并保持 channel 打开?

编辑:下面是一个完整的类,我用它来一致地重现问题。环境:乌类图18Chronicle-queue-5.16.9.jar

1) java com.tradeplacer.util.IpcTest 生产者

2) java com.tradeplacer.util.IpcTest 消费者

3)杀死生产者

4)重启生产者

5)注意到消费者不再阅读任何内容

package com.tradeplacer.util;

import java.nio.ByteBuffer;
import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ChronicleQueueBuilder;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycles;

public class IpcTest {
private static final String DIR = "chronicle-test";

public static final void startProducer() {
new Thread() {
public void run() {
System.out.println("starting producer...");
ChronicleQueue queue = ChronicleQueueBuilder.single(DIR).blockSize(65536).rollCycle(RollCycles.MINUTELY).build();
ExcerptAppender appender = queue.acquireAppender();
ByteBuffer ipcBuffer = ByteBuffer.allocate(8192);

for (int i = 0; i < Integer.MAX_VALUE; i++) {
ipcBuffer.clear();
ipcBuffer.put(("data" + i).getBytes());
Bytes<ByteBuffer> bbb = Bytes.wrapForWrite(ipcBuffer);
appender.writeBytes(bbb);
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();
}

public static final void startConsumer() {
new Thread() {
public void run() {
System.out.println("starting consumer...");
ChronicleQueue queue = ChronicleQueueBuilder.single(DIR).blockSize(65536).rollCycle(RollCycles.MINUTELY).build();
ExcerptTailer tailer = queue.createTailer().toEnd(); // skip to end, don't read old messages
Bytes bytes = Bytes.allocateDirect(8192);

while (true) {
try {
long ipcIndex = tailer.index();
boolean read = tailer.readBytes(bytes);
int len = bytes.length();
byte[] data = new byte[len];
bytes.read(data);
if (read) {
System.out.println("read " + data);
}
} catch (Exception e) {
e.printStackTrace();
}
}

}
}.start();
}

public static void main(final String[] args) {
if ("producer".equals(args[0]))
startProducer();
else
startConsumer();
}
}

最佳答案

我稍微修改了代码以减少对象创建。在最新版本5.17.1上,我可以多次重启生产者并且消费者不断读取数据。

注意:如果您要写入文本,writeText 方法可能是更好的选择。

如果您想编写更复杂的内容,我建议使用 Wire 或每个 MethodReader/MethodWriter,它们允许您进行接口(interface)方法调用。

package net.openhft.chronicle.queue;

import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.threads.Pauser;

import java.nio.ByteBuffer;

public class IpcTest {
private static final String DIR = "chronicle-test";

public static final void startProducer() {
System.out.println("starting producer...");
ChronicleQueue queue = SingleChronicleQueueBuilder.single(DIR).blockSize(65536).rollCycle(RollCycles.MINUTELY).build();
ExcerptAppender appender = queue.acquireAppender();
Bytes<ByteBuffer> bytes = Bytes.elasticByteBuffer(8192);
ByteBuffer ipcBuffer = bytes.underlyingObject();

for (int i = 0; i < Integer.MAX_VALUE; i++) {
ipcBuffer.clear();
ipcBuffer.put(("data" + i).getBytes());
bytes.readPositionRemaining(0, ipcBuffer.position());
appender.writeBytes(bytes);

Jvm.pause(1);
}
}

public static final void startConsumer() {
System.out.println("starting consumer...");
ChronicleQueue queue = SingleChronicleQueueBuilder.single(DIR).blockSize(65536).rollCycle(RollCycles.MINUTELY).build();
ExcerptTailer tailer = queue.createTailer().toEnd(); // skip to end, don't read old messages
Bytes<ByteBuffer> bytes = Bytes.elasticHeapByteBuffer(8192);
Pauser pauser = Pauser.balanced();
while (true) {
try {
long ipcIndex = tailer.index();
bytes.clear();
boolean read = tailer.readBytes(bytes);
if (read) {
byte[] data = bytes.underlyingObject().array();
int len = (int) bytes.readRemaining();
System.out.println("read " + new String(data, 0, 0, len));
pauser.reset();
} else {
pauser.pause();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

public static void main(final String[] args) {
if ("producer".equals(args[0]))
startProducer();
else
startConsumer();
}
}

使用MethodReader/MethodWriter

public class IpcTest {

interface Hello {
void hello(String text);
}

private static final String DIR = "chronicle-test";

public static final void startProducer() {
System.out.println("starting producer...");
ChronicleQueue queue = SingleChronicleQueueBuilder.single(DIR).blockSize(65536).rollCycle(RollCycles.MINUTELY).build();
Hello hello = queue.methodWriter(Hello.class);

for (int i = 0; i < Integer.MAX_VALUE; i++) {
hello.hello("data" + i);
Jvm.pause(1);
}
}

public static final void startConsumer() {
System.out.println("starting consumer...");
ChronicleQueue queue = SingleChronicleQueueBuilder.single(DIR).blockSize(65536).rollCycle(RollCycles.MINUTELY).build();
Hello hello = text -> System.out.println("read " + text);
MethodReader reader = queue.createTailer().methodReader(hello);
Pauser pauser = Pauser.balanced();
while (true) {
if (reader.readOne()) {
pauser.reset();
} else {
pauser.pause();
}
}
}

public static void main(final String[] args) {
if ("producer".equals(args[0]))
startProducer();
else
startConsumer();
}
}

您可以将 DTO 与 AbstractMarshallable 一起使用,以提高序列化和反序列化的效率。

package net.openhft.chronicle.queue;

import net.openhft.chronicle.bytes.MethodReader;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.pool.ClassAliasPool;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.threads.Pauser;
import net.openhft.chronicle.wire.AbstractMarshallable;

public class IpcTest {

static class Hi extends AbstractMarshallable {
String text;
int value;
}

interface Hello {
void hi(Hi hi);
}

private static final String DIR = "chronicle-test";

public static final void startProducer() {
System.out.println("starting producer...");
ChronicleQueue queue = SingleChronicleQueueBuilder.single(DIR).blockSize(65536).rollCycle(RollCycles.MINUTELY).build();
Hello hello = queue.methodWriter(Hello.class);
Hi hi = new Hi();
for (int i = 0; i < Integer.MAX_VALUE; i++) {
hi.text = "data";
hi.value = i;
hello.hi(hi);
Jvm.pause(1);
}
}

public static final void startConsumer() {
System.out.println("starting consumer...");
ChronicleQueue queue = SingleChronicleQueueBuilder.single(DIR).blockSize(65536).rollCycle(RollCycles.MINUTELY).build();
Hello hello = text -> System.out.println("read " + text);
MethodReader reader = queue.createTailer().methodReader(hello);
Pauser pauser = Pauser.balanced();
while (true) {
if (reader.readOne()) {
pauser.reset();
} else {
pauser.pause();
}
}
}

public static void main(final String[] args) {
ClassAliasPool.CLASS_ALIASES.addAlias(Hi.class);
if ("producer".equals(args[0]))
startProducer();
else
startConsumer();
}
}

在这种情况下,消费者打印

....
read !Hi {
text: data,
value: 3862
}

read !Hi {
text: data,
value: 3863
}

read !Hi {
text: data,
value: 3864
}
....

关于java - 重新启动追加器时,历史记录队列尾部停止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53606393/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com