在pysparkDataframe中拆分输入日志文件

mm9b1k5b  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(399)

我有一个日志文件,我需要分裂使用pysparkDataframe

20/06/25 12:19:33 INFO datasources.FileScanRDD: Reading File path: hdfs://bpaiddev/dev/data/warehouse/clean/falcon/ukc/masked_data/parquet/FRAUD_CUSTOMER_INFORMATION/rcd_crt_dttm_yyyymmdd=20200523/part-0042-ed52abc2w.c000.snapp.parquet, range:0-27899, partition values :[20200523]
20/06/25 12:19:34 INFO executor.EXECUTOR: Finished task 18.0 in stage 0.0 (TID 18),18994 bytes result sent to driver

从日志示例中,您可以看到第一行与第二行相比有更多的细节。我想要 Timestamp, Status ,Message,Range,Value 第一行的列,第二行我只能 Timestamp,Status,Message 柱。
如何将regex函数应用于此类数据?请帮我解决这个问题。谢谢!
预期产量:

+-----------------+------+--------------------+--------------+--------------------+
    |         time_val|status|         log_message|         range|               value|
    +-----------------+------+--------------------+--------------+--------------------+
    |20/06/25 12:19:33|  INFO|datasources.FileS...| range:0-27899| partition values...|
    |20/06/25 12:19:34|  INFO|executor.EXECUTORd..|              |                    |
    +-----------------+------+--------------------+--------------+--------------------+
tyky79it

tyky79it1#

您可以首先用 Timestamp 、“状态”和所有剩余的 String .

input_df=spark.createDataFrame(sc.textFile("log_lines.log").map(lambda x : tuple([x[0:17], x[18:22], x[23:]])), ["time_val","status","message"])

+-----------------+------+--------------------+
|         time_val|status|             message|
+-----------------+------+--------------------+
|20/06/25 12:19:33|  INFO|datasources.FileS...|
|20/06/25 12:19:34|  INFO|executor.EXECUTOR...|
+-----------------+------+--------------------+

现在,你先用 Message,Range,Value 如下所示,

input_df.filter(F.col("message").startswith("datasources.FileScanRDD")).withColumn("log_message", F.split(F.col("message"), ",")[0]).withColumn("range", F.split(F.col("message"), ",")[1]).withColumn("value", F.split(F.col("message"), ",")[2])..drop("message").drop("message").show()

+-----------------+------+--------------------+--------------+--------------------+
|         time_val|status|         log_message|         range|               value|
+-----------------+------+--------------------+--------------+--------------------+
|20/06/25 12:19:33|  INFO|datasources.FileS...| range:0-27899| partition values...|
+-----------------+------+--------------------+--------------+--------------------+

然后你可以处理另一行刚刚有消息,

input_df.filter(~(F.col("message").startswith("executor"))).show()

+-----------------+------+--------------------+
|         time_val|status|             message|
+-----------------+------+--------------------+
|20/06/25 12:19:33|  INFO|datasources.FileS...|
+-----------------+------+--------------------+

相关问题