Emitter <>. Send в SmallRye Reactive Messaging не отправляется в Kotlin через брокера AMQP с Quarkus
В настоящее время я пытаюсь написать "службу уведомлений" на основе Maven, Quarkus и SmallRye Reactive Messaging в Kotlin. В качестве основы у меня есть пример на Java, который отлично работает, и я пытался "перевести" его на Kotlin.
Я хочу, чтобы он работал следующим образом: я отправляю HTTP-запрос (например, GET http://localhost:8080/search/{word}), и система отправляет "слово" (здесь строка) в запросы очереди. 'брокера сообщений Artemis AMQP. Другая система подписывается на брокера сообщений и выбирает "слово" из очереди "запросы" по HTTP-запросу (например, GET http://localhost:8080/receiver).
Однако в Kotlin это не работает, и я предполагаю, что Emitter не отправляет "слово", в отличие от Java.
Вот код, который я использую:
Котлин
Отправка
import io.smallrye.reactive.messaging.annotations.Emitter
import io.smallrye.reactive.messaging.annotations.Stream
import javax.ws.rs.GET
import javax.ws.rs.Path
import javax.ws.rs.PathParam
@Path("/search")
class ExampleService {
@Stream("queries")
val queryEmitter: Emitter<String>? = null
@GET
@Path("/{word}")
fun search(@PathParam("word") word: String?): String {
println("about to send word: " + word!!)
if (word.isNotEmpty()) {
var qE=queryEmitter?.send(word)
println("Emitter return : $qE")
return word
}
return "word was empty"
}
}
Получение
import org.eclipse.microprofile.reactive.messaging.Incoming
import javax.ws.rs.GET
import javax.ws.rs.Path
import javax.ws.rs.Produces
import javax.ws.rs.core.MediaType
@Path("/receiver")
class AdsResource {
var word : String = "nothing happened so far"
@GET
@Produces(MediaType.TEXT_PLAIN)
fun getWords(): String {
return word
}
@Incoming("sink")
fun consume(message: String) {
println("got user query: $message")
word = message
}
}
А вот и Java- версия
Отправка
import io.smallrye.reactive.messaging.annotations.Emitter;
import io.smallrye.reactive.messaging.annotations.Stream;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
@Path("/search")
public class SearchEndpoint {
@Stream("queries")
Emitter<String> queryEmitter;
@GET
@Path("/{word}")
public String search(@PathParam("word") String word) {
System.out.println("about to send word: " + word);
if (!word.isEmpty()) {
Emitter<String> qE = queryEmitter.send(word);
System.out.println("Emitter return: " + qE);
return word;
}
return "word was empty" ;
}
}
Получение
import org.eclipse.microprofile.reactive.messaging.Incoming;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
@Path("/receiver")
public class AdsResource {
private String word = "";
@GET
public String getAd() {
System.out.println("got user query: " + word);
return word;
}
@Incoming("sink")
public void consume(String message) {
System.out.println("got user query: " + message);
word = message;
}
}
Здесь находятся файлы конфигурации application.properties для Kotlin и Java.
# Configures the AMQP broker credentials.
amqp-username=quarkus
amqp-password=quarkus
# Configure the AMQP connector to write to the `queries ` address
mp.messaging.outgoing.queries.connector=smallrye-amqp
mp.messaging.outgoing.queries.address=sink
mp.messaging.outgoing.queries.durable=true
# Configure the AMQP connector to read from the `queries ` queue
mp.messaging.incoming.sink.connector=smallrye-amqp
mp.messaging.incoming.sink.durable=true
Немного информации:
- Брокер сообщений AMQP, который я запустил через docker-compose, основан на этом руководстве.
- Реактивный обмен сообщениями Smallrye
Заранее спасибо и дайте мне знать, если я пропустил информацию.
1 ответ
Проблема сводится к тому, где Котлин добавляет @Stream
аннотация в байт-коде.
По сути, чтобы решить вашу проблему, вам необходимо заменить:
@Stream("queries")
с участием
@field: Stream("queries")