Десериализовать файл Avro с помощью C#
Я не могу найти способ десериализации файла Apache Avro с помощью C#. Файл Avro - это файл, созданный функцией архивирования в концентраторах событий Microsoft Azure.
С Java я могу использовать Avro Tools из Apache для преобразования файла в JSON:
java -jar avro-tools-1.8.1.jar tojson --pretty inputfile > output.json
Используя пакет NuGet Microsoft.Hadoop.Avro я могу извлечь SequenceNumber
, Offset
а также EnqueuedTimeUtc
, но так как я не знаю, какой тип использовать для Body
исключение брошено. Я пробовал с Dictionary<string, object>
и другие виды.
static void Main(string[] args)
{
var fileName = "...";
using (Stream stream = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.Read))
{
using (var reader = AvroContainer.CreateReader<EventData>(stream))
{
using (var streamReader = new SequentialReader<EventData>(reader))
{
var record = streamReader.Objects.FirstOrDefault();
}
}
}
}
[DataContract(Namespace = "Microsoft.ServiceBus.Messaging")]
public class EventData
{
[DataMember(Name = "SequenceNumber")]
public long SequenceNumber { get; set; }
[DataMember(Name = "Offset")]
public string Offset { get; set; }
[DataMember(Name = "EnqueuedTimeUtc")]
public string EnqueuedTimeUtc { get; set; }
[DataMember(Name = "Body")]
public foo Body { get; set; }
// More properties...
}
Схема выглядит следующим образом:
{
"type": "record",
"name": "EventData",
"namespace": "Microsoft.ServiceBus.Messaging",
"fields": [
{
"name": "SequenceNumber",
"type": "long"
},
{
"name": "Offset",
"type": "string"
},
{
"name": "EnqueuedTimeUtc",
"type": "string"
},
{
"name": "SystemProperties",
"type": {
"type": "map",
"values": [ "long", "double", "string", "bytes" ]
}
},
{
"name": "Properties",
"type": {
"type": "map",
"values": [ "long", "double", "string", "bytes" ]
}
},
{
"name": "Body",
"type": [ "null", "bytes" ]
}
]
}
4 ответа
Мне удалось получить полный доступ к данным, используя dynamic
, Вот код для доступа к сырой body
данные, которые хранятся в виде массива байтов. В моем случае эти байты содержат кодировку JSON в кодировке UTF8, но, конечно, это зависит от того, как вы изначально создали свой EventData
экземпляры, которые вы опубликовали в концентраторе событий:
using (var reader = AvroContainer.CreateGenericReader(stream))
{
while (reader.MoveNext())
{
foreach (dynamic record in reader.Current.Objects)
{
var sequenceNumber = record.SequenceNumber;
var bodyText = Encoding.UTF8.GetString(record.Body);
Console.WriteLine($"{sequenceNumber}: {bodyText}");
}
}
}
Если кто-то может опубликовать решение со статической типизацией, я поддержу его, но, учитывая, что большая задержка в любой системе почти наверняка будет связана с подключением к BLOB-объектам Event Hub Archive, я не буду беспокоиться о парсинге производительности.:)
В этом Гисте показано, как десериализовать захват концентратора событий с помощью C# с помощью Microsoft.Hadoop.Avro2, который обладает преимуществом совместимости как с.NET Framework 4.5, так и с.NET Standard 1.6:
var connectionString = "<Azure event hub capture storage account connection string>";
var containerName = "<Azure event hub capture container name>";
var blobName = "<Azure event hub capture BLOB name (ends in .avro)>";
var storageAccount = CloudStorageAccount.Parse(connectionString);
var blobClient = storageAccount.CreateCloudBlobClient();
var container = blobClient.GetContainerReference(containerName);
var blob = container.GetBlockBlobReference(blobName);
using (var stream = blob.OpenRead())
using (var reader = AvroContainer.CreateGenericReader(stream))
while (reader.MoveNext())
foreach (dynamic result in reader.Current.Objects)
{
var record = new AvroEventData(result);
record.Dump();
}
public struct AvroEventData
{
public AvroEventData(dynamic record)
{
SequenceNumber = (long) record.SequenceNumber;
Offset = (string) record.Offset;
DateTime.TryParse((string) record.EnqueuedTimeUtc, out var enqueuedTimeUtc);
EnqueuedTimeUtc = enqueuedTimeUtc;
SystemProperties = (Dictionary<string, object>) record.SystemProperties;
Properties = (Dictionary<string, object>) record.Properties;
Body = (byte[]) record.Body;
}
public long SequenceNumber { get; set; }
public string Offset { get; set; }
public DateTime EnqueuedTimeUtc { get; set; }
public Dictionary<string, object> SystemProperties { get; set; }
public Dictionary<string, object> Properties { get; set; }
public byte[] Body { get; set; }
}
Ссылки NuGet:
- Microsoft.Hadoop.Avro2 (работает 1.2.1)
- WindowsAzure.Storage (8.3.0 работает)
Пространства имен:
- Microsoft.Hadoop.Avro.Container
- Microsoft.WindowsAzure.Storage
Я наконец смог заставить это работать с библиотекой / структурой Apache C#.
Я застрял на некоторое время, потому что функция захвата в концентраторах событий Azure иногда выводит файл без содержимого сообщения. У меня, возможно, также была проблема с тем, как сообщения были первоначально сериализованы в объект EventData.
Приведенный ниже код предназначен для файла, сохраненного на диск из контейнера BLOB-объектов захвата.
var dataFileReader = DataFileReader<EventData>.OpenReader(file);
foreach (var record in dataFileReader.NextEntries)
{
// Do work on EventData object
}
Это также работает с использованием объекта GenericRecord.
var dataFileReader = DataFileReader<GenericRecord>.OpenReader(file);
Это потребовало некоторых усилий, чтобы выяснить. Однако теперь я согласен с тем, что эта функция захвата концентраторов событий Azure является отличной возможностью для резервного копирования всех событий. Я все еще чувствую, что они должны сделать формат необязательным, как они сделали с выводом задания Stream Analytic, но, возможно, я привыкну к Avro.
Я бы рекомендовал вам использовать https://github.com/AdrianStrugala/AvroConvert
А просто:
byte[] avroFileContent = File.ReadAllBytes(fileName);
var result = AvroConvert.Deserialize<EventData>(avroFileContent);
Сама библиотека должна была стать улучшением процесса разработки с использованием формата Avro. Вам даже не нужна схема или атрибуты вашей модели. (Я участник этой библиотеки)
Вы также можете использовать NullableSchema
атрибут для пометки тела как объединения байтов и нуля. Это позволит вам использовать строго типизированный интерфейс.
[DataContract(Namespace = "Microsoft.ServiceBus.Messaging")]
public class EventData
{
[DataMember(Name = "SequenceNumber")]
public long SequenceNumber { get; set; }
[DataMember(Name = "Offset")]
public string Offset { get; set; }
[DataMember(Name = "EnqueuedTimeUtc")]
public string EnqueuedTimeUtc { get; set; }
[DataMember(Name = "Body")]
[NullableSchema]
public foo Body { get; set; }
}
Ваши оставшиеся типы, я подозреваю, должны быть определены как:
[DataContract(Namespace = "Microsoft.ServiceBus.Messaging")]
[KnownType(typeof(Dictionary<string, object>))]
public class EventData
{
[DataMember]
public IDictionary<string, object> SystemProperties { get; set; }
[DataMember]
public IDictionary<string, object> Properties { get; set; }
[DataMember]
public byte[] Body { get; set; }
}
Даже если Body
это союз null
а также bytes
это карта к nullable
byte[]
,
В C# массивы всегда являются ссылочными типами, поэтому их можно null
и контракт выполнен.