如何通过pyspark将稀疏矩阵保存到配置单元表中

yqlxgs2m  于 2021-06-24  发布在  Hive
关注(0)|答案(1)|浏览(456)

我有一个rdd,每行包含三个数据类型。例如

from pyspark.sql import SparkSession
from scipy.sparse import csc_matrix
import numpy as np
from pyspark.sql.types import StructType,StructField,FloatType,IntegerType,ArrayType

# create sparse matrix

row = np.array([0, 2, 2, 0, 1, 2])
col = np.array([0, 0, 1, 2, 2, 2])
data = np.array([1, 2, 3, 4, 5, 6])
sp_mat = csc_matrix((data, (row, col)), shape=(3, 3))

# create rdd

sqlContext = SparkSession.builder.appName("test").enableHiveSupport().getOrCreate()
sp_data = [(0,12.1,sp_mat),(1,21.32,sp_mat),(2,21.2,sp_mat)]
spare_rdd = sqlContext.sparkContext.parallelize(sp_data)
print(spare_rdd.take(3)) 

# print

[(0, 12.1, <3x3 sparse matrix of type '<type 'numpy.int64'>' with 6 stored elements in Compressed Sparse Column format>)
,(1, 21.32, <3x3 sparse matrix of type '<type 'numpy.int64'>' with 6 stored elements in Compressed Sparse Column format>)
,(2, 21.2, <3x3 sparse matrix of type '<type 'numpy.int64'>' with 6 stored elements in Compressed Sparse Column format>)]

前两种数据类型是int和float。这三种数据类型是scipy稀疏矩阵。我想把这个rdd数据写入配置单元表。但我不知道保存稀疏矩阵的形式或字段。
所以我的问题如下:
如何为scipy稀疏矩阵创建配置单元表?

CREATE EXTERNAL TABLE spare_table(
  id int,
  value float,
  ...  <---- One or more field or struct for scipy sparse matrix 
)
stored as orc tblproperties ("orc.compress"="SNAPPY");

如何通过pyspark将scipy稀疏矩阵保存到上表中?如果我把rdd转换成Dataframe df = sqlContext.createDataFrame(spare_rdd, schema=['id', 'value', 'scipy']) 将显示错误:
typeerror:不支持类型:<class'scipy.sparse.csc.csc\u matrix'>
不存储scipy类型的解决方案也是可以接受的。唯一的要求是该解决方案能够支持稀疏矩阵的写和读。任何帮助都将不胜感激。

ztigrdn8

ztigrdn81#

我终于找到了解决办法。我可以储存 indices , indptr , data 以及 shape 将scipy稀疏矩阵转化为hive表来实现。当我再次从配置单元表中读取时,我可以重新创建一个基于它们的稀疏矩阵。首先,我应该创建一个配置单元表,如下所示:

CREATE EXTERNAL TABLE spare_table(
  id int,
  value float,
  indices array<int>,
  indptr array<int>,
  data array<int>,
  shape array<int>
)
stored as orc tblproperties ("orc.compress"="SNAPPY");

然后通过将scipy稀疏矩阵分解为 indices , indptr , data 以及 shape .

grid_img_df = spare_rdd.map(lambda x: [
    x[0]
    ,x[1]
    ,x[2].indices.tolist()
    ,x[2].indptr.tolist()
    ,x[2].data.tolist()
    ,[int(shape) for shape in x[2].shape]])

df = sqlContext.createDataFrame(grid_img_df, schema=['id','value','indices','indptr','data','shape'])
df.show()
+---+-----+------------------+------------+------------------+------+
| id|value|           indices|      indptr|              data| shape|
+---+-----+------------------+------------+------------------+------+
|  0| 12.1|[0, 2, 2, 0, 1, 2]|[0, 2, 3, 6]|[1, 2, 3, 4, 5, 6]|[3, 3]|
|  1|21.32|[0, 2, 2, 0, 1, 2]|[0, 2, 3, 6]|[1, 2, 3, 4, 5, 6]|[3, 3]|
|  2| 21.2|[0, 2, 2, 0, 1, 2]|[0, 2, 3, 6]|[1, 2, 3, 4, 5, 6]|[3, 3]|
+---+-----+------------------+------------+------------------+------+

最后,我把它写在Hive里。

df.registerTempTable("df_table")
sqlContext.sql(""" INSERT overwrite TABLE spare_table 
                    select id
                            ,value
                            ,indices
                            ,indptr
                            ,data
                            ,shape
                            from df_table""")

从配置单元表读取数据并将其转换为scipy稀疏矩阵的代码如下:

rdd = df.rdd.map(lambda row:(row.id,row.value,csc_matrix((row.data,row.indices,row.indptr),shape=row.shape)))

相关问题