我尝试使用ApacheBeam2.23和kafka作为数据源创建10秒的固定窗口。即使我尝试将AfterProcessingTimeTrigger设置为15,并且如果我尝试使用groupbykey,则抛出以下错误,似乎每个记录都会被触发。错误:keyerror:0[运行“[17]:fixedwindow”时]
数据模拟:
from kafka import KafkaProducer
import time
producer = KafkaProducer()
id_val = 1001
while(1):
message = {}
message['id_val'] = str(id_val)
message['sensor_1'] = 10
if (id_val<1003):
id_val = id_val+1
else:
id_val=1001
time.sleep(2)
print(time.time())
producer.send('test', str(message).encode())
梁段:
class AddTimestampFn(beam.DoFn):
def process(self, element):
timestamp = int(time.time())
yield beam.window.TimestampedValue(element, timestamp)
pipeline_options = PipelineOptions()
pipeline_options.view_as(StandardOptions).streaming = True
p = beam.Pipeline(options=pipeline_options)
with beam.Pipeline() as p:
lines = p | "Reading messages from Kafka" >> kafkaio.KafkaConsume(kafka_config)
groups = (
lines
| 'ParseEventFn' >> beam.Map(lambda x: (ast.literal_eval(x[1])))
| 'Add timestamp' >> beam.ParDo(AddTimestampFn())
| 'After timestamp add ' >> beam.ParDo(PrintFn("timestamp add"))
| 'FixedWindow' >> beam.WindowInto(
beam.window.FixedWindows(10*1),allowed_lateness = 30)
| 'Group ' >> beam.GroupByKey())
| 'After group' >> beam.ParDo(PrintFn("after group")))
我做错什么了?我刚刚开始使用beam,所以它可能是非常愚蠢的东西。
暂无答案!
目前还没有任何答案,快来回答吧!