Apache·Kafka——Kafka制作人总是在挑选localhost:8081 for javaapi中的模式注册

lndjwyie  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(252)

Kafka的制作者无法挑选 schema.registry.url 在其属性中定义。
正如我们在下面的屏幕截图中看到的,schema注册表url是一个伪url

// variable which is being debugged
private KafkaProducer<K, V> kafkaFullAckProducer;


但在我的日志中,使用kafkaproducer发布消息失败,主机为http://0:8081

{"@timestamp":"2018-10-31T18:57:37.906+05:30","message":"Failed to send HTTP request to endpoint: http://0:8081/subjects/

这两个上述的证明是采取了一个单一的运行程序。我们可以清楚地看到,在eclipse调试期间,schmearegistry url的提示是123.1.1.1,但实际上是这样的http://0 in 我失败日志的案例。
因此,在我的其他环境中,我无法运行其他分配的schema.registry.url,因为它总是使用http://0
代码托管在一台计算机上,而模式注册表/代理托管在另一台计算机上。
注册表是在开发环境中启动的。/合流启动
我的生产商代码:

private KafkaProducer<K, V> kafkaFullAckProducer;
MonitoringConfig config;

public void produceWithFullAck(String brokerTopic, V genericRecord) throws Exception {
    // key is null
    ProducerRecord<K, V> record = new ProducerRecord<K, V>(brokerTopic, genericRecord);
    try {
        Future<RecordMetadata> futureHandle = this.kafkaFullAckProducer.send(record, (metadata, exception) -> {

            if (metadata != null) {
                log.info("Monitoring - Sent record(key=" + record.key() + " value=" + record.value()
                        + " meta(partition=" + metadata.partition() + " offset=" + metadata.offset() + ")");
            }
        });
        RecordMetadata recordMetadata = futureHandle.get();

    } catch (Exception e) {
        if (e.getCause() != null)
            log.error("Monitoring - " + e.getCause().toString());
        throw new RuntimeException(e.getMessage(), e.getCause());
    } finally {
        // initializer.getKafkaProducer().close();
    }
}

@PostConstruct
private void initialize() {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBrokerList());
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
    // kafkaProps.put("value.serializer",
    // "org.apache.kafka.common.serialization.ByteArraySerializer");
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CustomAvroSerializer.class.getName());
    props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, Constants.COMPRESSION_TYPE_CONFIG);
    props.put(ProducerConfig.RETRIES_CONFIG, config.getProducerRetryCount());
    props.put("schema.registry.url", config.getSchemaRegistryUrl()); // the url is coming right here 
    props.put("acks", "all");
    kafkaFullAckProducer = new KafkaProducer<K, V>(props);
}

monitoringconfig:公共类monitoringconfig{

@Value("${kafka.zookeeper.connect}")
    private String zookeeperConnect;

    @Value("${kafka.broker.list}")
    private String brokerList;

    @Value("${consumer.timeout.ms:1000}")
    private String consumerTimeout;

    @Value("${producer.retry.count:1}")
    private String producerRetryCount;

    @Value("${schema.registry.url}")
    private String schemaRegistryUrl;

    @Value("${consumer.enable:false}")
    private boolean isConsumerEnabled;

    @Value("${consumer.count.thread}")
    private int totalConsumerThread;
}

应用程序属性:

kafka.zookeeper.connect=http://localhost:2181
kafka.broker.list=http://localhost:9092
consumer.timeout.ms=1000
producer.retry.count=1
schema.registry.url=http://123.1.1.1:8082

自定义avro序列化程序是我需要反对的东西,并使用这里讨论的方式,但我确信这不是这个问题的原因。
下面是hosts的详细信息:host1:有这个java服务和kafka连接,错误日志在这里。主机2:有kafka,schema registry和zookeper。

k10s72fa

k10s72fa1#

您正在使用自定义序列化程序,并且作为 Serializer 实现时,必须定义 configure 接受Map的方法。
在这个方法中,我猜你定义了 CachedSchemaRegistryClient 字段,但没有从配置Map中提取在生产者级别添加的url属性,因此它将默认使用其他localhost地址
合流代码需要单步执行四个类,但是您将在这里看到序列化程序实现,然后查看单独的config类以及抽象序列化程序和serde父类。在上一篇文章中,我曾指出,我认为您不需要实际使用自定义avro序列化程序,因为您似乎在重做abstractkafkaserializer类的工作

相关问题