Pyflink数据流API获取行时间(Kafka消息时间戳)

yx2lnoni  于 2022-12-11  发布在  Apache
关注(0)|答案(1)|浏览(241)

是否可以在Flink/Pyflink的DataStream API中获取Kafka消息的行时间?
我用pyflink订阅了一个Kafka主题,需要访问我收到的消息的元数据(rowtime):

types = Types.ROW_NAMED(['name', 'platform', 'year', 'global_sales', 
            'time_send', 'append_log_time', 'time_in_sps', 'write_time'], 
            [Types.STRING(), Types.STRING(), Types.INT(), Types.DOUBLE(),
             Types.STRING(), Types.LONG(), Types.STRING(), Types.STRING()])
# 2. create source DataStream
deserialization_schema = JsonRowDeserializationSchema.builder() \
    .type_info(type_info=types).build()

kafka_source = FlinkKafkaConsumer(
    topics='test',
    deserialization_schema=deserialization_schema,
    properties={'bootstrap.servers': 'kafka:9092'})
kafka_props = {'bootstrap.servers': 'kafka:9092', 'group.id': 'test_group'}

ds = env.add_source(kafka_source)
ds = ds.map(MyMapFunction(), output_type= Types.ROW_NAMED(['name', 'platform', 'year', 'global_sales', 'time_send', 'append_log_time', 'time_in_sps', 'write_time'], 
                [Types.STRING(), Types.STRING(), Types.INT(), Types.DOUBLE(), Types.STRING(), Types.LONG(), Types.STRING(), Types.STRING()]))

我希望能够读取Kafka代理在元数据中发送给Flink的AppendLogTime。使用TableAPI,我能够获得rowtime属性:

tbl = t_env.from_data_stream(ds, col('name'), col('platform'),
     col('year'), col('global_sales'), col('time_send'),
     col('append_log_time').rowtime, col('time_in_sps'), 
     col('write_time'))

使用.rowtime,我可以从Kafka消息中获取Flink的TableAPI内的AppendLogTime。
如果Map数据流:

ds.map(MyMapFunction())

class MyMapFunction(MapFunction):

def open(self, runtime_context: RuntimeContext):
    pass

def map(self, value):
    return Row(value[0], value[1], value[2], value[3], value[4], value[5].rowtime, str(datetime.timestamp(datetime.now()) * 1000) , value[7])

并尝试存取rowtime属性,我会取得AttributeError: rowtime
我甚至试过只玩rowtime;行时间.行时间与值[x].行时间.
是否可以在Flink/Pyflink的DataStream API中获取Kafka消息的行时间?

juud5qan

juud5qan1#

我可以使用Processfunction获取Kafka消息的时间戳,并且在该函数中,我可以使用以下命令访问时间戳:上下文.时间戳():

class MyProcessFunction(KeyedProcessFunction):

def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
    result = "Current key: {}, name: {}, platform : {}, year : {}, global_sales : {}, time_send: {}, timestamp: {}, time_in_sps: {}, write_time: {}".format(
        "key", str(value[0]), str(value[1]),  str(value[2]),  str(value[3]), str(value[4]), str(ctx.timestamp()), str(value[6]), str(value[7]))
    yield result

def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext'):
    yield "On timer timestamp: " + str(timestamp)

ds.process(MyProcessFunction(), output_type=Types.STRING())

相关问题