如何在sparksqljava中把csv类型的字符串转换成dataframe?

jaql4c8m  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(488)

我用spark结构化流式api制作spark-java客户机代码。这些代码从kafka中提取csv类型的字符串

  1. SparkSession spark = SparkSession.builder().master("local[*]").appName("KafkaMongoStream").getOrCreate();
  2. Dataset<Row> df = spark.read().format("kafka").option("kafka.bootstrap.servers", "localhost:9092"))
  3. .option("subscribe", "topicForMongoDB")
  4. .option("startingOffsets", "earliest")
  5. .load()
  6. .selectExpr("CAST(value AS STRING)");
  7. df.show();

返回的结果是成功的。这些代码打印csv类型的字符串。

  1. +--------------------+
  2. | value|
  3. +--------------------+
  4. |realtime_start,re...|
  5. |2021-01-27,2021-0...|
  6. |2021-01-27,2021-0...|
  7. |2021-01-27,2021-0...|
  8. |2021-01-27,2021-0...|
  9. |2021-01-27,2021-0...|

然后我尝试将这些字符串转换为sparksql中的sparkDataframe。首先,下面的代码是javapojo类

  1. public class EntityMongoDB implements Serializable {
  2. private Date date;
  3. private float value;
  4. private String id;
  5. private String title;
  6. private String state;
  7. private String frequency_short;
  8. private String units_short;
  9. private String seasonal_adjustment_short;
  10. private static StructType structType = DataTypes.createStructType(new StructField[] {
  11. DataTypes.createStructField("date", DataTypes.DateType, false),
  12. DataTypes.createStructField("value", DataTypes.FloatType, false),
  13. DataTypes.createStructField("id", DataTypes.StringType, false),
  14. DataTypes.createStructField("title", DataTypes.StringType, false),
  15. DataTypes.createStructField("state", DataTypes.StringType, false),
  16. DataTypes.createStructField("frequency_short", DataTypes.StringType, false),
  17. DataTypes.createStructField("units_short", DataTypes.StringType, false),
  18. DataTypes.createStructField("seasonal_adjustment_short", DataTypes.StringType, false)
  19. });
  20. public static StructType getStructType() {
  21. return structType;
  22. }
  23. }

我编写代码将csv类型的字符串转换成Dataframe

  1. Dataset<Row> dfs = df.select(from_json(col("value"), EntityMongoDB.getStructType())
  2. .as("entityMongoDB"))
  3. .selectExpr("entityMongoDB.date", "entityMongoDB.value", "entityMongoDB.id",
  4. "entityMongoDB.title", "entityMongoDB.state", "entityMongoDB.frequency_short",
  5. "entityMongoDB.units_short", "entityMongoDB.seasonal_adjustment_short").toDF();
  6. dfs.show();
  7. dfs.printSchema();

打印的架构是正确的。

  1. |-- date: date (nullable = true)
  2. |-- value: float (nullable = true)
  3. |-- id: string (nullable = true)
  4. |-- title: string (nullable = true)
  5. |-- state: string (nullable = true)
  6. |-- frequency_short: string (nullable = true)
  7. |-- units_short: string (nullable = true)
  8. |-- seasonal_adjustment_short: string (nullable = true)

但是生成的列充满了空值

  1. +----+-----+----+-----+-----+---------------+-----------+-------------------------+
  2. |date|value| id|title|state|frequency_short|units_short|seasonal_adjustment_short|
  3. +----+-----+----+-----+-----+---------------+-----------+-------------------------+
  4. |null| null|null| null| null| null| null| null|
  5. |null| null|null| null| null| null| null| null|
  6. |null| null|null| null| null| null| null| null|
  7. |null| null|null| null| null| null| null| null|
  8. |null| null|null| null| null| null| null| null|

我认为dataframe的模式生成正确,但是提取数据部分有一些问题。任何答复都将感激不尽。致以最诚挚的问候

yjghlzjz

yjghlzjz1#

你手上的弦 value 列不是有效的json,所以 from_json 在这里不行。
对于spark 3+,可以使用 from_csv 正如@mck在评论中指出的:

  1. Dataset<Row> dfs = df.select(from_csv(col("value"), EntityMongoDB.getStructType())
  2. .as("entityMongoDB"))
  3. .selectExpr("entityMongoDB.*").toDF();

对于3之前的spark版本,您可以 split 然后,逗号表示的值从结果数组转换为多列:

  1. Dataset<Row> dfs = df.select(split(col("value"), ",").as("values"))
  2. .select(IntStream.range(0, 7).map(i -> col("values").getItem(i)).toArray())
  3. .toDF("date", "value", "id", "title", "state", "frequency_short", "units_short", "seasonal_adjustment_short");

另外,如果值中有列名,可以过滤掉该行。

相关问题