我有一个Kafkaavro主题和数据是在以下格式。
Sample data
{'model_id': 'tesla_model_s'} {'model_id': 'tesla_model_s', 'metric_name': 'tire_pressure', 'metric_value': '25', 'metric_datetime_utc': 1596662368}
{'model_id': 'BMW_x2'} {'model_id': 'BMW_x2', 'metric_name': 'tire_pressure', 'metric_value': '36', 'metric_datetime_utc': 1596662369}
{'model_id': 'BMW_x2'} {'model_id': 'BMW_x2', 'metric_name': 'pressure', 'metric_value': '17', 'metric_datetime_utc': 1596662369}
{'model_id': 'BMW_x2'} {'model_id': 'BMW_x2', 'metric_name': 'temperature', 'metric_value': '24', 'metric_datetime_utc': 1596662370}
{'model_id': 'BMW_x2'} {'model_id': 'BMW_x2', 'metric_name': 'tire_pressure', 'metric_value': '27', 'metric_datetime_utc': 1596662370}
{'model_id': 'BMW_x2'} {'model_id': 'BMW_x2', 'metric_name': 'temperature', 'metric_value': '11', 'metric_datetime_utc': 1596662370}
{'model_id': 'BMW_x2'} {'model_id': 'BMW_x2', 'metric_name': 'temperature', 'metric_value': '37', 'metric_datetime_utc': 1596662370}
{'model_id': 'BMW_x2'} {'model_id': 'BMW_x2', 'metric_name': 'pressure', 'metric_value': '17', 'metric_datetime_utc': 1596662370}
{'model_id': 'BMW_x2'} {'model_id': 'BMW_x2', 'metric_name': 'temperature', 'metric_value': '27', 'metric_datetime_utc': 1596662371}
压力是一个关键的衡量标准。当压力值为17时,应触发输出结果。
预期产量
|model_id|metric_name |metric_value|start_metric_name|start_metric_value|end_metric_name|end_metric_value|start_time|end_time |
|--------|-------------|------------|-----------------|------------------|---------------|----------------|----------|----------|
|BMW_x2 |temperature |24 |pressure |17 |pressure |17 |1596662369|1596662370|
|BMW_x2 |tire_pressure|27 |pressure |17 |pressure |17 |1596662369|1596662370|
|BMW_x2 |temperature |11 |pressure |17 |pressure |17 |1596662369|1596662370|
|BMW_x2 |temperature |37 |pressure |17 |pressure |17 |1596662369|1596662370|
如何使用pyspark流对数据进行聚合和平面Map。
暂无答案!
目前还没有任何答案,快来回答吧!