带有kafka io流数据的apache beam python groupbykey

q5iwbnjs  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(325)

我尝试使用ApacheBeam2.23和kafka作为数据源创建10秒的固定窗口。即使我尝试将AfterProcessingTimeTrigger设置为15,并且如果我尝试使用groupbykey,则抛出以下错误,似乎每个记录都会被触发。错误:keyerror:0[运行“[17]:fixedwindow”时]
数据模拟:

from kafka import KafkaProducer
import time
producer = KafkaProducer()
id_val = 1001
while(1):
    message = {}
    message['id_val'] = str(id_val)
    message['sensor_1'] = 10
    if (id_val<1003):
        id_val = id_val+1
    else:
        id_val=1001
    time.sleep(2)
    print(time.time())
    producer.send('test', str(message).encode())

梁段:

class AddTimestampFn(beam.DoFn):
  def process(self, element):
    timestamp = int(time.time())
    yield beam.window.TimestampedValue(element, timestamp)

pipeline_options = PipelineOptions()
pipeline_options.view_as(StandardOptions).streaming = True
p = beam.Pipeline(options=pipeline_options)

with beam.Pipeline() as p:
lines = p | "Reading messages from Kafka" >> kafkaio.KafkaConsume(kafka_config)
groups = (
    lines
    | 'ParseEventFn' >> beam.Map(lambda x: (ast.literal_eval(x[1])))
    | 'Add timestamp' >> beam.ParDo(AddTimestampFn())
    | 'After timestamp add ' >> beam.ParDo(PrintFn("timestamp add"))
    | 'FixedWindow' >> beam.WindowInto(
        beam.window.FixedWindows(10*1),allowed_lateness = 30)
    | 'Group ' >> beam.GroupByKey())
    | 'After group' >> beam.ParDo(PrintFn("after group")))

我做错什么了?我刚刚开始使用beam,所以它可能是非常愚蠢的东西。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题