Как читать данные, сериализованные с помощью Chronicle Wire, из InputStream?

Некоторые данные сериализуются в выходной поток через Chronicle Wire.

Object m = ... ;
OutputStream out = ... ;

WireType.RAW                               //
        .apply(Bytes.elasticByteBuffer())  //
        .getValueOut().object(m)           //
        .bytes().copyTo(out)
;

Я хочу вернуть их из Inputstream.

InputStream in = ... ;

WireType.RAW
        .apply(Bytes.elasticByteBuffer())
        .getValueIn()
        ???
;

Object m = ???; // How to initialize m ?

Как читать мой начальный объект m от in?

1 ответ

Существует предположение, что вы будете иметь некоторое представление о том, как долго хранятся данные, и прочитаете их за один раз. Также предполагается, что вы захотите повторно использовать буферы, чтобы избежать создания мусора. Для минимизации задержки данные обычно считываются на / с каналов NIO.

У меня возникла проблема при создании этого примера. Улучшение поддержки Input/OutputStream и объектов, не относящихся к маршалу, https://github.com/OpenHFT/Chronicle-Wire/issues/111

Это должно делать то, что вы хотите эффективно, без создания мусора каждый раз.

package net.openhft.chronicle.wire;

import net.openhft.chronicle.bytes.Bytes;

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;

public class WireToOutputStream {
    private final Bytes<ByteBuffer> bytes = Bytes.elasticHeapByteBuffer(128);
    private final Wire wire;
    private final DataOutputStream dos;

    public WireToOutputStream(WireType wireType, OutputStream os) {
        wire = wireType.apply(bytes);
        dos = new DataOutputStream(os);
    }

    public Wire getWire() {
        wire.clear();
        return wire;
    }

    public void flush() throws IOException {
        int length = Math.toIntExact(bytes.readRemaining());
        dos.writeInt(length);
        dos.write(bytes.underlyingObject().array(), 0, length);
    }
}

package net.openhft.chronicle.wire;

import net.openhft.chronicle.bytes.Bytes;

import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.StreamCorruptedException;
import java.nio.ByteBuffer;

public class InputStreamToWire {
    private final Bytes<ByteBuffer> bytes = Bytes.elasticHeapByteBuffer(128);
    private final Wire wire;
    private final DataInputStream dis;

    public InputStreamToWire(WireType wireType, InputStream is) {
        wire = wireType.apply(bytes);
        dis = new DataInputStream(is);
    }

    public Wire readOne() throws IOException {
        wire.clear();
        int length = dis.readInt();
        if (length < 0) throw new StreamCorruptedException();
        bytes.ensureCapacity(length);
        byte[] array = bytes.underlyingObject().array();
        dis.readFully(array, 0, length);
        bytes.readPositionRemaining(0, length);
        return wire;
    }
}

Затем вы можете сделать следующее

package net.openhft.chronicle.wire;

import net.openhft.chronicle.core.util.ObjectUtils;
import org.junit.Test;

import java.io.IOException;
import java.io.Serializable;
import java.net.ServerSocket;
import java.net.Socket;

import static org.junit.Assert.assertEquals;

public class WireToOutputStreamTest {
    @Test
    public void testVisSocket() throws IOException {
        ServerSocket ss = new ServerSocket(0);
        Socket s = new Socket("localhost", ss.getLocalPort());
        Socket s2 = ss.accept();
        WireToOutputStream wtos = new WireToOutputStream(WireType.RAW, s.getOutputStream());

        Wire wire = wtos.getWire();
        AnObject ao = new AnObject();
        ao.value = 12345;
        ao.text = "Hello";
        // write the type is needed.
        wire.getValueOut().typeLiteral(AnObject.class);
        Wires.writeMarshallable(ao, wire);
        wtos.flush();

        InputStreamToWire istw = new InputStreamToWire(WireType.RAW, s2.getInputStream());
        Wire wire2 = istw.readOne();
        Class type = wire2.getValueIn().typeLiteral();
        Object ao2 = ObjectUtils.newInstance(type);
        Wires.readMarshallable(ao2, wire2, true);
        System.out.println(ao2);
        ss.close();
        s.close();
        s2.close();
        assertEquals(ao.toString(), ao2.toString());
    }

    public static class AnObject implements Serializable {
        long value;
        String text;

        @Override
        public String toString() {
            return "AnObject{" +
                    "value=" + value +
                    ", text='" + text + '\'' +
                    '}';
        }
    }
}

Образец кода

 // On Sender side
 Object m = ... ;
 OutputStream out = ... ;

 WireToOutputStream wireToOutputStream = new 
 WireToOutputStream(WireType.TEXT, out);

 Wire wire = wireToOutputStream.getWire();
 wire.getValueOut().typeLiteral(m.getClass());
 Wires.writeMarshallable(m, wire);
 wireToOutputStream.flush();

 // On Receiver side
 InputStream in = ... ;

 InputStreamToWire inputStreamToWire = new InputStreamToWire(WireType.TEXT, in);

 Wire wire2 = inputStreamToWire.readOne();
 Class type = wire2.getValueIn().typeLiteral();
 Object m = ObjectUtils.newInstance(type);
 Wires.readMarshallable(m, wire2, true);

Этот код намного проще, если ваш DTO расширяется Marshallable но это будет работать независимо от того, расширяете ли вы интерфейс или нет. т.е. вам не нужно расширять Serializable.

Также, если вы знаете, какой будет тип, вам не нужно каждый раз писать это.

Приведенные выше вспомогательные классы были добавлены в последний SNAPSHOT.

Другие вопросы по тегам