org.apache.flink.api.java.operators.DataSink.name()方法的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(12.2k)|赞(0)|评价(0)|浏览(142)

本文整理了Java中org.apache.flink.api.java.operators.DataSink.name()方法的一些代码示例,展示了DataSink.name()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。DataSink.name()方法的具体详情如下:
包路径:org.apache.flink.api.java.operators.DataSink
类名称:DataSink
方法名:name

DataSink.name介绍

暂无

代码示例

代码示例来源:origin: apache/flink

@Override
public void emitDataSet(DataSet<Row> dataSet) {
  dataSet
    .output(new Utils.CollectHelper<>(accumulatorName, serializer))
    .name("SQL Client Batch Collect Sink");
}

代码示例来源:origin: apache/flink

@Override
public Count<T> run(DataSet<T> input)
    throws Exception {
  super.run(input);
  countHelper = new CountHelper<>();
  input
    .output(countHelper)
      .name("Count");
  return this;
}

代码示例来源:origin: apache/flink

@Override
public ChecksumHashCode<T> run(DataSet<T> input)
    throws Exception {
  super.run(input);
  checksumHashCodeHelper = new ChecksumHashCodeHelper<>();
  input
    .output(checksumHashCodeHelper)
      .name("ChecksumHashCode");
  return this;
}

代码示例来源:origin: apache/flink

/**
 * Convenience method to get the count (number of elements) of a DataSet
 * as well as the checksum (sum over element hashes).
 *
 * @return A ChecksumHashCode that represents the count and checksum of elements in the data set.
 * @deprecated replaced with {@code org.apache.flink.graph.asm.dataset.ChecksumHashCode} in Gelly
 */
@Deprecated
public static <T> Utils.ChecksumHashCode checksumHashCode(DataSet<T> input) throws Exception {
  final String id = new AbstractID().toString();
  input.output(new Utils.ChecksumHashCodeHelper<T>(id)).name("ChecksumHashCode");
  JobExecutionResult res = input.getExecutionEnvironment().execute();
  return res.<Utils.ChecksumHashCode> getAccumulatorResult(id);
}

代码示例来源:origin: apache/flink

@Override
  public void write(String executionName, PrintStream out, DataSet<T> data) throws Exception {
    if (Tuple.class.isAssignableFrom(data.getType().getTypeClass())) {
      data
        .writeAsCsv(filename.getValue(), lineDelimiter.getValue(), fieldDelimiter.getValue())
          .name("CSV: " + filename.getValue());
    } else {
      // line and field delimiters are ineffective when writing custom POJOs result types
      data
        .writeAsText(filename.getValue())
          .name("CSV: " + filename.getValue());
    }

    data.getExecutionEnvironment().execute();
  }
}

代码示例来源:origin: apache/flink

@Override
public Collect<T> run(DataSet<T> input)
    throws Exception {
  super.run(input);
  serializer = input.getType().createSerializer(env.getConfig());
  collectHelper = new CollectHelper<>(serializer);
  input
    .output(collectHelper)
      .name("Collect");
  return this;
}

代码示例来源:origin: apache/flink

private void createTextSink(PythonOperationInfo info) {
  DataSet<byte[]> parent = sets.getDataSet(info.parentID);
  parent.map(new StringDeserializerMap()).setParallelism(info.parallelism)
    .writeAsText(info.path, info.writeMode).setParallelism(info.parallelism).name("TextSink");
}

代码示例来源:origin: apache/flink

@Override
public AverageClusteringCoefficient<K, VV, EV> run(Graph<K, VV, EV> input)
    throws Exception {
  super.run(input);
  DataSet<LocalClusteringCoefficient.Result<K>> localClusteringCoefficient = input
    .run(new LocalClusteringCoefficient<K, VV, EV>()
      .setParallelism(parallelism));
  averageClusteringCoefficientHelper = new AverageClusteringCoefficientHelper<>();
  localClusteringCoefficient
    .output(averageClusteringCoefficientHelper)
      .name("Average clustering coefficient");
  return this;
}

代码示例来源:origin: apache/flink

@Override
public AverageClusteringCoefficient<K, VV, EV> run(Graph<K, VV, EV> input)
    throws Exception {
  super.run(input);
  DataSet<LocalClusteringCoefficient.Result<K>> localClusteringCoefficient = input
    .run(new LocalClusteringCoefficient<K, VV, EV>()
      .setParallelism(parallelism));
  averageClusteringCoefficientHelper = new AverageClusteringCoefficientHelper<>();
  localClusteringCoefficient
    .output(averageClusteringCoefficientHelper)
      .name("Average clustering coefficient");
  return this;
}

代码示例来源:origin: apache/flink

