本文整理了Java中com.hazelcast.core.IMap.submitToKey()
方法的一些代码示例,展示了IMap.submitToKey()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。IMap.submitToKey()
方法的具体详情如下:
包路径:com.hazelcast.core.IMap
类名称:IMap
方法名:submitToKey
[英]Applies the user defined EntryProcessor to the entry mapped by the key. Returns immediately with a ICompletableFuture representing that task.
EntryProcessor is not cancellable, so calling ICompletableFuture.cancel() method won't cancel the operation of EntryProcessor.
The EntryProcessor may implement the Offloadable and ReadOnly interfaces.
If the EntryProcessor implements the Offloadable interface the processing will be offloaded to the given ExecutorService allowing unblocking the partition-thread, which means that other partition-operations may proceed. The key will be locked for the time-span of the processing in order to not generate a write-conflict. In this case the threading looks as follows:
partition-thread (fetch & lock)
execution-thread (process)
partition-thread (set & unlock, or just unlock if no changes)
If the EntryProcessor implements the Offloadable and ReadOnly interfaces the processing will be offloaded to the given ExecutorService allowing unblocking the partition-thread. Since the EntryProcessor is not supposed to do any changes to the Entry the key will NOT be locked for the time-span of the processing. In this case the threading looks as follows:
partition-thread (fetch & lock)
execution-thread (process)
In this case the EntryProcessor.getBackupProcessor() has to return null; otherwise an IllegalArgumentException exception is thrown.
If the EntryProcessor implements only ReadOnly without implementing Offloadable the processing unit will not be offloaded, however, the EntryProcessor will not wait for the lock to be acquired, since the EP will not do any modifications.
If the EntryProcessor implements ReadOnly and modifies the entry it is processing a UnsupportedOperationException will be thrown.
Using offloading is useful if the EntryProcessor encompasses heavy logic that may stall the partition-thread.
Offloading will not be applied to backup partitions. It is possible to initialize the EntryBackupProcessor with some input provided by the EntryProcessor in the EntryProcessor.getBackupProcessor() method. The input allows providing context to the EntryBackupProcessor - for example the "delta" so that the EntryBackupProcessor does not have to calculate the "delta" but it may just apply it.
See #executeOnKey(Object,EntryProcessor) for sync version of this method.
Interactions with the map store
If value with key is not found in memory MapLoader#load(Object) is invoked to load the value from the map store backing the map.
If the entryProcessor updates the entry and write-through persistence mode is configured, before the value is stored in memory, MapStore#store(Object,Object) is called to write the value into the map store.
If the entryProcessor updates the entry's value to null value and write-through persistence mode is configured, before the value is removed from the memory, MapStore#delete(Object) is called to delete the value from the map store.
Any exception thrown by the map store fail the operation and are propagated to the provided callback via ExecutionCallback#onFailure(Throwable).
If write-behind persistence mode is configured with write-coalescing turned off, com.hazelcast.map.ReachedMaxSizeException may be thrown if the write-behind queue has reached its per-node maximum capacity.
[中]将用户定义的EntryProcessor应用于键映射的条目。立即返回,其中ICompletableFuture表示该任务。
EntryProcessor不可取消,因此调用ICompletableFuture。cancel()方法不会取消EntryProcessor的操作。
EntryProcessor可以实现可卸载和只读接口。
如果EntryProcessor实现了可卸载接口,那么处理将被卸载到给定的ExecutorService,从而允许取消阻止分区线程,这意味着其他分区操作可能会继续。密钥将在处理的时间范围内被锁定,以避免产生写入冲突。在这种情况下,线程看起来如下所示:
1.分区线程(获取和锁定)
1.执行线程(进程)
1.分区线程(设置并解锁,如果没有更改,则仅解锁)
如果EntryProcessor实现可卸载和只读接口,则处理将卸载到给定的ExecutorService,从而允许解除分区线程阻塞。由于EntryProcessor不应该对条目进行任何更改,因此在处理的时间跨度内不会锁定密钥。在这种情况下,线程看起来如下所示:
1.分区线程(获取和锁定)
1.执行线程(进程)
在本例中,是EntryProcessor。getBackupProcessor()必须返回null;否则将引发IllegalArgumentException异常。
如果EntryProcessor仅实现只读而未实现可卸载,则不会卸载处理单元,但是,EntryProcessor不会等待获取锁,因为EP不会进行任何修改。
如果EntryProcessor实现只读并修改它正在处理的条目,则将抛出UnsupportedOperationException。
如果EntryProcessor包含可能导致分区线程暂停的大量逻辑,则使用卸载是有用的。
卸载不会应用于备份分区。可以使用EntryProcessor中EntryProcessor提供的一些输入来初始化EntryBackupProcessor。getBackupProcessor()方法。输入允许向EntryBackupProcessor提供上下文-例如“增量”,这样EntryBackupProcessor不必计算“增量”,但可以应用它。
有关此方法的同步版本,请参阅#executeOnKey(对象,EntryProcessor)。
与地图商店的交互
如果在内存中找不到带键的值,则会调用MapLoader#load(对象)从支持映射的映射存储中加载值。
如果entryProcessor更新条目并配置了直写持久化模式,则在将值存储在内存中之前,将调用MapStore#store(对象,对象)将值写入映射存储。
如果entryProcessor将条目的值更新为null值,并配置了直写持久化模式,则在从内存中删除该值之前,将调用MapStore#delete(对象)从映射存储中删除该值。
映射存储引发的任何异常都会导致操作失败,并通过ExecutionCallback#onFailure(Throwable)传播到提供的回调。
如果将写后持久化模式配置为关闭写合并,则com。黑泽尔卡斯特。地图如果写后队列已达到其每个节点的最大容量,则可能引发ReacheMaxSizeException。
代码示例来源:origin: hazelcast/hazelcast-jet
@Override
public void submitToKey(K key, EntryProcessor entryProcessor, ExecutionCallback callback) {
map.submitToKey(key, entryProcessor, callback);
}
代码示例来源:origin: hazelcast/hazelcast-jet
@Override
public ICompletableFuture submitToKey(K key, EntryProcessor entryProcessor) {
return map.submitToKey(key, entryProcessor);
}
代码示例来源:origin: vladimir-bukhtoyarov/bucket4j
private <T extends Serializable> CompletableFuture<CommandResult<T>> invokeAsync(K key, JCacheEntryProcessor<K, T> entryProcessor) {
CompletableFuture<CommandResult<T>> future = new CompletableFuture<>();
cache.submitToKey(key, adoptEntryProcessor(entryProcessor), new ExecutionCallback() {
@Override
public void onResponse(Object response) {
future.complete((CommandResult<T>) response);
}
@Override
public void onFailure(Throwable t) {
future.completeExceptionally(t);
}
});
return future;
}
代码示例来源:origin: com.github.vladimir-bukhtoyarov/bucket4j-hazelcast
private <T extends Serializable> CompletableFuture<CommandResult<T>> invokeAsync(K key, JCacheEntryProcessor<K, T> entryProcessor) {
CompletableFuture<CommandResult<T>> future = new CompletableFuture<>();
cache.submitToKey(key, adoptEntryProcessor(entryProcessor), new ExecutionCallback() {
@Override
public void onResponse(Object response) {
future.complete((CommandResult<T>) response);
}
@Override
public void onFailure(Throwable t) {
future.completeExceptionally(t);
}
});
return future;
}
代码示例来源:origin: hazelcast/hazelcast-jet
@Override
protected boolean tryProcess(int ordinal, @Nonnull Object object) {
checkError();
if (!tryIncrement(numConcurrentOps, 1, MAX_PARALLEL_ASYNC_OPS)) {
return false;
}
try {
@SuppressWarnings("unchecked")
T item = (T) object;
EntryProcessor<K, V> entryProcessor = toEntryProcessorFn.apply(item);
K key = toKeyFn.apply(item);
map.submitToKey(key, entryProcessor, callback);
return true;
} catch (HazelcastInstanceNotActiveException e) {
throw handleInstanceNotActive(e, isLocal);
}
}
代码示例来源:origin: dsukhoroslov/bagri
QueryResultProcessor xqp = new QueryResultProcessor(params, ctx.getDocKeys(), resList);
xrCache.submitToKey(qrKey, xqp);
updateStats(query, 1, resList.size());
代码示例来源:origin: dsukhoroslov/bagri
public void addIndex(long docId, int pathId, String path, Object value) throws BagriException {
// shouldn't we index NULL values too? create special NULL class for this..
if (value != null) {
Collection<Index> indices = getPathIndices(pathId, path);
if (indices.isEmpty()) {
return;
}
IndexKey xid = factory.newIndexKey(pathId, value);
for (Index idx: indices) {
//indexPath(idx, docId, pathId, value);
if (idx.isEnabled()) {
long txId = idx.isUnique() ? txMgr.getCurrentTxId() : TX_NO;
ValueIndexator indexator = new ValueIndexator(docId, txId);
if (indexAsynch) {
idxCache.submitToKey(xid, indexator);
} else {
// this does not work in transaction!
idxCache.executeOnKey(xid, indexator);
}
logger.trace("addIndex; index submit for key {}", xid);
}
}
}
}
内容来源于网络,如有侵权,请联系作者删除!