org.apache.edgent.topology.Topology.source()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(2.4k)|赞(0)|评价(0)|浏览(134)

本文整理了Java中org.apache.edgent.topology.Topology.source()方法的一些代码示例,展示了Topology.source()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Topology.source()方法的具体详情如下:
包路径:org.apache.edgent.topology.Topology
类名称:Topology
方法名:source

Topology.source介绍

[英]Declare a new source stream that iterates over the return of Iterable get() from data. Once all the tuples from data.get() have been submitted on the stream, no more tuples are submitted.
The returned stream will be endless if the iterator returned from the Iterable never completes.

If data implements AutoCloseable, its close()method will be called when the topology's execution is terminated.
[中]声明一个新的源流,该源流迭代从数据返回Iterable get()。一旦从数据中提取所有元组。get()已在流上提交,不再提交元组。
如果从Iterable返回的迭代器从未完成,则返回的流将是无止境的。
如果数据实现了AutoCloseable,则在拓扑执行终止时将调用其close()方法。

代码示例

代码示例来源:origin: apache/incubator-edgent

private <T> TStream<T> integers(Topology t, @SuppressWarnings("unchecked") T... values) {
  return t.source(() -> Arrays.asList(values));
}

代码示例来源:origin: apache/incubator-edgent

return te.topology().source(() -> new DirectoryWatcher(directory, comparator));

代码示例来源:origin: apache/incubator-edgent

@Test
public void testJoinLastWithKeyer() throws Exception{
  Topology t = newTopology();
  
  List<Integer> ints = new ArrayList<>();
  for(int i = 0; i < 100; i++){
    ints.add(i);
  }
  
  TStream<Integer> intStream = t.collection(ints);
  
  // Wait until the window is populated, and then submit tuples
  TStream<Integer> lookupIntStream = t.source(() -> {
    try {
      Thread.sleep(500);
    } catch (Exception e) {
      e.printStackTrace();
    }
    return ints;
  });
  
  TStream<String> joinsHappened = lookupIntStream.joinLast(tuple -> tuple, intStream, tuple -> tuple, (a, b) -> {
    assertTrue(a.equals(b));
    return "0";
  });
  Condition<Long> tc = t.getTester().tupleCount(joinsHappened, 100);
  complete(t, tc);      
}

代码示例来源:origin: apache/incubator-edgent

TStream<Integer> lookupIntStream = t.source(() -> {
  try {
    Thread.sleep(500);

代码示例来源:origin: apache/incubator-edgent

@Test
public void testCountBasedBatch() throws Exception{
  Topology top = newTopology();
  List<Integer> intList = new ArrayList<>();
  for(int i = 0; i < 1000;i++)
    intList.add(i);
  TStream<Integer> ints = top.source(() -> intList);
  
  TWindow<Integer, Integer> window = ints.last(100, tuple -> 0);
  TStream<Integer> sizes = window.batch((tuples, key) -> {
    return tuples.size();
  });
  Condition<List<Integer> > contents = top.getTester().streamContents(sizes,
      100,100,100,100,100,100,100,100,100,100);
  complete(top, contents);
  assertTrue(contents.valid());
}

相关文章