本文整理了Java中org.apache.flink.api.java.operators.DataSink.name()
方法的一些代码示例,展示了DataSink.name()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。DataSink.name()
方法的具体详情如下:
包路径:org.apache.flink.api.java.operators.DataSink
类名称:DataSink
方法名:name
暂无
代码示例来源:origin: apache/flink
@Override
public void emitDataSet(DataSet<Row> dataSet) {
dataSet
.output(new Utils.CollectHelper<>(accumulatorName, serializer))
.name("SQL Client Batch Collect Sink");
}
代码示例来源:origin: apache/flink
@Override
public Count<T> run(DataSet<T> input)
throws Exception {
super.run(input);
countHelper = new CountHelper<>();
input
.output(countHelper)
.name("Count");
return this;
}
代码示例来源:origin: apache/flink
@Override
public ChecksumHashCode<T> run(DataSet<T> input)
throws Exception {
super.run(input);
checksumHashCodeHelper = new ChecksumHashCodeHelper<>();
input
.output(checksumHashCodeHelper)
.name("ChecksumHashCode");
return this;
}
代码示例来源:origin: apache/flink
/**
* Convenience method to get the count (number of elements) of a DataSet
* as well as the checksum (sum over element hashes).
*
* @return A ChecksumHashCode that represents the count and checksum of elements in the data set.
* @deprecated replaced with {@code org.apache.flink.graph.asm.dataset.ChecksumHashCode} in Gelly
*/
@Deprecated
public static <T> Utils.ChecksumHashCode checksumHashCode(DataSet<T> input) throws Exception {
final String id = new AbstractID().toString();
input.output(new Utils.ChecksumHashCodeHelper<T>(id)).name("ChecksumHashCode");
JobExecutionResult res = input.getExecutionEnvironment().execute();
return res.<Utils.ChecksumHashCode> getAccumulatorResult(id);
}
代码示例来源:origin: apache/flink
@Override
public void write(String executionName, PrintStream out, DataSet<T> data) throws Exception {
if (Tuple.class.isAssignableFrom(data.getType().getTypeClass())) {
data
.writeAsCsv(filename.getValue(), lineDelimiter.getValue(), fieldDelimiter.getValue())
.name("CSV: " + filename.getValue());
} else {
// line and field delimiters are ineffective when writing custom POJOs result types
data
.writeAsText(filename.getValue())
.name("CSV: " + filename.getValue());
}
data.getExecutionEnvironment().execute();
}
}
代码示例来源:origin: apache/flink
@Override
public Collect<T> run(DataSet<T> input)
throws Exception {
super.run(input);
serializer = input.getType().createSerializer(env.getConfig());
collectHelper = new CollectHelper<>(serializer);
input
.output(collectHelper)
.name("Collect");
return this;
}
代码示例来源:origin: apache/flink
private void createTextSink(PythonOperationInfo info) {
DataSet<byte[]> parent = sets.getDataSet(info.parentID);
parent.map(new StringDeserializerMap()).setParallelism(info.parallelism)
.writeAsText(info.path, info.writeMode).setParallelism(info.parallelism).name("TextSink");
}
代码示例来源:origin: apache/flink
@Override
public AverageClusteringCoefficient<K, VV, EV> run(Graph<K, VV, EV> input)
throws Exception {
super.run(input);
DataSet<LocalClusteringCoefficient.Result<K>> localClusteringCoefficient = input
.run(new LocalClusteringCoefficient<K, VV, EV>()
.setParallelism(parallelism));
averageClusteringCoefficientHelper = new AverageClusteringCoefficientHelper<>();
localClusteringCoefficient
.output(averageClusteringCoefficientHelper)
.name("Average clustering coefficient");
return this;
}
代码示例来源:origin: apache/flink
@Override
public AverageClusteringCoefficient<K, VV, EV> run(Graph<K, VV, EV> input)
throws Exception {
super.run(input);
DataSet<LocalClusteringCoefficient.Result<K>> localClusteringCoefficient = input
.run(new LocalClusteringCoefficient<K, VV, EV>()
.setParallelism(parallelism));
averageClusteringCoefficientHelper = new AverageClusteringCoefficientHelper<>();
localClusteringCoefficient
.output(averageClusteringCoefficientHelper)
.name("Average clustering coefficient");
return this;
}
代码示例来源:origin: apache/flink
@Override
public TriadicCensus<K, VV, EV> run(Graph<K, VV, EV> input)
throws Exception {
super.run(input);
triangleListingHelper = new TriangleListingHelper<>();
input
.run(new TriangleListing<K, VV, EV>()
.setParallelism(parallelism))
.output(triangleListingHelper)
.name("Triangle counts");
vertexDegreesHelper = new VertexDegreesHelper<>();
input
.run(new VertexDegrees<K, VV, EV>()
.setParallelism(parallelism))
.output(vertexDegreesHelper)
.name("Edge and triplet counts");
return this;
}
代码示例来源:origin: apache/flink
/**
* Convenience method to get the count (number of elements) of a DataSet.
*
* @return A long integer that represents the number of elements in the data set.
*/
public long count() throws Exception {
final String id = new AbstractID().toString();
output(new Utils.CountHelper<T>(id)).name("count()");
JobExecutionResult res = getExecutionEnvironment().execute();
return res.<Long> getAccumulatorResult(id);
}
代码示例来源:origin: apache/flink
private void createCsvSink(PythonOperationInfo info) {
DataSet<byte[]> parent = sets.getDataSet(info.parentID);
parent.map(new StringTupleDeserializerMap()).setParallelism(info.parallelism).name("CsvSinkPreStep")
.writeAsCsv(info.path, info.lineDelimiter, info.fieldDelimiter, info.writeMode).setParallelism(info.parallelism).name("CsvSink");
}
代码示例来源:origin: apache/flink
/**
* Convenience method to get the elements of a DataSet as a List.
* As DataSet can contain a lot of data, this method should be used with caution.
*
* @return A List containing the elements of the DataSet
*/
public List<T> collect() throws Exception {
final String id = new AbstractID().toString();
final TypeSerializer<T> serializer = getType().createSerializer(getExecutionEnvironment().getConfig());
this.output(new Utils.CollectHelper<>(id, serializer)).name("collect()");
JobExecutionResult res = getExecutionEnvironment().execute();
ArrayList<byte[]> accResult = res.getAccumulatorResult(id);
if (accResult != null) {
try {
return SerializedListAccumulator.deserializeList(accResult, serializer);
} catch (ClassNotFoundException e) {
throw new RuntimeException("Cannot find type class of collected data type.", e);
} catch (IOException e) {
throw new RuntimeException("Serialization error while deserializing collected data", e);
}
} else {
throw new RuntimeException("The call to collect() could not retrieve the DataSet.");
}
}
代码示例来源:origin: apache/flink
@Override
public VertexMetrics<K, VV, EV> run(Graph<K, VV, EV> input)
throws Exception {
super.run(input);
DataSet<Vertex<K, Degrees>> vertexDegree = input
.run(new VertexDegrees<K, VV, EV>()
.setIncludeZeroDegreeVertices(includeZeroDegreeVertices)
.setParallelism(parallelism));
vertexMetricsHelper = new VertexMetricsHelper<>();
vertexDegree
.output(vertexMetricsHelper)
.name("Vertex metrics");
return this;
}
代码示例来源:origin: apache/flink
@Override
public VertexMetrics<K, VV, EV> run(Graph<K, VV, EV> input)
throws Exception {
super.run(input);
DataSet<Vertex<K, LongValue>> vertexDegree = input
.run(new VertexDegree<K, VV, EV>()
.setIncludeZeroDegreeVertices(includeZeroDegreeVertices)
.setReduceOnTargetId(reduceOnTargetId)
.setParallelism(parallelism));
vertexMetricsHelper = new VertexMetricsHelper<>();
vertexDegree
.output(vertexMetricsHelper)
.name("Vertex metrics");
return this;
}
代码示例来源:origin: apache/flink
@Override
public EdgeMetrics<K, VV, EV> run(Graph<K, VV, EV> input)
throws Exception {
super.run(input);
// s, t, (d(s), d(t))
DataSet<Edge<K, Tuple3<EV, LongValue, LongValue>>> edgeDegreePair = input
.run(new EdgeDegreePair<K, VV, EV>()
.setReduceOnTargetId(reduceOnTargetId)
.setParallelism(parallelism));
// s, d(s), count of (u, v) where deg(u) < deg(v) or (deg(u) == deg(v) and u < v)
DataSet<Tuple3<K, LongValue, LongValue>> edgeStats = edgeDegreePair
.map(new EdgeStats<>())
.setParallelism(parallelism)
.name("Edge stats")
.groupBy(0)
.reduce(new SumEdgeStats<>())
.setCombineHint(CombineHint.HASH)
.setParallelism(parallelism)
.name("Sum edge stats");
edgeMetricsHelper = new EdgeMetricsHelper<>();
edgeStats
.output(edgeMetricsHelper)
.setParallelism(parallelism)
.name("Edge metrics");
return this;
}
代码示例来源:origin: apache/flink
public static void connectedComponentsWithCoGroup(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(Integer.parseInt(args[0]));
DataSet<Tuple1<Long>> initialVertices = env.readCsvFile(args[1]).types(Long.class).name(VERTEX_SOURCE);
DataSet<Tuple2<Long, Long>> edges = env.readCsvFile(args[2]).types(Long.class, Long.class).name(EDGES_SOURCE);
DataSet<Tuple2<Long, Long>> verticesWithId = initialVertices.flatMap(new DummyMapFunction());
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
verticesWithId.iterateDelta(verticesWithId, Integer.parseInt(args[4]), 0).name(ITERATION_NAME);
DataSet<Tuple2<Long, Long>> joinWithNeighbors = iteration.getWorkset().join(edges)
.where(0).equalTo(0)
.with(new DummyJoinFunction()).name(JOIN_NEIGHBORS_MATCH);
DataSet<Tuple2<Long, Long>> minAndUpdate = joinWithNeighbors.coGroup(iteration.getSolutionSet())
.where(0).equalTo(0)
.with(new DummyCoGroupFunction()).name(MIN_ID_AND_UPDATE);
iteration.closeWith(minAndUpdate, minAndUpdate).writeAsCsv(args[3]).name(SINK);
env.execute();
}
代码示例来源:origin: apache/flink
@Test
public void testReduce() {
// construct the plan
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
DataSet<Long> set1 = env.generateSequence(0,1);
set1.reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce1")
.output(new DiscardingOutputFormat<Long>()).name("Sink");
Plan plan = env.createProgramPlan();
try {
OptimizedPlan oPlan = compileNoStats(plan);
JobGraphGenerator jobGen = new JobGraphGenerator();
jobGen.compileJobGraph(oPlan);
} catch(CompilerException ce) {
ce.printStackTrace();
fail("The pact compiler is unable to compile this plan correctly");
}
}
}
代码示例来源:origin: apache/flink
public static void tcph3(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(Integer.parseInt(args[0]));
//order id, order status, order data, order prio, ship prio
DataSet<Tuple5<Long, String, String, String, Integer>> orders =
env.readCsvFile(args[1])
.fieldDelimiter("|").lineDelimiter("\n")
.includeFields("101011001").types(Long.class, String.class, String.class, String.class, Integer.class)
.name(ORDERS);
//order id, extended price
DataSet<Tuple2<Long, Double>> lineItems =
env.readCsvFile(args[2])
.fieldDelimiter("|").lineDelimiter("\n")
.includeFields("100001").types(Long.class, Double.class)
.name(LINEITEM);
DataSet<Tuple2<Long, Integer>> filterO = orders.flatMap(new FilterO()).name(MAPPER_NAME);
DataSet<Tuple3<Long, Integer, Double>> joinLiO = filterO.join(lineItems).where(0).equalTo(0).with(new JoinLiO()).name(JOIN_NAME);
DataSet<Tuple3<Long, Integer, Double>> aggLiO = joinLiO.groupBy(0, 1).reduceGroup(new AggLiO()).name(REDUCE_NAME);
aggLiO.writeAsCsv(args[3], "\n", "|").name(SINK);
env.execute();
}
代码示例来源:origin: apache/flink
/**
* Source -> Map -> Reduce -> Cross -> Reduce -> Cross -> Reduce ->
* |--------------------------/ /
* |--------------------------------------------/
*
* First cross has SameKeyFirst output contract
*/
@Test
public void testTicket158() {
// construct the plan
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
DataSet<Long> set1 = env.generateSequence(0,1);
set1.map(new IdentityMapper<Long>()).name("Map1")
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce1")
.cross(set1).with(new IdentityCrosser<Long>()).withForwardedFieldsFirst("*").name("Cross1")
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce2")
.cross(set1).with(new IdentityCrosser<Long>()).name("Cross2")
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce3")
.output(new DiscardingOutputFormat<Long>()).name("Sink");
Plan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileNoStats(plan);
JobGraphGenerator jobGen = new JobGraphGenerator();
jobGen.compileJobGraph(oPlan);
}
}
内容来源于网络,如有侵权,请联系作者删除!