SmallRye Reactive Messaging 的 Emitter<>.send 不会通过带有 Quarkus 的 AMQP 代理在 Kotlin 中发送
SmallRye Reactive Messaging's Emitter<>.send doesn't send in Kotlin via AMQP broker with Quarkus
目前,我正在尝试在 Kotlin 中编写基于 Maven、Quarkus 和 SmallRye Reactive Messaging 的 'notification service'。
作为基础,我在 Java 中有一个工作正常的示例,我试图将它 "translate" 到 Kotlin 中。
我希望它工作的方式是,我发送一个 HTTP 请求(例如 GET http://localhost:8080/search/{word}),系统将 'word'(这里是一个字符串)发送到队列'queries' 的 Artemis AMQP 消息代理。
另一个系统订阅消息代理并根据 HTTP 请求(例如 GET http://localhost:8080/receiver)在队列 'queries' 中获取 'word'。
然而,在 Kotlin 中,它不起作用,我最好的猜测是,与 Java 不同,发射器不会发送 'word'。
这里是我使用的代码:
科特林
发送中
import io.smallrye.reactive.messaging.annotations.Emitter
import io.smallrye.reactive.messaging.annotations.Stream
import javax.ws.rs.GET
import javax.ws.rs.Path
import javax.ws.rs.PathParam
@Path("/search")
class ExampleService {
@Stream("queries")
val queryEmitter: Emitter<String>? = null
@GET
@Path("/{word}")
fun search(@PathParam("word") word: String?): String {
println("about to send word: " + word!!)
if (word.isNotEmpty()) {
var qE=queryEmitter?.send(word)
println("Emitter return : $qE")
return word
}
return "word was empty"
}
}
接收
import org.eclipse.microprofile.reactive.messaging.Incoming
import javax.ws.rs.GET
import javax.ws.rs.Path
import javax.ws.rs.Produces
import javax.ws.rs.core.MediaType
@Path("/receiver")
class AdsResource {
var word : String = "nothing happened so far"
@GET
@Produces(MediaType.TEXT_PLAIN)
fun getWords(): String {
return word
}
@Incoming("sink")
fun consume(message: String) {
println("got user query: $message")
word = message
}
}
这里是 Java 版本
发送中
import io.smallrye.reactive.messaging.annotations.Emitter;
import io.smallrye.reactive.messaging.annotations.Stream;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
@Path("/search")
public class SearchEndpoint {
@Stream("queries")
Emitter<String> queryEmitter;
@GET
@Path("/{word}")
public String search(@PathParam("word") String word) {
System.out.println("about to send word: " + word);
if (!word.isEmpty()) {
Emitter<String> qE = queryEmitter.send(word);
System.out.println("Emitter return: " + qE);
return word;
}
return "word was empty" ;
}
}
接收
import org.eclipse.microprofile.reactive.messaging.Incoming;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
@Path("/receiver")
public class AdsResource {
private String word = "";
@GET
public String getAd() {
System.out.println("got user query: " + word);
return word;
}
@Incoming("sink")
public void consume(String message) {
System.out.println("got user query: " + message);
word = message;
}
}
此处为 Kotlin 和 Java
的配置文件“application.properties”
# Configures the AMQP broker credentials.
amqp-username=quarkus
amqp-password=quarkus
# Configure the AMQP connector to write to the `queries ` address
mp.messaging.outgoing.queries.connector=smallrye-amqp
mp.messaging.outgoing.queries.address=sink
mp.messaging.outgoing.queries.durable=true
# Configure the AMQP connector to read from the `queries ` queue
mp.messaging.incoming.sink.connector=smallrye-amqp
mp.messaging.incoming.sink.durable=true
一些信息:
- 我 运行 到 docker 的 AMQP 消息代理-基于此 guide.
编写
- Smallrye Reactive Messaging
提前致谢,如果我错过了提供信息,请告诉我。
问题归结为 Kotlin 在字节码中添加 @Stream
注释的位置。
基本上要解决您的问题,您需要更换:
@Stream("queries")
和
@field: Stream("queries")
目前,我正在尝试在 Kotlin 中编写基于 Maven、Quarkus 和 SmallRye Reactive Messaging 的 'notification service'。 作为基础,我在 Java 中有一个工作正常的示例,我试图将它 "translate" 到 Kotlin 中。
我希望它工作的方式是,我发送一个 HTTP 请求(例如 GET http://localhost:8080/search/{word}),系统将 'word'(这里是一个字符串)发送到队列'queries' 的 Artemis AMQP 消息代理。 另一个系统订阅消息代理并根据 HTTP 请求(例如 GET http://localhost:8080/receiver)在队列 'queries' 中获取 'word'。
然而,在 Kotlin 中,它不起作用,我最好的猜测是,与 Java 不同,发射器不会发送 'word'。
这里是我使用的代码:
科特林
发送中
import io.smallrye.reactive.messaging.annotations.Emitter
import io.smallrye.reactive.messaging.annotations.Stream
import javax.ws.rs.GET
import javax.ws.rs.Path
import javax.ws.rs.PathParam
@Path("/search")
class ExampleService {
@Stream("queries")
val queryEmitter: Emitter<String>? = null
@GET
@Path("/{word}")
fun search(@PathParam("word") word: String?): String {
println("about to send word: " + word!!)
if (word.isNotEmpty()) {
var qE=queryEmitter?.send(word)
println("Emitter return : $qE")
return word
}
return "word was empty"
}
}
接收
import org.eclipse.microprofile.reactive.messaging.Incoming
import javax.ws.rs.GET
import javax.ws.rs.Path
import javax.ws.rs.Produces
import javax.ws.rs.core.MediaType
@Path("/receiver")
class AdsResource {
var word : String = "nothing happened so far"
@GET
@Produces(MediaType.TEXT_PLAIN)
fun getWords(): String {
return word
}
@Incoming("sink")
fun consume(message: String) {
println("got user query: $message")
word = message
}
}
这里是 Java 版本
发送中
import io.smallrye.reactive.messaging.annotations.Emitter;
import io.smallrye.reactive.messaging.annotations.Stream;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
@Path("/search")
public class SearchEndpoint {
@Stream("queries")
Emitter<String> queryEmitter;
@GET
@Path("/{word}")
public String search(@PathParam("word") String word) {
System.out.println("about to send word: " + word);
if (!word.isEmpty()) {
Emitter<String> qE = queryEmitter.send(word);
System.out.println("Emitter return: " + qE);
return word;
}
return "word was empty" ;
}
}
接收
import org.eclipse.microprofile.reactive.messaging.Incoming;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
@Path("/receiver")
public class AdsResource {
private String word = "";
@GET
public String getAd() {
System.out.println("got user query: " + word);
return word;
}
@Incoming("sink")
public void consume(String message) {
System.out.println("got user query: " + message);
word = message;
}
}
此处为 Kotlin 和 Java
的配置文件“application.properties”# Configures the AMQP broker credentials.
amqp-username=quarkus
amqp-password=quarkus
# Configure the AMQP connector to write to the `queries ` address
mp.messaging.outgoing.queries.connector=smallrye-amqp
mp.messaging.outgoing.queries.address=sink
mp.messaging.outgoing.queries.durable=true
# Configure the AMQP connector to read from the `queries ` queue
mp.messaging.incoming.sink.connector=smallrye-amqp
mp.messaging.incoming.sink.durable=true
一些信息:
- 我 运行 到 docker 的 AMQP 消息代理-基于此 guide. 编写
- Smallrye Reactive Messaging
提前致谢,如果我错过了提供信息,请告诉我。
问题归结为 Kotlin 在字节码中添加 @Stream
注释的位置。
基本上要解决您的问题,您需要更换:
@Stream("queries")
和
@field: Stream("queries")