org.apache.ignite.Ignite.dataStreamer()方法的使用及代码示例

x33g5p2x  于2022-01-21 转载在 其他  
字(8.1k)|赞(0)|评价(0)|浏览(193)

本文整理了Java中org.apache.ignite.Ignite.dataStreamer()方法的一些代码示例,展示了Ignite.dataStreamer()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Ignite.dataStreamer()方法的具体详情如下:
包路径:org.apache.ignite.Ignite
类名称:Ignite
方法名:dataStreamer

Ignite.dataStreamer介绍

[英]Gets a new instance of data streamer associated with given cache name. Data streamer is responsible for loading external data into in-memory data grid. For more information refer to IgniteDataStreamer documentation.
[中]获取与给定缓存名称关联的数据流的新实例。数据拖缆负责将外部数据加载到内存数据网格中。有关更多信息,请参阅IgniteDataStreamer文档。

代码示例

代码示例来源:origin: apache/ignite

/** {@inheritDoc} */
@Override public <K, V> IgniteDataStreamer<K, V> dataStreamer(@Nullable String cacheName) {
  checkIgnite();
  return g.dataStreamer(cacheName);
}

代码示例来源:origin: apache/ignite

@Override public Object call() throws Exception {
    boolean failed = false;
    try {
      client.dataStreamer(DEFAULT_CACHE_NAME);
    }
    catch (IgniteClientDisconnectedException e) {
      failed = true;
      checkAndWait(e);
    }
    assertTrue(failed);
    return client.dataStreamer(DEFAULT_CACHE_NAME);
  }
},

代码示例来源:origin: apache/ignite

@Override public void run() {
    try {
      client.dataStreamer(CACHE_NAME);
      fail();
    }
    catch (IgniteClientDisconnectedException e) {
      assertNotNull(e.reconnectFuture());
    }
  }
});

代码示例来源:origin: apache/ignite

/**
 * Stops streamer.
 *
 * @throws IgniteException If failed.
 */
public void stop() throws IgniteException {
  if (stopped)
    return;
  stopped = true;
  getIgnite().<K, V>dataStreamer(cacheName).close(true);
  getIgnite().close();
}

代码示例来源:origin: apache/ignite

/**
   * @param ignite Ignite.
   * @param cacheName Cache name.
   */
  private void populateData(Ignite ignite, String cacheName) {
    try (IgniteDataStreamer<Object, Object> streamer = ignite.dataStreamer(cacheName)) {
      for (int i = 0; i < 1000; i++)
        streamer.addData(i, i);

      streamer.flush();
    }
  }
}

代码示例来源:origin: apache/ignite

@Override public void apply(Ignite grid) {
    try (IgniteDataStreamer<Integer, String> dataStreamer = grid.dataStreamer(DEFAULT_CACHE_NAME)) {
      dataStreamer.allowOverwrite(allowOverwrite);
      for (int i = 0; i < KEYS_CNT; i++)
        dataStreamer.addData(i, Integer.toString(i));
    }
    log.info("Data loaded.");
  }
};

代码示例来源:origin: apache/ignite

/**
 * @param ignite Ignite.
 * @param name Cache name.
 * @param from Start from key.
 * @param iter Iteration.
 */
protected void generateData(Ignite ignite, String name, int from, int iter) {
  try (IgniteDataStreamer<Integer, Integer> dataStreamer = ignite.dataStreamer(name)) {
    dataStreamer.allowOverwrite(true);
    for (int i = from; i < from + TEST_SIZE; i++) {
      if ((i + 1) % (TEST_SIZE / 10) == 0)
        log.info("Prepared " + (i + 1) * 100 / (TEST_SIZE) + "% entries. [count=" + TEST_SIZE +
          ", iteration=" + iter + ", cache=" + name + "]");
      dataStreamer.addData(i, i + name.hashCode() + iter);
    }
  }
}

代码示例来源:origin: apache/ignite

/**
 * @param ignite Node.
 * @param cacheName Cache name.
 */
private void insertData(Ignite ignite, String cacheName) {
  try (IgniteDataStreamer streamer = ignite.dataStreamer(cacheName)) {
    for (int i = 0; i < KEYS; i++)
      streamer.addData(new TestKey(i), new TestValue(i));
  }
}

代码示例来源:origin: apache/ignite

/**
 * Load data into Ignite.
 *
 * @param ignite Ignite.
 * @param cache Cache.
 */
private static void loadData(Ignite ignite, IgniteCache<Integer, Person> cache) throws Exception {
  try (IgniteDataStreamer<Object, Object> str = ignite.dataStreamer(cache.getName())) {
    for (int id = 0; id < ENTRIES_CNT; id++)
      str.addData(id, new Person(id, "John" + id, "Doe"));
  }
}

代码示例来源:origin: apache/ignite

/**
 * @param ignite Ignite.
 */
private void loadCaches(Ignite ignite) {
  for (int i = 0; i < CACHES; i++) {
    try (IgniteDataStreamer<Object, Object> s = ignite.dataStreamer(cacheName(i))) {
      s.allowOverwrite(true);
      for (int j = 0; j < NUM_OF_KEYS; j++)
        s.addData(j, "cache: " + i + " data: " + j);
      s.flush();
    }
  }
}

代码示例来源:origin: apache/ignite

/**
 * Populates cache with data streamer.
 *
 * @param g Grid.
 */
private static void realTimePopulate(final Ignite g) {
  try (IgniteDataStreamer<Integer, Long> ldr = g.dataStreamer(DEFAULT_CACHE_NAME)) {
    // Sets max values to 1 so cache metrics have correct values.
    ldr.perNodeParallelOperations(1);
    // Count closure which increments a count on remote node.
    ldr.receiver(new IncrementingUpdater());
    for (int i = 0; i < CNT; i++)
      ldr.addData(i % (CNT / 2), 1L);
  }
}

