pyspark 如何使用Spark找到10亿条记录的最近邻居?

iszxjhcz  于 2023-04-29  发布在  Spark
关注(0)|答案(4)|浏览(157)

假设10亿条记录包含以下信息:

ID  x1  x2  x3  ... x100
    1   0.1  0.12  1.3  ... -2.00
    2   -1   1.2    2   ... 3
    ...

对于上面的每个ID,我想找到前10个最接近的ID,基于它们的向量(x1,x2,...,x100)。
最好的计算方法是什么?

ffx8fchx

ffx8fchx1#

碰巧,我有一个解决方案,包括将sklearn和Spark结合起来:https://adventuresindatascience.wordpress.com/2016/04/02/integrating-spark-with-scikit-learn-visualizing-eigenvectors-and-fun/
它的要点是:

  • 集中使用sklearn的k-NN fit()方法
  • 但随后使用sklearn的k-NN kneighbors()方法进行分布式
ar7v8xwq

ar7v8xwq2#

对所有记录进行暴力比较是一场失败的战斗。我的建议是使用一个现成的k-Nearest Neighbor算法实现,比如scikit-learn提供的算法,然后广播索引和距离的结果数组,并进一步执行。
这种情况下的步骤是:
1-按照Bryce的建议对要素进行矢量化,并让矢量化方法返回一个浮点数列表(或numpy数组),其中包含与要素一样多的元素
2-将scikit-learn神经网络与您的数据相匹配:

nbrs = NearestNeighbors(n_neighbors=10, algorithm='auto').fit(vectorized_data)

3-在向量化数据上运行经过训练的算法(在您的情况下,训练和查询数据是相同的)

distances, indices = nbrs.kneighbors(qpa)

步骤2和步骤3将在pyspark节点上运行,在本例中是不可并行的。您需要在此节点上有足够的内存。在我的情况下,1。500万张唱片和4个专题片,花了一两秒钟。
在我们为spark实现NN之前,我想我们必须坚持这些解决方案。如果您更喜欢尝试新的东西,那么选择http://spark-packages.org/package/saurfang/spark-knn

x6492ojm

x6492ojm3#

你没有提供很多细节,但我对这个问题采取的一般方法是:
1.将记录转换为一个数据结构,例如具有(ID,x1..x100)作为标签和特征
1.Map每个记录并将该记录与所有其他记录进行比较(这里有很多优化空间)
1.创建一些中断逻辑,以便一旦开始比较ID = 5和ID = 1,就会中断计算,因为已经比较了ID = 1和ID = 5
1.一些减少步骤以获得类似{id_pair: [1,5], distance: 123}的数据结构
1.另一个Map步骤是查找每条记录的10个最近邻居
你已经确定了pyspark,我通常使用scala来完成这类工作,但每一步的伪代码可能如下所示:

# 1. vectorize the features
def vectorize_raw_data(record)
    arr_of_features = record[1..99]
    LabeledPoint( record[0] , arr_of_features)

# 2,3 + 4 map over each record for comparison
broadcast_var = [] 
def calc_distance(record, comparison)
    # here you want to keep a broadcast variable with a list or dictionary of
    # already compared IDs and break if the key pair already exists
    # then, calc the euclidean distance by mapping over the features of
    # the record and subtracting the values then squaring the result, keeping 
    # a running sum of those squares and square rooting that sum
    return {"id_pair" : [1,5], "distance" : 123}    

for record in allRecords:
  for comparison in allRecords:
    broadcast_var.append( calc_distance(record, comparison) )

# 5. map for 10 closest neighbors

def closest_neighbors(record, n=10)
     broadcast_var.filter(x => x.id_pair.include?(record.id) ).takeOrdered(n, distance)

伪代码很糟糕,但我认为它传达了意图。当你将所有记录与所有其他记录进行比较时,这里将有很多 Shuffle 和排序。恕我直言,你想把密钥对/距离存储在一个中心位置(就像一个广播变量,虽然这很危险,但它会被更新),以减少你执行的总欧氏距离计算。

ifsvaxew

ifsvaxew4#

@xenocyon的博客遗漏了很多关于格式和用法的信息,为了更好地理解,我在下面写了一个片段。

df = df.withColumn('vector_list', F.array('x1', 'x2', 'x3', ... , 'x100'))
vectors_collected = df.select(df['x1'],df['x2'], ... , df['x100']).rdd.map(list).collect()
knn = NearestNeighbors(n_neighbors=5).fit(vectors_collected)
broadcast_knn = spark.sparkContext.broadcast(knn)
knn_results = df.select(df['vector_list']).rdd.map(lambda x: broadcast_knn.value.kneighbors(x))

下面的代码用于复制类似于sklearn的距离和索引。

numpy_knn_results = np.array(knn_results.collect())
# k is 5, hence reshape with 5 each row.
distance = numpy_knn_results.reshape(numpy_knn_results.shape[0]*2, 5)[0::2]
indices = numpy_knn_results.reshape(numpy_knn_results.shape[0]*2, 5)[1::2]

相关问题