Как получить все сообщения из очереди SQS
Я использую SQS для чтения данных. Но я не уверен, как читать все данные из очереди.
public List<Customer> getMessage() {
int numberOfMessages= getMessageCount();
System.out.println(numberOfMessages);
int count=0;
while(count<10) {
System.out.println("Messages remaining in the queue-
>>>"+numberOfMessages);
System.out.println("Recieving Messages from the Queue: ");
final ReceiveMessageRequest receiveMessageRequest =
new ReceiveMessageRequest(queueURL)
.withMaxNumberOfMessages(10)
.withWaitTimeSeconds(20);
final List<com.amazonaws.services.sqs.model.Message> customers =
sqs.receiveMessage(receiveMessageRequest).getMessages();
for(com.amazonaws.services.sqs.model.Message cust: customers) {
System.out.println("Current message number->>>>>"+(count+1));
System.out.println(cust.getBody());
sqs.deleteMessage(new DeleteMessageRequest(queueURL,
cust.getReceiptHandle()));
count++;
}
//numberOfMessages=getMessageCount();
}
return null;
}
public int getMessageCount() {
Set<String> attrs = new HashSet<String>();
attrs.add("ApproximateNumberOfMessages");
CreateQueueRequest createQueueRequest = new CreateQueueRequest().withQueueName("sampleQueueSharma");
GetQueueAttributesRequest a = new GetQueueAttributesRequest().withQueueUrl(sqs.createQueue(createQueueRequest).getQueueUrl()).withAttributeNames(attrs);
Map<String,String> result = sqs.getQueueAttributes(a).getAttributes();
int num = Integer.parseInt(result.get("ApproximateNumberOfMessages"));
return num;
}
Я читаю данные таким образом, но это кажется неправильным.
Я тоже пробовал заменить while(count<10)
с while(numberOfMessages>0)
и раскомментируя numberOfMessages=getMessageCount()
эта строка, но при этом код выполняется бесконечно. Кажется, что он всегда возвращает значение больше 1.
Может ли кто-нибудь помочь мне с этим?
2 ответа
Используется ListIterator в сообщениях
List<com.amazonaws.services.sqs.model.Message> messages = amazonSQS.receiveMessage(receiveMessageRequest).getMessages();
ListIterator<Message> messageListIterator = messages.listIterator();
List<String> message=new ArrayList<>();
while (messageListIterator.hasNext()){
Message msg= messageListIterator.next();
message.add(msg.getBody());
}
Сначала несколько заметок:
- С помощью
count
как вы, вы читаете только около 10 сообщений (может быть немного больше из-за пакетирования). Вы, вероятно, не хотите использовать это после простой стадии проверки концепции - С помощью
while (numberOfMessages > 0)
вы будете читать до тех пор, пока SQS приблизит счетчик сообщений, что у него есть сообщения. Обратите внимание, что это приблизительное значение, поэтому не следует полагаться на его точность (в конечном итоге это будет согласованно). - Ваш
getMessageCount()
Похоже, что метод пытается воссоздать очередь каждый раз, когда он вызывается - пока это будет работать, вам не нужно этого делать. Создайте его один раз и просто используйте.
На основании кода, который я вижу, getMessageCount()
вернется > 1
если (а) у вас просто куча сообщений, (б) кто-то постоянно добавляет сообщения в очередь (или (в) если вы не удаляете их должным образом, но делаете это).
Я бы предложил следующие модификации вашего кода:
- Зарегистрируйте результат
getMessageCount()
каждый раз это называется. Это даст вам указание, если вы помещаете сообщения в свою очередь быстрее, чем вы можете их обработать, или у вас есть источник сообщений, который никогда не закончится. - Зарегистрируйте количество сообщений, полученных вашим
ReceiveMessageRequest
, Это даст вам знать, что вы действительно обрабатываете сообщения. - Вместо того, чтобы основывать свой поток управления на значении
getMessageCount()
звони покаReceiveMessageRequest
результат (с waitTimeSeconds=20) возвращает 0 сообщений - это гарантия того, что ваша очередь в этот момент пуста (вместо приблизительного значения).