使用java读取spark redis保存的数据

vsdwdz23  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(564)

我使用spark redis将数据集保存到redis。然后我使用spring data redis读取这些数据:
我保存到redis的对象:

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@Builder
@RedisHash("collaborative_filtering")
public class RatingResult implements Serializable {
    private static final long serialVersionUID = 8755574422193819444L;

    @Id
    private String id;

    @Indexed
    private int user;

    @Indexed
    private String product;

    private double productN;
    private double rating;
    private float prediction;

    public static RatingResult convert(Row row) {
        int user = row.getAs("user");
        String product = row.getAs("product");
        double productN = row.getAs("productN");
        double rating = row.getAs("rating");
        float prediction = row.getAs("prediction");
        String id = user + product;

        return RatingResult.builder().id(id).user(user).product(product).productN(productN).rating(rating)
                .prediction(prediction).build();
    }

}

使用spark redis保存对象:

JavaRDD<RatingResult> result = ...
...
sparkSession.createDataFrame(result, RatingResult.class).write().format("org.apache.spark.sql.redis")
            .option("table", "collaborative_filtering").mode(SaveMode.Overwrite).save();

存储库:

@Repository
public interface RatingResultRepository extends JpaRepository<RatingResult, String> {

}

我无法读取这个数据,因为spark redis和spring data redis保存的结构数据不一样,所以我使用命令检查了spark redis和spring data redis创建的键的值不同: redis-cli -p 6379 keys \* 以及 redis-cli hgetall $key )
那么,如何读取使用java或java中的任何库保存的数据呢?

3zwjbxry

3zwjbxry1#

以下是我的作品。
正在从spark redis写入数据。
我在这里使用scala,但它与java中的基本相同。我唯一改变的就是添加了一个 .option("key.column", "id") 指定哈希id。

val ratingResult = new RatingResult("1", 1, "product1", 2.0, 3.0, 4)

    val result: JavaRDD[RatingResult] = spark.sparkContext.parallelize(Seq(ratingResult)).toJavaRDD()
    spark
      .createDataFrame(result, classOf[RatingResult])
      .write
      .format("org.apache.spark.sql.redis")
      .option("key.column", "id")
      .option("table", "collaborative_filtering")
      .mode(SaveMode.Overwrite)
      .save()

在spring data redis中,我有以下内容:

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@Builder
@RedisHash("collaborative_filtering")
public class RatingResult implements Serializable {
    private static final long serialVersionUID = 8755574422193819444L;

    @Id
    private String id;

    @Indexed
    private int user;

    @Indexed
    private String product;

    private double productN;
    private double rating;
    private float prediction;

    @Override
    public String toString() {
        return "RatingResult{" +
                "id='" + id + '\'' +
                ", user=" + user +
                ", product='" + product + '\'' +
                ", productN=" + productN +
                ", rating=" + rating +
                ", prediction=" + prediction +
                '}';
    }
}

我用crudepository代替jpa:

@Repository
public interface RatingResultRepository extends CrudRepository<RatingResult, String> {

}

查询:

RatingResult found = ratingResultRepository.findById("1").get();
     System.out.println("found = " + found);

输出:

found = RatingResult{id='null', user=1, product='product1', productN=2.0, rating=3.0, prediction=4.0}

你可能注意到 id 字段未填充,因为存储的spark redis具有哈希id而不是哈希属性。

相关问题