对不起,如果这是明确的文件,但我没有找到它。我目前正在遵循这个教程,并调整它以适应我的需要,我面临着一些问题,我想知道这是否是正确的适合我。
https://medium.com/@tomershaiman/tutorial-building-a-s3-parquet-datalake-without-a-single-line-of-code-5ea4455edc1e我想做的和想象我现在的情况非常相似。
tl/dr:我在kafka中有一个protobuf格式的主题数据,我想创建一个流,将这个protobuf转换成avro格式的新主题,一旦它变成avro格式,我就会有一个连接器,它将使用它并将它转储到s3 bucket中。
现在假设我有一个kafka主题searchrequest\u proto,格式为protobuf,然后我想创建一个名为searchrequest\u avro的主题,格式为avro
例如,我的原型是
syntax = "proto3";
message SearchRequest {
string query = 1;
int32 page_number = 2;
int32 result_per_page = 3;
}
我的第一个问题是我的searchrequest ksql流需要所有的字段还是我可以这样做?
CREATE STREAM SearchRequest_proto (query VARCHAR, page_number INT, result_per_page INT) WITH (KAFKA_TOPIC='SearchRequest_proto', VALUE_FORMAT='PROTOBUF');
或者可以给我这样的东西:
CREATE STREAM SearchRequest_proto (query VARCHAR, page_number INT) WITH (KAFKA_TOPIC='SearchRequest_proto', VALUE_FORMAT='PROTOBUF');
我的问题来了,因为我有一个更复杂的原型,我正在尝试测试只是使用一些领域,不需要做所有的,当我创建我的第二个流出来的第一个似乎没有真正出来。
CREATE STREAM SearchRequest_avro WITH (KAFKA_TOPIC='SearchRequest_avro', REPLICAS=1, PARTITIONS=1, VALUE_FORMAT='AVRO') AS SELECT * FROM SearchRequest_proto;
之后,如果我进入我的Kafka消费者组,我可以看到第二个流注册为Kafka消费者。我的第一个protobuf主题包含消息,但不知怎么的,我甚至不能在我的主题上使用打印现在显示的东西,我得到这个消息:
ksql> show streams;
Stream Name | Kafka Topic | Format
--------------------------------------------------------
OBJ_POS_AVRO | com.obj_pos_avro | AVRO
OBJ_POS_PROTO | com.obj_pos_proto | PROTOBUF
--------------------------------------------------------
ksql> print "com.obj_pos_proto";
Could not find topic 'com.obj_pos_proto', or the KSQL user does not have permissions to list the topic. Topic names are case-sensitive.
ksql> print "com.obj_pos_avro";
Could not find topic 'com.obj_pos_avro', or the KSQL user does not have permissions to list the topic. Topic names are case-sensitive.
我的问题来了,因为我看到消费者注册,但没有任何抵消,我想知道,因为我没有隐式声明所有的领域在我的protobuf作为流的一部分,如果它是失败的,因为这?或者是别的什么。
另外一点,如果有人知道,我做了一些谷歌,但没有找到,有没有任何方法,我可以使用模式注册表注册我的protobufs,使流将能够自动读取它,而不需要指定所有这些领域?
或者任何一种可以使用protobufs并生成流或avro格式文件的库?
谢谢你的反馈,很抱歉发了这么长的帖子,你也可以想象我对Kafka的主题不是很了解,所以对我来说这是一个新的主题
编辑:我自己做了一个快速测试,它确实支持更少的字段,所以这不是问题。但是,我在序列化反序列化时遇到了一个错误,这肯定是我的配置问题:
[2020-09-21 08:19:32,836] INFO KafkaProtobufDeserializerConfig values:
bearer.auth.token = [hidden]
proxy.port = -1
schema.reflection = false
auto.register.schemas = true
max.schemas.per.subject = 1000
basic.auth.credentials.source = URL
value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
schema.registry.url = [http://confluent-schema-registry-svc:8081]
basic.auth.user.info = [hidden]
proxy.host =
specific.protobuf.value.type = class java.lang.Object
use.latest.version = false
schema.registry.basic.auth.user.info = [hidden]
bearer.auth.credentials.source = STATIC_TOKEN
derive.type = false
specific.protobuf.key.type = class java.lang.Object
key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
(io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializerConfig)
[2020-09-21 08:19:32,836] INFO ProtobufDataConfig values:
schemas.cache.config = 1000
enhanced.protobuf.schema.support = false
(io.confluent.connect.protobuf.ProtobufDataConfig)
[2020-09-21 08:19:32,841] INFO JsonConverterConfig values:
converter.type = value
decimal.format = BASE64
schemas.cache.size = 1000
schemas.enable = false
(org.apache.kafka.connect.json.JsonConverterConfig)
[2020-09-21 08:19:32,844] ERROR {"type":0,"deserializationError":{"errorMessage":"Error deserializing message from topic: com.obj_pos_proto","recordB64":null,"cause":["Failed to deserialize data for topic com.obj_pos_proto to Protobuf: ","Error deserializing Protobuf message for id -1","Unknown magic byte!"]},"recordProcessingError":null,"productionError":null} (processing.CSAS_TARGET_AVRO_3.KsqlTopic.Source.deserializer)
[2020-09-21 08:19:32,845] WARN Exception caught during Deserialization, taskId: 0_2, topic: com.obj_pos_proto, partition: 2, offset: 0 (org.apache.kafka.streams.processor.internals.StreamThread)
org.apache.kafka.common.errors.SerializationException: Error deserializing message from topic: com.obj_pos_proto
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic com.obj_pos_proto to Protobuf:
at io.confluent.connect.protobuf.ProtobufConverter.toConnectData(ProtobufConverter.java:123)
at io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:45)
at io.confluent.ksql.serde.tls.ThreadLocalDeserializer.deserialize(ThreadLocalDeserializer.java:37)
at io.confluent.ksql.serde.GenericRowSerDe$GenericRowDeserializer.deserialize(GenericRowSerDe.java:300)
at io.confluent.ksql.serde.GenericRowSerDe$GenericRowDeserializer.deserialize(GenericRowSerDe.java:285)
at io.confluent.ksql.logging.processing.LoggingDeserializer.deserialize(LoggingDeserializer.java:46)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:63)
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:175)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:162)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:765)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:943)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:764)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Protobuf message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
1条答案
按热度按时间bvpmtnay1#
我相信我找到了我有这个问题的原因。
我想做的是创建一个无键流https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/create-stream/#example
失败的原因是我的生产者没有联系到我的模式注册表,所以当我试图反序列化数据时,它总是会失败,因为注册表没有真正正常工作