Spring Data Redis Reactive:StreamReceiver 到协程流

Spring Data Redis Reactive: StreamReceiver to coroutines Flow

我正在编写一个使用 Spring Data Redis 与 Redis Stream 配合使用的应用程序。我将 spring-data-redis 与 Lettuce 一起使用。我可以成功写入流,因为我可以通过 redis-cli 直接在 Redis 中对其进行验证,并且我看到消息在 Redis 中。当谈到使用 StreamReceiver 从流中读取时,它有点工作,但我的协程版本测试失败。

所以,我实现了两个版本,用于阅读不同的 return 类型:

TestDataRedisRepository.kt内容:

import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flatMapConcat
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.reactive.asFlow
import org.slf4j.LoggerFactory
import org.springframework.data.redis.connection.stream.MapRecord
import org.springframework.data.redis.connection.stream.RecordId
import org.springframework.data.redis.connection.stream.StreamOffset
import org.springframework.data.redis.connection.stream.StreamRecords
import org.springframework.data.redis.core.ReactiveRedisTemplate
import org.springframework.data.redis.core.addAndAwait
import org.springframework.data.redis.core.trimAndAwait
import org.springframework.data.redis.stream.StreamReceiver
import org.springframework.stereotype.Repository
import reactor.core.publisher.Flux

const val STREAM_KEY = "test-data"

@Repository
class TestDataRedisRepository(
    val reactiveRedisTemplate: ReactiveRedisTemplate<String, TestData>,
    val streamReceiver: StreamReceiver<String, MapRecord<String, String, String>>
) {
    private val log = LoggerFactory.getLogger(this::class.java)

    @FlowPreview
    fun saveAll(entityStream: Flow<TestData>): Flow<RecordId> {
        return entityStream
            .map { toMapRecord(it) }
            .flatMapConcat {
                log.info("Saving record: $it")
                reactiveRedisTemplate
                    .opsForStream<String, TestData>()
                    .add(it)
                    .asFlow()
            }
    }

    suspend fun save(TestData: TestData): RecordId {
        val record = toMapRecord(TestData)
        log.info("Saving record: $record")
        return reactiveRedisTemplate
            .opsForStream<String, TestData>()
            .addAndAwait(record)
    }

    private fun toMapRecord(TestData: TestData): MapRecord<String, String, String> =
        StreamRecords.newRecord()
            .`in`(STREAM_KEY)
            .ofMap(TestData.toMap())

    fun readAllAsFlux(): Flux<TestData> {
        return streamReceiver
            .receive(StreamOffset.fromStart(STREAM_KEY))
            .doOnEach { log.info("Received stream record: $it") }
            .map { it.value.fromMap() }
    }

    fun readAllAsFlow(): Flow<TestData> {
        return streamReceiver
            .receive(StreamOffset.fromStart(STREAM_KEY))
            .doOnEach { log.info("Received stream record: $it") }
            .map { it.value.fromMap() }
            .asFlow()
    }

    suspend fun deleteAll() {
        reactiveRedisTemplate
            .opsForStream<String, TestData>()
            .trimAndAwait(STREAM_KEY, 0)
    }
}

测试classTestDataRedisRepositoryTest.kt内容:

import app.cash.turbine.test
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.runBlocking
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
import reactor.test.StepVerifier
import java.time.Duration
import java.time.Instant
import kotlin.time.ExperimentalTime

@FlowPreview
@ExperimentalTime
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@SpringBootTest
internal class TestDataRedisRepositoryTest @Autowired constructor(
    private val testDataRedisRepository: TestDataRedisRepository
) {
    private val now: Instant = Instant.now()

    @BeforeAll
    fun setUp() {
        runBlocking { testDataRedisRepository.deleteAll() }
    }

    @AfterEach
    fun afterEach() {
        runBlocking {
            testDataRedisRepository.deleteAll()
        }
    }

    @Test //passes
    fun `test Flux`() {
        runBlocking {
            testDataRedisRepository.saveAll(
                flowOf(
                    TestData(now.minusSeconds(1), "test2"),
                    TestData(now, "test3")
                )
            ).toList()
        }

        testDataRedisRepository.readAllAsFlux().`as`(StepVerifier::create) //
            .consumeNextWith {
                assertThat(it).isEqualTo(TestData(now.minusSeconds(1), "test2"))
            }
            .consumeNextWith {
                assertThat(it).isEqualTo(TestData(now, "test3"))
            }
            .thenCancel()
            .verify(Duration.ofSeconds(1))
    }

    @Test //fails
    fun `test Flow`() {
        runBlocking {
            testDataRedisRepository.saveAll(
                flowOf(
                    TestData(now.minusSeconds(1), "test2"),
                    TestData(now, "test3")
                )
            ).toList()

//            val list = testDataRedisRepository.readAllAsFlow().toList() // this call blocks forever

            // FlowTurbine just times out
            testDataRedisRepository.readAllAsFlow()
                .test {
                    assertThat(expectItem())
                        .isEqualTo(TestData(now.minusSeconds(1), "test2"))
                    assertThat(expectItem())
                        .isEqualTo(TestData(now, "test3"))

                    expectComplete()
                }
        }
    }
}

