org.apache.edgent.topology.Topology.poll()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(9.8k)|赞(0)|评价(0)|浏览(88)

本文整理了Java中org.apache.edgent.topology.Topology.poll()方法的一些代码示例,展示了Topology.poll()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Topology.poll()方法的具体详情如下:
包路径:org.apache.edgent.topology.Topology
类名称:Topology
方法名:poll

Topology.poll介绍

[英]Declare a new source stream that calls data.get() periodically. Each non-null value returned will appear on the returned stream. Thus each call to {code data.get()} will result in zero tuples or one tuple on the stream.

If data implements AutoCloseable, its close()method will be called when the topology's execution is terminated.

The poll rate may be changed when the topology is running via a runtime org.apache.edgent.execution.mbeans.PeriodMXBean. In order to use this mechanism the caller must provide a alias for the stream when building the topology. The PeriodMXBean is registered with the ControlServicewith type TStream#TYPE and the stream's alias. e.g.,

Topology t = ...}

[中]声明调用数据的新源流。定期获取()。返回的每个非空值都将出现在返回的流中。因此,每次调用{code data.get()}都会在流中产生零元组或一元组。
如果数据实现了AutoCloseable,则在拓扑执行终止时将调用其close()方法。
当拓扑通过运行时组织运行时,可能会更改轮询率。阿帕奇。艾森特。处决姆本斯。咖啡豆。为了使用这种机制,调用方必须在构建拓扑时为流提供别名。PeriodMXBean通过类型为TStream#type和流别名的ControlServices注册。例如。,

Topology t = ...}

代码示例

代码示例来源:origin: apache/incubator-edgent

return topology.poll(commandReaderList(cmd), period, units);

代码示例来源:origin: org.apache.edgent/edgent-connectors-iot

TStream<Date> hb = iotDevice.topology().poll(
  () -> new Date(),
  period, unit).tag("heartbeat");

代码示例来源:origin: apache/incubator-edgent

TStream<Date> hb = iotDevice.topology().poll(
  () -> new Date(),
  period, unit).tag("heartbeat");

代码示例来源:origin: apache/incubator-edgent

/**
 * Fails every 1.5 seconds (10 tuples * 150 millis)
 */
static void registerMonitoredApplicationTwo(DirectSubmitter<Topology, Job> submitter, AtomicInteger topoBuiltCnt, AtomicInteger injectedErrorCnt) {
  ApplicationService appService = submitter.getServices().getService(ApplicationService.class);
  appService.registerTopology(MONITORED_APP_NAME_2, (topology, config) -> {
   
      topoBuiltCnt.incrementAndGet();
      
      Random r = new Random();
      TStream<Double> d  = topology.poll(() -> r.nextGaussian(), 150, TimeUnit.MILLISECONDS);
      
      final AtomicInteger count = new AtomicInteger(0);
      d = d.filter(tuple -> {
        int tupleCount = count.incrementAndGet();
        if (tupleCount == 10) {
          injectedErrorCnt.incrementAndGet();
          throw new IllegalStateException(MONITORED_APP_NAME_2 + " Injected error " + injectedErrorCnt.get());
        }
        return true; 
      });
      
      d.sink(tuple -> System.out.print("#"));
    });
}

代码示例来源:origin: apache/incubator-edgent

/**
 * Fails every 2 seconds (20 tuples * 100 millis)
 */
