Как добавить опубликованный контент DDS в существующий файл на стороне подписчика?
Я создал обычных издателей и подписчиков, реализованных с использованием java, который работает как чтение содержимого размером 1 МБ общего размера 5 МБ и публикуется на каждом 1 МБ подписчику. Данные успешно публикуются. Сейчас возникает проблема с добавлением содержимое существующего файла. Наконец, я смог найти только последние 1 МБ данных в файле. Пожалуйста, дайте мне знать, как решить эту проблему? а также я приложил исходный код для издателя и подписчика.
Publisher:
public class MessageDataPublisher {
static StringBuffer fileContent;
static RandomAccessFile randomAccessFile ;
public static void main(String[] args) throws IOException {
MessageDataPublisher msgObj=new MessageDataPublisher();
String fileToWrite="test.txt";
msgObj.towriteDDS(fileToWrite);
}
public void towriteDDS(String fileName) throws IOException{
DDSEntityManager mgr=new DDSEntityManager();
String partitionName="PARTICIPANT";
// create Domain Participant
mgr.createParticipant(partitionName);
// create Type
BinaryFileTypeSupport binary=new BinaryFileTypeSupport();
mgr.registerType(binary);
// create Topic
mgr.createTopic("Serials");
// create Publisher
mgr.createPublisher();
// create DataWriter
mgr.createWriter();
// Publish Events
DataWriter dwriter = mgr.getWriter();
BinaryFileDataWriter binaryWriter=BinaryFileDataWriterHelper.narrow(dwriter);
int bufferSize=1024*1024;
File readfile=new File(fileName);
FileInputStream is = new FileInputStream(readfile);
byte[] totalbytes = new byte[is.available()];
is.read(totalbytes);
byte[] readbyte = new byte[bufferSize];
BinaryFile binaryInstance;
int k=0;
for(int i=0;i<totalbytes.length;i++){
readbyte[k]=totalbytes[i];
k++;
if(k>(bufferSize-1)){
binaryInstance=new BinaryFile();
binaryInstance.name="sendpublisher.txt";
binaryInstance.contents=readbyte;
int status = binaryWriter.write(binaryInstance, HANDLE_NIL.value);
ErrorHandler.checkStatus(status, "MsgDataWriter.write");
ErrorHandler.checkStatus(status, "MsgDataWriter.write");
k=0;
}
}
if(k < (bufferSize-1)){
byte[] remaingbyte = new byte[k];
for(int j=0;j<(k-1);j++){
remaingbyte[j]=readbyte[j];
}
binaryInstance=new BinaryFile();
binaryInstance.name="sendpublisher.txt";
binaryInstance.contents=remaingbyte;
int status = binaryWriter.write(binaryInstance, HANDLE_NIL.value);
ErrorHandler.checkStatus(status, "MsgDataWriter.write");
}
is.close();
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// clean up
mgr.getPublisher().delete_datawriter(binaryWriter);
mgr.deletePublisher();
mgr.deleteTopic();
mgr.deleteParticipant();
}
}
Subscriber:
public class MessageDataSubscriber {
static RandomAccessFile randomAccessFile ;
public static void main(String[] args) throws IOException {
DDSEntityManager mgr = new DDSEntityManager();
String partitionName = "PARTICIPANT";
// create Domain Participant
mgr.createParticipant(partitionName);
// create Type
BinaryFileTypeSupport msgTS = new BinaryFileTypeSupport();
mgr.registerType(msgTS);
// create Topic
mgr.createTopic("Serials");
// create Subscriber
mgr.createSubscriber();
// create DataReader
mgr.createReader();
// Read Events
DataReader dreader = mgr.getReader();
BinaryFileDataReader binaryReader=BinaryFileDataReaderHelper.narrow(dreader);
BinaryFileSeqHolder binaryseq=new BinaryFileSeqHolder();
SampleInfoSeqHolder infoSeq = new SampleInfoSeqHolder();
boolean terminate = false;
int count = 0;
while (!terminate && count < 1500) {
// To run undefinitely
binaryReader.take(binaryseq, infoSeq, 10,
ANY_SAMPLE_STATE.value, ANY_VIEW_STATE.value,ANY_INSTANCE_STATE.value);
for (int i = 0; i < binaryseq.value.length; i++) {
toWrtieXML(binaryseq.value[i].contents);
terminate = true;
}
try
{
Thread.sleep(200);
}
catch(InterruptedException ie)
{
}
++count;
}
binaryReader.return_loan(binaryseq,infoSeq);
// clean up
mgr.getSubscriber().delete_datareader(binaryReader);
mgr.deleteSubscriber();
mgr.deleteTopic();
mgr.deleteParticipant();
}
private static void toWrtieXML(byte[] bytes) throws IOException {
// TODO Auto-generated method stub
File Writefile=new File("samplesubscriber.txt");
if(!Writefile.exists()){
randomAccessFile = new RandomAccessFile(Writefile, "rw");
randomAccessFile.write(bytes, 0, bytes.length);
randomAccessFile.close();
}
else{
randomAccessFile = new RandomAccessFile(Writefile, "rw");
long i=Writefile.length();
randomAccessFile.seek(i);
randomAccessFile.write(bytes, 0, bytes.length);
randomAccessFile.close();
}
}
}
заранее спасибо
1 ответ
Трудно дать окончательный ответ на ваш вопрос, потому что ваша проблема может быть результатом нескольких разных причин. Кроме того, как только причина проблемы будет определена, у вас, вероятно, будет несколько вариантов ее устранения.
Первое место, чтобы посмотреть на стороне читателя. Код делает take()
в петле с 200 миллисекундной паузой между каждым дублем. В зависимости от ваших настроек QoS в DataReader вы можете столкнуться с ситуацией, когда ваши образцы перезаписываются в DataReader, когда ваше приложение спит в течение 200 миллисекунд. Если вы делаете это через гигабитную сеть Ethernet, то типичный продукт DDS сможет обрабатывать эти 5 блоков по 1 мегабайту в течение этого периода ожидания, а это означает, что ваш стандартный одноместный буфер будет перезаписываться 4 раза во время сна.
Этот сценарий был бы вероятен, если бы вы использовали настройки QoS истории по умолчанию для вашего BinaryFileDataReader
, что значит history.kind = KEEP_LAST
а также history.depth = 1
, Увеличение последнего до большего значения, например до 20, приведет к очереди, способной удерживать 20 фрагментов файла, пока вы спите. Этого должно быть достаточно на данный момент.
Если это не решит вашу проблему, другие возможные причины могут быть изучены.