我的RedisConfig.kt内容:

import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory
import org.springframework.data.redis.connection.stream.MapRecord
import org.springframework.data.redis.core.ReactiveRedisTemplate
import org.springframework.data.redis.serializer.RedisSerializationContext
import org.springframework.data.redis.serializer.RedisSerializer
import org.springframework.data.redis.stream.StreamReceiver

@Configuration
class RedisConfig {

    /**
     * For writing to Redis Stream
     */
    @Bean
    fun reactiveRedisTemplate(
        factory: ReactiveRedisConnectionFactory,
        serializationContext: RedisSerializationContext<String, TestData>
    ): ReactiveRedisTemplate<String, TestData> = ReactiveRedisTemplate(
        factory,
        serializationContext
    )

    /**
     * For reading from Redis Stream
     */
    @Bean
    fun streamReceiver(
        factory: ReactiveRedisConnectionFactory,
        serializationContext: RedisSerializationContext<String, TestData>
    ): StreamReceiver<String, MapRecord<String, String, String>> {
        return StreamReceiver.create(
            factory,
            StreamReceiver.StreamReceiverOptions.builder()
                .serializer(serializationContext)
                .build()
        )
    }

    @Bean
    fun serializationContext(): RedisSerializationContext<String, TestData> =
        RedisSerializationContext.newSerializationContext<String, TestData>(
            RedisSerializer.string()
        ).build()
}

TestData.kt

import java.time.Instant

data class TestData(
    val instant: Instant,
    val content: String
)

const val INSTANT = "instant"
const val CONTENT = "content"

fun TestData.toMap(): Map<String, String> {
    return mapOf(
        INSTANT to instant.toString(),
        CONTENT to content
    )
}

fun Map<String, String>.fromMap(): TestData {
    return TestData(
        Instant.parse(this[INSTANT]),
        this[CONTENT]!!
    )
}

Redis 运行 在默认端口上的 Docker 容器中。 为了完整起见,这里有 aplication.yamlbuild.gradle.kts:

spring:
  redis:
    host: localhost
    port: 6379
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile

plugins {
    id("org.springframework.boot") version "2.5.2"
    id("io.spring.dependency-management") version "1.0.11.RELEASE"
    kotlin("jvm") version "1.5.20"
    kotlin("plugin.spring") version "1.5.20"
}

group = "com.example"
version = "0.0.1-SNAPSHOT"
java.sourceCompatibility = JavaVersion.VERSION_11

repositories {
    mavenCentral()
}

dependencies {
    implementation("org.springframework.boot:spring-boot-starter-data-redis-reactive")
    implementation("io.projectreactor.kotlin:reactor-kotlin-extensions")
    implementation("org.jetbrains.kotlin:kotlin-reflect")
    implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")

    testImplementation("org.springframework.boot:spring-boot-starter-test")
    testImplementation("io.projectreactor:reactor-test")
    testImplementation("app.cash.turbine:turbine:0.5.2")
}

tasks.withType<KotlinCompile> {
    kotlinOptions {
        freeCompilerArgs = listOf("-Xjsr305=strict")
        jvmTarget = "11"
    }
}

tasks.withType<Test> {
    useJUnitPlatform()
}

如果测试需要检查多个值,则调用 toList() 会导致流程等待源发出其所有值,然后 returns 这些值作为列表。请注意,这仅适用于有限数据流。

因此,在您的情况下,如果它是一个发出无限值的流,那么它将永远等待收集这些值,这就是您的测试被阻止的原因。

一种解决方案是从流中取出有限数量的项目,然后进行断言。例如,您可以执行类似以下代码的操作:

// Take the first item and cancel the flow
val firstItem = testDataRedisRepository.readAllAsFlow().first()

// Take the second item
val secondItem = testDataRedisRepository.readAllAsFlow().drop(1).first()

// Take the first five items
val firstFiveItems = testDataRedisRepository.readAllAsFlow().take(5).toList()

更多场景可以参考这篇Android开发者linkKotlin流程测试