是否可以在Flink/Pyflink的DataStream API中获取Kafka消息的行时间?
我用pyflink订阅了一个Kafka主题,需要访问我收到的消息的元数据(rowtime):
types = Types.ROW_NAMED(['name', 'platform', 'year', 'global_sales',
'time_send', 'append_log_time', 'time_in_sps', 'write_time'],
[Types.STRING(), Types.STRING(), Types.INT(), Types.DOUBLE(),
Types.STRING(), Types.LONG(), Types.STRING(), Types.STRING()])
# 2. create source DataStream
deserialization_schema = JsonRowDeserializationSchema.builder() \
.type_info(type_info=types).build()
kafka_source = FlinkKafkaConsumer(
topics='test',
deserialization_schema=deserialization_schema,
properties={'bootstrap.servers': 'kafka:9092'})
kafka_props = {'bootstrap.servers': 'kafka:9092', 'group.id': 'test_group'}
ds = env.add_source(kafka_source)
ds = ds.map(MyMapFunction(), output_type= Types.ROW_NAMED(['name', 'platform', 'year', 'global_sales', 'time_send', 'append_log_time', 'time_in_sps', 'write_time'],
[Types.STRING(), Types.STRING(), Types.INT(), Types.DOUBLE(), Types.STRING(), Types.LONG(), Types.STRING(), Types.STRING()]))
我希望能够读取Kafka代理在元数据中发送给Flink的AppendLogTime。使用TableAPI,我能够获得rowtime属性:
tbl = t_env.from_data_stream(ds, col('name'), col('platform'),
col('year'), col('global_sales'), col('time_send'),
col('append_log_time').rowtime, col('time_in_sps'),
col('write_time'))
使用.rowtime,我可以从Kafka消息中获取Flink的TableAPI内的AppendLogTime。
如果Map数据流:
ds.map(MyMapFunction())
class MyMapFunction(MapFunction):
def open(self, runtime_context: RuntimeContext):
pass
def map(self, value):
return Row(value[0], value[1], value[2], value[3], value[4], value[5].rowtime, str(datetime.timestamp(datetime.now()) * 1000) , value[7])
并尝试存取rowtime属性,我会取得AttributeError: rowtime
我甚至试过只玩rowtime;行时间.行时间与值[x].行时间.
是否可以在Flink/Pyflink的DataStream API中获取Kafka消息的行时间?
1条答案
按热度按时间juud5qan1#
我可以使用Processfunction获取Kafka消息的时间戳,并且在该函数中,我可以使用以下命令访问时间戳:上下文.时间戳():