我有一个Kafka制作者,它从两个大文件中读取数据,并以相同结构的JSON格式发送它们:
第一个月
生成器将每个文件分成小块,并从每个块创建JSON格式,最后在for循环中发送它们。
发送这两个文件的过程通过多线程同时进行。
我想从这些流(s1.row_id == s2.row_id)中进行join,并在我的制作者发送Kafka的数据时进行一些流处理。因为制作者从多个源生成了大量数据,我迫不及待地要使用所有这些数据,而且必须同时完成。
我不确定Table API是否是一种好方法,但这是我目前为止的pyflink代码:
from pyflink.datastream.stream_execution_environment import StreamExecutionEnvironment
from pyflink.table import EnvironmentSettings
from pyflink.table.expressions import col
from pyflink.table.table_environment import StreamTableEnvironment
KAFKA_SERVERS = 'localhost:9092'
def log_processing():
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///flink_jar/kafka-clients-3.3.2.jar")
env.add_jars("file:///flink_jar/flink-connector-kafka-1.16.1.jar")
env.add_jars("file:///flink_jar/flink-sql-connector-kafka-1.16.1.jar")
settings = EnvironmentSettings.new_instance() \
.in_streaming_mode() \
.build()
t_env = StreamTableEnvironment.create(stream_execution_environment=env, environment_settings=settings)
t1 = f"""
CREATE TEMPORARY TABLE table1(
row_id INT,
row_data STRING
) WITH (
'connector' = 'kafka',
'topic' = 'datatopic',
'properties.bootstrap.servers' = '{KAFKA_SERVERS}',
'properties.group.id' = 'MY_GRP',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
"""
t2 = f"""
CREATE TEMPORARY TABLE table2(
row_id INT,
row_data STRING
) WITH (
'connector' = 'kafka',
'topic' = 'datatopic',
'properties.bootstrap.servers' = '{KAFKA_SERVERS}',
'properties.group.id' = 'MY_GRP',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
"""
p1 = t_env.execute_sql(t1)
p2 = t_env.execute_sql(t2)
字符集
//请告诉我下一步该怎么做:
//问题:
// 1)我是否需要单独使用消费者类中的数据,然后将它们插入到这些表中,或者数据将从我们在这里实现的内容中使用(因为我传递了连接器、主题、bootstartap.servers等的名称...)?
// 2)如果是这样:
2.1)我如何在Python中从这些流中进行连接?
2.2)我的Producer将发送数千条消息,如何防止以前的数据?我希望确保不进行重复查询。
// 3)如果没有,该怎么办?
非常感谢
1条答案
按热度按时间bqf10yzr1#
// 1)我是否需要单独使用消费者类中的数据,然后将它们插入到这些表中,或者数据将从我们在这里实现的内容中使用(因为我传递了连接器、主题、bootstartap.servers等的名称...)?
后一种,数据将被我们实现的“Kafka”表连接器消耗。并且您需要定义一个Sink表作为插入的目标,Sink表可以是一个Kafka连接器表,其中包含您想要输出的主题。
2.1)我如何在Python中从这些流中进行连接?
您可以编写SQL来连接table 1和table 2,然后在Python中插入到接收表中
2.2)我的Producer将发送数千条消息,如何防止以前的数据?我希望确保不进行重复查询。
你可以在“join”或“insert”之前过滤这些消息,在你的情况下,“WHERE”子句就足够了