Как получить данные из определенной группы потребителей в концентраторе событий

Я не могу подписаться на определенную группу потребителей в концентраторе событий, я создал группу потребителей, но все же я получаю данные из группы потребителей $Default, которую я не хочу, потому что у меня несколько арендаторов, и я хочу дифференцироваться на основе группы потребителей,

этот

поэтому в соответствии с tenant-1234 и tenant-1235 на основе я хочу публиковать и подписывать данные, но когда я пытаюсь подписаться на основе, скажем, tenant-1234. Я также получаю данные из группы потребителей $Default, которые мне не нужны.

var maxEventTrigger: Long = Constants.maxEventTrigger.toLong;
val customEventhubParameters = EventHubsConf(connStr).setMaxEventsPerTrigger(maxEventTrigger);
customEventhubParameters.setConsumerGroup("tenant-1234")
val incomingStream = spark.readStream.format("eventhubs").options(customEventhubParameters.toMap).load();
logger.info("Data has been fetched from event hub successfully");
val messages = incomingStream.withColumn("Offset", $"offset".cast(LongType)).withColumn("Time (readable)", $"enqueuedTime".cast(TimestampType)).withColumn("Timestamp", $"enqueuedTime".cast(LongType)).withColumn("Body", $"body".cast(StringType)).select("Offset", "Time (readable)", "Timestamp", "Body")

Приведенный выше код - это клиент tenant-1234, который все еще читает сообщение из группы потребителей $Default, но ему следует прочитать сообщение из группы потребителей tenant-1234.

Заранее спасибо!

0 ответов