本文整理了Java中org.apache.spark.broadcast.Broadcast.value()
方法的一些代码示例,展示了Broadcast.value()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Broadcast.value()
方法的具体详情如下:
包路径:org.apache.spark.broadcast.Broadcast
类名称:Broadcast
方法名:value
暂无
代码示例来源:origin: OryxProject/oryx
@Override
public Iterator<Tuple2<Integer,Integer>> call(
Tuple2<Integer,Iterable<Integer>> userIDsAndItemIDs) {
Integer userID = userIDsAndItemIDs._1();
Collection<Integer> positiveItemIDs = Sets.newHashSet(userIDsAndItemIDs._2());
int numPositive = positiveItemIDs.size();
Collection<Tuple2<Integer,Integer>> negative = new ArrayList<>(numPositive);
List<Integer> allItemIDs = allItemIDsBC.value();
int numItems = allItemIDs.size();
// Sample about as many negative examples as positive
for (int i = 0; i < numItems && negative.size() < numPositive; i++) {
Integer itemID = allItemIDs.get(random.nextInt(numItems));
if (!positiveItemIDs.contains(itemID)) {
negative.add(new Tuple2<>(userID, itemID));
}
}
return negative.iterator();
}
});
代码示例来源:origin: databricks/learning-spark
public Tuple2<String, Integer> call(Tuple2<String, Integer> callSignCount) {
String sign = callSignCount._1();
String country = lookupCountry(sign, signPrefixes.value());
return new Tuple2(country, callSignCount._2());
}}).reduceByKey(new SumInts());
countryContactCounts.saveAsTextFile(outputDir + "/countries.txt");
代码示例来源:origin: OryxProject/oryx
private static void saveFeaturesRDD(JavaPairRDD<Integer,float[]> features,
Path path,
Broadcast<? extends Map<Integer,String>> bIndexToID) {
log.info("Saving features RDD to {}", path);
features.map(keyAndVector -> {
String id = bIndexToID.value().get(keyAndVector._1());
float[] vector = keyAndVector._2();
return TextUtils.joinJSON(Arrays.asList(id, vector));
}).saveAsTextFile(path.toString(), GzipCodec.class);
}
代码示例来源:origin: OryxProject/oryx
return new Tuple2<>(
Long.valueOf(tokens[3]),
new Rating(bUserIDToIndex.value().get(tokens[0]),
bItemIDToIndex.value().get(tokens[1]),
代码示例来源:origin: OryxProject/oryx
private static RDD<Tuple2<Object,double[]>> readAndConvertFeatureRDD(
JavaPairRDD<String,float[]> javaRDD,
Broadcast<? extends Map<String,Integer>> bIdToIndex) {
RDD<Tuple2<Integer,double[]>> scalaRDD = javaRDD.mapToPair(t ->
new Tuple2<>(bIdToIndex.value().get(t._1()), t._2())
).mapValues(f -> {
double[] d = new double[f.length];
for (int i = 0; i < d.length; i++) {
d[i] = f[i];
}
return d;
}
).rdd();
// This mimics the persistence level establish by ALS training methods
scalaRDD.persist(StorageLevel.MEMORY_AND_DISK());
@SuppressWarnings("unchecked")
RDD<Tuple2<Object,double[]>> objKeyRDD = (RDD<Tuple2<Object,double[]>>) (RDD<?>) scalaRDD;
return objKeyRDD;
}
代码示例来源:origin: mahmoudparsian/data-algorithms-book
@Override
public Integer call(Iterable<String> values) {
Map<String, PairOfDoubleInteger> patients = buildPatientsMap(values);
String filterType = (String) broadcastVarFilterType.value();
Double filterValueThreshold = (Double) broadcastVarFilterValueThreshold.value();
int passedTheTest = getNumberOfPatientsPassedTheTest(patients, filterType, filterValueThreshold);
return passedTheTest;
}
});
代码示例来源:origin: OryxProject/oryx
AppPMMLUtils.addExtension(pmml, "epsilon", epsilon);
addIDsExtension(pmml, "XIDs", userFeaturesRDD, bUserIndexToID.value());
addIDsExtension(pmml, "YIDs", itemFeaturesRDD, bItemIndexToID.value());
return pmml;
代码示例来源:origin: mahmoudparsian/data-algorithms-book
@Override
public Boolean call(String record) {
String ref = REF.value();
String[] tokens = record.split(",");
if (ref.equals(tokens[1])) {
return true; // do return these records
}
else {
return false; // do not retrun these records
}
}
});
代码示例来源:origin: mahmoudparsian/data-algorithms-book
@Override
public java.util.Iterator<scala.Tuple2<String, scala.Tuple2<String, Integer>>> call(String line) throws Exception {
List<Tuple2<String, Tuple2<String, Integer>>> list = new ArrayList<Tuple2<String, Tuple2<String, Integer>>>();
String[] tokens = line.split("\\s");
for (int i = 0; i < tokens.length; i++) {
int start = (i - brodcastWindow.value() < 0) ? 0 : i - brodcastWindow.value();
int end = (i + brodcastWindow.value() >= tokens.length) ? tokens.length - 1 : i + brodcastWindow.value();
for (int j = start; j <= end; j++) {
if (j != i) {
list.add(new Tuple2<String, Tuple2<String, Integer>>(tokens[i], new Tuple2<String, Integer>(tokens[j], 1)));
} else {
// do nothing
continue;
}
}
}
return list.iterator();
}
}
代码示例来源:origin: mahmoudparsian/data-algorithms-book
@Override
public Iterator<Tuple2<String,Integer>> call(String sequence) {
int K = broadcastK.value();
List<Tuple2<String,Integer>> list = new ArrayList<Tuple2<String,Integer>>();
for (int i=0; i < sequence.length()-K+1 ; i++) {
String kmer = sequence.substring(i, K+i);
list.add(new Tuple2<String,Integer>(kmer, 1));
}
return list.iterator();
}
});
代码示例来源:origin: mahmoudparsian/data-algorithms-book
@Override
public Iterator<Row> call(String line) throws Exception {
List<Row> list = new ArrayList<>();
String[] tokens = line.split("\\s");
for (int i = 0; i < tokens.length; i++) {
int start = (i - brodcastWindow.value() < 0) ? 0
: i - brodcastWindow.value();
int end = (i + brodcastWindow.value() >= tokens.length) ? tokens.length - 1
: i + brodcastWindow.value();
for (int j = start; j <= end; j++) {
if (j != i) {
list.add(RowFactory.create(tokens[i], tokens[j], 1));
} else {
// do nothing
continue;
}
}
}
return list.iterator();
}
});
代码示例来源:origin: mahmoudparsian/data-algorithms-book
@Override
public Iterator<SortedMap<Integer, String>> call(Iterator<Tuple2<String,Integer>> iter) {
final int N = topN.value();
SortedMap<Integer, String> localTopN = new TreeMap<Integer, String>();
while (iter.hasNext()) {
Tuple2<String,Integer> tuple = iter.next();
localTopN.put(tuple._2, tuple._1);
// keep only top N
if (localTopN.size() > N) {
localTopN.remove(localTopN.firstKey());
}
}
return Collections.singletonList(localTopN).iterator();
}
});
代码示例来源:origin: mahmoudparsian/data-algorithms-book
@Override
public Tuple2<String, String> call(String record) {
String[] tokens = StringUtils.split(record, ";");
String geneIDAndReferenceType = tokens[0];
String patientIDAndGeneValue = tokens[1];
String[] arr = StringUtils.split(geneIDAndReferenceType, ",");
// arr[0] = geneID
// arr[1] = referenceType
// check referenceType and geneValue
String referenceType = (String) broadcastVarReferenceType.value();
if ( arr[1].equals(referenceType) ){
// prepare key-value for reducer and send it to reducer
return new Tuple2<String, String>(geneIDAndReferenceType, patientIDAndGeneValue);
}
else {
// otherwise nothing will be counted
// later we will filter out these "null" keys
return TUPLE_2_NULL;
}
}
});
代码示例来源:origin: mahmoudparsian/data-algorithms-book
@Override
public Iterator<SortedMap<Integer, String>> call(Iterator<Tuple2<String,Integer>> iter) {
int N = broadcastN.value();
SortedMap<Integer, String> topN = new TreeMap<Integer, String>();
while (iter.hasNext()) {
Tuple2<String,Integer> tuple = iter.next();
String kmer = tuple._1;
int frequency = tuple._2;
topN.put(frequency, kmer);
// keep only top N
if (topN.size() > N) {
topN.remove(topN.firstKey());
}
}
System.out.println("topN="+topN);
return Collections.singletonList(topN).iterator();
}
});
代码示例来源:origin: mahmoudparsian/data-algorithms-book
@Override
public Tuple2<String,Tuple2<Double,String>> call(Tuple2<String,String> cartRecord) {
String rRecord = cartRecord._1;
String sRecord = cartRecord._2;
String[] rTokens = rRecord.split(";");
String rRecordID = rTokens[0];
String r = rTokens[1]; // r.1, r.2, ..., r.d
String[] sTokens = sRecord.split(";");
// sTokens[0] = s.recordID
String sClassificationID = sTokens[1];
String s = sTokens[2]; // s.1, s.2, ..., s.d
Integer d = broadcastD.value();
double distance = Util.calculateDistance(r, s, d);
String K = rRecordID; // r.recordID
Tuple2<Double,String> V = new Tuple2<Double,String>(distance, sClassificationID);
return new Tuple2<String,Tuple2<Double,String>>(K, V);
}
});
代码示例来源:origin: mahmoudparsian/data-algorithms-book
Map<Tuple2<String,String>, Double> CLASSIFIER = broadcastClassifier.value();
List<String> CLASSES = broadcastClasses.value();
代码示例来源:origin: mahmoudparsian/data-algorithms-book
System.out.println("genes: filterValueThreshold="+filterValueThreshold);
String referenceType = (String) broadcastVarReferenceType.value();
String filterType = (String) broadcastVarFilterType.value();
Double filterValueThreshold = (Double) broadcastVarFilterValueThreshold.value();
代码示例来源:origin: mahmoudparsian/data-algorithms-book
@Override
public Double call(Iterable<String> biosets) {
Set<String> geneBiosets = new HashSet<String>();
for (String biosetID : biosets) {
geneBiosets.add(biosetID);
}
// now we do need shared Map data structure to iterate over its items
Map<String, Double> timetable = broadcastTimeTable.value();
// the following two lists are needed for ttest(exist, notexist)
List<Double> exist = new ArrayList<Double>();
List<Double> notexist = new ArrayList<Double>();
for (Map.Entry<String, Double> entry : timetable.entrySet()) {
String biosetID = entry.getKey();
Double time = entry.getValue();
if (geneBiosets.contains(biosetID)) {
exist.add(time);
}
else {
notexist.add(time);
}
}
// perform the ttest(exist, notexist)
double ttest = MathUtil.ttest(exist, notexist);
return ttest;
}
});
代码示例来源:origin: apache/tinkerpop
@Override
public <R> R get(final String key) throws IllegalArgumentException {
if (!this.memoryComputeKeys.containsKey(key))
throw Memory.Exceptions.memoryDoesNotExist(key);
if (this.inExecute && !this.memoryComputeKeys.get(key).isBroadcast())
throw Memory.Exceptions.memoryDoesNotExist(key);
final ObjectWritable<R> r = (ObjectWritable<R>) (this.inExecute ? this.broadcast.value().get(key) : this.sparkMemory.get(key).value());
if (null == r || r.isEmpty())
throw Memory.Exceptions.memoryDoesNotExist(key);
else
return r.get();
}
代码示例来源:origin: mahmoudparsian/data-algorithms-book
@Override
public String call(Iterable<Tuple2<Double,String>> neighbors) {
Integer k = broadcastK.value();
// keep only k-nearest-neighbors
SortedMap<Double, String> nearestK = Util.findNearestK(neighbors, k);
// now we have the k-nearest-neighbors in nearestK
// we need to find out the classification by majority
// count classifications
Map<String, Integer> majority = Util.buildClassificationCount(nearestK);
// find a classificationID with majority of vote
String selectedClassification = Util.classifyByMajority(majority);
return selectedClassification;
}
});
内容来源于网络,如有侵权,请联系作者删除!