static void registerMonitoredApplicationOne(DirectSubmitter<Topology, Job> submitter, AtomicInteger topoBuiltCnt, AtomicInteger injectedErrorCnt) {
  ApplicationService appService = submitter.getServices().getService(ApplicationService.class);
  appService.registerTopology(MONITORED_APP_NAME_1, (topology, config) -> {
   
      topoBuiltCnt.incrementAndGet();
      
      Random r = new Random();
      TStream<Double> d  = topology.poll(() -> r.nextGaussian(), 100, TimeUnit.MILLISECONDS);
      
      final AtomicInteger count = new AtomicInteger(0);
      d = d.filter(tuple -> {
        int tupleCount = count.incrementAndGet();
        if (tupleCount == 20) {
          injectedErrorCnt.incrementAndGet();
          throw new IllegalStateException(MONITORED_APP_NAME_1 + " Injected error " + injectedErrorCnt.get());
        }
        return true; 
      });
      
      d.sink(tuple -> System.out.print("."));
    });
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void jobPeriodicSource() throws Exception {
  Topology t = newTopology();
  AtomicInteger n = new AtomicInteger(0);
  @SuppressWarnings("unused")
  TStream<Integer> ints = t.poll(() -> n.incrementAndGet(), 100, TimeUnit.MILLISECONDS);
  Future<Job> fj = ((DirectProvider)getTopologyProvider()).submit(t);
  Job job = fj.get();
  assertEquals(Job.State.RUNNING, job.getCurrentState());
  Thread.sleep(TimeUnit.SECONDS.toMillis(2));
  assertTrue(n.get() > 0); // At least one tuple was processed
  job.stateChange(Job.Action.CLOSE);
  assertEquals(Job.State.CLOSED, job.getCurrentState());
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void automaticMetricCleanup1() throws Exception {
  // Declare topology with custom metric oplet
  Topology t = newTopology();
  AtomicInteger n = new AtomicInteger(0);
  TStream<Integer> ints = t.poll(() -> n.incrementAndGet(), 10, TimeUnit.MILLISECONDS);
  ints.pipe(new TestOplet<Integer>());
  // Submit job
  Future<? extends Job> fj = getSubmitter().submit(t);
  Job job = fj.get();
  Thread.sleep(TimeUnit.MILLISECONDS.toMillis(50));
  // At least one tuple was processed
  int tupleCount = n.get(); 
  assertTrue("Expected more tuples than "+ tupleCount, tupleCount > 0);
  // Each test oplet registers two metrics 
  Map<String, Metric> all = metricRegistry.getMetrics();
  assertEquals(2, all.size());
  
  // After close all metrics have been unregistered 
  job.stateChange(Job.Action.CLOSE);
  assertEquals(0, all.size());
}

代码示例来源:origin: apache/incubator-edgent

Topology t = newTopology();
AtomicLong n = new AtomicLong(0);
TStream<Long> s = t.poll(() -> n.incrementAndGet(), 10, TimeUnit.MILLISECONDS);

代码示例来源:origin: apache/incubator-edgent

try {
  Topology t = newTopology();
  TStream<String> s = t.poll(() -> (new Date()).toString(),
      1, TimeUnit.HOURS)
      .alias("myAlias");

代码示例来源:origin: apache/incubator-edgent

@Test
public void automaticMetricCleanup2() throws Exception {
  // Declare topology with custom metric oplet
  Topology t = newTopology();
  AtomicInteger n = new AtomicInteger(0);
  TStream<Integer> ints = t.poll(() -> n.incrementAndGet(), 10, TimeUnit.MILLISECONDS);
  TStream<Integer> ints2 = ints.pipe(new TestOplet<Integer>());
  ints2.pipe(new TestOplet<Integer>());
  // Submit job
  Future<? extends Job> fj = getSubmitter().submit(t);
  Job job = fj.get();
  Thread.sleep(TimeUnit.MILLISECONDS.toMillis(50));
  // Each test oplet registers two metrics 
  Map<String, Metric> all = metricRegistry.getMetrics();
  assertEquals(4, all.size());
  
  // After close all metrics have been unregistered 
  job.stateChange(Job.Action.CLOSE);
  assertEquals(0, all.size());
}

代码示例来源:origin: apache/incubator-edgent

TStream<Double> gaussian = t.poll(() -> r1.nextGaussian(), 100, TimeUnit.MILLISECONDS);
gaussian = Metrics.counter(gaussian);
TStream<MyClass1> mc1 = t.poll(
    () -> new MyClass1(Double.toString(r2.nextGaussian()), 					                                           
        Double.toString(r2.nextGaussian()),r1.nextGaussian()
TStream<MyClass2> mc2 = t.poll(
    () -> new MyClass2(
        new MyClass1(
TStream<MyClass2> mc4 = t.poll(
    () -> new MyClass2(
        new MyClass1(
TStream<MyClass2> mc5 = t2.poll(
    () -> new MyClass2(
        new MyClass1(

代码示例来源:origin: apache/incubator-edgent

TStream<Long> times = t.poll(() -> {   
  return System.currentTimeMillis();
}, 1, TimeUnit.MILLISECONDS);

代码示例来源:origin: apache/incubator-edgent

@Test
public void testDeadtime() throws Exception {
  Topology topology = newTopology("testDeadtime");
  
  int maxTupleCnt = 10;
  AtomicInteger cnt = new AtomicInteger();
  
  TStream<Integer> values = topology.poll(() -> {
    int curCnt = cnt.incrementAndGet();
    if (curCnt > maxTupleCnt)
      return null;
    return curCnt;
    }, 100, TimeUnit.MILLISECONDS);
  
  // use a deadtime value that causes filtering of every other tuple
  TStream<Integer> filtered = Filters.deadtime(values, 150, TimeUnit.MILLISECONDS);
  
  Condition<Long> count = topology.getTester().tupleCount(filtered, maxTupleCnt/2);
  Condition<List<Integer>> contents = topology.getTester().streamContents(filtered, 1, 3, 5, 7, 9 );
  complete(topology, count);
  
  assertTrue(contents.getResult().toString(), contents.valid());
}

代码示例来源:origin: apache/incubator-edgent

@Test(expected = TimeoutException.class)
public void jobTimesOut() throws Exception {
  Topology t = newTopology();
  AtomicInteger n = new AtomicInteger(0);
  @SuppressWarnings("unused")
  TStream<Integer> ints = t.poll(() -> n.incrementAndGet(), 100, TimeUnit.MILLISECONDS);
  Future<Job> fj = ((DirectProvider)getTopologyProvider()).submit(t);
  Job job = fj.get();
  assertEquals(Job.State.RUNNING, job.getCurrentState());
  try {
    job.complete(700, TimeUnit.MILLISECONDS);
  } finally {
    assertTrue(n.get() > 0); // At least one tuple was processed
    assertEquals(Job.State.RUNNING, job.getCurrentState());
    assertEquals(Job.Health.HEALTHY, job.getHealth());
    assertEquals("", job.getLastError());
  }
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void testTimeBasedBatch() throws Exception{
 // Timing variances on shared machines can cause this test to fail
 assumeTrue(!Boolean.getBoolean("edgent.build.ci"));
 
  Topology top = newTopology();
  TStream<Integer> ints = top.poll(() -> {
    return 1;
  }, 10, TimeUnit.MILLISECONDS);
  
  TWindow<Integer, Integer> window = ints.last(1000, TimeUnit.MILLISECONDS, tuple -> 0);
  TStream<Integer> sizes = window.batch((tuples, key) -> {
    return tuples.size();
  });
  Condition<List<Integer> > contents = top.getTester().streamContents(sizes,
    100, 100, 100, 100, 100, 100, 100, 100, 100, 100);
  complete(top, contents);
  System.out.println(contents.getResult());
  for(Integer size : contents.getResult()){
    assertTrue("size="+size, size >= 90 && size <= 110);
  }
}

代码示例来源:origin: apache/incubator-edgent

@Test(expected = ExecutionException.class)
public void jobPeriodicSourceError() throws Exception {
  Topology t = newTopology();
  AtomicInteger n = new AtomicInteger(0);
  TStream<Integer> ints = t.poll(() -> n.incrementAndGet(), 100, TimeUnit.MILLISECONDS);
  ints.pipe(new FailedOplet<Integer>(5, 0));
  
  Future<Job> fj = ((DirectProvider)getTopologyProvider()).submit(t);
  Job job = fj.get();
  assertEquals(Job.State.RUNNING, job.getCurrentState());
  try {
    job.complete(10, TimeUnit.SECONDS); 
  } finally {
    // RUNNING even though execution error 
    assertEquals(Job.State.RUNNING, job.getCurrentState());
    assertEquals(Job.Health.UNHEALTHY, job.getHealth());
    assertEquals("java.lang.RuntimeException: Expected Test Exception", job.getLastError());
  }
}

代码示例来源:origin: apache/incubator-edgent

AtomicInteger cnt = new AtomicInteger();
TStream<Integer> values = topology.poll(() -> {
  int curCnt = cnt.incrementAndGet();
  if (curCnt > maxTupleCnt)

代码示例来源:origin: apache/incubator-edgent

AtomicInteger n = new AtomicInteger(0);
@SuppressWarnings("unused")
TStream<Integer> ints = t.poll(() -> n.incrementAndGet(), 500, TimeUnit.MILLISECONDS);

代码示例来源:origin: apache/incubator-edgent

TStream<TimeAndId> raw = topology.poll(() -> new TimeAndId(), 10, TimeUnit.MILLISECONDS);

相关文章