Akka 流 - 为什么不删除上下文?
Akka streams - why is the context not dropped?
我正在从 Kafka 读取数据,遇到了 return Kafka 连接详细信息的代码,但我不明白上下文是如何共享的。这是设置 KafkaConnection 的 class:
import akka.actor.typed.ActorSystem;
import akka.kafka.ConsumerMessage;
import akka.kafka.ConsumerSettings;
import akka.kafka.Subscriptions;
import akka.kafka.javadsl.Consumer;
import akka.stream.javadsl.SourceWithContext;
import lombok.Builder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Deserializer;
import java.time.Duration;
@Getter
@Slf4j
public final class KafkaSource<K, V> {
private final SourceWithContext<ConsumerRecord<K, V>, ConsumerMessage.Committable, ?> commitableSource;
@Builder
private KafkaSource(final Deserializer<K> keyd, final Deserializer<V> valueDeserializer, final ActorSystem actorSystem) {
final String kafkaBootstrapServers = "localhost:9092";
final ConsumerSettings<K, V> kafkaConsumerSettings =
ConsumerSettings.create(actorSystem, keyd, valueDeserializer)
.withBootstrapServers(kafkaBootstrapServers)
.withGroupId("testGroup12")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.withStopTimeout(Duration.ofSeconds(5));
final String topics = "request-topic";
this.commitableSource = Consumer.sourceWithOffsetContext(kafkaConsumerSettings,
Subscriptions.topics(topics)).mapContext(ctx -> ctx);
}
}
这是我编写的用于处理来自 Kafka 的数据的 Akka Stream :
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
public class ReadFromKafka {
private static ObjectMapper objectMapper = new ObjectMapper();
public static void main(String args[]) {
final ActorSystem actorSystem = ActorSystem.create(Behaviors.empty(), "as");
var ksource = KafkaSource.<String, String>builder()
.actorSystem(actorSystem)
.keyd(new StringDeserializer()).valueDeserializer(new StringDeserializer())
.build();
ksource.getCommitableSource()
.map(ConsumerRecord::value)
.map(x -> {
var mappedObject = objectMapper.readValue(x, RequestDto.class);
System.out.println("mappedObject is :" + mappedObject);
return mappedObject;
}
)
.log("error")
.asSource()
.map(pair -> pair.second().commitInternal())
.run(actorSystem);
}
}
正在映射class,RequestDto :
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.*;
import lombok.extern.jackson.Jacksonized;
import java.util.Date;
@Jacksonized
@AllArgsConstructor
@Getter
@Builder
@ToString
public class RequestDto {
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:sss")
private final Date datePurchased;
private String someOtherField;
}
虽然 ReadFromKafka
按预期工作,但为什么 ConsumerMessage.Committable from SourceWithContext<ConsumerRecord<K, V>, ConsumerMessage.Committable, ?>
在执行时没有被丢弃:
.map(ConsumerRecord::value)
.map(x -> {
var mappedObject = objectMapper.readValue(x, RequestDto.class);
System.out.println("mappedObject is :" + mappedObject);
return mappedObject;
}
)
.asSource() 允许在第二个位置访问元组中的上下文,然后使用 :
提交偏移量
.map(pair -> pair.second().commitInternal())
我对它的工作原理感到困惑,似乎后台发生了一些隐含的事情,允许上下文在整个流中传播?
A SourceWithContext<A, B, M>
定义了它支持的流操作,只对值的 A
部分起作用。
因此,如果 f
是一个接受 A
并返回 C
的函数,.map(f)
会导致 SourceWithContext<C, B, M>
.
在引擎盖下,它是 Source<Pair<A, B>, M>
。 map
可以定义为类似(一如既往地为残暴的 Java 道歉):
private Source<Pair<A, B>, M> underlying;
public <C> SourceWithContext<C, B, M> map(Function<A, C> f) {
Source<Pair<C, B>, M> src =
underlying.map(pair -> {
A a = pair.first();
C c = f(a);
return Pair.of<C, B>(c, pair.second()); // no idea if this is the correct Java syntax, but you get the idea
})
return SourceWithContext.fromPairs<C, B, M>(src);
}
请注意,f
永远看不到 Pair
的 second
部分。只要每个操作都根据上下文做正确的事情,它就会起作用。
有些操作没有明确的“正确的事情”要做。这方面的一个例子是可以重新排序元素的操作。
我正在从 Kafka 读取数据,遇到了 return Kafka 连接详细信息的代码,但我不明白上下文是如何共享的。这是设置 KafkaConnection 的 class:
import akka.actor.typed.ActorSystem;
import akka.kafka.ConsumerMessage;
import akka.kafka.ConsumerSettings;
import akka.kafka.Subscriptions;
import akka.kafka.javadsl.Consumer;
import akka.stream.javadsl.SourceWithContext;
import lombok.Builder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Deserializer;
import java.time.Duration;
@Getter
@Slf4j
public final class KafkaSource<K, V> {
private final SourceWithContext<ConsumerRecord<K, V>, ConsumerMessage.Committable, ?> commitableSource;
@Builder
private KafkaSource(final Deserializer<K> keyd, final Deserializer<V> valueDeserializer, final ActorSystem actorSystem) {
final String kafkaBootstrapServers = "localhost:9092";
final ConsumerSettings<K, V> kafkaConsumerSettings =
ConsumerSettings.create(actorSystem, keyd, valueDeserializer)
.withBootstrapServers(kafkaBootstrapServers)
.withGroupId("testGroup12")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.withStopTimeout(Duration.ofSeconds(5));
final String topics = "request-topic";
this.commitableSource = Consumer.sourceWithOffsetContext(kafkaConsumerSettings,
Subscriptions.topics(topics)).mapContext(ctx -> ctx);
}
}
这是我编写的用于处理来自 Kafka 的数据的 Akka Stream :
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
public class ReadFromKafka {
private static ObjectMapper objectMapper = new ObjectMapper();
public static void main(String args[]) {
final ActorSystem actorSystem = ActorSystem.create(Behaviors.empty(), "as");
var ksource = KafkaSource.<String, String>builder()
.actorSystem(actorSystem)
.keyd(new StringDeserializer()).valueDeserializer(new StringDeserializer())
.build();
ksource.getCommitableSource()
.map(ConsumerRecord::value)
.map(x -> {
var mappedObject = objectMapper.readValue(x, RequestDto.class);
System.out.println("mappedObject is :" + mappedObject);
return mappedObject;
}
)
.log("error")
.asSource()
.map(pair -> pair.second().commitInternal())
.run(actorSystem);
}
}
正在映射class,RequestDto :
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.*;
import lombok.extern.jackson.Jacksonized;
import java.util.Date;
@Jacksonized
@AllArgsConstructor
@Getter
@Builder
@ToString
public class RequestDto {
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:sss")
private final Date datePurchased;
private String someOtherField;
}
虽然 ReadFromKafka
按预期工作,但为什么 ConsumerMessage.Committable from SourceWithContext<ConsumerRecord<K, V>, ConsumerMessage.Committable, ?>
在执行时没有被丢弃:
.map(ConsumerRecord::value)
.map(x -> {
var mappedObject = objectMapper.readValue(x, RequestDto.class);
System.out.println("mappedObject is :" + mappedObject);
return mappedObject;
}
)
.asSource() 允许在第二个位置访问元组中的上下文,然后使用 :
提交偏移量.map(pair -> pair.second().commitInternal())
我对它的工作原理感到困惑,似乎后台发生了一些隐含的事情,允许上下文在整个流中传播?
A SourceWithContext<A, B, M>
定义了它支持的流操作,只对值的 A
部分起作用。
因此,如果 f
是一个接受 A
并返回 C
的函数,.map(f)
会导致 SourceWithContext<C, B, M>
.
在引擎盖下,它是 Source<Pair<A, B>, M>
。 map
可以定义为类似(一如既往地为残暴的 Java 道歉):
private Source<Pair<A, B>, M> underlying;
public <C> SourceWithContext<C, B, M> map(Function<A, C> f) {
Source<Pair<C, B>, M> src =
underlying.map(pair -> {
A a = pair.first();
C c = f(a);
return Pair.of<C, B>(c, pair.second()); // no idea if this is the correct Java syntax, but you get the idea
})
return SourceWithContext.fromPairs<C, B, M>(src);
}
请注意,f
永远看不到 Pair
的 second
部分。只要每个操作都根据上下文做正确的事情,它就会起作用。
有些操作没有明确的“正确的事情”要做。这方面的一个例子是可以重新排序元素的操作。