Как добавить опубликованный контент DDS в существующий файл на стороне подписчика?

Я создал обычных издателей и подписчиков, реализованных с использованием java, который работает как чтение содержимого размером 1 МБ общего размера 5 МБ и публикуется на каждом 1 МБ подписчику. Данные успешно публикуются. Сейчас возникает проблема с добавлением содержимое существующего файла. Наконец, я смог найти только последние 1 МБ данных в файле. Пожалуйста, дайте мне знать, как решить эту проблему? а также я приложил исходный код для издателя и подписчика.

Publisher:

public class MessageDataPublisher {
    static StringBuffer fileContent;
    static RandomAccessFile randomAccessFile ;

    public static void main(String[] args) throws IOException {
        MessageDataPublisher msgObj=new MessageDataPublisher();

        String fileToWrite="test.txt";
        msgObj.towriteDDS(fileToWrite);
    }


    public void towriteDDS(String fileName) throws IOException{

        DDSEntityManager mgr=new DDSEntityManager();
        String partitionName="PARTICIPANT";



        // create Domain Participant
        mgr.createParticipant(partitionName);

        // create Type
        BinaryFileTypeSupport binary=new BinaryFileTypeSupport();
        mgr.registerType(binary);


        // create Topic
        mgr.createTopic("Serials");

        // create Publisher
        mgr.createPublisher();

        // create DataWriter
        mgr.createWriter();

        // Publish Events

        DataWriter dwriter = mgr.getWriter();
        BinaryFileDataWriter binaryWriter=BinaryFileDataWriterHelper.narrow(dwriter);


        int bufferSize=1024*1024;


        File readfile=new File(fileName);
        FileInputStream is = new FileInputStream(readfile);
        byte[] totalbytes = new byte[is.available()];
        is.read(totalbytes);
        byte[] readbyte = new byte[bufferSize];
        BinaryFile binaryInstance;

        int k=0;
        for(int i=0;i<totalbytes.length;i++){
            readbyte[k]=totalbytes[i];
            k++;
            if(k>(bufferSize-1)){
                binaryInstance=new BinaryFile();
                binaryInstance.name="sendpublisher.txt";
                binaryInstance.contents=readbyte;
                int status = binaryWriter.write(binaryInstance, HANDLE_NIL.value);
                ErrorHandler.checkStatus(status, "MsgDataWriter.write");

                ErrorHandler.checkStatus(status, "MsgDataWriter.write");

                k=0;
                }

        }
        if(k < (bufferSize-1)){
            byte[] remaingbyte = new byte[k];                   
            for(int j=0;j<(k-1);j++){
                remaingbyte[j]=readbyte[j];
            }
            binaryInstance=new BinaryFile();
            binaryInstance.name="sendpublisher.txt";
            binaryInstance.contents=remaingbyte;
            int status = binaryWriter.write(binaryInstance, HANDLE_NIL.value);
            ErrorHandler.checkStatus(status, "MsgDataWriter.write");

        }       
        is.close();


        try {
            Thread.sleep(4000);

        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // clean up
        mgr.getPublisher().delete_datawriter(binaryWriter);
        mgr.deletePublisher();
        mgr.deleteTopic();
        mgr.deleteParticipant();

    }




}


Subscriber:


public class MessageDataSubscriber {
    static RandomAccessFile randomAccessFile ;
    public static void main(String[] args) throws IOException {
        DDSEntityManager mgr = new DDSEntityManager();
        String partitionName = "PARTICIPANT";

        // create Domain Participant
        mgr.createParticipant(partitionName);

        // create Type
        BinaryFileTypeSupport msgTS = new BinaryFileTypeSupport();
        mgr.registerType(msgTS);

        // create Topic
        mgr.createTopic("Serials");

        // create Subscriber
        mgr.createSubscriber();

        // create DataReader
        mgr.createReader();

        // Read Events
        DataReader dreader = mgr.getReader();
        BinaryFileDataReader binaryReader=BinaryFileDataReaderHelper.narrow(dreader);
        BinaryFileSeqHolder binaryseq=new BinaryFileSeqHolder();
        SampleInfoSeqHolder infoSeq = new SampleInfoSeqHolder();
        boolean terminate = false;
        int count = 0;

        while (!terminate && count < 1500) {
             // To run undefinitely
            binaryReader.take(binaryseq, infoSeq, 10,
                    ANY_SAMPLE_STATE.value, ANY_VIEW_STATE.value,ANY_INSTANCE_STATE.value);
                for (int i = 0; i < binaryseq.value.length; i++) {
                    toWrtieXML(binaryseq.value[i].contents);
                    terminate = true;
            }

            try
            {
                Thread.sleep(200);
            }
            catch(InterruptedException ie)
            {
            }
            ++count;

        }
            binaryReader.return_loan(binaryseq,infoSeq);

        // clean up

        mgr.getSubscriber().delete_datareader(binaryReader);
        mgr.deleteSubscriber();
        mgr.deleteTopic();
        mgr.deleteParticipant();

    }

    private static void toWrtieXML(byte[] bytes) throws IOException {
        // TODO Auto-generated method stub
        File Writefile=new File("samplesubscriber.txt");
        if(!Writefile.exists()){
            randomAccessFile = new RandomAccessFile(Writefile, "rw");
            randomAccessFile.write(bytes, 0, bytes.length);
            randomAccessFile.close();
            }
            else{
                randomAccessFile = new RandomAccessFile(Writefile, "rw");
                long i=Writefile.length();
                randomAccessFile.seek(i);
                randomAccessFile.write(bytes, 0, bytes.length);
                randomAccessFile.close();
            }


    }
}

заранее спасибо

1 ответ

Решение

Трудно дать окончательный ответ на ваш вопрос, потому что ваша проблема может быть результатом нескольких разных причин. Кроме того, как только причина проблемы будет определена, у вас, вероятно, будет несколько вариантов ее устранения.

Первое место, чтобы посмотреть на стороне читателя. Код делает take() в петле с 200 миллисекундной паузой между каждым дублем. В зависимости от ваших настроек QoS в DataReader вы можете столкнуться с ситуацией, когда ваши образцы перезаписываются в DataReader, когда ваше приложение спит в течение 200 миллисекунд. Если вы делаете это через гигабитную сеть Ethernet, то типичный продукт DDS сможет обрабатывать эти 5 блоков по 1 мегабайту в течение этого периода ожидания, а это означает, что ваш стандартный одноместный буфер будет перезаписываться 4 раза во время сна.

Этот сценарий был бы вероятен, если бы вы использовали настройки QoS истории по умолчанию для вашего BinaryFileDataReader, что значит history.kind = KEEP_LAST а также history.depth = 1, Увеличение последнего до большего значения, например до 20, приведет к очереди, способной удерживать 20 фрагментов файла, пока вы спите. Этого должно быть достаточно на данный момент.

Если это не решит вашу проблему, другие возможные причины могут быть изучены.

Другие вопросы по тегам