Akka 流 - 为什么不删除上下文?

Akka streams - why is the context not dropped?

我正在从 Kafka 读取数据,遇到了 return Kafka 连接详细信息的代码,但我不明白上下文是如何共享的。这是设置 KafkaConnection 的 class:

import akka.actor.typed.ActorSystem;
import akka.kafka.ConsumerMessage;
import akka.kafka.ConsumerSettings;
import akka.kafka.Subscriptions;
import akka.kafka.javadsl.Consumer;
import akka.stream.javadsl.SourceWithContext;
import lombok.Builder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Deserializer;

import java.time.Duration;

@Getter
@Slf4j
public final class KafkaSource<K, V> {

    private final SourceWithContext<ConsumerRecord<K, V>, ConsumerMessage.Committable, ?> commitableSource;

    @Builder
    private KafkaSource(final Deserializer<K> keyd, final Deserializer<V> valueDeserializer, final ActorSystem actorSystem) {

        final String kafkaBootstrapServers = "localhost:9092";

        final ConsumerSettings<K, V> kafkaConsumerSettings =
                ConsumerSettings.create(actorSystem, keyd, valueDeserializer)
                        .withBootstrapServers(kafkaBootstrapServers)
                        .withGroupId("testGroup12")
                        .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
                        .withStopTimeout(Duration.ofSeconds(5));

        final String topics = "request-topic";

        this.commitableSource = Consumer.sourceWithOffsetContext(kafkaConsumerSettings,
                Subscriptions.topics(topics)).mapContext(ctx -> ctx);
    }
}

这是我编写的用于处理来自 Kafka 的数据的 Akka Stream :

import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;

public class ReadFromKafka {

    private static ObjectMapper objectMapper = new ObjectMapper();

    public static void main(String args[]) {

        final ActorSystem actorSystem = ActorSystem.create(Behaviors.empty(), "as");

        var ksource = KafkaSource.<String, String>builder()
                .actorSystem(actorSystem)
                .keyd(new StringDeserializer()).valueDeserializer(new StringDeserializer())
                .build();

        ksource.getCommitableSource()
                .map(ConsumerRecord::value)
                .map(x -> {
                            var mappedObject = objectMapper.readValue(x, RequestDto.class);
                            System.out.println("mappedObject is :" + mappedObject);
                            return mappedObject;
                        }
                )
                .log("error")
                .asSource()
                .map(pair -> pair.second().commitInternal())
                .run(actorSystem);
    }
}

正在映射class,RequestDto :

import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.*;
import lombok.extern.jackson.Jacksonized;

import java.util.Date;

@Jacksonized
@AllArgsConstructor
@Getter
@Builder
@ToString
public class RequestDto {

    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:sss")
    private final Date datePurchased;

    private String someOtherField;

}

虽然 ReadFromKafka 按预期工作,但为什么 ConsumerMessage.Committable from SourceWithContext<ConsumerRecord<K, V>, ConsumerMessage.Committable, ?> 在执行时没有被丢弃:

.map(ConsumerRecord::value)
.map(x -> {
            var mappedObject = objectMapper.readValue(x, RequestDto.class);
            System.out.println("mappedObject is :" + mappedObject);
            return mappedObject;
        }
) 

.asSource() 允许在第二个位置访问元组中的上下文,然后使用 :

提交偏移量
.map(pair -> pair.second().commitInternal())

我对它的工作原理感到困惑,似乎后台发生了一些隐含的事情,允许上下文在整个流中传播?

A SourceWithContext<A, B, M> 定义了它支持的流操作,只对值的 A 部分起作用。

因此,如果 f 是一个接受 A 并返回 C 的函数,.map(f) 会导致 SourceWithContext<C, B, M>.

在引擎盖下,它是 Source<Pair<A, B>, M>map 可以定义为类似(一如既往地为残暴的 Java 道歉):

private Source<Pair<A, B>, M> underlying;

public <C> SourceWithContext<C, B, M> map(Function<A, C> f) {
    Source<Pair<C, B>, M> src =
        underlying.map(pair -> {
            A a = pair.first();
            C c = f(a);
            return Pair.of<C, B>(c, pair.second()); // no idea if this is the correct Java syntax, but you get the idea
        })
    return SourceWithContext.fromPairs<C, B, M>(src);
}

请注意,f 永远看不到 Pairsecond 部分。只要每个操作都根据上下文做正确的事情,它就会起作用。

有些操作没有明确的“正确的事情”要做。这方面的一个例子是可以重新排序元素的操作。