我有一个apachecamel路由,将avro消息发布到apachekafka主题上。我只在设置producer属性“serializerclass=kafka.serializer.stringencoder”时才能使用它。否则我会
java.lang.classcastexception:java.lang.string不能在kafka.serializer.defaultencoder.tobytes(编码器)处转换为[b]。scala:34)在kafka.producer.async.defaulteventhandler$$anonfun$serialize$1.apply(defaulteventhandler)。scala:130)在kafka.producer.async.defaulteventhandler$$anonfun$serialize$1.apply(defaulteventhandler)。scala:125)在scala.collection.indexedseqoptimized$class.foreach(indexedseqoptimized。scala:33)在scala.collection.mutable.wrappedarray.foreach(wrappedarray。scala:34)在kafka.producer.async.defaulteventhandler.serialize(defaulteventhandler。scala:125)在kafka.producer.async.defaulteventhandler.handle(defaulteventhandler。scala:52)在Kafka。制片人。制片人。发送(制片人。scala:77)在kafka.javaapi.producer.producer.send(producer。scala:33)在org.apache.camel.component.kafka.kafkaproducer.process(kafkaproducer。java:84)
另一方面,我有第二个apache camel路由,应该使用上面的主题,但是失败了
java.io.ioexception:org.apache.avro.io.binarydecover.innerlongdecode(binarydecover)中的长编码无效。java:217)在org.apache.avro.io.binarydecoder.readlong(binarydecoder。java:176)在org.apache.avro.io.resolvingdecoder.readlong(resolvingdecoder。java:162)在org.apache.avro.generic.genericdatumreader.read(genericdatumreader。java:160)在org.apache.avro.generic.genericdatumreader.readfield(genericdatumreader。java:193)在org.apache.avro.generic.genericdatumreader.readrecord(genericdatumreader。java:183)在org.apache.avro.generic.genericdatumreader.read(genericdatumreader。java:151)在org.apache.avro.generic.genericdatumreader.read(genericdatumreader。java:142)在org.apache.camel.dataformat.avro.avrodataformat.unmarshal(avrodataformat。java:133)在org.apache.camel.processor.unmarshalprocessor.process(unmarshalprocessor。java:67)
以下是我使用的apache camel消费代码:
<route id="cassandra.publisher">
<from
uri="{{kafka.base.uri}}&topic=sensordata&groupId=Cassandra_ConsumerGroup&consumerId=CassandraConsumer_Instance_1&clientId=adapter2" />
<unmarshal>
<custom ref="avroSensorData" />
</unmarshal>
2条答案
按热度按时间wlwcrazw1#
为了解决这个问题,您必须为camel kafka消费者提供keydeserializer和valuedeserializer,如下所示:
&keydeserializer=org.apache.kafka.common.serialization.stringdeserializer&valuedeserializer=org.apache.kafka.common.serialization.bytearraydeserializer
sulc1iza2#
http://camel.465427.n5.nabble.com/camel-kafka-component-td5749525.html#a5769561
描述ApacheCamel版本2.16.0/2.15.3将支持各种数据类型,而不仅仅是字符串消息。
正如所承诺的,ApacheCamel 2.15.3已经修复了这个问题,而camel-8790也修复了这个问题(https://issues.apache.org/jira/browse/camel-8790).