# 1. vectorize the features
def vectorize_raw_data(record)
arr_of_features = record[1..99]
LabeledPoint( record[0] , arr_of_features)
# 2,3 + 4 map over each record for comparison
broadcast_var = []
def calc_distance(record, comparison)
# here you want to keep a broadcast variable with a list or dictionary of
# already compared IDs and break if the key pair already exists
# then, calc the euclidean distance by mapping over the features of
# the record and subtracting the values then squaring the result, keeping
# a running sum of those squares and square rooting that sum
return {"id_pair" : [1,5], "distance" : 123}
for record in allRecords:
for comparison in allRecords:
broadcast_var.append( calc_distance(record, comparison) )
# 5. map for 10 closest neighbors
def closest_neighbors(record, n=10)
broadcast_var.filter(x => x.id_pair.include?(record.id) ).takeOrdered(n, distance)
numpy_knn_results = np.array(knn_results.collect())
# k is 5, hence reshape with 5 each row.
distance = numpy_knn_results.reshape(numpy_knn_results.shape[0]*2, 5)[0::2]
indices = numpy_knn_results.reshape(numpy_knn_results.shape[0]*2, 5)[1::2]
4条答案
按热度按时间ffx8fchx1#
碰巧,我有一个解决方案,包括将sklearn和Spark结合起来:https://adventuresindatascience.wordpress.com/2016/04/02/integrating-spark-with-scikit-learn-visualizing-eigenvectors-and-fun/
它的要点是:
ar7v8xwq2#
对所有记录进行暴力比较是一场失败的战斗。我的建议是使用一个现成的k-Nearest Neighbor算法实现,比如
scikit-learn
提供的算法,然后广播索引和距离的结果数组,并进一步执行。这种情况下的步骤是:
1-按照Bryce的建议对要素进行矢量化,并让矢量化方法返回一个浮点数列表(或numpy数组),其中包含与要素一样多的元素
2-将scikit-learn神经网络与您的数据相匹配:
3-在向量化数据上运行经过训练的算法(在您的情况下,训练和查询数据是相同的)
步骤2和步骤3将在pyspark节点上运行,在本例中是不可并行的。您需要在此节点上有足够的内存。在我的情况下,1。500万张唱片和4个专题片,花了一两秒钟。
在我们为spark实现NN之前,我想我们必须坚持这些解决方案。如果您更喜欢尝试新的东西,那么选择http://spark-packages.org/package/saurfang/spark-knn
x6492ojm3#
你没有提供很多细节,但我对这个问题采取的一般方法是:
1.将记录转换为一个数据结构,例如具有(ID,x1..x100)作为标签和特征
1.Map每个记录并将该记录与所有其他记录进行比较(这里有很多优化空间)
1.创建一些中断逻辑,以便一旦开始比较ID = 5和ID = 1,就会中断计算,因为已经比较了ID = 1和ID = 5
1.一些减少步骤以获得类似
{id_pair: [1,5], distance: 123}
的数据结构1.另一个Map步骤是查找每条记录的10个最近邻居
你已经确定了pyspark,我通常使用scala来完成这类工作,但每一步的伪代码可能如下所示:
伪代码很糟糕,但我认为它传达了意图。当你将所有记录与所有其他记录进行比较时,这里将有很多 Shuffle 和排序。恕我直言,你想把密钥对/距离存储在一个中心位置(就像一个广播变量,虽然这很危险,但它会被更新),以减少你执行的总欧氏距离计算。
ifsvaxew4#
@xenocyon的博客遗漏了很多关于格式和用法的信息,为了更好地理解,我在下面写了一个片段。
下面的代码用于复制类似于sklearn的距离和索引。