Kafka的制作者无法挑选 schema.registry.url
在其属性中定义。
正如我们在下面的屏幕截图中看到的,schema注册表url是一个伪url
// variable which is being debugged
private KafkaProducer<K, V> kafkaFullAckProducer;
但在我的日志中,使用kafkaproducer发布消息失败,主机为http://0:8081
{"@timestamp":"2018-10-31T18:57:37.906+05:30","message":"Failed to send HTTP request to endpoint: http://0:8081/subjects/
这两个上述的证明是采取了一个单一的运行程序。我们可以清楚地看到,在eclipse调试期间,schmearegistry url的提示是123.1.1.1,但实际上是这样的http://0 in 我失败日志的案例。
因此,在我的其他环境中,我无法运行其他分配的schema.registry.url,因为它总是使用http://0
代码托管在一台计算机上,而模式注册表/代理托管在另一台计算机上。
注册表是在开发环境中启动的。/合流启动
我的生产商代码:
private KafkaProducer<K, V> kafkaFullAckProducer;
MonitoringConfig config;
public void produceWithFullAck(String brokerTopic, V genericRecord) throws Exception {
// key is null
ProducerRecord<K, V> record = new ProducerRecord<K, V>(brokerTopic, genericRecord);
try {
Future<RecordMetadata> futureHandle = this.kafkaFullAckProducer.send(record, (metadata, exception) -> {
if (metadata != null) {
log.info("Monitoring - Sent record(key=" + record.key() + " value=" + record.value()
+ " meta(partition=" + metadata.partition() + " offset=" + metadata.offset() + ")");
}
});
RecordMetadata recordMetadata = futureHandle.get();
} catch (Exception e) {
if (e.getCause() != null)
log.error("Monitoring - " + e.getCause().toString());
throw new RuntimeException(e.getMessage(), e.getCause());
} finally {
// initializer.getKafkaProducer().close();
}
}
@PostConstruct
private void initialize() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBrokerList());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
// kafkaProps.put("value.serializer",
// "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CustomAvroSerializer.class.getName());
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, Constants.COMPRESSION_TYPE_CONFIG);
props.put(ProducerConfig.RETRIES_CONFIG, config.getProducerRetryCount());
props.put("schema.registry.url", config.getSchemaRegistryUrl()); // the url is coming right here
props.put("acks", "all");
kafkaFullAckProducer = new KafkaProducer<K, V>(props);
}
monitoringconfig:公共类monitoringconfig{
@Value("${kafka.zookeeper.connect}")
private String zookeeperConnect;
@Value("${kafka.broker.list}")
private String brokerList;
@Value("${consumer.timeout.ms:1000}")
private String consumerTimeout;
@Value("${producer.retry.count:1}")
private String producerRetryCount;
@Value("${schema.registry.url}")
private String schemaRegistryUrl;
@Value("${consumer.enable:false}")
private boolean isConsumerEnabled;
@Value("${consumer.count.thread}")
private int totalConsumerThread;
}
应用程序属性:
kafka.zookeeper.connect=http://localhost:2181
kafka.broker.list=http://localhost:9092
consumer.timeout.ms=1000
producer.retry.count=1
schema.registry.url=http://123.1.1.1:8082
自定义avro序列化程序是我需要反对的东西,并使用这里讨论的方式,但我确信这不是这个问题的原因。
下面是hosts的详细信息:host1:有这个java服务和kafka连接,错误日志在这里。主机2:有kafka,schema registry和zookeper。
1条答案
按热度按时间k10s72fa1#
您正在使用自定义序列化程序,并且作为
Serializer
实现时,必须定义configure
接受Map的方法。在这个方法中,我猜你定义了
CachedSchemaRegistryClient
字段,但没有从配置Map中提取在生产者级别添加的url属性,因此它将默认使用其他localhost地址合流代码需要单步执行四个类,但是您将在这里看到序列化程序实现,然后查看单独的config类以及抽象序列化程序和serde父类。在上一篇文章中,我曾指出,我认为您不需要实际使用自定义avro序列化程序,因为您似乎在重做abstractkafkaserializer类的工作