Продление автоматической блокировки службы Azure не работает с несколькими репликами потребителя.
У меня есть прослушиватель служебной шины Azure, разработанный с помощью библиотеки Spring Cloud Azure (com.azure.spring:spring-cloud-azure-starter-integration-servicebus:4.5.0).
TTL, равный 5 минутам, настроен на уровне очереди. Продолжительность обновления автоматической блокировки настроена на 10 минут на уровне потребителя.
Функция автоматического обновления блокировки работает нормально, если обработка сообщения занимает более 5 минут с одной репликой потребителя.
Если я увеличу количество реплик для потребителя до 2, вторая реплика подберет сообщение для обработки, пока первая реплика обновляет блокировку.
Может кто-нибудь, пожалуйста, помогите разобраться в проблеме и предложите необходимое решение.
Спасибо,
Ниже приведены детали проекта и кода.
build.gradle
------------
implementation('org.springframework.boot:spring-boot-starter-web')
implementation('com.azure.spring:spring-cloud-azure-starter-integration-servicebus:4.5.0')
implementation group: 'com.azure', name: 'azure-storage-blob', version: '12.7.0'
implementation("com.splunk.logging:splunk-library-javalogging:1.8.0")
implementation group: 'com.opencsv', name: 'opencsv', version: '5.5.2'
implementation('org.springframework.boot:spring-boot-starter-data-mongodb')
Конфигурация
private static final String QUEUE_1_LISTENER_CONTAINER = "queue-listener-container-queue-1";
private static final String QUEUE_2_LISTENER_CONTAINER = "queue-listener-container-queue-2";
private static final String QUEUE_3_LISTENER_CONTAINER = "queue-listener-container-queue-3";
private static final String QUEUE_4_LISTENER_CONTAINER = "queue-listener-container-queue-4";
private static final String QUEUE_5_LISTENER_CONTAINER = "queue-listener-container-queue-5";
private static final String INPUT_CHANNEL = "queue.input";
@Bean(QUEUE_1_LISTENER_CONTAINER)
public ServiceBusMessageListenerContainer messageListenerContainerqueue1(ServiceBusProcessorFactory processorFactory, @Value("${queue1.queue.name}") String queueName, @Value("${azure.servicebus.max.auto.lock.renew.duration}") int maxAutoLockRenewDuration) {
ServiceBusContainerProperties containerProperties = getContainerProperties(queueName,maxAutoLockRenewDuration);
return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
}
@Bean(QUEUE_2_LISTENER_CONTAINER)
public ServiceBusMessageListenerContainer messageListenerContainerQueue2(ServiceBusProcessorFactory processorFactory, @Value("${queue2.queue.name}") String queueName, @Value("${azure.servicebus.max.auto.lock.renew.duration}") int maxAutoLockRenewDuration) {
ServiceBusContainerProperties containerProperties = getContainerProperties(queueName, maxAutoLockRenewDuration);
return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
}
@Bean(QUEUE_3_LISTENER_CONTAINER)
public ServiceBusMessageListenerContainer messageListenerContainerQueue3(ServiceBusProcessorFactory processorFactory, @Value("${queue3.queue.name}") String queueName, @Value("${azure.servicebus.max.auto.lock.renew.duration}") int maxAutoLockRenewDuration) {
ServiceBusContainerProperties containerProperties = getContainerProperties(queueName, maxAutoLockRenewDuration);
return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
}
@Bean(QUEUE_4_LISTENER_CONTAINER)
public ServiceBusMessageListenerContainer messageListenerContainerQueue4(ServiceBusProcessorFactory processorFactory, @Value("${queue4.queue.name}") String queueName, @Value("${azure.servicebus.max.auto.lock.renew.duration}") int maxAutoLockRenewDuration) {
ServiceBusContainerProperties containerProperties = getContainerProperties(queueName, maxAutoLockRenewDuration);
return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
}
@Bean(QUEUE_5_LISTENER_CONTAINER)
public ServiceBusMessageListenerContainer messageListenerContainerQueue5(ServiceBusProcessorFactory processorFactory, @Value("${queue5.queue.name}") String queueName, @Value("${azure.servicebus.max.auto.lock.renew.duration}") int maxAutoLockRenewDuration) {
ServiceBusContainerProperties containerProperties = getContainerProperties(queueName, maxAutoLockRenewDuration);
return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
}
@Bean
public ServiceBusInboundChannelAdapter queueMessageChannelAdapterQueue1(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
@Qualifier(QUEUE_1_LISTENER_CONTAINER) ServiceBusMessageListenerContainer listenerContainer) {
ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
adapter.setOutputChannel(inputChannel);
return adapter;
}
@Bean
public ServiceBusInboundChannelAdapter queueMessageChannelAdapterQueue2(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
@Qualifier(QUEUE_2_LISTENER_CONTAINER) ServiceBusMessageListenerContainer listenerContainer) {
ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
adapter.setOutputChannel(inputChannel);
return adapter;
}
@Bean
public ServiceBusInboundChannelAdapter queueMessageChannelAdapterQueue3(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
@Qualifier(QUEUE_3_LISTENER_CONTAINER) ServiceBusMessageListenerContainer listenerContainer) {
ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
adapter.setOutputChannel(inputChannel);
return adapter;
}
@Bean
public ServiceBusInboundChannelAdapter queueMessageChannelAdapterQueue4(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
@Qualifier(QUEUE_4_LISTENER_CONTAINER) ServiceBusMessageListenerContainer listenerContainer) {
ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
adapter.setOutputChannel(inputChannel);
return adapter;
}
@Bean
public ServiceBusInboundChannelAdapter queueMessageChannelAdapterQueue5(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
@Qualifier(QUEUE_5_LISTENER_CONTAINER) ServiceBusMessageListenerContainer listenerContainer) {
ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
adapter.setOutputChannel(inputChannel);
return adapter;
}
@Bean(name = INPUT_CHANNEL)
public MessageChannel input() {
return new DirectChannel();
}
private ServiceBusContainerProperties getContainerProperties(String queueName, int maxAutoLockRenewDuration) {
ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
containerProperties.setReceiveMode(ServiceBusReceiveMode.PEEK_LOCK);
containerProperties.setMaxAutoLockRenewDuration(Duration.ofMinutes(maxAutoLockRenewDuration));
containerProperties.setAutoComplete(true);
containerProperties.setEntityName(queueName);
return containerProperties;
}
listener
--------
@ServiceActivator(inputChannel = INPUT_CHANNEL)
public void messageReceiver(byte[] payload) {
String message = new String(payload);
try {
AzureBlobCreatedMessage azureBlobCreatedMessage = new ObjectMapper().readValue(message, AzureBlobCreatedMessage.class);
String blobURL = azureBlobCreatedMessage.getData().getUrl();
BlobUrlParts parts = BlobUrlParts.parse(blobURL);
String containerName = parts.getBlobContainerName();
String fileName = parts.getBlobName();
fileProcessor.processMessage(containerName, fileName);
} catch (JsonProcessingException e) {
log.error("Unable to parse json message received {} from", message);
}
}