
x33g5p2x  于2022-01-30 转载在 其他  



[英]Declare a new source stream that calls data.get() periodically. Each non-null value returned will appear on the returned stream. Thus each call to {code data.get()} will result in zero tuples or one tuple on the stream.

If data implements AutoCloseable, its close()method will be called when the topology's execution is terminated.

The poll rate may be changed when the topology is running via a runtime org.apache.edgent.execution.mbeans.PeriodMXBean. In order to use this mechanism the caller must provide a alias for the stream when building the topology. The PeriodMXBean is registered with the ControlServicewith type TStream#TYPE and the stream's alias. e.g.,

Topology t = ...}

[中]声明调用数据的新源流。定期获取()。返回的每个非空值都将出现在返回的流中。因此,每次调用{code data.get()}都会在流中产生零元组或一元组。

Topology t = ...}


代码示例来源:origin: apache/incubator-edgent

return topology.poll(commandReaderList(cmd), period, units);

代码示例来源:origin: org.apache.edgent/edgent-connectors-iot

TStream<Date> hb = iotDevice.topology().poll(
  () -> new Date(),
  period, unit).tag("heartbeat");

代码示例来源:origin: apache/incubator-edgent

TStream<Date> hb = iotDevice.topology().poll(
  () -> new Date(),
  period, unit).tag("heartbeat");

代码示例来源:origin: apache/incubator-edgent

 * Fails every 1.5 seconds (10 tuples * 150 millis)
