gpt4 book ai didi

java - 如何将已发布的 DDS 内容附加到订阅者端的现有文件中?

转载 作者:太空宇宙 更新时间:2023-11-04 08:09:34 26 4
gpt4 key购买 nike

我已经创建了使用java实现的普通发布者和订阅者,其工作原理是按大小为1MB读取内容,总大小为5MB,并每1MB发布给订阅者。数据已成功发布。现在面临将内容附加到现有文件的问题。最后我只能找到文件中的最后1MB数据。所以请让我知道如何解决这个问题?我还附上了发布者和订阅者的源代码。

Publisher:

public class MessageDataPublisher {
static StringBuffer fileContent;
static RandomAccessFile randomAccessFile ;

public static void main(String[] args) throws IOException {
MessageDataPublisher msgObj=new MessageDataPublisher();

String fileToWrite="test.txt";
msgObj.towriteDDS(fileToWrite);
}


public void towriteDDS(String fileName) throws IOException{

DDSEntityManager mgr=new DDSEntityManager();
String partitionName="PARTICIPANT";



// create Domain Participant
mgr.createParticipant(partitionName);

// create Type
BinaryFileTypeSupport binary=new BinaryFileTypeSupport();
mgr.registerType(binary);


// create Topic
mgr.createTopic("Serials");

// create Publisher
mgr.createPublisher();

// create DataWriter
mgr.createWriter();

// Publish Events

DataWriter dwriter = mgr.getWriter();
BinaryFileDataWriter binaryWriter=BinaryFileDataWriterHelper.narrow(dwriter);


int bufferSize=1024*1024;


File readfile=new File(fileName);
FileInputStream is = new FileInputStream(readfile);
byte[] totalbytes = new byte[is.available()];
is.read(totalbytes);
byte[] readbyte = new byte[bufferSize];
BinaryFile binaryInstance;

int k=0;
for(int i=0;i<totalbytes.length;i++){
readbyte[k]=totalbytes[i];
k++;
if(k>(bufferSize-1)){
binaryInstance=new BinaryFile();
binaryInstance.name="sendpublisher.txt";
binaryInstance.contents=readbyte;
int status = binaryWriter.write(binaryInstance, HANDLE_NIL.value);
ErrorHandler.checkStatus(status, "MsgDataWriter.write");

ErrorHandler.checkStatus(status, "MsgDataWriter.write");

k=0;
}

}
if(k < (bufferSize-1)){
byte[] remaingbyte = new byte[k];
for(int j=0;j<(k-1);j++){
remaingbyte[j]=readbyte[j];
}
binaryInstance=new BinaryFile();
binaryInstance.name="sendpublisher.txt";
binaryInstance.contents=remaingbyte;
int status = binaryWriter.write(binaryInstance, HANDLE_NIL.value);
ErrorHandler.checkStatus(status, "MsgDataWriter.write");

}
is.close();


try {
Thread.sleep(4000);

} catch (InterruptedException e) {
e.printStackTrace();
}

// clean up
mgr.getPublisher().delete_datawriter(binaryWriter);
mgr.deletePublisher();
mgr.deleteTopic();
mgr.deleteParticipant();

}




}


Subscriber:


public class MessageDataSubscriber {
static RandomAccessFile randomAccessFile ;
public static void main(String[] args) throws IOException {
DDSEntityManager mgr = new DDSEntityManager();
String partitionName = "PARTICIPANT";

// create Domain Participant
mgr.createParticipant(partitionName);

// create Type
BinaryFileTypeSupport msgTS = new BinaryFileTypeSupport();
mgr.registerType(msgTS);

// create Topic
mgr.createTopic("Serials");

// create Subscriber
mgr.createSubscriber();

// create DataReader
mgr.createReader();

// Read Events
DataReader dreader = mgr.getReader();
BinaryFileDataReader binaryReader=BinaryFileDataReaderHelper.narrow(dreader);
BinaryFileSeqHolder binaryseq=new BinaryFileSeqHolder();
SampleInfoSeqHolder infoSeq = new SampleInfoSeqHolder();
boolean terminate = false;
int count = 0;

while (!terminate && count < 1500) {
// To run undefinitely
binaryReader.take(binaryseq, infoSeq, 10,
ANY_SAMPLE_STATE.value, ANY_VIEW_STATE.value,ANY_INSTANCE_STATE.value);
for (int i = 0; i < binaryseq.value.length; i++) {
toWrtieXML(binaryseq.value[i].contents);
terminate = true;
}

try
{
Thread.sleep(200);
}
catch(InterruptedException ie)
{
}
++count;

}
binaryReader.return_loan(binaryseq,infoSeq);

// clean up

mgr.getSubscriber().delete_datareader(binaryReader);
mgr.deleteSubscriber();
mgr.deleteTopic();
mgr.deleteParticipant();

}

private static void toWrtieXML(byte[] bytes) throws IOException {
// TODO Auto-generated method stub
File Writefile=new File("samplesubscriber.txt");
if(!Writefile.exists()){
randomAccessFile = new RandomAccessFile(Writefile, "rw");
randomAccessFile.write(bytes, 0, bytes.length);
randomAccessFile.close();
}
else{
randomAccessFile = new RandomAccessFile(Writefile, "rw");
long i=Writefile.length();
randomAccessFile.seek(i);
randomAccessFile.write(bytes, 0, bytes.length);
randomAccessFile.close();
}


}
}

提前致谢

最佳答案

很难对您的问题给出结论性答案,因为您的问题可能是由多种不同原因造成的。此外,一旦确定了问题的原因,您可能会有多种选择来缓解问题。

首先要看的是读者端。该代码在循环中执行 take(),每个 take 之间有 200 毫秒的暂停。根据 DataReader 上的 QoS 设置,您可能会遇到这样的情况:当应用程序 hibernate 200 毫秒时,DataReader 中的样本会被覆盖。如果您通过千兆位以太网执行此操作,那么典型的 DDS 产品将能够在该 sleep 期间执行这 5 个 1 MB 的 block ,这意味着您的默认单位缓冲区将在您的 sleep 期间被覆盖 4 次。

如果您使用 BinaryFileDataReader 的默认历史 QoS 设置(即 history.kind = KEEP_LASThistory.depth = 1),则可能会出现这种情况。将后者增加到一个更大的值,例如 20,将导致队列能够在您 sleep 时保存 20 个文件 block 。现在应该足够了。

如果这不能解决您的问题,可以探讨其他可能的原因。

关于java - 如何将已发布的 DDS 内容附加到订阅者端的现有文件中?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11468167/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com