Продление автоматической блокировки службы Azure не работает с несколькими репликами потребителя.

У меня есть прослушиватель служебной шины Azure, разработанный с помощью библиотеки Spring Cloud Azure (com.azure.spring:spring-cloud-azure-starter-integration-servicebus:4.5.0).

TTL, равный 5 минутам, настроен на уровне очереди. Продолжительность обновления автоматической блокировки настроена на 10 минут на уровне потребителя.

Функция автоматического обновления блокировки работает нормально, если обработка сообщения занимает более 5 минут с одной репликой потребителя.

Если я увеличу количество реплик для потребителя до 2, вторая реплика подберет сообщение для обработки, пока первая реплика обновляет блокировку.

Может кто-нибудь, пожалуйста, помогите разобраться в проблеме и предложите необходимое решение.

Спасибо,

Ниже приведены детали проекта и кода.

      build.gradle
------------
implementation('org.springframework.boot:spring-boot-starter-web')
implementation('com.azure.spring:spring-cloud-azure-starter-integration-servicebus:4.5.0')
implementation group: 'com.azure', name: 'azure-storage-blob', version: '12.7.0'
implementation("com.splunk.logging:splunk-library-javalogging:1.8.0")
implementation group: 'com.opencsv', name: 'opencsv', version: '5.5.2'
implementation('org.springframework.boot:spring-boot-starter-data-mongodb')

Конфигурация

      private static final String QUEUE_1_LISTENER_CONTAINER = "queue-listener-container-queue-1";
private static final String QUEUE_2_LISTENER_CONTAINER = "queue-listener-container-queue-2";
private static final String QUEUE_3_LISTENER_CONTAINER = "queue-listener-container-queue-3";
private static final String QUEUE_4_LISTENER_CONTAINER = "queue-listener-container-queue-4";
private static final String QUEUE_5_LISTENER_CONTAINER = "queue-listener-container-queue-5";


private static final String INPUT_CHANNEL = "queue.input";

@Bean(QUEUE_1_LISTENER_CONTAINER)
public ServiceBusMessageListenerContainer messageListenerContainerqueue1(ServiceBusProcessorFactory processorFactory, @Value("${queue1.queue.name}") String queueName, @Value("${azure.servicebus.max.auto.lock.renew.duration}") int maxAutoLockRenewDuration) {
    ServiceBusContainerProperties containerProperties = getContainerProperties(queueName,maxAutoLockRenewDuration);
    return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
}

@Bean(QUEUE_2_LISTENER_CONTAINER)
public ServiceBusMessageListenerContainer messageListenerContainerQueue2(ServiceBusProcessorFactory processorFactory, @Value("${queue2.queue.name}") String queueName, @Value("${azure.servicebus.max.auto.lock.renew.duration}") int maxAutoLockRenewDuration) {
    ServiceBusContainerProperties containerProperties = getContainerProperties(queueName, maxAutoLockRenewDuration);
    return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
}

@Bean(QUEUE_3_LISTENER_CONTAINER)
public ServiceBusMessageListenerContainer messageListenerContainerQueue3(ServiceBusProcessorFactory processorFactory, @Value("${queue3.queue.name}") String queueName, @Value("${azure.servicebus.max.auto.lock.renew.duration}") int maxAutoLockRenewDuration) {
    ServiceBusContainerProperties containerProperties = getContainerProperties(queueName, maxAutoLockRenewDuration);
    return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
}

@Bean(QUEUE_4_LISTENER_CONTAINER)
public ServiceBusMessageListenerContainer messageListenerContainerQueue4(ServiceBusProcessorFactory processorFactory, @Value("${queue4.queue.name}") String queueName, @Value("${azure.servicebus.max.auto.lock.renew.duration}") int maxAutoLockRenewDuration) {
    ServiceBusContainerProperties containerProperties = getContainerProperties(queueName, maxAutoLockRenewDuration);
    return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
}

@Bean(QUEUE_5_LISTENER_CONTAINER)
public ServiceBusMessageListenerContainer messageListenerContainerQueue5(ServiceBusProcessorFactory processorFactory, @Value("${queue5.queue.name}") String queueName, @Value("${azure.servicebus.max.auto.lock.renew.duration}") int maxAutoLockRenewDuration) {
    ServiceBusContainerProperties containerProperties = getContainerProperties(queueName, maxAutoLockRenewDuration);
    return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
}


@Bean
public ServiceBusInboundChannelAdapter queueMessageChannelAdapterQueue1(
        @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
        @Qualifier(QUEUE_1_LISTENER_CONTAINER) ServiceBusMessageListenerContainer listenerContainer) {
    ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
    adapter.setOutputChannel(inputChannel);
    return adapter;
}

@Bean
public ServiceBusInboundChannelAdapter queueMessageChannelAdapterQueue2(
        @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
        @Qualifier(QUEUE_2_LISTENER_CONTAINER) ServiceBusMessageListenerContainer listenerContainer) {
    ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
    adapter.setOutputChannel(inputChannel);
    return adapter;
}

@Bean
public ServiceBusInboundChannelAdapter queueMessageChannelAdapterQueue3(
        @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
        @Qualifier(QUEUE_3_LISTENER_CONTAINER) ServiceBusMessageListenerContainer listenerContainer) {
    ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
    adapter.setOutputChannel(inputChannel);
    return adapter;
}

@Bean
public ServiceBusInboundChannelAdapter queueMessageChannelAdapterQueue4(
        @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
        @Qualifier(QUEUE_4_LISTENER_CONTAINER) ServiceBusMessageListenerContainer listenerContainer) {
    ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
    adapter.setOutputChannel(inputChannel);
    return adapter;
}

@Bean
public ServiceBusInboundChannelAdapter queueMessageChannelAdapterQueue5(
        @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
        @Qualifier(QUEUE_5_LISTENER_CONTAINER) ServiceBusMessageListenerContainer listenerContainer) {
    ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
    adapter.setOutputChannel(inputChannel);
    return adapter;
}

@Bean(name = INPUT_CHANNEL)
public MessageChannel input() {
    return new DirectChannel();
}

private ServiceBusContainerProperties getContainerProperties(String queueName, int maxAutoLockRenewDuration) {
    ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
    containerProperties.setReceiveMode(ServiceBusReceiveMode.PEEK_LOCK);
    containerProperties.setMaxAutoLockRenewDuration(Duration.ofMinutes(maxAutoLockRenewDuration));
    containerProperties.setAutoComplete(true);
    containerProperties.setEntityName(queueName);
    return containerProperties;
}






listener
--------


 @ServiceActivator(inputChannel = INPUT_CHANNEL)
public void messageReceiver(byte[] payload) {
    String message = new String(payload);
    try {
        AzureBlobCreatedMessage azureBlobCreatedMessage = new ObjectMapper().readValue(message, AzureBlobCreatedMessage.class);
        String blobURL = azureBlobCreatedMessage.getData().getUrl();
        BlobUrlParts parts = BlobUrlParts.parse(blobURL);
        String containerName = parts.getBlobContainerName();
        String fileName = parts.getBlobName();
        fileProcessor.processMessage(containerName, fileName);
    } catch (JsonProcessingException e) {
        log.error("Unable to parse json message received {} from", message);
    }
}

0 ответов

Другие вопросы по тегам