static void registerMonitoredApplicationTwo(DirectSubmitter<Topology, Job> submitter, AtomicInteger topoBuiltCnt, AtomicInteger injectedErrorCnt) {
  ApplicationService appService = submitter.getServices().getService(ApplicationService.class);
  appService.registerTopology(MONITORED_APP_NAME_2, (topology, config) -> {
      Random r = new Random();
      TStream<Double> d  = topology.poll(() -> r.nextGaussian(), 150, TimeUnit.MILLISECONDS);
      final AtomicInteger count = new AtomicInteger(0);
      d = d.filter(tuple -> {
        int tupleCount = count.incrementAndGet();
        if (tupleCount == 10) {
          throw new IllegalStateException(MONITORED_APP_NAME_2 + " Injected error " + injectedErrorCnt.get());
        return true; 
      d.sink(tuple -> System.out.print("#"));

代码示例来源:origin: apache/incubator-edgent

 * Fails every 2 seconds (20 tuples * 100 millis)
static void registerMonitoredApplicationOne(DirectSubmitter<Topology, Job> submitter, AtomicInteger topoBuiltCnt, AtomicInteger injectedErrorCnt) {
  ApplicationService appService = submitter.getServices().getService(ApplicationService.class);
  appService.registerTopology(MONITORED_APP_NAME_1, (topology, config) -> {
      Random r = new Random();
      TStream<Double> d  = topology.poll(() -> r.nextGaussian(), 100, TimeUnit.MILLISECONDS);
      final AtomicInteger count = new AtomicInteger(0);
      d = d.filter(tuple -> {
        int tupleCount = count.incrementAndGet();
        if (tupleCount == 20) {
          throw new IllegalStateException(MONITORED_APP_NAME_1 + " Injected error " + injectedErrorCnt.get());
        return true; 
      d.sink(tuple -> System.out.print("."));

代码示例来源:origin: apache/incubator-edgent

public void jobPeriodicSource() throws Exception {
  Topology t = newTopology();
  AtomicInteger n = new AtomicInteger(0);
  TStream<Integer> ints = t.poll(() -> n.incrementAndGet(), 100, TimeUnit.MILLISECONDS);
  Future<Job> fj = ((DirectProvider)getTopologyProvider()).submit(t);
  Job job = fj.get();
  assertEquals(Job.State.RUNNING, job.getCurrentState());
  assertTrue(n.get() > 0); // At least one tuple was processed
  assertEquals(Job.State.CLOSED, job.getCurrentState());

代码示例来源:origin: apache/incubator-edgent

public void automaticMetricCleanup1() throws Exception {
  // Declare topology with custom metric oplet
  Topology t = newTopology();
  AtomicInteger n = new AtomicInteger(0);
  TStream<Integer> ints = t.poll(() -> n.incrementAndGet(), 10, TimeUnit.MILLISECONDS);
  ints.pipe(new TestOplet<Integer>());
  // Submit job
  Future<? extends Job> fj = getSubmitter().submit(t);
  Job job = fj.get();
  // At least one tuple was processed
  int tupleCount = n.get(); 
  assertTrue("Expected more tuples than "+ tupleCount, tupleCount > 0);
  // Each test oplet registers two metrics 
  Map<String, Metric> all = metricRegistry.getMetrics();
  assertEquals(2, all.size());
  // After close all metrics have been unregistered 
  assertEquals(0, all.size());

代码示例来源:origin: apache/incubator-edgent

Topology t = newTopology();
AtomicLong n = new AtomicLong(0);
TStream<Long> s = t.poll(() -> n.incrementAndGet(), 10, TimeUnit.MILLISECONDS);

代码示例来源:origin: apache/incubator-edgent

try {
  Topology t = newTopology();
  TStream<String> s = t.poll(() -> (new Date()).toString(),
      1, TimeUnit.HOURS)

代码示例来源:origin: apache/incubator-edgent

public void automaticMetricCleanup2() throws Exception {
  // Declare topology with custom metric oplet
  Topology t = newTopology();
  AtomicInteger n = new AtomicInteger(0);
  TStream<Integer> ints = t.poll(() -> n.incrementAndGet(), 10, TimeUnit.MILLISECONDS);
  TStream<Integer> ints2 = ints.pipe(new TestOplet<Integer>());
  ints2.pipe(new TestOplet<Integer>());
  // Submit job
  Future<? extends Job> fj = getSubmitter().submit(t);
  Job job = fj.get();
  // Each test oplet registers two metrics 
  Map<String, Metric> all = metricRegistry.getMetrics();
  assertEquals(4, all.size());
  // After close all metrics have been unregistered 
  assertEquals(0, all.size());

代码示例来源:origin: apache/incubator-edgent

TStream<Double> gaussian = t.poll(() -> r1.nextGaussian(), 100, TimeUnit.MILLISECONDS);
gaussian = Metrics.counter(gaussian);
TStream<MyClass1> mc1 = t.poll(
    () -> new MyClass1(Double.toString(r2.nextGaussian()), 					                                           
TStream<MyClass2> mc2 = t.poll(
    () -> new MyClass2(
        new MyClass1(
TStream<MyClass2> mc4 = t.poll(
    () -> new MyClass2(
        new MyClass1(
TStream<MyClass2> mc5 = t2.poll(
    () -> new MyClass2(
        new MyClass1(

代码示例来源:origin: apache/incubator-edgent

TStream<Long> times = t.poll(() -> {   
  return System.currentTimeMillis();
}, 1, TimeUnit.MILLISECONDS);

代码示例来源:origin: apache/incubator-edgent

public void testDeadtime() throws Exception {
  Topology topology = newTopology("testDeadtime");
  int maxTupleCnt = 10;
  AtomicInteger cnt = new AtomicInteger();
  TStream<Integer> values = topology.poll(() -> {
    int curCnt = cnt.incrementAndGet();
    if (curCnt > maxTupleCnt)
      return null;
    return curCnt;
    }, 100, TimeUnit.MILLISECONDS);
  // use a deadtime value that causes filtering of every other tuple
  TStream<Integer> filtered = Filters.deadtime(values, 150, TimeUnit.MILLISECONDS);
  Condition<Long> count = topology.getTester().tupleCount(filtered, maxTupleCnt/2);
  Condition<List<Integer>> contents = topology.getTester().streamContents(filtered, 1, 3, 5, 7, 9 );
  complete(topology, count);
  assertTrue(contents.getResult().toString(), contents.valid());

代码示例来源:origin: apache/incubator-edgent

@Test(expected = TimeoutException.class)
public void jobTimesOut() throws Exception {
  Topology t = newTopology();
  AtomicInteger n = new AtomicInteger(0);
  TStream<Integer> ints = t.poll(() -> n.incrementAndGet(), 100, TimeUnit.MILLISECONDS);
  Future<Job> fj = ((DirectProvider)getTopologyProvider()).submit(t);
  Job job = fj.get();
  assertEquals(Job.State.RUNNING, job.getCurrentState());
  try {
    job.complete(700, TimeUnit.MILLISECONDS);
  } finally {
    assertTrue(n.get() > 0); // At least one tuple was processed
    assertEquals(Job.State.RUNNING, job.getCurrentState());
    assertEquals(Job.Health.HEALTHY, job.getHealth());
    assertEquals("", job.getLastError());

代码示例来源:origin: apache/incubator-edgent

public void testTimeBasedBatch() throws Exception{
 // Timing variances on shared machines can cause this test to fail
  Topology top = newTopology();
  TStream<Integer> ints = top.poll(() -> {
    return 1;
  }, 10, TimeUnit.MILLISECONDS);
  TWindow<Integer, Integer> window = ints.last(1000, TimeUnit.MILLISECONDS, tuple -> 0);
  TStream<Integer> sizes = window.batch((tuples, key) -> {
    return tuples.size();
  Condition<List<Integer> > contents = top.getTester().streamContents(sizes,
    100, 100, 100, 100, 100, 100, 100, 100, 100, 100);
  complete(top, contents);
  for(Integer size : contents.getResult()){
    assertTrue("size="+size, size >= 90 && size <= 110);

代码示例来源:origin: apache/incubator-edgent

@Test(expected = ExecutionException.class)
public void jobPeriodicSourceError() throws Exception {
  Topology t = newTopology();
  AtomicInteger n = new AtomicInteger(0);
  TStream<Integer> ints = t.poll(() -> n.incrementAndGet(), 100, TimeUnit.MILLISECONDS);
  ints.pipe(new FailedOplet<Integer>(5, 0));
  Future<Job> fj = ((DirectProvider)getTopologyProvider()).submit(t);
  Job job = fj.get();
  assertEquals(Job.State.RUNNING, job.getCurrentState());
  try {
    job.complete(10, TimeUnit.SECONDS); 
  } finally {
    // RUNNING even though execution error 
    assertEquals(Job.State.RUNNING, job.getCurrentState());
    assertEquals(Job.Health.UNHEALTHY, job.getHealth());
    assertEquals("java.lang.RuntimeException: Expected Test Exception", job.getLastError());

代码示例来源:origin: apache/incubator-edgent

AtomicInteger cnt = new AtomicInteger();
TStream<Integer> values = topology.poll(() -> {
  int curCnt = cnt.incrementAndGet();
  if (curCnt > maxTupleCnt)

代码示例来源:origin: apache/incubator-edgent

AtomicInteger n = new AtomicInteger(0);
TStream<Integer> ints = t.poll(() -> n.incrementAndGet(), 500, TimeUnit.MILLISECONDS);

代码示例来源:origin: apache/incubator-edgent

TStream<TimeAndId> raw = topology.poll(() -> new TimeAndId(), 10, TimeUnit.MILLISECONDS);
