我有一个系统,可以生成针对某个主题的消息:user_event
我从数据中创建了一个流:
CREATE STREAM UserEventStream (
type STRING,
bundle_version BIGINT,
app_version STRING,
os_name STRING,
device_id STRING,
session_id STRING,
device_type STRING,
device_name STRING,
device_brand STRING,
language STRING,
ip STRING,
user_agent STRING,
user_id BIGINT,
fcm_token STRING,
iframe BOOLEAN,
referrer STRING,
data MAP<STRING, STRING>,
time BIGINT
) WITH (
KAFKA_TOPIC='user_event',
VALUE_FORMAT='AVRO'
);
当我尝试一个简单的查询时,比如:SELECT * FROM UserEventStream;
它不返回任何东西,但主题每秒都会获得新数据。
消息格式正确。
它像一周前一样工作,它完美地阅读了主题。我不认为我做了任何与它或服务器。
Kafka、schema registry、ksqldb、kafka-ui示例运行在没有docker的虚拟机上。
以下是avro消息架构:
{
"type": "record",
"name": "UserEvent",
"namespace": "com.mycompany.kafka.records",
"fields": [
{
"name": "type",
"type": {
"type": "string",
"avro.java.string": "String"
}
},
{
"name": "bundle_version",
"type": [
"null",
"long"
]
},
{
"name": "app_version",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
]
},
{
"name": "os_name",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
]
},
{
"name": "device_id",
"type": {
"type": "string",
"avro.java.string": "String"
}
},
{
"name": "session_id",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
]
},
{
"name": "device_type",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
]
},
{
"name": "device_name",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
]
},
{
"name": "device_brand",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
]
},
{
"name": "language",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
]
},
{
"name": "ip",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
]
},
{
"name": "user_agent",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
]
},
{
"name": "user_id",
"type": [
"null",
"long"
]
},
{
"name": "fcm_token",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
]
},
{
"name": "iframe",
"type": "boolean"
},
{
"name": "referrer",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
]
},
{
"name": "data",
"type": {
"type": "map",
"values": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
],
"avro.java.string": "String"
},
"default": {}
},
{
"name": "time",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
}
]
}
1条答案
按热度按时间lsmepo6l1#
我找到解决办法了。
问题是流的列没有正确定义,它没有告诉我任何关于它的信息。
于是,我就像这样创造了一条溪流:
它从注册表中识别模式并创建正确的列。现在它完美地工作了。