@Override
public TriadicCensus<K, VV, EV> run(Graph<K, VV, EV> input)
    throws Exception {
  super.run(input);
  triangleListingHelper = new TriangleListingHelper<>();
  input
    .run(new TriangleListing<K, VV, EV>()
      .setParallelism(parallelism))
    .output(triangleListingHelper)
      .name("Triangle counts");
  vertexDegreesHelper = new VertexDegreesHelper<>();
  input
    .run(new VertexDegrees<K, VV, EV>()
      .setParallelism(parallelism))
    .output(vertexDegreesHelper)
      .name("Edge and triplet counts");
  return this;
}

代码示例来源:origin: apache/flink

/**
 * Convenience method to get the count (number of elements) of a DataSet.
 *
 * @return A long integer that represents the number of elements in the data set.
 */
public long count() throws Exception {
  final String id = new AbstractID().toString();
  output(new Utils.CountHelper<T>(id)).name("count()");
  JobExecutionResult res = getExecutionEnvironment().execute();
  return res.<Long> getAccumulatorResult(id);
}

代码示例来源:origin: apache/flink

private void createCsvSink(PythonOperationInfo info) {
  DataSet<byte[]> parent = sets.getDataSet(info.parentID);
  parent.map(new StringTupleDeserializerMap()).setParallelism(info.parallelism).name("CsvSinkPreStep")
      .writeAsCsv(info.path, info.lineDelimiter, info.fieldDelimiter, info.writeMode).setParallelism(info.parallelism).name("CsvSink");
}

代码示例来源:origin: apache/flink

/**
 * Convenience method to get the elements of a DataSet as a List.
 * As DataSet can contain a lot of data, this method should be used with caution.
 *
 * @return A List containing the elements of the DataSet
 */
public List<T> collect() throws Exception {
  final String id = new AbstractID().toString();
  final TypeSerializer<T> serializer = getType().createSerializer(getExecutionEnvironment().getConfig());
  this.output(new Utils.CollectHelper<>(id, serializer)).name("collect()");
  JobExecutionResult res = getExecutionEnvironment().execute();
  ArrayList<byte[]> accResult = res.getAccumulatorResult(id);
  if (accResult != null) {
    try {
      return SerializedListAccumulator.deserializeList(accResult, serializer);
    } catch (ClassNotFoundException e) {
      throw new RuntimeException("Cannot find type class of collected data type.", e);
    } catch (IOException e) {
      throw new RuntimeException("Serialization error while deserializing collected data", e);
    }
  } else {
    throw new RuntimeException("The call to collect() could not retrieve the DataSet.");
  }
}

代码示例来源:origin: apache/flink

@Override
public VertexMetrics<K, VV, EV> run(Graph<K, VV, EV> input)
    throws Exception {
  super.run(input);
  DataSet<Vertex<K, Degrees>> vertexDegree = input
    .run(new VertexDegrees<K, VV, EV>()
      .setIncludeZeroDegreeVertices(includeZeroDegreeVertices)
      .setParallelism(parallelism));
  vertexMetricsHelper = new VertexMetricsHelper<>();
  vertexDegree
    .output(vertexMetricsHelper)
      .name("Vertex metrics");
  return this;
}

代码示例来源:origin: apache/flink

@Override
public VertexMetrics<K, VV, EV> run(Graph<K, VV, EV> input)
    throws Exception {
  super.run(input);
  DataSet<Vertex<K, LongValue>> vertexDegree = input
    .run(new VertexDegree<K, VV, EV>()
      .setIncludeZeroDegreeVertices(includeZeroDegreeVertices)
      .setReduceOnTargetId(reduceOnTargetId)
      .setParallelism(parallelism));
  vertexMetricsHelper = new VertexMetricsHelper<>();
  vertexDegree
    .output(vertexMetricsHelper)
      .name("Vertex metrics");
  return this;
}

代码示例来源:origin: apache/flink

@Override
public EdgeMetrics<K, VV, EV> run(Graph<K, VV, EV> input)
    throws Exception {
  super.run(input);
  // s, t, (d(s), d(t))
  DataSet<Edge<K, Tuple3<EV, LongValue, LongValue>>> edgeDegreePair = input
    .run(new EdgeDegreePair<K, VV, EV>()
      .setReduceOnTargetId(reduceOnTargetId)
      .setParallelism(parallelism));
  // s, d(s), count of (u, v) where deg(u) < deg(v) or (deg(u) == deg(v) and u < v)
  DataSet<Tuple3<K, LongValue, LongValue>> edgeStats = edgeDegreePair
    .map(new EdgeStats<>())
      .setParallelism(parallelism)
      .name("Edge stats")
    .groupBy(0)
    .reduce(new SumEdgeStats<>())
    .setCombineHint(CombineHint.HASH)
      .setParallelism(parallelism)
      .name("Sum edge stats");
  edgeMetricsHelper = new EdgeMetricsHelper<>();
  edgeStats
    .output(edgeMetricsHelper)
      .setParallelism(parallelism)
      .name("Edge metrics");
  return this;
}

