如何使用 micronaut 流式传输来自 JPA 的数据流?
How can a stream of data from JPA be streamed with micronaut?
我目前开始使用 micronaut 和 kotlin。我的 JPA 查询产生了大约 100 万个结果。
这些结果我想从这个 micronaut 服务流式传输到另一个。
我的查询 returns allQuery.resultStream
类型 java.util.stream
。
发送服务的控制者:
@Get("/test{value1,value2,value3}")
fun getTestObjects(
value1: String,
value2: String,
value3: String
): Stream<TestObject> {
val entries = testRepository.findAllWhere(value1, value2, value3)
return entries
}
接受服务的客户:
@Get("/data/test{value1,value2,value3}")
override fun getTestObjects(alue1: String,
value2: String,
value3: String) : Stream<TestObject>
JPA 查询如下所示:
val cb = entityManager.criteriaBuilder
val cq = cb.createQuery(TestObject::class.java)
val rootEntry = cq.from(TestObject::class.java)
val predicates = mutableListOf<Predicate>()
predicates.add(cb.like(rootEntry.get<String>("value1"), value1))
predicates.add(cb.equal(rootEntry.get<String>("value2"), value2))
predicates.add(cb.equal(rootEntry.get<Int>("value3"), value3))
val cqAllWhere = cq.select(rootEntry)
.where(cb.or(*predicates.toTypedArray()))
val allQuery = entityManager.createQuery(cqAllWhere)
val entries = allQuery.resultStream
return entries
我的预期输出将是一种具有回推规则的 Flowable,并且没有发送服务首先将所有对象放入内存,因为那么多内存将不可用。
基本上您只需要创建一个 Flowable
并在项目可用时发出它们。
return Flowable.create(emitter -> {
//loop through result set
//for each item
emitter.onNext(item);
//If you encounter an error
emitter.onError(...);
//When you're done
emitter.onComplete();
})
我目前开始使用 micronaut 和 kotlin。我的 JPA 查询产生了大约 100 万个结果。 这些结果我想从这个 micronaut 服务流式传输到另一个。
我的查询 returns allQuery.resultStream
类型 java.util.stream
。
发送服务的控制者:
@Get("/test{value1,value2,value3}")
fun getTestObjects(
value1: String,
value2: String,
value3: String
): Stream<TestObject> {
val entries = testRepository.findAllWhere(value1, value2, value3)
return entries
}
接受服务的客户:
@Get("/data/test{value1,value2,value3}")
override fun getTestObjects(alue1: String,
value2: String,
value3: String) : Stream<TestObject>
JPA 查询如下所示:
val cb = entityManager.criteriaBuilder
val cq = cb.createQuery(TestObject::class.java)
val rootEntry = cq.from(TestObject::class.java)
val predicates = mutableListOf<Predicate>()
predicates.add(cb.like(rootEntry.get<String>("value1"), value1))
predicates.add(cb.equal(rootEntry.get<String>("value2"), value2))
predicates.add(cb.equal(rootEntry.get<Int>("value3"), value3))
val cqAllWhere = cq.select(rootEntry)
.where(cb.or(*predicates.toTypedArray()))
val allQuery = entityManager.createQuery(cqAllWhere)
val entries = allQuery.resultStream
return entries
我的预期输出将是一种具有回推规则的 Flowable,并且没有发送服务首先将所有对象放入内存,因为那么多内存将不可用。
基本上您只需要创建一个 Flowable
并在项目可用时发出它们。
return Flowable.create(emitter -> {
//loop through result set
//for each item
emitter.onNext(item);
//If you encounter an error
emitter.onError(...);
//When you're done
emitter.onComplete();
})