Подключение EventHubConsumerClient к Azure IoTHub не работает с управляемой идентификацией
Я пытаюсь использовать сообщения IoTHub с помощью EventHubConsumerClient в python. (Я также пробовал csharp EventProcessorClient, похоже, с той же проблемой)
Я запускаю EventHubConsumerClient на виртуальной машине.
Настройка:
- Центр Интернета вещей доступен из Интернета.
- Пользовательский идентификатор, добавленный в виртуальную машину.
- Назначенное пользователем удостоверение имеет роль «Читатель данных Центра Интернета вещей» с областью действия, установленной на Центр Интернета вещей.
- BlobCheckpointStore подключен к хранилищу BLOB-объектов, аутентифицирован с помощью управляемого удостоверения (я проверил, работает)
- Регион Западная Европа
Все работает нормально, если я использую строку подключения концентратора событий для подключения к Центру Интернета вещей. Но когда я использую учетные данные управляемой идентификации, я получаю следующую ошибку:
evtconsumer | INFO:uamqp.c_uamqp:Token put complete with result: 3, status: 401, description: b'InvalidIssuer: Token issuer is invalid. TrackingId:0315ff67-60c5-4bb2-ba6d-160f45eb91eb, SystemTracker:NoSystemTracker, Timestamp:2021-09-15T10:30:05', connection: b'a44766f1-5d50-4d61-958e-52f7529315a4'
evtconsumer | INFO:uamqp.authentication.cbs_auth:Authentication status: 401, description: b'InvalidIssuer: Token issuer is invalid. TrackingId:0315ff67-60c5-4bb2-ba6d-160f45eb91eb, SystemTracker:NoSystemTracker, Timestamp:2021-09-15T10:30:05'
evtconsumer | INFO:uamqp.authentication.cbs_auth:Authentication Put-Token failed. Retrying.
Мой код:
import logging
import os
import sys
from azure.identity import ManagedIdentityCredential
from azure.eventhub import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
logging.basicConfig(stream = sys.stdout, level = logging.INFO)
_logger = logging.getLogger()
azure_client_id = os.getenv("AZURE_CLIENT_ID")
evthubnamespace = os.getenv("IOTHUB_EVTHUB_FULLY_QUALIFIED_NAMESPACE")
evthubname = os.getenv("IOTHUB_EVTHUB_NAME")
evthubconnectionstring = os.getenv("IOTHUB_EVTHUB_CONNECTION_STRING")
blob_account_url = os.getenv("BLOB_ACCOUNT_URL")
blob_container_name = os.getenv("BLOB_CONTAINER_NAME")
# for toggling between authentication methods:
use_connection_string = os.getenv("USE_CONNECTION_STRING") == "true"
credential = ManagedIdentityCredential(client_id=azure_client_id)
def on_event(partition_context, event):
# Print the event data.
_logger.info("Received the event: \"{}\" from the partition with ID: \"{}\"".format(event.body_as_str(encoding='UTF-8'), partition_context.partition_id))
# Update the checkpoint so that the program doesn't read the events
# that it has already read when you run it next time.
partition_context.update_checkpoint(event)
def main():
# Create an Azure blob checkpoint store to store the checkpoints.
checkpoint_store = BlobCheckpointStore(
credential=credential,
blob_account_url=blob_account_url,
container_name=blob_container_name)
if use_connection_string:
# this works fine
_logger.info("Using connection string")
client = EventHubConsumerClient.from_connection_string(
evthubconnectionstring,
consumer_group="$Default",
checkpoint_store=checkpoint_store)
else:
# This causes errors
_logger.info(f"Using managed identity. fully_qualified_namespace: {evthubnamespace} eventhub_name: {evthubname}")
client = EventHubConsumerClient(
fully_qualified_namespace=evthubnamespace,
eventhub_name=evthubname,
consumer_group="$Default",
checkpoint_store=checkpoint_store,
credential=credential)
with client:
# Call the receive method. Read from the beginning of the partition (starting_position: "-1")
client.receive(on_event=on_event)
if __name__ == '__main__':
main()
У меня совсем нет идей по этому поводу. Кажется, что интерфейс концентратора событий AMQP концентратора IoT не принимает токены, сгенерированные из учетных данных управляемой идентификации?