Spring Data Redis Reactive:StreamReceiver 到协程流
Spring Data Redis Reactive: StreamReceiver to coroutines Flow
我正在编写一个使用 Spring Data Redis 与 Redis Stream 配合使用的应用程序。我将 spring-data-redis
与 Lettuce 一起使用。我可以成功写入流,因为我可以通过 redis-cli 直接在 Redis 中对其进行验证,并且我看到消息在 Redis 中。当谈到使用 StreamReceiver
从流中读取时,它有点工作,但我的协程版本测试失败。
所以,我实现了两个版本,用于阅读不同的 return 类型:
Flux<TestData>
。我使用 reactor-test
classes 测试它,类似于 Spring Data Redis 团队所做的。它工作正常:打印出收到的物品并通过测试。
Flow<TestData>
。我使用 FlowTurbine 对其进行了测试。这个测试失败了,即使收到的项目被打印出来了; FlowTurbine 刚刚超时。我尝试使用直接阻塞 Flow.toList()
而不是 Turbine 的 test
,但在这种情况下,调用只会永远阻塞。当我处理 Flow 时,我可能做错了什么。我究竟做错了什么?以及如何修复它?
TestDataRedisRepository.kt
内容:
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flatMapConcat
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.reactive.asFlow
import org.slf4j.LoggerFactory
import org.springframework.data.redis.connection.stream.MapRecord
import org.springframework.data.redis.connection.stream.RecordId
import org.springframework.data.redis.connection.stream.StreamOffset
import org.springframework.data.redis.connection.stream.StreamRecords
import org.springframework.data.redis.core.ReactiveRedisTemplate
import org.springframework.data.redis.core.addAndAwait
import org.springframework.data.redis.core.trimAndAwait
import org.springframework.data.redis.stream.StreamReceiver
import org.springframework.stereotype.Repository
import reactor.core.publisher.Flux
const val STREAM_KEY = "test-data"
@Repository
class TestDataRedisRepository(
val reactiveRedisTemplate: ReactiveRedisTemplate<String, TestData>,
val streamReceiver: StreamReceiver<String, MapRecord<String, String, String>>
) {
private val log = LoggerFactory.getLogger(this::class.java)
@FlowPreview
fun saveAll(entityStream: Flow<TestData>): Flow<RecordId> {
return entityStream
.map { toMapRecord(it) }
.flatMapConcat {
log.info("Saving record: $it")
reactiveRedisTemplate
.opsForStream<String, TestData>()
.add(it)
.asFlow()
}
}
suspend fun save(TestData: TestData): RecordId {
val record = toMapRecord(TestData)
log.info("Saving record: $record")
return reactiveRedisTemplate
.opsForStream<String, TestData>()
.addAndAwait(record)
}
private fun toMapRecord(TestData: TestData): MapRecord<String, String, String> =
StreamRecords.newRecord()
.`in`(STREAM_KEY)
.ofMap(TestData.toMap())
fun readAllAsFlux(): Flux<TestData> {
return streamReceiver
.receive(StreamOffset.fromStart(STREAM_KEY))
.doOnEach { log.info("Received stream record: $it") }
.map { it.value.fromMap() }
}
fun readAllAsFlow(): Flow<TestData> {
return streamReceiver
.receive(StreamOffset.fromStart(STREAM_KEY))
.doOnEach { log.info("Received stream record: $it") }
.map { it.value.fromMap() }
.asFlow()
}
suspend fun deleteAll() {
reactiveRedisTemplate
.opsForStream<String, TestData>()
.trimAndAwait(STREAM_KEY, 0)
}
}
测试classTestDataRedisRepositoryTest.kt
内容:
import app.cash.turbine.test
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.runBlocking
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
import reactor.test.StepVerifier
import java.time.Duration
import java.time.Instant
import kotlin.time.ExperimentalTime
@FlowPreview
@ExperimentalTime
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@SpringBootTest
internal class TestDataRedisRepositoryTest @Autowired constructor(
private val testDataRedisRepository: TestDataRedisRepository
) {
private val now: Instant = Instant.now()
@BeforeAll
fun setUp() {
runBlocking { testDataRedisRepository.deleteAll() }
}
@AfterEach
fun afterEach() {
runBlocking {
testDataRedisRepository.deleteAll()
}
}
@Test //passes
fun `test Flux`() {
runBlocking {
testDataRedisRepository.saveAll(
flowOf(
TestData(now.minusSeconds(1), "test2"),
TestData(now, "test3")
)
).toList()
}
testDataRedisRepository.readAllAsFlux().`as`(StepVerifier::create) //
.consumeNextWith {
assertThat(it).isEqualTo(TestData(now.minusSeconds(1), "test2"))
}
.consumeNextWith {
assertThat(it).isEqualTo(TestData(now, "test3"))
}
.thenCancel()
.verify(Duration.ofSeconds(1))
}
@Test //fails
fun `test Flow`() {
runBlocking {
testDataRedisRepository.saveAll(
flowOf(
TestData(now.minusSeconds(1), "test2"),
TestData(now, "test3")
)
).toList()
// val list = testDataRedisRepository.readAllAsFlow().toList() // this call blocks forever
// FlowTurbine just times out
testDataRedisRepository.readAllAsFlow()
.test {
assertThat(expectItem())
.isEqualTo(TestData(now.minusSeconds(1), "test2"))
assertThat(expectItem())
.isEqualTo(TestData(now, "test3"))
expectComplete()
}
}
}
}
我的RedisConfig.kt
内容:
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory
import org.springframework.data.redis.connection.stream.MapRecord
import org.springframework.data.redis.core.ReactiveRedisTemplate
import org.springframework.data.redis.serializer.RedisSerializationContext
import org.springframework.data.redis.serializer.RedisSerializer
import org.springframework.data.redis.stream.StreamReceiver
@Configuration
class RedisConfig {
/**
* For writing to Redis Stream
*/
@Bean
fun reactiveRedisTemplate(
factory: ReactiveRedisConnectionFactory,
serializationContext: RedisSerializationContext<String, TestData>
): ReactiveRedisTemplate<String, TestData> = ReactiveRedisTemplate(
factory,
serializationContext
)
/**
* For reading from Redis Stream
*/
@Bean
fun streamReceiver(
factory: ReactiveRedisConnectionFactory,
serializationContext: RedisSerializationContext<String, TestData>
): StreamReceiver<String, MapRecord<String, String, String>> {
return StreamReceiver.create(
factory,
StreamReceiver.StreamReceiverOptions.builder()
.serializer(serializationContext)
.build()
)
}
@Bean
fun serializationContext(): RedisSerializationContext<String, TestData> =
RedisSerializationContext.newSerializationContext<String, TestData>(
RedisSerializer.string()
).build()
}
TestData.kt
import java.time.Instant
data class TestData(
val instant: Instant,
val content: String
)
const val INSTANT = "instant"
const val CONTENT = "content"
fun TestData.toMap(): Map<String, String> {
return mapOf(
INSTANT to instant.toString(),
CONTENT to content
)
}
fun Map<String, String>.fromMap(): TestData {
return TestData(
Instant.parse(this[INSTANT]),
this[CONTENT]!!
)
}
Redis 运行 在默认端口上的 Docker 容器中。
为了完整起见,这里有 aplication.yaml
和 build.gradle.kts
:
spring:
redis:
host: localhost
port: 6379
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
plugins {
id("org.springframework.boot") version "2.5.2"
id("io.spring.dependency-management") version "1.0.11.RELEASE"
kotlin("jvm") version "1.5.20"
kotlin("plugin.spring") version "1.5.20"
}
group = "com.example"
version = "0.0.1-SNAPSHOT"
java.sourceCompatibility = JavaVersion.VERSION_11
repositories {
mavenCentral()
}
dependencies {
implementation("org.springframework.boot:spring-boot-starter-data-redis-reactive")
implementation("io.projectreactor.kotlin:reactor-kotlin-extensions")
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("io.projectreactor:reactor-test")
testImplementation("app.cash.turbine:turbine:0.5.2")
}
tasks.withType<KotlinCompile> {
kotlinOptions {
freeCompilerArgs = listOf("-Xjsr305=strict")
jvmTarget = "11"
}
}
tasks.withType<Test> {
useJUnitPlatform()
}
如果测试需要检查多个值,则调用 toList()
会导致流程等待源发出其所有值,然后 returns 这些值作为列表。请注意,这仅适用于有限数据流。
因此,在您的情况下,如果它是一个发出无限值的流,那么它将永远等待收集这些值,这就是您的测试被阻止的原因。
一种解决方案是从流中取出有限数量的项目,然后进行断言。例如,您可以执行类似以下代码的操作:
// Take the first item and cancel the flow
val firstItem = testDataRedisRepository.readAllAsFlow().first()
// Take the second item
val secondItem = testDataRedisRepository.readAllAsFlow().drop(1).first()
// Take the first five items
val firstFiveItems = testDataRedisRepository.readAllAsFlow().take(5).toList()
更多场景可以参考这篇Android开发者linkKotlin流程测试
我正在编写一个使用 Spring Data Redis 与 Redis Stream 配合使用的应用程序。我将 spring-data-redis
与 Lettuce 一起使用。我可以成功写入流,因为我可以通过 redis-cli 直接在 Redis 中对其进行验证,并且我看到消息在 Redis 中。当谈到使用 StreamReceiver
从流中读取时,它有点工作,但我的协程版本测试失败。
所以,我实现了两个版本,用于阅读不同的 return 类型:
Flux<TestData>
。我使用reactor-test
classes 测试它,类似于 Spring Data Redis 团队所做的。它工作正常:打印出收到的物品并通过测试。Flow<TestData>
。我使用 FlowTurbine 对其进行了测试。这个测试失败了,即使收到的项目被打印出来了; FlowTurbine 刚刚超时。我尝试使用直接阻塞Flow.toList()
而不是 Turbine 的test
,但在这种情况下,调用只会永远阻塞。当我处理 Flow 时,我可能做错了什么。我究竟做错了什么?以及如何修复它?
TestDataRedisRepository.kt
内容:
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flatMapConcat
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.reactive.asFlow
import org.slf4j.LoggerFactory
import org.springframework.data.redis.connection.stream.MapRecord
import org.springframework.data.redis.connection.stream.RecordId
import org.springframework.data.redis.connection.stream.StreamOffset
import org.springframework.data.redis.connection.stream.StreamRecords
import org.springframework.data.redis.core.ReactiveRedisTemplate
import org.springframework.data.redis.core.addAndAwait
import org.springframework.data.redis.core.trimAndAwait
import org.springframework.data.redis.stream.StreamReceiver
import org.springframework.stereotype.Repository
import reactor.core.publisher.Flux
const val STREAM_KEY = "test-data"
@Repository
class TestDataRedisRepository(
val reactiveRedisTemplate: ReactiveRedisTemplate<String, TestData>,
val streamReceiver: StreamReceiver<String, MapRecord<String, String, String>>
) {
private val log = LoggerFactory.getLogger(this::class.java)
@FlowPreview
fun saveAll(entityStream: Flow<TestData>): Flow<RecordId> {
return entityStream
.map { toMapRecord(it) }
.flatMapConcat {
log.info("Saving record: $it")
reactiveRedisTemplate
.opsForStream<String, TestData>()
.add(it)
.asFlow()
}
}
suspend fun save(TestData: TestData): RecordId {
val record = toMapRecord(TestData)
log.info("Saving record: $record")
return reactiveRedisTemplate
.opsForStream<String, TestData>()
.addAndAwait(record)
}
private fun toMapRecord(TestData: TestData): MapRecord<String, String, String> =
StreamRecords.newRecord()
.`in`(STREAM_KEY)
.ofMap(TestData.toMap())
fun readAllAsFlux(): Flux<TestData> {
return streamReceiver
.receive(StreamOffset.fromStart(STREAM_KEY))
.doOnEach { log.info("Received stream record: $it") }
.map { it.value.fromMap() }
}
fun readAllAsFlow(): Flow<TestData> {
return streamReceiver
.receive(StreamOffset.fromStart(STREAM_KEY))
.doOnEach { log.info("Received stream record: $it") }
.map { it.value.fromMap() }
.asFlow()
}
suspend fun deleteAll() {
reactiveRedisTemplate
.opsForStream<String, TestData>()
.trimAndAwait(STREAM_KEY, 0)
}
}
测试classTestDataRedisRepositoryTest.kt
内容:
import app.cash.turbine.test
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.runBlocking
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
import reactor.test.StepVerifier
import java.time.Duration
import java.time.Instant
import kotlin.time.ExperimentalTime
@FlowPreview
@ExperimentalTime
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@SpringBootTest
internal class TestDataRedisRepositoryTest @Autowired constructor(
private val testDataRedisRepository: TestDataRedisRepository
) {
private val now: Instant = Instant.now()
@BeforeAll
fun setUp() {
runBlocking { testDataRedisRepository.deleteAll() }
}
@AfterEach
fun afterEach() {
runBlocking {
testDataRedisRepository.deleteAll()
}
}
@Test //passes
fun `test Flux`() {
runBlocking {
testDataRedisRepository.saveAll(
flowOf(
TestData(now.minusSeconds(1), "test2"),
TestData(now, "test3")
)
).toList()
}
testDataRedisRepository.readAllAsFlux().`as`(StepVerifier::create) //
.consumeNextWith {
assertThat(it).isEqualTo(TestData(now.minusSeconds(1), "test2"))
}
.consumeNextWith {
assertThat(it).isEqualTo(TestData(now, "test3"))
}
.thenCancel()
.verify(Duration.ofSeconds(1))
}
@Test //fails
fun `test Flow`() {
runBlocking {
testDataRedisRepository.saveAll(
flowOf(
TestData(now.minusSeconds(1), "test2"),
TestData(now, "test3")
)
).toList()
// val list = testDataRedisRepository.readAllAsFlow().toList() // this call blocks forever
// FlowTurbine just times out
testDataRedisRepository.readAllAsFlow()
.test {
assertThat(expectItem())
.isEqualTo(TestData(now.minusSeconds(1), "test2"))
assertThat(expectItem())
.isEqualTo(TestData(now, "test3"))
expectComplete()
}
}
}
}
我的RedisConfig.kt
内容:
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory
import org.springframework.data.redis.connection.stream.MapRecord
import org.springframework.data.redis.core.ReactiveRedisTemplate
import org.springframework.data.redis.serializer.RedisSerializationContext
import org.springframework.data.redis.serializer.RedisSerializer
import org.springframework.data.redis.stream.StreamReceiver
@Configuration
class RedisConfig {
/**
* For writing to Redis Stream
*/
@Bean
fun reactiveRedisTemplate(
factory: ReactiveRedisConnectionFactory,
serializationContext: RedisSerializationContext<String, TestData>
): ReactiveRedisTemplate<String, TestData> = ReactiveRedisTemplate(
factory,
serializationContext
)
/**
* For reading from Redis Stream
*/
@Bean
fun streamReceiver(
factory: ReactiveRedisConnectionFactory,
serializationContext: RedisSerializationContext<String, TestData>
): StreamReceiver<String, MapRecord<String, String, String>> {
return StreamReceiver.create(
factory,
StreamReceiver.StreamReceiverOptions.builder()
.serializer(serializationContext)
.build()
)
}
@Bean
fun serializationContext(): RedisSerializationContext<String, TestData> =
RedisSerializationContext.newSerializationContext<String, TestData>(
RedisSerializer.string()
).build()
}
TestData.kt
import java.time.Instant
data class TestData(
val instant: Instant,
val content: String
)
const val INSTANT = "instant"
const val CONTENT = "content"
fun TestData.toMap(): Map<String, String> {
return mapOf(
INSTANT to instant.toString(),
CONTENT to content
)
}
fun Map<String, String>.fromMap(): TestData {
return TestData(
Instant.parse(this[INSTANT]),
this[CONTENT]!!
)
}
Redis 运行 在默认端口上的 Docker 容器中。
为了完整起见,这里有 aplication.yaml
和 build.gradle.kts
:
spring:
redis:
host: localhost
port: 6379
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
plugins {
id("org.springframework.boot") version "2.5.2"
id("io.spring.dependency-management") version "1.0.11.RELEASE"
kotlin("jvm") version "1.5.20"
kotlin("plugin.spring") version "1.5.20"
}
group = "com.example"
version = "0.0.1-SNAPSHOT"
java.sourceCompatibility = JavaVersion.VERSION_11
repositories {
mavenCentral()
}
dependencies {
implementation("org.springframework.boot:spring-boot-starter-data-redis-reactive")
implementation("io.projectreactor.kotlin:reactor-kotlin-extensions")
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("io.projectreactor:reactor-test")
testImplementation("app.cash.turbine:turbine:0.5.2")
}
tasks.withType<KotlinCompile> {
kotlinOptions {
freeCompilerArgs = listOf("-Xjsr305=strict")
jvmTarget = "11"
}
}
tasks.withType<Test> {
useJUnitPlatform()
}
如果测试需要检查多个值,则调用 toList()
会导致流程等待源发出其所有值,然后 returns 这些值作为列表。请注意,这仅适用于有限数据流。
因此,在您的情况下,如果它是一个发出无限值的流,那么它将永远等待收集这些值,这就是您的测试被阻止的原因。
一种解决方案是从流中取出有限数量的项目,然后进行断言。例如,您可以执行类似以下代码的操作:
// Take the first item and cancel the flow
val firstItem = testDataRedisRepository.readAllAsFlow().first()
// Take the second item
val secondItem = testDataRedisRepository.readAllAsFlow().drop(1).first()
// Take the first five items
val firstFiveItems = testDataRedisRepository.readAllAsFlow().take(5).toList()
更多场景可以参考这篇Android开发者linkKotlin流程测试