消息中心上的kafka streams ktable配置错误

lokaqttq  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(279)

这个问题现在在MessageHub上解决了
我在Kafka创建ktable时遇到了一些问题。我对Kafka很陌生,这可能是我问题的根源,但我想我还是可以在这里问问。我有一个项目,我想保持不同的ID计数总发生的轨道。我正在使用ibmcloud上的messagehub来管理我的主题,到目前为止它工作得非常出色。
我有一个关于MessageHub的主题,它生成如下消息 {"ID":"123","TIMESTAMP":"1525339553", "BALANCE":"100", "AMOUNT":"4"} 目前,唯一相关的关键是id。
我的kafka代码以及streams配置如下所示:

import org.apache.kafka.streams.StreamsConfig;

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, "3");    
props.put("security.protocol","SASL_SSL");
props.put("sasl.mechanism","PLAIN");
props.put("ssl.protocol","TLSv1.2");
props.put("ssl.enabled.protocols","TLSv1.2");
String saslJaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USERNAME\" password=\"PASSWORD\";";
saslJaasConfig = saslJaasConfig.replace("USERNAME", user).replace("PASSWORD", password);
props.put("sasl.jaas.config",saslJaasConfig);

StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> Kstreams = builder.stream(myTopic);

KTable<String, Long> eventCount = Kstreams
        .flatMapValues(value -> getID(value)) //function that retrieves the ID
        .groupBy((key, value) -> value)
        .count();

运行代码时,出现以下错误:
线程“ktabletest-e2062d11-0b30-4ed0-82b0-00d83dcd9366->streamthread-1”org.apache.kafka.streams.errors.streamsexception中出现异常:无法创建主题ktabletest-kstream-aggregate-state-store-0000000003-repartition。
然后:
原因:java.util.concurrent.executionexception:org.apache.kafka.common.errors.policyviolationexception:无效配置:{segment.index.bytes=52428800,segment.bytes=52428800,cleanup.policy=delete,segment.ms=600000}。仅允许配置:[retention.ms,cleanup.policy]
我不知道为什么会发生这种错误,以及如何处理它。我构建kstream和ktable的方式是否有误?或者bluemix上的消息中心?
解决了的:
在我标记为正确的答案下面添加一个摘录。原来我的streamsconfig很好,而且(目前)消息中心方面存在一个问题,但有一个解决方法:
原来,MessageHub在使用kafka streams 1.1创建重新分区主题时出现了问题。在进行修复时,您需要手动创建主题ktabletest-kstream-aggregate-state-store-0000000003-repartition。它需要与您的输入主题(mytopic)一样多的分区,并将保留时间设置为最大值
非常感谢你的帮助!

q8l4jmvw

q8l4jmvw1#

MessageHub对创建主题时可以使用的配置有一些限制。
从您收到的policyviolationexception来看,您的streams应用程序似乎试图使用一些我们不允许的配置:
段索引字节
段.字节
段.ms
我猜您在流配置中的某个地方设置了这些,应该将它们删除。
请注意,您还需要设置 StreamsConfig.REPLICATION_FACTOR_CONFIG 在您的配置中设置为3,以使用我们文档中提到的MessageHub。

相关问题