本文整理了Java中org.apache.ignite.Ignite.dataStreamer()
方法的一些代码示例,展示了Ignite.dataStreamer()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Ignite.dataStreamer()
方法的具体详情如下:
包路径:org.apache.ignite.Ignite
类名称:Ignite
方法名:dataStreamer
[英]Gets a new instance of data streamer associated with given cache name. Data streamer is responsible for loading external data into in-memory data grid. For more information refer to IgniteDataStreamer documentation.
[中]获取与给定缓存名称关联的数据流的新实例。数据拖缆负责将外部数据加载到内存数据网格中。有关更多信息,请参阅IgniteDataStreamer文档。
代码示例来源:origin: apache/ignite
/** {@inheritDoc} */
@Override public <K, V> IgniteDataStreamer<K, V> dataStreamer(@Nullable String cacheName) {
checkIgnite();
return g.dataStreamer(cacheName);
}
代码示例来源:origin: apache/ignite
@Override public Object call() throws Exception {
boolean failed = false;
try {
client.dataStreamer(DEFAULT_CACHE_NAME);
}
catch (IgniteClientDisconnectedException e) {
failed = true;
checkAndWait(e);
}
assertTrue(failed);
return client.dataStreamer(DEFAULT_CACHE_NAME);
}
},
代码示例来源:origin: apache/ignite
@Override public void run() {
try {
client.dataStreamer(CACHE_NAME);
fail();
}
catch (IgniteClientDisconnectedException e) {
assertNotNull(e.reconnectFuture());
}
}
});
代码示例来源:origin: apache/ignite
/**
* Stops streamer.
*
* @throws IgniteException If failed.
*/
public void stop() throws IgniteException {
if (stopped)
return;
stopped = true;
getIgnite().<K, V>dataStreamer(cacheName).close(true);
getIgnite().close();
}
代码示例来源:origin: apache/ignite
/**
* @param ignite Ignite.
* @param cacheName Cache name.
*/
private void populateData(Ignite ignite, String cacheName) {
try (IgniteDataStreamer<Object, Object> streamer = ignite.dataStreamer(cacheName)) {
for (int i = 0; i < 1000; i++)
streamer.addData(i, i);
streamer.flush();
}
}
}
代码示例来源:origin: apache/ignite
@Override public void apply(Ignite grid) {
try (IgniteDataStreamer<Integer, String> dataStreamer = grid.dataStreamer(DEFAULT_CACHE_NAME)) {
dataStreamer.allowOverwrite(allowOverwrite);
for (int i = 0; i < KEYS_CNT; i++)
dataStreamer.addData(i, Integer.toString(i));
}
log.info("Data loaded.");
}
};
代码示例来源:origin: apache/ignite
/**
* @param ignite Ignite.
* @param name Cache name.
* @param from Start from key.
* @param iter Iteration.
*/
protected void generateData(Ignite ignite, String name, int from, int iter) {
try (IgniteDataStreamer<Integer, Integer> dataStreamer = ignite.dataStreamer(name)) {
dataStreamer.allowOverwrite(true);
for (int i = from; i < from + TEST_SIZE; i++) {
if ((i + 1) % (TEST_SIZE / 10) == 0)
log.info("Prepared " + (i + 1) * 100 / (TEST_SIZE) + "% entries. [count=" + TEST_SIZE +
", iteration=" + iter + ", cache=" + name + "]");
dataStreamer.addData(i, i + name.hashCode() + iter);
}
}
}
代码示例来源:origin: apache/ignite
/**
* @param ignite Node.
* @param cacheName Cache name.
*/
private void insertData(Ignite ignite, String cacheName) {
try (IgniteDataStreamer streamer = ignite.dataStreamer(cacheName)) {
for (int i = 0; i < KEYS; i++)
streamer.addData(new TestKey(i), new TestValue(i));
}
}
代码示例来源:origin: apache/ignite
/**
* Load data into Ignite.
*
* @param ignite Ignite.
* @param cache Cache.
*/
private static void loadData(Ignite ignite, IgniteCache<Integer, Person> cache) throws Exception {
try (IgniteDataStreamer<Object, Object> str = ignite.dataStreamer(cache.getName())) {
for (int id = 0; id < ENTRIES_CNT; id++)
str.addData(id, new Person(id, "John" + id, "Doe"));
}
}
代码示例来源:origin: apache/ignite
/**
* @param ignite Ignite.
*/
private void loadCaches(Ignite ignite) {
for (int i = 0; i < CACHES; i++) {
try (IgniteDataStreamer<Object, Object> s = ignite.dataStreamer(cacheName(i))) {
s.allowOverwrite(true);
for (int j = 0; j < NUM_OF_KEYS; j++)
s.addData(j, "cache: " + i + " data: " + j);
s.flush();
}
}
}
代码示例来源:origin: apache/ignite
/**
* Populates cache with data streamer.
*
* @param g Grid.
*/
private static void realTimePopulate(final Ignite g) {
try (IgniteDataStreamer<Integer, Long> ldr = g.dataStreamer(DEFAULT_CACHE_NAME)) {
// Sets max values to 1 so cache metrics have correct values.
ldr.perNodeParallelOperations(1);
// Count closure which increments a count on remote node.
ldr.receiver(new IncrementingUpdater());
for (int i = 0; i < CNT; i++)
ldr.addData(i % (CNT / 2), 1L);
}
}
代码示例来源:origin: apache/ignite
/**
* fill data by default
*/
private void fillTestData(Ignite ig) {
try (IgniteDataStreamer<Object, Object> s = ig.dataStreamer(TEST_CACHE_NAME)) {
for (int i = 0; i < 500; i++) {
BinaryObject bo = ig.binary().builder("TestIndexObject")
.setField("a", i, Object.class)
.setField("b", String.valueOf(i), Object.class)
.build();
s.addData(i, bo);
}
}
}
代码示例来源:origin: apache/ignite
/**
* Creates streamer for global cache.
*
* @param ignite instance of {@code Ignite}.
* @param cache instance of global cache.
* @return instance of {@code IgniteDataStreamer}.
*/
private static IgniteDataStreamer<String, String> createGlobalStreamer(Ignite ignite,
IgniteCache<String, String> cache) {
IgniteDataStreamer<String, String> streamer = ignite.dataStreamer(cache.getName());
streamer.allowOverwrite(true);
streamer.skipStore(true);
streamer.keepBinary(false);
return streamer;
}
}
代码示例来源:origin: apache/ignite
/**
* Creates default cache and preload some data entries.
*
* @param ignite Ignite.
* @param countEntries Count of entries.
*/
private void createCacheAndPreload(Ignite ignite, int countEntries) {
ignite.createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
.setAffinity(new RendezvousAffinityFunction(false, 32))
.setBackups(1));
try (IgniteDataStreamer streamer = ignite.dataStreamer(DEFAULT_CACHE_NAME)) {
for (int i = 0; i < countEntries; i++)
streamer.addData(i, i);
}
}
代码示例来源:origin: apache/ignite
/**
* @throws Exception If failed.
*/
@Test
public void testStreamer1() throws Exception {
cacheC = new IgniteClosure<String, CacheConfiguration[]>() {
@Override public CacheConfiguration[] apply(String s) {
return null;
}
};
startServer(0, 1);
cacheC = null;
cacheNodeFilter = new TestCacheNodeExcludingFilter(Collections.singletonList(getTestIgniteInstanceName(0)));
startServer(1, 2);
IgniteDataStreamer<Object, Object> streamer = ignite(0).dataStreamer(CACHE_NAME1);
streamer.addData(1, 1);
streamer.flush();
}
代码示例来源:origin: apache/ignite
/**
* @throws Exception If failed.
*/
@Test
public void testDataStreamerWaitsUntilDynamicCacheStartIsFinished() throws Exception {
final Ignite ignite0 = startGrids(2);
final Ignite ignite1 = grid(1);
final String cacheName = "testCache";
IgniteCache<Integer, Integer> cache = ignite0.getOrCreateCache(
new CacheConfiguration<Integer, Integer>().setName(cacheName));
try (IgniteDataStreamer<Integer, Integer> ldr = ignite1.dataStreamer(cacheName)) {
ldr.addData(0, 0);
}
assertEquals(Integer.valueOf(0), cache.get(0));
}
代码示例来源:origin: apache/ignite
/**
* Create and add test data via Streamer API.
*
* @param grid to get streamer.
* @return test object (it is key and val).
*/
private TestObj streamData(final Ignite grid) {
final IgniteDataStreamer<TestObj, TestObj> streamer = grid.dataStreamer(CACHE_NAME);
TestObj entity = null;
for (int i = 0; i < 1; i++) {
entity = new TestObj(i);
streamer.addData(entity, entity);
}
streamer.flush();
streamer.close();
streamer.future().get();
return entity;
}
代码示例来源:origin: apache/ignite
/**
* Put key range.
*
* @param node Node.
* @param from From key.
* @param to To key.
*/
protected static void put(Ignite node, int from, int to) {
try (IgniteDataStreamer streamer = node.dataStreamer(CACHE_NAME)) {
streamer.allowOverwrite(true);
streamer.keepBinary(true);
for (int i = from; i < to; i++) {
BinaryObject key = key(node, i);
BinaryObject val = value(node, i);
streamer.addData(key, val);
}
streamer.flush();
}
}
代码示例来源:origin: apache/ignite
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
cleanPersistenceDir();
Ignite ig = startGrid();
ig.cluster().active(true);
try (IgniteDataStreamer<Integer, byte[]> st = ig.dataStreamer(DEFAULT_CACHE_NAME)){
st.allowOverwrite(true);
byte[] payload = new byte[1024];
// Generate WAL segment files.
for (int i = 0; i < 100 * 1024; i++)
st.addData(i, payload);
}
}
代码示例来源:origin: apache/ignite
/** */
private void populateCache(Ignite client) {
for (int i = 0; i < NUM_CACHES; i++) {
CacheConfiguration cfg = new CacheConfiguration();
cfg.setName(NAME_PREFIX + i).setAtomicityMode(CacheAtomicityMode.ATOMIC)
.setBackups(1).setStatisticsEnabled(true).setManagementEnabled(true);
client.getOrCreateCache(cfg);
IgniteDataStreamer<Object, Object> streamer = client.dataStreamer(NAME_PREFIX + i);
for (int j = 0; j < NUM_ENTRIES_PER_CACHE; j++) {
String bo = i + "|" + j + "|WHATEVER";
streamer.addData(j, bo);
}
streamer.close();
log.info("Streamer closed");
}
}
内容来源于网络,如有侵权,请联系作者删除!