kafka avro消费者:mysql十进制到java十进制

6jjcrrmo  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(412)

我正在尝试使用mysql表中包含3列的记录 (Axis, Price, lastname) 以及他们的数据类型 (int, decimal(14,4), varchar(50)) 分别。
我插入了一个包含以下数据的记录 (1, 5.0000, John) .
下面的java代码(使用confluent platform中mysql连接器创建的主题中的avro记录)读取decimal列:price,作为java.nio.heapbytebuffer类型,因此在接收到该列时无法获取该列的值。
有没有办法将接收到的数据提取或转换成java十进制或双精度数据类型?
以下是mysql连接器属性file:-

{
  "name": "mysql-source",
  "config": {
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
   "key.converter": "io.confluent.connect.avro.AvroConverter",
   "key.converter.schema.registry.url": "http://localhost:8081",
   "value.converter": "io.confluent.connect.avro.AvroConverter",
   "value.converter.schema.registry.url": "http://localhost:8081",
   "incrementing.column.name": "Axis",
   "tasks.max": "1",
   "table.whitelist": "ticket",
   "mode": "incrementing",
   "topic.prefix": "mysql-",
   "name": "mysql-source",
   "validate.non.null": "false",
   "connection.url": "jdbc:mysql://localhost:3306/ticket? 
   user=user&password=password"
   }
}

这是你的名字code:-

public static void main(String[] args) throws InterruptedException, 
     IOException {

        Properties props = new Properties();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");

        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
        "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
        "io.confluent.kafka.serializers.KafkaAvroDeserializer");
        props.put("schema.registry.url", "http://localhost:8081");

        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        String topic = "sql-ticket";
final Consumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(props);
consumer.subscribe(Arrays.asList(topic));

try {
  while (true) {
    ConsumerRecords<String, GenericRecord> records = consumer.poll(100);
    for (ConsumerRecord<String, GenericRecord> record : records) {
      System.out.printf("value = %s \n", record.value().get("Price"));
    }
  }
} finally {
  consumer.close();
}

}

mbyulnm0

mbyulnm01#

好吧,我终于找到了解决办法。
这个 Heapbytebuffer 需要转换为 byte[] 数组,然后我用 BigInteger 它从创建的字节数组构造值,然后我创建了一个 BigDecimal 变量的值,我用 movePointLeft(4) 这是规模(在我的例子:4)和一切工作如预期。

ByteBuffer buf = (ByteBuffer) record.value().get(("Price"));
    byte[] arr = new byte[buf.remaining()];
    buf.get(arr);
    BigInteger bi =new BigInteger(1,arr);
    BigDecimal bd = new BigDecimal(bi).movePointLeft(4);
    System.out.println(bd);

下面是结果(左边是输出,右边是mysql):-

相关问题