Flink 表API同时连接两个Kafka流

w1jd8yoj  于 2023-11-15  发布在  Apache
关注(0)|答案(1)|浏览(168)

我有一个Kafka制作者,它从两个大文件中读取数据,并以相同结构的JSON格式发送它们:
第一个月
生成器将每个文件分成小块,并从每个块创建JSON格式,最后在for循环中发送它们。
发送这两个文件的过程通过多线程同时进行。
我想从这些流(s1.row_id == s2.row_id)中进行join,并在我的制作者发送Kafka的数据时进行一些流处理。因为制作者从多个源生成了大量数据,我迫不及待地要使用所有这些数据,而且必须同时完成。
我不确定Table API是否是一种好方法,但这是我目前为止的pyflink代码:

from pyflink.datastream.stream_execution_environment import StreamExecutionEnvironment
from pyflink.table import  EnvironmentSettings
from pyflink.table.expressions import col
from pyflink.table.table_environment import StreamTableEnvironment

KAFKA_SERVERS = 'localhost:9092'

def log_processing():
  env = StreamExecutionEnvironment.get_execution_environment()
  env.add_jars("file:///flink_jar/kafka-clients-3.3.2.jar")
  env.add_jars("file:///flink_jar/flink-connector-kafka-1.16.1.jar")
  env.add_jars("file:///flink_jar/flink-sql-connector-kafka-1.16.1.jar")

  settings = EnvironmentSettings.new_instance() \
    .in_streaming_mode() \
    .build()

  t_env = StreamTableEnvironment.create(stream_execution_environment=env, environment_settings=settings)

  t1 = f"""
         CREATE TEMPORARY TABLE table1(
            row_id INT,
            row_data STRING
        ) WITH (
          'connector' = 'kafka',
          'topic' = 'datatopic',
          'properties.bootstrap.servers' = '{KAFKA_SERVERS}',
          'properties.group.id' = 'MY_GRP',
          'scan.startup.mode' = 'latest-offset',
          'format' = 'json'
        )
        """

 t2 = f"""
        CREATE TEMPORARY TABLE table2(
            row_id INT,
            row_data STRING
        ) WITH (
          'connector' = 'kafka',
          'topic' = 'datatopic',
          'properties.bootstrap.servers' = '{KAFKA_SERVERS}',
          'properties.group.id' = 'MY_GRP',
          'scan.startup.mode' = 'latest-offset',
          'format' = 'json'
        )
        """
 p1 = t_env.execute_sql(t1)
 p2 = t_env.execute_sql(t2)

字符集
//请告诉我下一步该怎么做:
//问题:
// 1)我是否需要单独使用消费者类中的数据,然后将它们插入到这些表中,或者数据将从我们在这里实现的内容中使用(因为我传递了连接器、主题、bootstartap.servers等的名称...)?
// 2)如果是这样:
2.1)我如何在Python中从这些流中进行连接?
2.2)我的Producer将发送数千条消息,如何防止以前的数据?我希望确保不进行重复查询。
// 3)如果没有,该怎么办?
非常感谢

bqf10yzr

bqf10yzr1#

// 1)我是否需要单独使用消费者类中的数据,然后将它们插入到这些表中,或者数据将从我们在这里实现的内容中使用(因为我传递了连接器、主题、bootstartap.servers等的名称...)?
后一种,数据将被我们实现的“Kafka”表连接器消耗。并且您需要定义一个Sink表作为插入的目标,Sink表可以是一个Kafka连接器表,其中包含您想要输出的主题。
2.1)我如何在Python中从这些流中进行连接?
您可以编写SQL来连接table 1和table 2,然后在Python中插入到接收表中
2.2)我的Producer将发送数千条消息,如何防止以前的数据?我希望确保不进行重复查询。
你可以在“join”或“insert”之前过滤这些消息,在你的情况下,“WHERE”子句就足够了

相关问题