Ошибка "EndOfStreamException" во время выполнения сериализации MS Bond через ZeroMQ

Для начала стоит отметить, что в рамках одного решения F# сериализация и десериализация сообщений Bond работают нормально. Однако у меня возникают проблемы с правильной обработкой отправки и / или получения сообщения через ZeroMQ.

Существует ошибка во время выполнения на стороне подписчика следующей программы. Файл.bond определяется и компилируется компилятором Bond. Затем dll создается из C# для вызова из F# . У меня тогда есть две программы на F# . Один, который публикует сериализованные данные через сокет TCP, а другой - подписчик. Когда сообщение получено на подпрограмму, строка, которая пытается Unmarshal необработанные данные, является той, которая вызывает ошибку во время выполнения. Кто-нибудь может увидеть причину этого?

[РЕДАКТИРОВАТЬ] Согласно комментарию Федора, я внес изменения на стороне издателя, которая изменяет ошибку на стороне подписчика. Так что ошибка, скорее всего, связана с тем, как я упаковываю и распаковываю информацию.

Это файл.bond

namespace Examples

struct Record
{
    0: map<string, double> payload;
}

Вот издатель:

// publisher

open System
open Bond
open Bond.Protocols
open Bond.IO.Safe
open ZeroMQ

let ctx = new ZContext()
let publisher = new ZSocket(ctx, ZSocketType.PUB)
publisher.Bind("tcp://*:5556")

let src = new Examples.Record()
src.payload.Add("a", 1.)
src.payload.Add("b", 2.)

let output = new OutputBuffer()
let writer = new CompactBinaryWriter<OutputBuffer>(output)

while true do
    Marshal.To(writer, src)
    //let input = new InputBuffer(output.Data)
    //let byteArr = input.ReadBytes(int(input.Length - 1L))
    let updateFrame = new ZFrame(System.Text.Encoding.ASCII.GetString output.Data.Array)
    publisher.Send(updateFrame)

Вот подписчик:

// subscriber

open Bond
open Bond.Protocols
open Bond.IO.Safe
open System
open System.Text
open ZeroMQ

let ctx = new ZContext()
let subscriber = new ZSocket(ctx, ZSocketType.SUB)
subscriber.Connect("tcp://127.0.0.1:5556")
subscriber.SubscribeAll()

let output = new OutputBuffer()    
while true do    
    let received = subscriber.ReceiveFrame()
    let byteArr = Encoding.ASCII.GetBytes (received.ReadString())
    let arrSeg = ArraySegment<byte>(byteArr)
    let input = new InputBuffer(arrSeg)
    let dst = Unmarshal<Examples.Record>.From(input)
    for KeyValue(k, v) in dst.payload do
        printfn "%A %A" k v

1 ответ

Решение

На принимающей стороне, когда вы пытаетесь декодировать маршалированный Bond Compact Binary как строку ASCII, вы теряете часть полезной нагрузки. При маршалинге структуры, как Record для Compact Binary первые четыре байта полезной нагрузки 0x43 0x42 0x10 0x00, При чтении строки из ZFrame первый встроенныйNUL (0x00) это означает, что конец строки не зависит от размера кадра. Итак, читающая сторона видит только 0x43 0x42 0x10 вместо всей полезной нагрузки (29 байт, когда я тестировал).

Поскольку Compact Binary является двоичным протоколом, вы захотите использовать ZFrame конструктор, который принимает буфер на стороне издателя:

let updateFrame = new ZFrame(output.Data.Array, output.Data.Offset, output.Data.Count)

На стороне подписчика вы захотите просто прочитать буфер:

let byteArr = received.Read()

Кроме того, на стороне издателя вы постоянно накапливаете данные в один и тот же OutputBuffer. Вы хотите сбросить output.Position до 0, прежде чем вы добавите следующую запись для повторного использования буфера вместо его увеличения:

while true do  
    Marshal.To(writer, src)
    let updateFrame = new ZFrame(output.Data.Array, output.Data.Offset, output.Data.Count)output.Data.Array)
    publisher.Send(updateFrame)
    output.Position <- 0

Еще одна вещь, чтобы отметить: буфер по умолчанию, выделенный для OutputBuffer составляет 65 КБ. Подумайте об уменьшении, когда вы узнаете, насколько большими будут ваши полезные нагрузки.

NB: я отлаживал это в приложении C#, которое имело похожую семантику. Вот что я использовал:

namespace so_q_zmq
{
    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading.Tasks;
    using Bond;
    using Bond.IO.Safe;
    using Bond.Protocols;
    using ZeroMQ;

    [Schema]
    class Record
    {
        [Id(0)]
        public Dictionary<string, double> payload = new Dictionary<string, double>();
    }

    class Program
    {
        static void Main(string[] args)
        {
            var pTask = Task.Run(() =>
            {
                try
                {
                    Publisher();
                }
                catch (Exception ex)
                {
                    Console.WriteLine("Publisher failed: {0}", ex);
                }
            });

            var sTask = Task.Run(() =>
            {
                try
                {
                    Subscriber();
                }
                catch (Exception ex)
                {
                    Console.WriteLine("Subscriber failed: {0}", ex);
                }
            });

            Task.WaitAll(pTask, sTask);
            Console.WriteLine("Done");
            Console.ReadLine();
        }

        static void Publisher()
        {
            var ctx = new ZContext();
            var publisher = new ZSocket(ctx, ZSocketType.PUB);
            publisher.Bind("tcp://127.0.0.1:12345");

            var src = new Record();
            src.payload.Add("a", 1.0);
            src.payload.Add("b", 2.0);

            var output = new OutputBuffer();
            var writer = new CompactBinaryWriter<OutputBuffer>(output);

            for (;;)
            {
                Marshal.To(writer, src);
                // INCORRECT:
                // var str = Encoding.ASCII.GetString(output.Data.Array);
                // var updateFrame = new ZFrame(str);
                var updateFrame = new ZFrame(output.Data.Array, output.Data.Offset, output.Data.Count);
                publisher.Send(updateFrame);
                output.Position = 0;
            }
        }

        static void Subscriber()
        {
            var ctx = new ZContext();
            var subscriber = new ZSocket(ctx, ZSocketType.SUB);
            subscriber.Connect("tcp://127.0.0.1:12345");
            subscriber.SubscribeAll();

            for (;;)
            {
                var received = subscriber.ReceiveFrame();
                // INCORRECT
                // var str = received.ReadString();
                // var byteArr = Encoding.ASCII.GetBytes(str);
                var byteArr = received.Read();
                var arrSeg = new ArraySegment<byte>(byteArr); // There's an InputBuffer ctor that takes a byte[] directly
                var input = new InputBuffer(arrSeg);
                var dst = Unmarshal<Record>.From(input);
                foreach (var kvp in dst.payload)
                {
                    Console.WriteLine("{0} {1}", kvp.Key, kvp.Value);
                }
            }
        }
    }
}
Другие вопросы по тегам