Spring Integration отправляет сообщения Исполнителю в транзакции
У меня огромное количество сообщений из файлов CSV, которые затем отправляются в API с ограниченной скоростью. Я использую канал очереди, поддерживаемый хранилищем сообщений канала базы данных, чтобы сделать сообщения устойчивыми во время обработки. Я хочу максимально приблизиться к пределу скорости, поэтому мне нужно отправлять сообщения в API через несколько потоков.
У меня в голове было то, как это должно работать, что-то читает DB, видит, какие сообщения доступны, а затем делегирует каждое сообщение одному из потоков, которые будут обработаны в транзакции.
Но я не смог этого сделать, мне пришлось иметь транзакционный опросчик, который имеет пул потоков из N потоков, фиксированную скорость, скажем, 5 секунд, и максимальное количество сообщений на опрос 10 (что-то больше, чем то, что может быть обработано за 5 секунд)... который работает нормально, но имеет проблемы, когда не так много ожидающих сообщений (то есть, если бы было 10 сообщений, они были бы обработаны одним потоком), это не будет проблема на практике, потому что у нас будет 1000 сообщений. Но это кажется концептуально более сложным, чем, как я думал, это должно работать.
Возможно, я не очень хорошо объяснил это, но, похоже, что может быть распространенной проблемой, когда сообщения приходят быстро, но выходят медленнее?
1 ответ
Ваше решение действительно правильное, но вы должны думать, не сдвигайте сообщения в Exectuor
с этого пути вы выпрыгиваете за границы транзакции.
Тот факт, что у вас есть 10 сообщений, обработанных в одном и том же потоке, является подробностями реализации, и это выглядит так:
AbstractPollingEndpoint.this.taskExecutor.execute(() -> {
int count = 0;
while (AbstractPollingEndpoint.this.initialized
&& (AbstractPollingEndpoint.this.maxMessagesPerPoll <= 0
|| count < AbstractPollingEndpoint.this.maxMessagesPerPoll)) {
try {
if (!Poller.this.pollingTask.call()) {
break;
}
count++;
}
Итак, мы опрашиваем сообщения до maxMessagesPerPoll
в той же теме.
Чтобы сделать его действительно более параллельным и в то же время сохранять транзакции, не теряйте сообщения, которые вы должны использовать fixedRate
:
/**
* Specify whether the periodic interval should be measured between the
* scheduled start times rather than between actual completion times.
* The latter, "fixed delay" behavior, is the default.
*/
public void setFixedRate(boolean fixedRate)
И увеличить количество нитей, используемых TaskScheduler
для голосования. Вы можете сделать это, объявив ThreadPoolTaskScheduler
боб с именем как IntegrationContextUtils.TASK_SCHEDULER_BEAN_NAME
переопределить файл по умолчанию с пулом как 10
, Или используйте глобальные свойства, чтобы просто переопределить размер пула в этом по умолчанию TaskScheduler
: https://docs.spring.io/spring-integration/docs/5.0.6.RELEASE/reference/html/configuration.html