transformer kafka中managedchannel线程安全吗

jvidinwx  于 2021-06-05  发布在  Kafka
关注(0)|答案(2)|浏览(329)

这是我的变压器:

public class DataEnricher implements Transformer < byte[], EnrichedData, KeyValue < byte[], EnrichedData >> {

    private ManagedChannel channel;
    private InfoClient infoclient;
    private LRUCacheCollector < String,
    InfoResponse > cache;

    public DataEnricher() {}

    @Override
    public void init(ProcessorContext context) {
        channel = ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext().build();
        infoclient = new InfoClient(channel);
    }

    @Override
    public KeyValue < byte[],
    EnrichedData > transform(byte[] key, EnrichedData request) {
        InfoResponse infoResponse = null;
        String someInfo = request.getSomeInfo();
        try {
            infoResponse = infoclient.getMoreInfo(someInfo);
        } catch (Exception e) {
            logger.warn("An exception has occurred during retrieval.", e.getMessage());
        }
        EnrichedData enrichedData = EnrichedDataBuilder.addExtraInfo(request, infoResponse);
        return new KeyValue < > (key, enrichedData);
    }

    @Override
    public KeyValue < byte[],
    DataEnricher > punctuate(long timestamp) {
        return null;
    }

    @Override
    public void close() {
        client.shutdown();
    }
}

在kafka流中,每个流线程初始化自己的流拓扑副本,然后根据processorcontext(即每个任务,即每个分区)示例化该拓扑。我也不会 init() 被调用并覆盖/泄漏每个分区的通道,因为我们有多个线程,甚至在创建 channel/client ? 有什么办法可以防止吗?
这叫做 run() 方法:

public KafkaStreams createStreams() {
    final Properties streamsConfiguration = new Properties();
    //other configuration is setup here
    streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
    streamsConfiguration.put(
        StreamsConfig.NUM_STREAM_THREADS_CONFIG,
        3);

    StreamsBuilder streamsBuilder = new StreamsBuilder();

    RequestJsonSerde requestSerde = new RequestJsonSerde();
    DataEnricher dataEnricher = new DataEnricher();
    // Get the stream of requests
    final KStream < byte[], EnrichedData > requestsStream = streamsBuilder
        .stream(requestsTopic, Consumed.with(Serdes.ByteArray(), requestSerde));
    final KStream < byte[], EnrichedData > enrichedRequestsStream = requestsStream
        .filter((key, request) - > {
            return Objects.nonNull(request);
        }
        .transform(() - > dataEnricher);

    enrichedRequestsStream.to(enrichedRequestsTopic, Produced.with(Serdes.ByteArray()));

    return new KafkaStreams(streamsBuilder.build(), new StreamsConfig(streamsConfiguration));
}
4zcjmb1e

4zcjmb1e1#

我假设 TransformerSupplier 创建一个 Transformer 每个拓扑的示例(或 ProcessorContext )因此有一个 channel 每个拓扑。那样的话,就没有危险了 channel 正在被覆盖。我也假设你 client.shutdown() 也关闭了它的频道。

efzxgjgh

efzxgjgh2#

与…无关 ManagedChannel ,但您必须为每个 ProcessContextTransformerSupplier .

KStream.transform(DataEnricher::new);

一旦我遇到一些与此相关的Kafka流异常,我将尝试重新创建它。
如果不使用标点符号向下游发送更多记录,并且新的键与应该使用的输入记录相同 transformValues() 原因 transform() 当应用基于键的操作(如聚合、联接)时,可能会导致重新分区。

相关问题