flink:无法将流放入csv

hgc7kmma  于 2021-07-15  发布在  Flink
关注(0)|答案(1)|浏览(325)

我正在尝试使用pyflink将一个流放入csv格式的文件系统,但是它不起作用。


# stream_to_csv.py

from pyflink.table import EnvironmentSettings, StreamTableEnvironment

env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env = StreamTableEnvironment.create(environment_settings=env_settings)

table_env.execute_sql("""
    CREATE TABLE datagen (
        id INT,
        data STRING
    ) WITH (
        'connector' = 'datagen',
        'rows-per-second' = '1'
    )
""")

table_env.execute_sql("""
    CREATE TABLE print (
        id INT,
        data STRING
    ) WITH (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '/tmp/output'
    )
""")

table_env.execute_sql("""
INSERT INTO print
SELECT id, data
FROM datagen
""").wait()

要运行脚本:

$ python stream_to_csv.py

我希望记录转到/tmp/output文件夹,但是这不会发生。

$ ~ ls /tmp/output
(nothing shown here)

我错过什么了吗?

vecaoik1

vecaoik11#

我厚颜无耻地抄了点福的回信http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/flink-not-able-to-sink-a-stream-into-csv-td43105.html.
您需要为文件系统设置滚动策略。有关详细信息,请参阅滚动策略部分[1]。
实际上有输出,您可以执行命令 ls -la /tmp/output/ ,然后您将看到几个名为“.part ”的文件。
为了你的工作,你需要设定 execution.checkpointing.interval 在配置和 sink.rolling-policy.rollover-interval 在filesystem connector的属性中。
作业将如下所示:

from pyflink.table import EnvironmentSettings, StreamTableEnvironment

env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env = StreamTableEnvironment.create(environment_settings=env_settings)
table_env.get_config().get_configuration().set_string("execution.checkpointing.interval", "10s")

table_env.execute_sql("""
    CREATE TABLE datagen (
        id INT,
        data STRING
    ) WITH (
        'connector' = 'datagen',
        'rows-per-second' = '1'
    )
""")

table_env.execute_sql("""
    CREATE TABLE print (
        id INT,
        data STRING
    ) WITH (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '/tmp/output',
        'sink.rolling-policy.rollover-interval' = '10s'
    )
""")

table_env.execute_sql("""
INSERT INTO print
SELECT id, data
FROM datagen
""").wait()

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html#rolling-政策

相关问题