代码示例来源:origin: apache/ignite

/**
 * fill data by default
 */
private void fillTestData(Ignite ig) {
  try (IgniteDataStreamer<Object, Object> s = ig.dataStreamer(TEST_CACHE_NAME)) {
    for (int i = 0; i < 500; i++) {
      BinaryObject bo = ig.binary().builder("TestIndexObject")
        .setField("a", i, Object.class)
        .setField("b", String.valueOf(i), Object.class)
        .build();
      s.addData(i, bo);
    }
  }
}

代码示例来源:origin: apache/ignite

/**
   * Creates streamer for global cache.
   *
   * @param ignite instance of {@code Ignite}.
   * @param cache instance of global cache.
   * @return instance of {@code IgniteDataStreamer}.
   */
  private static IgniteDataStreamer<String, String> createGlobalStreamer(Ignite ignite,
    IgniteCache<String, String> cache) {
    IgniteDataStreamer<String, String> streamer = ignite.dataStreamer(cache.getName());

    streamer.allowOverwrite(true);

    streamer.skipStore(true);

    streamer.keepBinary(false);

    return streamer;
  }
}

代码示例来源:origin: apache/ignite

/**
 * Creates default cache and preload some data entries.
 *
 * @param ignite Ignite.
 * @param countEntries Count of entries.
 */
private void createCacheAndPreload(Ignite ignite, int countEntries) {
  ignite.createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
    .setAffinity(new RendezvousAffinityFunction(false, 32))
    .setBackups(1));
  try (IgniteDataStreamer streamer = ignite.dataStreamer(DEFAULT_CACHE_NAME)) {
    for (int i = 0; i < countEntries; i++)
      streamer.addData(i, i);
  }
}

代码示例来源:origin: apache/ignite

/**
 * @throws Exception If failed.
 */
@Test
public void testStreamer1() throws Exception {
  cacheC = new IgniteClosure<String, CacheConfiguration[]>() {
    @Override public CacheConfiguration[] apply(String s) {
      return null;
    }
  };
  startServer(0, 1);
  cacheC = null;
  cacheNodeFilter = new TestCacheNodeExcludingFilter(Collections.singletonList(getTestIgniteInstanceName(0)));
  startServer(1, 2);
  IgniteDataStreamer<Object, Object> streamer = ignite(0).dataStreamer(CACHE_NAME1);
  streamer.addData(1, 1);
  streamer.flush();
}

代码示例来源:origin: apache/ignite

/**
 * @throws Exception If failed.
 */
@Test
public void testDataStreamerWaitsUntilDynamicCacheStartIsFinished() throws Exception {
  final Ignite ignite0 = startGrids(2);
  final Ignite ignite1 = grid(1);
  final String cacheName = "testCache";
  IgniteCache<Integer, Integer> cache = ignite0.getOrCreateCache(
    new CacheConfiguration<Integer, Integer>().setName(cacheName));
  try (IgniteDataStreamer<Integer, Integer> ldr = ignite1.dataStreamer(cacheName)) {
    ldr.addData(0, 0);
  }
  assertEquals(Integer.valueOf(0), cache.get(0));
}

代码示例来源:origin: apache/ignite

/**
 * Create and add test data via Streamer API.
 *
 * @param grid to get streamer.
 * @return test object (it is key and val).
 */
private TestObj streamData(final Ignite grid) {
  final IgniteDataStreamer<TestObj, TestObj> streamer = grid.dataStreamer(CACHE_NAME);
  TestObj entity = null;
  for (int i = 0; i < 1; i++) {
    entity = new TestObj(i);
    streamer.addData(entity, entity);
  }
  streamer.flush();
  streamer.close();
  streamer.future().get();
  return entity;
}

代码示例来源:origin: apache/ignite

/**
 * Put key range.
 *
 * @param node Node.
 * @param from From key.
 * @param to To key.
 */
protected static void put(Ignite node, int from, int to) {
  try (IgniteDataStreamer streamer = node.dataStreamer(CACHE_NAME)) {
    streamer.allowOverwrite(true);
    streamer.keepBinary(true);
    for (int i = from; i < to; i++) {
      BinaryObject key = key(node, i);
      BinaryObject val = value(node, i);
      streamer.addData(key, val);
    }
    streamer.flush();
  }
}

代码示例来源:origin: apache/ignite

/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
  super.beforeTest();
  cleanPersistenceDir();
  Ignite ig = startGrid();
  ig.cluster().active(true);
  try (IgniteDataStreamer<Integer, byte[]> st = ig.dataStreamer(DEFAULT_CACHE_NAME)){
    st.allowOverwrite(true);
    byte[] payload = new byte[1024];
    // Generate WAL segment files.
    for (int i = 0; i < 100 * 1024; i++)
      st.addData(i, payload);
  }
}

代码示例来源:origin: apache/ignite

/** */
private void populateCache(Ignite client) {
  for (int i = 0; i < NUM_CACHES; i++) {
    CacheConfiguration cfg = new CacheConfiguration();
    cfg.setName(NAME_PREFIX + i).setAtomicityMode(CacheAtomicityMode.ATOMIC)
        .setBackups(1).setStatisticsEnabled(true).setManagementEnabled(true);
    client.getOrCreateCache(cfg);
    IgniteDataStreamer<Object, Object> streamer = client.dataStreamer(NAME_PREFIX + i);
    for (int j = 0; j < NUM_ENTRIES_PER_CACHE; j++) {
      String bo = i + "|" + j + "|WHATEVER";
      streamer.addData(j, bo);
    }
    streamer.close();
    log.info("Streamer closed");
  }
}

相关文章