Обработка данных концентратора событий с использованием Python

Я использую Azure концентратор событий Python SDK для отправки и получения сообщений от концентратора событий по этой ссылке. https://github.com/Azure/azure-event-hubs-python/tree/develop. Я могу успешно отправлять и получать сообщения. Но как мне проанализировать сообщения и извлечь данные из объекта данных события. Пожалуйста, найдите код ниже.

import os
import sys
#import logging
from azure.eventhub import EventHubClient, Receiver, Offset

ADDRESS = 'sb://####.servicebus.windows.net/#####'
USER = '##########'
KEY = '##################################'
CONSUMER_GROUP = "$default"
OFFSET = Offset("-1")
PARTITION = "1"


total = 0
last_sn = -1
last_offset = "-1"

try:
  if not ADDRESS:
      raise ValueError("No EventHubs URL supplied.")
  client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
  receiver = client.add_receiver(CONSUMER_GROUP, PARTITION, prefetch=5000, 
  offset=OFFSET)
  client.run()
  try:
      batched_events = receiver.receive(timeout=20)
  except:
      raise
  finally:
      client.stop()
  for event_data in batched_events:
      last_offset = event_data.offset.value
      last_sn = event_data.sequence_number
      total += 1
      print("Partition {}, Received {}, sn={} offset={}".format(
         PARTITION,
         total,
         last_sn,
         last_offset))

except KeyboardInterrupt:
   pass

если я пытаюсь просмотреть полученные event_data, я могу увидеть следующее сообщение. event_data <azure.eventhub.common.EventData at 0xd4f1358>event_data.message

<uamqp.message.Message at 0xd4f1240>

Любая помощь по поводу того, как разобрать это сообщение для извлечения данных.

1 ответ

Решение

Согласно исходному коду в
https://github.com/Azure/azure-event-hubs-python/blob/master/azure/eventhub/common.py event_data.body должен вернуть вам тело сообщения.
body возвращает генератор. Чтобы получить доступ к отдельному сообщению по одному, используйте next метод.

messages = event_data.body
next(messages) -> get 1st message
next(messages) -> get 2nd message
...
next(messages) -> get the last message
next(messages) -> StopIteration

StopIteration исключение относится к концу итерации, больше не осталось сообщений для извлечения.

Если вы хотите получить все сообщения одновременно, вы можете использовать - list(event_data.body)

Используя эти потрясающие встроенные функции Python, вы могли бы найти ответ самостоятельно:

  • Используйте встроенный Python dir функция, чтобы узнать, что все методы и значения поддерживает объект. Если бы вы должны были сделать dir(event_data), вы увидите метод - body перечисленных
  • Чтобы просмотреть документацию (конкретные строки документации) о любом методе / функции / классе, вы можете сделать, print object.__doc__, В вашем случае делать print event_data.body.__doc__ напечатать что body делает.

По состоянию на 1.1.0 Есть новые служебные методы для извлечения фактических данных сообщения:

  • body_as_str
  • body_as_json

Итак, что раньше было

import json
event_obj = json.loads(next(event_data.body).decode('UTF-8'))

Сейчас:

event_obj = event_data.body_as_json()

Для людей, использующих Event Hub версии 5.2.0 - последней на сегодняшний день (GitHub, справочная документация), она такая же, как версия 1.1.0, т.е. body_as_str() или же body_as_json(). Но клиент изменился - есть EventHubProducerClient и EventHubConsumerClientв новой версии. Чтобы распечатать тело полученного события:

from azure.eventhub import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

client = EventHubConsumerClient.from_connection_string(
        connection_str, consumer_group, eventhub_name=eventhub_name
    )

def on_event_batch(partition_context, events):
    partition_context.update_checkpoint()
    for e in events:
        print(e.body_as_str())

with client:
    client.receive_batch(
        on_event_batch=on_event_batch,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )
Другие вопросы по тегам