kafka streams jdbc源代码长时间不兼容

ycggw6v2  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(271)

问题:设置kafka管道后,使用带有avro序列化程序和反序列化程序的kafka connect jdbc源将数据拉入,一旦我尝试使用kafka streams java应用程序将数据读入kstream,就会出现以下错误。
org.apache.kafka.common.errors.serializationexception:longdeserializer接收的数据大小不是8
我试着尽可能地遵循现有的例子,但是有些事情没有意义。我将在下面列出所有代码/附加信息,但这里有几个问题。。。
我目前在理解上最大的差距之一是avro记录的“关键”是什么?我(在运行时)出错的那一行与这样一个事实有关:我告诉kstream键是一个long,但是当检索avro记录时,长度小于8(long类型的预期长度)。
当我设置jdbc源代码时,没有任何东西可以标识出密钥是什么—而且我在文档中也没有看到任何东西可以让我相信我可以指定密钥,尽管我已经尝试:

curl -X POST \
  -H "Content-Type: application/json" \
  --data 'see next code block for formatted data'  \
http://localhost:8083/connectors

// This is the data chunk used above but in a string - broke it apart for readability here
{
    "name": "source-jdbc-ldw_applications",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "tasks.max": 1,
        "connection.url": "jdbc:sqlserver://dbserver;databaseName=dbname;user=kafkareader;password=kafkareader;",
        "mode": "incrementing",
        "incrementing.column.name": "ApplicationID",
        "topic.prefix": "source-jdbc-",
        "poll.interval.ms": 30000,
        "table.whitelist": "LDW_Applications",
        "transforms": "setSchema",
        "transforms.setSchema.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
        "transforms.setSchema.schema.name": "com.mycompany.avro.Application",
        "transforms.setSchema.schema.version": "1"
    }
}

通过以上步骤,我可以通过运行

curl http://localhost:8081/subjects/source-jdbc-LDW_Applications-value/versions/1 |jq

下面是它的输出:

{
    "subject": "source-jdbc-LDW_Applications-value",
    "version": 1,
    "id": 9,
    "schema": "{\"type\":\"record\",\"name\":\"Application\",\"namespace\":\"com.baydynamics.avro\",\"fields\":[{\"name\":\"ApplicationID\",\"type\":\"long\"},{\"name\":\"Name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"Description\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"Group\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"OwnerUserID\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"RiskScore\",\"type\":[\"null\",{\"type\":\"int\",\"connect.type\":\"int16\"}],\"default\":null},{\"name\":\"RiskRating\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"ServiceLevelTierID\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"LossPotentialID\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"ConfidentialityRequirementID\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"IntegrityRequirementID\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"AvailabilityRequirementID\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"ApplicationCategoryID\",\"type\":[\"null\",\"long\"],\"default\":null}],\"connect.version\":1,\"connect.name\":\"com.baydynamics.avro.Application\"}"
}

想让这个模式更漂亮一点:

{
"type":"record",
"name":"Application",
"namespace":"com.baydynamics.avro",
"fields":[
    {
        "name":"ApplicationID",
        "type":"long"
    },
    {
        "name":"Name",
        "type":[
            "null",
            "string"
        ],
        "default":null
    },
    {
        "name":"Description",
        "type":[
            "null",
            "string"
        ],
        "default":null
    },
    {
        "name":"Group",
        "type":[
            "null",
            "string"
        ],
        "default":null
    },
    {
        "name":"OwnerUserID",
        "type":[
            "null",
            "long"
        ],
        "default":null
    },
    {
        "name":"RiskScore",
        "type":[
            "null",
            {
            "type":"int",
            "connect.type":"int16"
            }
        ],
        "default":null
    },
    {
        "name":"RiskRating",
        "type":[
            "null",
            "string"
        ],
        "default":null
    },
    {
        "name":"ServiceLevelTierID",
        "type":[
            "null",
            "int"
        ],
        "default":null
    },
    {
        "name":"LossPotentialID",
        "type":[
            "null",
            "int"
        ],
        "default":null
    },
    {
        "name":"ConfidentialityRequirementID",
        "type":[
            "null",
            "int"
        ],
        "default":null
    },
    {
        "name":"IntegrityRequirementID",
        "type":[
            "null",
            "int"
        ],
        "default":null
    },
    {
        "name":"AvailabilityRequirementID",
        "type":[
            "null",
            "int"
        ],
        "default":null
    },
    {
        "name":"ApplicationCategoryID",
        "type":[
            "null",
            "long"
        ],
        "default":null
    }
],
"connect.version":1,
"connect.name":"com.baydynamics.avro.Application"
}

所以,我没有看到任何东西表明上面的某个字段是记录的键。
然后我进入Kafka流,我试着把数据带进一个kstream…然后它爆炸了。。。

final KStream<Long, Application> applicationStream = builder.stream(Serdes.Long(), applicationSerde, VULNERABILITY_TOPIC);

因为我知道后台存储的数据是sql server中的bigint,在java中Map为long,所以我将kstream的键类型设为long,然后使用serdes.long()反序列化器作为kstream生成器的参数。
调试时,我看到原始记录的长度为7,这就是它抛出错误的原因。显然avro序列化的方式压缩得更好?我不知道。不管怎样,问题是我甚至不知道它认为它实际上在用什么键?!所以谁知道呢-也许我对long的假设是不正确的,因为它实际上没有使用applicationid作为键?为什么我会认为是?!
任何帮助都将不胜感激。我知道上面有很多信息,但简而言之。。
使用jdbc-kafka-connect将数据推入主题
数据正在进入主题-我可以通过控制台看到它
试图把数据推到一个流中,这样我就可以用数据做一些很棒的事情,但它在尝试填充流时失败了,因为serdes与avro记录不兼容
更新1:根据下面randall的建议,我尝试了smt(单消息转换),现在我有了每个记录的一个键,这是朝着正确方向迈出的一大步,但由于某些原因,强制转换为long(int64)似乎没有任何实际效果。我已经用smt的连接器配置截图,结果记录(现在有一个键!)我在Kafka流中也看到了同样的错误:

sauutmhj

sauutmhj1#

汇合的jdbc源连接器不生成带有键的记录。已记录添加此支持的功能请求。
同时,可以使用单个消息转换从值中提取一些字段来创建键。内置的 ValueToKey transform就是这么做的。这篇博文有一个smt的例子。

相关问题