Transformer Kafka 中的 ManagedChannel 线程安全吗
Is ManagedChannel thread-safe in Transformer Kafka
这是我的变压器:
public class DataEnricher implements Transformer < byte[], EnrichedData, KeyValue < byte[], EnrichedData >> {
private ManagedChannel channel;
private InfoClient infoclient;
private LRUCacheCollector < String,
InfoResponse > cache;
public DataEnricher() {}
@Override
public void init(ProcessorContext context) {
channel = ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext().build();
infoclient = new InfoClient(channel);
}
@Override
public KeyValue < byte[],
EnrichedData > transform(byte[] key, EnrichedData request) {
InfoResponse infoResponse = null;
String someInfo = request.getSomeInfo();
try {
infoResponse = infoclient.getMoreInfo(someInfo);
} catch (Exception e) {
logger.warn("An exception has occurred during retrieval.", e.getMessage());
}
EnrichedData enrichedData = EnrichedDataBuilder.addExtraInfo(request, infoResponse);
return new KeyValue < > (key, enrichedData);
}
@Override
public KeyValue < byte[],
DataEnricher > punctuate(long timestamp) {
return null;
}
@Override
public void close() {
client.shutdown();
}
}
在 Kafka Streams 中,每个流线程都初始化自己的流拓扑副本,然后根据 ProcessorContext(即每个任务,即每个分区)实例化该拓扑。那么 init()
不会被调用并且 overwrite/leak 每个分区的通道,并且由于我们有多个线程,甚至会竞争 channel/client
的创建?有什么办法可以避免吗?
这是在run()
方法中调用的:
public KafkaStreams createStreams() {
final Properties streamsConfiguration = new Properties();
//other configuration is setup here
streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
streamsConfiguration.put(
StreamsConfig.NUM_STREAM_THREADS_CONFIG,
3);
StreamsBuilder streamsBuilder = new StreamsBuilder();
RequestJsonSerde requestSerde = new RequestJsonSerde();
DataEnricher dataEnricher = new DataEnricher();
// Get the stream of requests
final KStream < byte[], EnrichedData > requestsStream = streamsBuilder
.stream(requestsTopic, Consumed.with(Serdes.ByteArray(), requestSerde));
final KStream < byte[], EnrichedData > enrichedRequestsStream = requestsStream
.filter((key, request) - > {
return Objects.nonNull(request);
}
.transform(() - > dataEnricher);
enrichedRequestsStream.to(enrichedRequestsTopic, Produced.with(Serdes.ByteArray()));
return new KafkaStreams(streamsBuilder.build(), new StreamsConfig(streamsConfiguration));
}
我假设 TransformerSupplier
每个拓扑创建一个 Transformer
实例(或 ProcessorContext
),因此每个拓扑创建一个 channel
实例。在那种情况下, channel
不会有被覆盖的危险。我还假设您的 client.shutdown()
也关闭了它的频道。
与 ManagedChannel
无关,但您必须在 TransformerSupplier
中为每个 ProcessContext
提供新的 DataEnricher 实例。
KStream.transform(DataEnricher::new);
一旦我运行进入一些与此相关的Kafka流异常,将尝试重新创建它。
并且 IMO 如果您不使用 punctuate 向下游发送更多记录并且新密钥与输入记录相同,您应该使用 transformValues()
因为 transform()
可能会导致重新分区应用聚合、连接等基于键的操作。
这是我的变压器:
public class DataEnricher implements Transformer < byte[], EnrichedData, KeyValue < byte[], EnrichedData >> {
private ManagedChannel channel;
private InfoClient infoclient;
private LRUCacheCollector < String,
InfoResponse > cache;
public DataEnricher() {}
@Override
public void init(ProcessorContext context) {
channel = ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext().build();
infoclient = new InfoClient(channel);
}
@Override
public KeyValue < byte[],
EnrichedData > transform(byte[] key, EnrichedData request) {
InfoResponse infoResponse = null;
String someInfo = request.getSomeInfo();
try {
infoResponse = infoclient.getMoreInfo(someInfo);
} catch (Exception e) {
logger.warn("An exception has occurred during retrieval.", e.getMessage());
}
EnrichedData enrichedData = EnrichedDataBuilder.addExtraInfo(request, infoResponse);
return new KeyValue < > (key, enrichedData);
}
@Override
public KeyValue < byte[],
DataEnricher > punctuate(long timestamp) {
return null;
}
@Override
public void close() {
client.shutdown();
}
}
在 Kafka Streams 中,每个流线程都初始化自己的流拓扑副本,然后根据 ProcessorContext(即每个任务,即每个分区)实例化该拓扑。那么 init()
不会被调用并且 overwrite/leak 每个分区的通道,并且由于我们有多个线程,甚至会竞争 channel/client
的创建?有什么办法可以避免吗?
这是在run()
方法中调用的:
public KafkaStreams createStreams() {
final Properties streamsConfiguration = new Properties();
//other configuration is setup here
streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
streamsConfiguration.put(
StreamsConfig.NUM_STREAM_THREADS_CONFIG,
3);
StreamsBuilder streamsBuilder = new StreamsBuilder();
RequestJsonSerde requestSerde = new RequestJsonSerde();
DataEnricher dataEnricher = new DataEnricher();
// Get the stream of requests
final KStream < byte[], EnrichedData > requestsStream = streamsBuilder
.stream(requestsTopic, Consumed.with(Serdes.ByteArray(), requestSerde));
final KStream < byte[], EnrichedData > enrichedRequestsStream = requestsStream
.filter((key, request) - > {
return Objects.nonNull(request);
}
.transform(() - > dataEnricher);
enrichedRequestsStream.to(enrichedRequestsTopic, Produced.with(Serdes.ByteArray()));
return new KafkaStreams(streamsBuilder.build(), new StreamsConfig(streamsConfiguration));
}
我假设 TransformerSupplier
每个拓扑创建一个 Transformer
实例(或 ProcessorContext
),因此每个拓扑创建一个 channel
实例。在那种情况下, channel
不会有被覆盖的危险。我还假设您的 client.shutdown()
也关闭了它的频道。
与 ManagedChannel
无关,但您必须在 TransformerSupplier
中为每个 ProcessContext
提供新的 DataEnricher 实例。
KStream.transform(DataEnricher::new);
一旦我运行进入一些与此相关的Kafka流异常,将尝试重新创建它。
并且 IMO 如果您不使用 punctuate 向下游发送更多记录并且新密钥与输入记录相同,您应该使用 transformValues()
因为 transform()
可能会导致重新分区应用聚合、连接等基于键的操作。