Kafka KSQLDB流不读取主题

gcuhipw9  于 2023-10-15  发布在  Apache
关注(0)|答案(1)|浏览(106)

我有一个系统,可以生成针对某个主题的消息:user_event
我从数据中创建了一个流:

CREATE STREAM UserEventStream (
    type STRING,
    bundle_version BIGINT,
    app_version STRING,
    os_name STRING,
    device_id STRING,
    session_id STRING,
    device_type STRING,
    device_name STRING,
    device_brand STRING,
    language STRING,
    ip STRING,
    user_agent STRING,
    user_id BIGINT,
    fcm_token STRING,
    iframe BOOLEAN,
    referrer STRING,
    data MAP<STRING, STRING>,
    time BIGINT
) WITH (
    KAFKA_TOPIC='user_event',
    VALUE_FORMAT='AVRO'
);

当我尝试一个简单的查询时,比如:SELECT * FROM UserEventStream;它不返回任何东西,但主题每秒都会获得新数据。
消息格式正确。
它像一周前一样工作,它完美地阅读了主题。我不认为我做了任何与它或服务器。

Kafka、schema registry、ksqldb、kafka-ui示例运行在没有docker的虚拟机上。

以下是avro消息架构:

{
    "type": "record",
    "name": "UserEvent",
    "namespace": "com.mycompany.kafka.records",
    "fields": [
        {
            "name": "type",
            "type": {
                "type": "string",
                "avro.java.string": "String"
            }
        },
        {
            "name": "bundle_version",
            "type": [
                "null",
                "long"
            ]
        },
        {
            "name": "app_version",
            "type": [
                "null",
                {
                    "type": "string",
                    "avro.java.string": "String"
                }
            ]
        },
        {
            "name": "os_name",
            "type": [
                "null",
                {
                    "type": "string",
                    "avro.java.string": "String"
                }
            ]
        },
        {
            "name": "device_id",
            "type": {
                "type": "string",
                "avro.java.string": "String"
            }
        },
        {
            "name": "session_id",
            "type": [
                "null",
                {
                    "type": "string",
                    "avro.java.string": "String"
                }
            ]
        },
        {
            "name": "device_type",
            "type": [
                "null",
                {
                    "type": "string",
                    "avro.java.string": "String"
                }
            ]
        },
        {
            "name": "device_name",
            "type": [
                "null",
                {
                    "type": "string",
                    "avro.java.string": "String"
                }
            ]
        },
        {
            "name": "device_brand",
            "type": [
                "null",
                {
                    "type": "string",
                    "avro.java.string": "String"
                }
            ]
        },
        {
            "name": "language",
            "type": [
                "null",
                {
                    "type": "string",
                    "avro.java.string": "String"
                }
            ]
        },
        {
            "name": "ip",
            "type": [
                "null",
                {
                    "type": "string",
                    "avro.java.string": "String"
                }
            ]
        },
        {
            "name": "user_agent",
            "type": [
                "null",
                {
                    "type": "string",
                    "avro.java.string": "String"
                }
            ]
        },
        {
            "name": "user_id",
            "type": [
                "null",
                "long"
            ]
        },
        {
            "name": "fcm_token",
            "type": [
                "null",
                {
                    "type": "string",
                    "avro.java.string": "String"
                }
            ]
        },
        {
            "name": "iframe",
            "type": "boolean"
        },
        {
            "name": "referrer",
            "type": [
                "null",
                {
                    "type": "string",
                    "avro.java.string": "String"
                }
            ]
        },
        {
            "name": "data",
            "type": {
                "type": "map",
                "values": [
                    "null",
                    {
                        "type": "string",
                        "avro.java.string": "String"
                    }
                ],
                "avro.java.string": "String"
            },
            "default": {}
        },
        {
            "name": "time",
            "type": {
                "type": "long",
                "logicalType": "timestamp-millis"
            }
        }
    ]
}
lsmepo6l

lsmepo6l1#

我找到解决办法了。
问题是流的列没有正确定义,它没有告诉我任何关于它的信息。
于是,我就像这样创造了一条溪流:

CREATE STREAM UserEventStream WITH (
    KAFKA_TOPIC='user_event',
    VALUE_FORMAT='AVRO'
);

它从注册表中识别模式并创建正确的列。现在它完美地工作了。

相关问题