ksqldb流是否要求两个模式具有相同数量的字段?

cuxqih21  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(287)

对不起,如果这是明确的文件,但我没有找到它。我目前正在遵循这个教程,并调整它以适应我的需要,我面临着一些问题,我想知道这是否是正确的适合我。
https://medium.com/@tomershaiman/tutorial-building-a-s3-parquet-datalake-without-a-single-line-of-code-5ea4455edc1e我想做的和想象我现在的情况非常相似。
tl/dr:我在kafka中有一个protobuf格式的主题数据,我想创建一个流,将这个protobuf转换成avro格式的新主题,一旦它变成avro格式,我就会有一个连接器,它将使用它并将它转储到s3 bucket中。
现在假设我有一个kafka主题searchrequest\u proto,格式为protobuf,然后我想创建一个名为searchrequest\u avro的主题,格式为avro
例如,我的原型是

syntax = "proto3";

message SearchRequest {
  string query = 1;
  int32 page_number = 2;
  int32 result_per_page = 3;
}

我的第一个问题是我的searchrequest ksql流需要所有的字段还是我可以这样做?

CREATE STREAM SearchRequest_proto (query VARCHAR, page_number INT, result_per_page INT) WITH (KAFKA_TOPIC='SearchRequest_proto', VALUE_FORMAT='PROTOBUF');

或者可以给我这样的东西:

CREATE STREAM SearchRequest_proto (query VARCHAR, page_number INT) WITH (KAFKA_TOPIC='SearchRequest_proto', VALUE_FORMAT='PROTOBUF');

我的问题来了,因为我有一个更复杂的原型,我正在尝试测试只是使用一些领域,不需要做所有的,当我创建我的第二个流出来的第一个似乎没有真正出来。

CREATE STREAM SearchRequest_avro WITH (KAFKA_TOPIC='SearchRequest_avro', REPLICAS=1, PARTITIONS=1, VALUE_FORMAT='AVRO') AS SELECT * FROM SearchRequest_proto;

之后,如果我进入我的Kafka消费者组,我可以看到第二个流注册为Kafka消费者。我的第一个protobuf主题包含消息,但不知怎么的,我甚至不能在我的主题上使用打印现在显示的东西,我得到这个消息:

ksql> show streams;

 Stream Name   | Kafka Topic                 | Format   
--------------------------------------------------------
 OBJ_POS_AVRO  | com.obj_pos_avro            | AVRO     
 OBJ_POS_PROTO | com.obj_pos_proto           | PROTOBUF 
--------------------------------------------------------
ksql> print "com.obj_pos_proto";
Could not find topic 'com.obj_pos_proto', or the KSQL user does not have permissions to list the topic. Topic names are case-sensitive.
ksql> print "com.obj_pos_avro";
Could not find topic 'com.obj_pos_avro', or the KSQL user does not have permissions to list the topic. Topic names are case-sensitive.

我的问题来了,因为我看到消费者注册,但没有任何抵消,我想知道,因为我没有隐式声明所有的领域在我的protobuf作为流的一部分,如果它是失败的,因为这?或者是别的什么。
另外一点,如果有人知道,我做了一些谷歌,但没有找到,有没有任何方法,我可以使用模式注册表注册我的protobufs,使流将能够自动读取它,而不需要指定所有这些领域?
或者任何一种可以使用protobufs并生成流或avro格式文件的库?
谢谢你的反馈,很抱歉发了这么长的帖子,你也可以想象我对Kafka的主题不是很了解,所以对我来说这是一个新的主题
编辑:我自己做了一个快速测试,它确实支持更少的字段,所以这不是问题。但是,我在序列化反序列化时遇到了一个错误,这肯定是我的配置问题:

[2020-09-21 08:19:32,836] INFO KafkaProtobufDeserializerConfig values: 
    bearer.auth.token = [hidden]
    proxy.port = -1
    schema.reflection = false
    auto.register.schemas = true
    max.schemas.per.subject = 1000
    basic.auth.credentials.source = URL
    value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
    schema.registry.url = [http://confluent-schema-registry-svc:8081]
    basic.auth.user.info = [hidden]
    proxy.host = 
    specific.protobuf.value.type = class java.lang.Object
    use.latest.version = false
    schema.registry.basic.auth.user.info = [hidden]
    bearer.auth.credentials.source = STATIC_TOKEN
    derive.type = false
    specific.protobuf.key.type = class java.lang.Object
    key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
 (io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializerConfig)
[2020-09-21 08:19:32,836] INFO ProtobufDataConfig values: 
    schemas.cache.config = 1000
    enhanced.protobuf.schema.support = false
 (io.confluent.connect.protobuf.ProtobufDataConfig)
[2020-09-21 08:19:32,841] INFO JsonConverterConfig values: 
    converter.type = value
    decimal.format = BASE64
    schemas.cache.size = 1000
    schemas.enable = false
 (org.apache.kafka.connect.json.JsonConverterConfig)
[2020-09-21 08:19:32,844] ERROR {"type":0,"deserializationError":{"errorMessage":"Error deserializing message from topic: com.obj_pos_proto","recordB64":null,"cause":["Failed to deserialize data for topic com.obj_pos_proto to Protobuf: ","Error deserializing Protobuf message for id -1","Unknown magic byte!"]},"recordProcessingError":null,"productionError":null} (processing.CSAS_TARGET_AVRO_3.KsqlTopic.Source.deserializer)
[2020-09-21 08:19:32,845] WARN Exception caught during Deserialization, taskId: 0_2, topic: com.obj_pos_proto, partition: 2, offset: 0 (org.apache.kafka.streams.processor.internals.StreamThread)
org.apache.kafka.common.errors.SerializationException: Error deserializing message from topic: com.obj_pos_proto
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic com.obj_pos_proto to Protobuf: 
    at io.confluent.connect.protobuf.ProtobufConverter.toConnectData(ProtobufConverter.java:123)
    at io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:45)
    at io.confluent.ksql.serde.tls.ThreadLocalDeserializer.deserialize(ThreadLocalDeserializer.java:37)
    at io.confluent.ksql.serde.GenericRowSerDe$GenericRowDeserializer.deserialize(GenericRowSerDe.java:300)
    at io.confluent.ksql.serde.GenericRowSerDe$GenericRowDeserializer.deserialize(GenericRowSerDe.java:285)
    at io.confluent.ksql.logging.processing.LoggingDeserializer.deserialize(LoggingDeserializer.java:46)
    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:63)
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
    at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:175)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:162)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:765)
    at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:943)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:764)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Protobuf message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
bvpmtnay

bvpmtnay1#

我相信我找到了我有这个问题的原因。
我想做的是创建一个无键流https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/create-stream/#example

-- keyless stream, with value columns loaded from Schema Registry:
CREATE STREAM pageviews WITH (
    KAFKA_TOPIC = 'keyless-pageviews-topic',
    VALUE_FORMAT = 'JSON'
  );

失败的原因是我的生产者没有联系到我的模式注册表,所以当我试图反序列化数据时,它总是会失败,因为注册表没有真正正常工作

相关问题