Получение сбоя обратного вызова для производителя в rabbitmq, когда обратное давление срабатывает
Я хотел выяснить сообщения о сбоях для моих производителей rabbitmq, используя некоторые функции обратного вызова api. Я настроил rabbitmq с помощью [{rabbit, [{vm_memory_high_watermark, 0.001}]}]. и попытался отправить много сообщений, но все сообщения принимаются, и позже появляется TimeoutException, а сообщения не отправляются в очередьenter code here
Пожалуйста, скажите мне, как захватить это.
Код для отправки сообщения:
// #create-sink - producer
final Sink<ByteString, CompletionStage<Done>> amqpSink =
AmqpSink.createSimple(
AmqpSinkSettings.create(connectionProvider)
.withRoutingKey(AkkaConstants.queueName)
.withDeclaration(queueDeclaration));
// #run-sink
//final List<String> input = Arrays.asList("one", "two", "three", "four", "five");
//Source.from(input).map(ByteString::fromString).runWith(amqpSink, materializer);
String filePath = "D:\\subrata\\code\\akkaAmqpTest-master\\akkaAmqpTest-master\\logs2\\dummy.txt";
Path path = Paths.get(filePath);
// List containing 78198 individual message
List<String> contents = Files.readAllLines(path);
System.out.println("********** file reading done ....");
int times = 5;
// Send 78198*times message to Queue [From console i can see 400000 number of messages being sent]
for(int i=0;i<times;i++) {
Source.from(contents).map(ByteString::fromString).runWith(amqpSink, materializer);
}
System.out.println("************* sending to queue is done");
1 ответ
К сожалению, в настоящее время это не поддерживается из коробки. В идеале производитель должен быть смоделирован как Flow
который будет отправлять все входящие сообщения брокеру AMQP и отправлять то же сообщение с результатом, если оно было успешно отправлено брокеру или нет. Есть билет для отслеживания этого возможного улучшения в системе отслеживания проблем Alpakka.