如何在pyspark中实现以下逻辑

xfyts7mz  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(221)

我有一个Kafkaavro主题和数据是在以下格式。

Sample data

{'model_id': 'tesla_model_s'} {'model_id': 'tesla_model_s', 'metric_name': 'tire_pressure', 'metric_value': '25', 'metric_datetime_utc': 1596662368}
{'model_id': 'BMW_x2'} {'model_id': 'BMW_x2', 'metric_name': 'tire_pressure', 'metric_value': '36', 'metric_datetime_utc': 1596662369}
{'model_id': 'BMW_x2'} {'model_id': 'BMW_x2', 'metric_name': 'pressure', 'metric_value': '17', 'metric_datetime_utc': 1596662369}
{'model_id': 'BMW_x2'} {'model_id': 'BMW_x2', 'metric_name': 'temperature', 'metric_value': '24', 'metric_datetime_utc': 1596662370}  
{'model_id': 'BMW_x2'} {'model_id': 'BMW_x2', 'metric_name': 'tire_pressure', 'metric_value': '27', 'metric_datetime_utc': 1596662370}
{'model_id': 'BMW_x2'} {'model_id': 'BMW_x2', 'metric_name': 'temperature', 'metric_value': '11', 'metric_datetime_utc': 1596662370}
{'model_id': 'BMW_x2'} {'model_id': 'BMW_x2', 'metric_name': 'temperature', 'metric_value': '37', 'metric_datetime_utc': 1596662370}
{'model_id': 'BMW_x2'} {'model_id': 'BMW_x2', 'metric_name': 'pressure', 'metric_value': '17', 'metric_datetime_utc': 1596662370}
{'model_id': 'BMW_x2'} {'model_id': 'BMW_x2', 'metric_name': 'temperature', 'metric_value': '27', 'metric_datetime_utc': 1596662371}

压力是一个关键的衡量标准。当压力值为17时,应触发输出结果。
预期产量

|model_id|metric_name  |metric_value|start_metric_name|start_metric_value|end_metric_name|end_metric_value|start_time|end_time  |
|--------|-------------|------------|-----------------|------------------|---------------|----------------|----------|----------|
|BMW_x2  |temperature  |24          |pressure         |17                |pressure       |17              |1596662369|1596662370|
|BMW_x2  |tire_pressure|27          |pressure         |17                |pressure       |17              |1596662369|1596662370|
|BMW_x2  |temperature  |11          |pressure         |17                |pressure       |17              |1596662369|1596662370|
|BMW_x2  |temperature  |37          |pressure         |17                |pressure       |17              |1596662369|1596662370|

如何使用pyspark流对数据进行聚合和平面Map。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题