这是我的变压器:
public class DataEnricher implements Transformer < byte[], EnrichedData, KeyValue < byte[], EnrichedData >> {
private ManagedChannel channel;
private InfoClient infoclient;
private LRUCacheCollector < String,
InfoResponse > cache;
public DataEnricher() {}
@Override
public void init(ProcessorContext context) {
channel = ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext().build();
infoclient = new InfoClient(channel);
}
@Override
public KeyValue < byte[],
EnrichedData > transform(byte[] key, EnrichedData request) {
InfoResponse infoResponse = null;
String someInfo = request.getSomeInfo();
try {
infoResponse = infoclient.getMoreInfo(someInfo);
} catch (Exception e) {
logger.warn("An exception has occurred during retrieval.", e.getMessage());
}
EnrichedData enrichedData = EnrichedDataBuilder.addExtraInfo(request, infoResponse);
return new KeyValue < > (key, enrichedData);
}
@Override
public KeyValue < byte[],
DataEnricher > punctuate(long timestamp) {
return null;
}
@Override
public void close() {
client.shutdown();
}
}
在kafka流中,每个流线程初始化自己的流拓扑副本,然后根据processorcontext(即每个任务,即每个分区)示例化该拓扑。我也不会 init()
被调用并覆盖/泄漏每个分区的通道,因为我们有多个线程,甚至在创建 channel/client
? 有什么办法可以防止吗?
这叫做 run()
方法:
public KafkaStreams createStreams() {
final Properties streamsConfiguration = new Properties();
//other configuration is setup here
streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
streamsConfiguration.put(
StreamsConfig.NUM_STREAM_THREADS_CONFIG,
3);
StreamsBuilder streamsBuilder = new StreamsBuilder();
RequestJsonSerde requestSerde = new RequestJsonSerde();
DataEnricher dataEnricher = new DataEnricher();
// Get the stream of requests
final KStream < byte[], EnrichedData > requestsStream = streamsBuilder
.stream(requestsTopic, Consumed.with(Serdes.ByteArray(), requestSerde));
final KStream < byte[], EnrichedData > enrichedRequestsStream = requestsStream
.filter((key, request) - > {
return Objects.nonNull(request);
}
.transform(() - > dataEnricher);
enrichedRequestsStream.to(enrichedRequestsTopic, Produced.with(Serdes.ByteArray()));
return new KafkaStreams(streamsBuilder.build(), new StreamsConfig(streamsConfiguration));
}
2条答案
按热度按时间4zcjmb1e1#
我假设
TransformerSupplier
创建一个Transformer
每个拓扑的示例(或ProcessorContext
)因此有一个channel
每个拓扑。那样的话,就没有危险了channel
正在被覆盖。我也假设你client.shutdown()
也关闭了它的频道。efzxgjgh2#
与…无关
ManagedChannel
,但您必须为每个ProcessContext
在TransformerSupplier
.一旦我遇到一些与此相关的Kafka流异常,我将尝试重新创建它。
如果不使用标点符号向下游发送更多记录,并且新的键与应该使用的输入记录相同
transformValues()
原因transform()
当应用基于键的操作(如聚合、联接)时,可能会导致重新分区。