Подключение EventHubConsumerClient к Azure IoTHub не работает с управляемой идентификацией

Я пытаюсь использовать сообщения IoTHub с помощью EventHubConsumerClient в python. (Я также пробовал csharp EventProcessorClient, похоже, с той же проблемой)

Я запускаю EventHubConsumerClient на виртуальной машине.

Настройка:

  • Центр Интернета вещей доступен из Интернета.
  • Пользовательский идентификатор, добавленный в виртуальную машину.
  • Назначенное пользователем удостоверение имеет роль «Читатель данных Центра Интернета вещей» с областью действия, установленной на Центр Интернета вещей.
  • BlobCheckpointStore подключен к хранилищу BLOB-объектов, аутентифицирован с помощью управляемого удостоверения (я проверил, работает)
  • Регион Западная Европа

Все работает нормально, если я использую строку подключения концентратора событий для подключения к Центру Интернета вещей. Но когда я использую учетные данные управляемой идентификации, я получаю следующую ошибку:

      evtconsumer    | INFO:uamqp.c_uamqp:Token put complete with result: 3, status: 401, description: b'InvalidIssuer: Token issuer is invalid. TrackingId:0315ff67-60c5-4bb2-ba6d-160f45eb91eb, SystemTracker:NoSystemTracker, Timestamp:2021-09-15T10:30:05', connection: b'a44766f1-5d50-4d61-958e-52f7529315a4'
evtconsumer    | INFO:uamqp.authentication.cbs_auth:Authentication status: 401, description: b'InvalidIssuer: Token issuer is invalid. TrackingId:0315ff67-60c5-4bb2-ba6d-160f45eb91eb, SystemTracker:NoSystemTracker, Timestamp:2021-09-15T10:30:05'
evtconsumer    | INFO:uamqp.authentication.cbs_auth:Authentication Put-Token failed. Retrying.

Мой код:

      import logging
import os
import sys
from azure.identity import ManagedIdentityCredential
from azure.eventhub import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore

logging.basicConfig(stream = sys.stdout, level = logging.INFO)
_logger = logging.getLogger()

azure_client_id = os.getenv("AZURE_CLIENT_ID")
evthubnamespace = os.getenv("IOTHUB_EVTHUB_FULLY_QUALIFIED_NAMESPACE")
evthubname = os.getenv("IOTHUB_EVTHUB_NAME")
evthubconnectionstring = os.getenv("IOTHUB_EVTHUB_CONNECTION_STRING")
blob_account_url = os.getenv("BLOB_ACCOUNT_URL")
blob_container_name = os.getenv("BLOB_CONTAINER_NAME")

# for toggling between authentication methods:
use_connection_string = os.getenv("USE_CONNECTION_STRING") == "true"

credential = ManagedIdentityCredential(client_id=azure_client_id)

def on_event(partition_context, event):
    # Print the event data.
    _logger.info("Received the event: \"{}\" from the partition with ID: \"{}\"".format(event.body_as_str(encoding='UTF-8'), partition_context.partition_id))

    # Update the checkpoint so that the program doesn't read the events
    # that it has already read when you run it next time.
    partition_context.update_checkpoint(event)

def main():
    # Create an Azure blob checkpoint store to store the checkpoints.
    checkpoint_store = BlobCheckpointStore(
        credential=credential,
        blob_account_url=blob_account_url,
        container_name=blob_container_name)

    if use_connection_string:
        # this works fine
        _logger.info("Using connection string")
        client = EventHubConsumerClient.from_connection_string(
            evthubconnectionstring,
            consumer_group="$Default",
            checkpoint_store=checkpoint_store)
    else:
        # This causes errors
        _logger.info(f"Using managed identity. fully_qualified_namespace: {evthubnamespace} eventhub_name: {evthubname}")
        client = EventHubConsumerClient(
            fully_qualified_namespace=evthubnamespace,
            eventhub_name=evthubname,
            consumer_group="$Default",
            checkpoint_store=checkpoint_store,
            credential=credential)
    with client:
        # Call the receive method. Read from the beginning of the partition (starting_position: "-1")
        client.receive(on_event=on_event)

if __name__ == '__main__':
    main()

У меня совсем нет идей по этому поводу. Кажется, что интерфейс концентратора событий AMQP концентратора IoT не принимает токены, сгенерированные из учетных данных управляемой идентификации?

0 ответов