Обработка данных концентратора событий с использованием Python
Я использую Azure концентратор событий Python SDK для отправки и получения сообщений от концентратора событий по этой ссылке. https://github.com/Azure/azure-event-hubs-python/tree/develop. Я могу успешно отправлять и получать сообщения. Но как мне проанализировать сообщения и извлечь данные из объекта данных события. Пожалуйста, найдите код ниже.
import os
import sys
#import logging
from azure.eventhub import EventHubClient, Receiver, Offset
ADDRESS = 'sb://####.servicebus.windows.net/#####'
USER = '##########'
KEY = '##################################'
CONSUMER_GROUP = "$default"
OFFSET = Offset("-1")
PARTITION = "1"
total = 0
last_sn = -1
last_offset = "-1"
try:
if not ADDRESS:
raise ValueError("No EventHubs URL supplied.")
client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
receiver = client.add_receiver(CONSUMER_GROUP, PARTITION, prefetch=5000,
offset=OFFSET)
client.run()
try:
batched_events = receiver.receive(timeout=20)
except:
raise
finally:
client.stop()
for event_data in batched_events:
last_offset = event_data.offset.value
last_sn = event_data.sequence_number
total += 1
print("Partition {}, Received {}, sn={} offset={}".format(
PARTITION,
total,
last_sn,
last_offset))
except KeyboardInterrupt:
pass
если я пытаюсь просмотреть полученные event_data, я могу увидеть следующее сообщение. event_data <azure.eventhub.common.EventData at 0xd4f1358>
event_data.message
<uamqp.message.Message at 0xd4f1240>
Любая помощь по поводу того, как разобрать это сообщение для извлечения данных.
1 ответ
Согласно исходному коду в
https://github.com/Azure/azure-event-hubs-python/blob/master/azure/eventhub/common.py event_data.body должен вернуть вам тело сообщения.body
возвращает генератор. Чтобы получить доступ к отдельному сообщению по одному, используйте next
метод.
messages = event_data.body
next(messages) -> get 1st message
next(messages) -> get 2nd message
...
next(messages) -> get the last message
next(messages) -> StopIteration
StopIteration
исключение относится к концу итерации, больше не осталось сообщений для извлечения.
Если вы хотите получить все сообщения одновременно, вы можете использовать - list(event_data.body)
Используя эти потрясающие встроенные функции Python, вы могли бы найти ответ самостоятельно:
- Используйте встроенный Python
dir
функция, чтобы узнать, что все методы и значения поддерживает объект. Если бы вы должны были сделатьdir(event_data)
, вы увидите метод -body
перечисленных - Чтобы просмотреть документацию (конкретные строки документации) о любом методе / функции / классе, вы можете сделать,
print object.__doc__
, В вашем случае делатьprint event_data.body.__doc__
напечатать чтоbody
делает.
По состоянию на 1.1.0
Есть новые служебные методы для извлечения фактических данных сообщения:
body_as_str
body_as_json
Итак, что раньше было
import json
event_obj = json.loads(next(event_data.body).decode('UTF-8'))
Сейчас:
event_obj = event_data.body_as_json()
Для людей, использующих Event Hub версии 5.2.0 - последней на сегодняшний день (GitHub, справочная документация), она такая же, как версия 1.1.0, т.е.
body_as_str()
или же
body_as_json()
. Но клиент изменился - есть
EventHubProducerClient
и
EventHubConsumerClient
в новой версии. Чтобы распечатать тело полученного события:
from azure.eventhub import EventHubConsumerClient
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
client = EventHubConsumerClient.from_connection_string(
connection_str, consumer_group, eventhub_name=eventhub_name
)
def on_event_batch(partition_context, events):
partition_context.update_checkpoint()
for e in events:
print(e.body_as_str())
with client:
client.receive_batch(
on_event_batch=on_event_batch,
starting_position="-1", # "-1" is from the beginning of the partition.
)