Transformer Kafka 中的 ManagedChannel 线程安全吗

Is ManagedChannel thread-safe in Transformer Kafka

这是我的变压器:

public class DataEnricher implements Transformer < byte[], EnrichedData, KeyValue < byte[], EnrichedData >> {

    private ManagedChannel channel;
    private InfoClient infoclient;
    private LRUCacheCollector < String,
    InfoResponse > cache;


    public DataEnricher() {}

    @Override
    public void init(ProcessorContext context) {
        channel = ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext().build();
        infoclient = new InfoClient(channel);
    }

    @Override
    public KeyValue < byte[],
    EnrichedData > transform(byte[] key, EnrichedData request) {
        InfoResponse infoResponse = null;
        String someInfo = request.getSomeInfo();
        try {
            infoResponse = infoclient.getMoreInfo(someInfo);
        } catch (Exception e) {
            logger.warn("An exception has occurred during retrieval.", e.getMessage());
        }
        EnrichedData enrichedData = EnrichedDataBuilder.addExtraInfo(request, infoResponse);
        return new KeyValue < > (key, enrichedData);
    }

    @Override
    public KeyValue < byte[],
    DataEnricher > punctuate(long timestamp) {
        return null;
    }

    @Override
    public void close() {
        client.shutdown();
    }
}

在 Kafka Streams 中,每个流线程都初始化自己的流拓扑副本,然后根据 ProcessorContext(即每个任务,即每个分区)实例化该拓扑。那么 init() 不会被调用并且 overwrite/leak 每个分区的通道,并且由于我们有多个线程,甚至会竞争 channel/client 的创建?有什么办法可以避免吗?

这是在run()方法中调用的:

public KafkaStreams createStreams() {
    final Properties streamsConfiguration = new Properties();
    //other configuration is setup here
    streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
    streamsConfiguration.put(
        StreamsConfig.NUM_STREAM_THREADS_CONFIG,
        3);

    StreamsBuilder streamsBuilder = new StreamsBuilder();

    RequestJsonSerde requestSerde = new RequestJsonSerde();
    DataEnricher dataEnricher = new DataEnricher();
    // Get the stream of requests
    final KStream < byte[], EnrichedData > requestsStream = streamsBuilder
        .stream(requestsTopic, Consumed.with(Serdes.ByteArray(), requestSerde));
    final KStream < byte[], EnrichedData > enrichedRequestsStream = requestsStream
        .filter((key, request) - > {
            return Objects.nonNull(request);
        }
        .transform(() - > dataEnricher);

    enrichedRequestsStream.to(enrichedRequestsTopic, Produced.with(Serdes.ByteArray()));

    return new KafkaStreams(streamsBuilder.build(), new StreamsConfig(streamsConfiguration));
}

我假设 TransformerSupplier 每个拓扑创建一个 Transformer 实例(或 ProcessorContext),因此每个拓扑创建一个 channel 实例。在那种情况下, channel 不会有被覆盖的危险。我还假设您的 client.shutdown() 也关闭了它的频道。

ManagedChannel 无关,但您必须在 TransformerSupplier 中为每个 ProcessContext 提供新的 DataEnricher 实例。

KStream.transform(DataEnricher::new);

一旦我运行进入一些与此相关的Kafka流异常,将尝试重新创建它。

并且 IMO 如果您不使用 punctuate 向下游发送更多记录并且新密钥与输入记录相同,您应该使用 transformValues() 因为 transform() 可能会导致重新分区应用聚合、连接等基于键的操作。