Based upon the pyflink
walkthrough , I'm trying to now get a simple nested row query working using apache-flink==1.14.4
. I've created my table structure based upon this solution: Get nested fields from Kafka message using Apache Flink SQL
A message looks like this:
{"signature": {"token": "abcd1234"}}
The relevant part of the code looks like this:
create_kafka_source_ddl = """
CREATE TABLE nested_msg (
`signature` ROW (
`token` STRING
)
) WITH (
'connector' = 'kafka',
'topic' = 'nested_msg',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'nested-msg',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
"""
create_es_sink_ddl = """
CREATE TABLE es_sink (
token STRING
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://elasticsearch:9200',
'index' = 'nested_count_1',
'document-id.key-delimiter' = '$',
'sink.bulk-flush.max-size' = '42mb',
'sink.bulk-flush.max-actions' = '32',
'sink.bulk-flush.interval' = '1000',
'sink.bulk-flush.backoff.delay' = '1000',
'format' = 'json'
)
"""
t_env.execute_sql(create_kafka_source_ddl)
t_env.execute_sql(create_es_sink_ddl)
# How do I select the nested field here?
t_env.from_path("nested_msg").select(col("signature.token").alias("token")).select(
"token"
).execute_insert("es_sink")
I've tried numerous variations here without success. The exception is:
py4j.protocol.Py4JJavaError: An error occurred while calling o48.select.
: org.apache.flink.table.api.ValidationException: Cannot resolve field [signature.token], input field list:[signature].
How can I selected a nested field like this in order to insert it into my sink?
1条答案
按热度按时间9o685dep1#
您可以将
col("signature.token")
变更为col("signature").get('token')
。