代码示例来源:origin: apache/flink

public static void connectedComponentsWithCoGroup(String[] args) throws Exception {
  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  env.setParallelism(Integer.parseInt(args[0]));
  DataSet<Tuple1<Long>> initialVertices = env.readCsvFile(args[1]).types(Long.class).name(VERTEX_SOURCE);
  DataSet<Tuple2<Long, Long>> edges = env.readCsvFile(args[2]).types(Long.class, Long.class).name(EDGES_SOURCE);
  DataSet<Tuple2<Long, Long>> verticesWithId = initialVertices.flatMap(new DummyMapFunction());
  DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
      verticesWithId.iterateDelta(verticesWithId, Integer.parseInt(args[4]), 0).name(ITERATION_NAME);
  DataSet<Tuple2<Long, Long>> joinWithNeighbors = iteration.getWorkset().join(edges)
      .where(0).equalTo(0)
      .with(new DummyJoinFunction()).name(JOIN_NEIGHBORS_MATCH);
  DataSet<Tuple2<Long, Long>> minAndUpdate = joinWithNeighbors.coGroup(iteration.getSolutionSet())
      .where(0).equalTo(0)
      .with(new DummyCoGroupFunction()).name(MIN_ID_AND_UPDATE);
  iteration.closeWith(minAndUpdate, minAndUpdate).writeAsCsv(args[3]).name(SINK);
  env.execute();
}

代码示例来源:origin: apache/flink

@Test
  public void testReduce() {
    // construct the plan
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(DEFAULT_PARALLELISM);
    DataSet<Long> set1 = env.generateSequence(0,1);

    set1.reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce1")
        .output(new DiscardingOutputFormat<Long>()).name("Sink");

    Plan plan = env.createProgramPlan();

    try {
      OptimizedPlan oPlan = compileNoStats(plan);
      JobGraphGenerator jobGen = new JobGraphGenerator();
      jobGen.compileJobGraph(oPlan);
    } catch(CompilerException ce) {
      ce.printStackTrace();
      fail("The pact compiler is unable to compile this plan correctly");
    }
  }
}

代码示例来源:origin: apache/flink

public static void tcph3(String[] args) throws Exception {
  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  env.setParallelism(Integer.parseInt(args[0]));
  //order id, order status, order data, order prio, ship prio
  DataSet<Tuple5<Long, String, String, String, Integer>> orders =
      env.readCsvFile(args[1])
      .fieldDelimiter("|").lineDelimiter("\n")
      .includeFields("101011001").types(Long.class, String.class, String.class, String.class, Integer.class)
      .name(ORDERS);
  //order id, extended price
  DataSet<Tuple2<Long, Double>> lineItems =
      env.readCsvFile(args[2])
      .fieldDelimiter("|").lineDelimiter("\n")
      .includeFields("100001").types(Long.class, Double.class)
      .name(LINEITEM);
  DataSet<Tuple2<Long, Integer>> filterO = orders.flatMap(new FilterO()).name(MAPPER_NAME);
  DataSet<Tuple3<Long, Integer, Double>> joinLiO = filterO.join(lineItems).where(0).equalTo(0).with(new JoinLiO()).name(JOIN_NAME);
  DataSet<Tuple3<Long, Integer, Double>> aggLiO = joinLiO.groupBy(0, 1).reduceGroup(new AggLiO()).name(REDUCE_NAME);
  aggLiO.writeAsCsv(args[3], "\n", "|").name(SINK);
  env.execute();
}

代码示例来源:origin: apache/flink

/**
   * Source -> Map -> Reduce -> Cross -> Reduce -> Cross -> Reduce ->
   * |--------------------------/                  /
   * |--------------------------------------------/
   * 
   * First cross has SameKeyFirst output contract
   */
  @Test
  public void testTicket158() {
    // construct the plan
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(DEFAULT_PARALLELISM);
    DataSet<Long> set1 = env.generateSequence(0,1);

    set1.map(new IdentityMapper<Long>()).name("Map1")
        .groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce1")
        .cross(set1).with(new IdentityCrosser<Long>()).withForwardedFieldsFirst("*").name("Cross1")
        .groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce2")
        .cross(set1).with(new IdentityCrosser<Long>()).name("Cross2")
        .groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce3")
        .output(new DiscardingOutputFormat<Long>()).name("Sink");

    Plan plan = env.createProgramPlan();
    OptimizedPlan oPlan = compileNoStats(plan);

    JobGraphGenerator jobGen = new JobGraphGenerator();
    jobGen.compileJobGraph(oPlan);
  }
}

相关文章