com.hazelcast.core.IMap.executeOnKey()方法的使用及代码示例

x33g5p2x  于2022-01-20 转载在 其他  
字(13.1k)|赞(0)|评价(0)|浏览(143)

本文整理了Java中com.hazelcast.core.IMap.executeOnKey()方法的一些代码示例,展示了IMap.executeOnKey()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。IMap.executeOnKey()方法的具体详情如下:
包路径:com.hazelcast.core.IMap
类名称:IMap
方法名:executeOnKey

IMap.executeOnKey介绍

[英]Applies the user defined EntryProcessor to the entry mapped by the key. Returns the object which is the result of the EntryProcessor#process(Entry) method.

The EntryProcessor may implement the Offloadable and ReadOnly interfaces.

If the EntryProcessor implements the Offloadable interface, the processing will be offloaded to the given ExecutorService, allowing unblocking of the partition-thread, which means that other partition-operations may proceed. The key will be locked for the time-span of the processing in order to not generate a write-conflict. In this case the threading looks as follows:

  1. partition-thread (fetch & lock)

  2. execution-thread (process)

  3. partition-thread (set & unlock, or just unlock if no changes)
    If the EntryProcessor implements the Offloadable and ReadOnly interfaces, the processing will be offloaded to the given ExecutorService allowing unblocking the partition-thread. Since the EntryProcessor is not supposed to do any changes to the Entry, the key will NOT be locked for the time-span of the processing. In this case the threading looks as follows:

  4. partition-thread (fetch)

  5. execution-thread (process)
    In this case, the EntryProcessor.getBackupProcessor() has to return null; otherwise an IllegalArgumentException exception is thrown.

If the EntryProcessor implements only ReadOnly without implementing Offloadable, the processing unit will not be offloaded, however, the EntryProcessor will not wait for the lock to be acquired, since the EP will not do any modifications.

Using offloading is useful if the EntryProcessor encompasses heavy logic that may stall the partition-thread.

If the EntryProcessor implements ReadOnly and modifies the entry it is processing, an UnsupportedOperationException will be thrown.

Offloading will not be applied to backup partitions. It is possible to initialize the EntryBackupProcessor with some input provided by the EntryProcessor in the EntryProcessor.getBackupProcessor() method. The input allows providing context to the EntryBackupProcessor, for example the "delta", so that the EntryBackupProcessor does not have to calculate the "delta" but it may just apply it.

See #submitToKey(Object,EntryProcessor) for an async version of this method.

Interactions with the map store

If value with key is not found in memory, MapLoader#load(Object) is invoked to load the value from the map store backing the map.

If the entryProcessor updates the entry and write-through persistence mode is configured, before the value is stored in memory, MapStore#store(Object,Object) is called to write the value into the map store.

If the entryProcessor updates the entry's value to null value and write-through persistence mode is configured, before the value is removed from the memory, MapStore#delete(Object) is called to delete the value from the map store.

Any exceptions thrown by the map store fail the operation and are propagated to the caller.

If write-behind persistence mode is configured with write-coalescing turned off, com.hazelcast.map.ReachedMaxSizeException may be thrown if the write-behind queue has reached its per-node maximum capacity.
[中]将用户定义的EntryProcessor应用于键映射的条目。返回EntryProcessor#process(Entry)方法的结果对象。
EntryProcessor可以实现可卸载和只读接口。
如果EntryProcessor实现可卸载接口,则处理将卸载到给定的ExecutorService,从而允许解除分区线程的阻塞,这意味着其他分区操作可能会继续。密钥将在处理的时间范围内被锁定,以避免产生写入冲突。在这种情况下,线程看起来如下所示:
1.分区线程(获取和锁定)
1.执行线程(进程)
1.分区线程(设置并解锁,如果没有更改,则仅解锁)
如果EntryProcessor实现了可卸载和只读接口,则处理将卸载到给定的ExecutorService,从而允许解除分区线程阻塞。由于EntryProcessor不应该对条目进行任何更改,因此在处理的时间跨度内不会锁定密钥。在这种情况下,线程看起来如下所示:
1.分区线程(获取)
1.执行线程(进程)
在本例中,EntryProcessor。getBackupProcessor()必须返回null;否则将引发IllegalArgumentException异常。
如果EntryProcessor仅实现只读而未实现可卸载,则不会卸载处理单元,但是,EntryProcessor不会等待获取锁,因为EP不会进行任何修改。
如果EntryProcessor包含可能导致分区线程暂停的大量逻辑,则使用卸载是有用的。
如果EntryProcessor实现ReadOnly并修改它正在处理的条目,将抛出UnsupportedOperationException。
卸载不会应用于备份分区。可以使用EntryProcessor中EntryProcessor提供的一些输入来初始化EntryBackupProcessor。getBackupProcessor()方法。输入允许向EntryBackupProcessor提供上下文,例如“delta”,这样EntryBackupProcessor就不必计算“delta”,只需应用它即可。
有关此方法的异步版本,请参见#submitToKey(对象,EntryProcessor)。
与地图商店的交互
如果在内存中找不到带键的值,则会调用MapLoader#load(对象)从支持映射的映射存储中加载值。
如果entryProcessor更新条目并配置了直写持久化模式,则在将值存储在内存中之前,将调用MapStore#store(对象,对象)将值写入映射存储。
如果entryProcessor将条目的值更新为null值,并配置了直写持久化模式,则在从内存中删除该值之前,将调用MapStore#delete(对象)从映射存储中删除该值。
映射存储引发的任何异常都会导致操作失败,并传播到调用方。
如果将写后持久化模式配置为关闭写合并,则com。黑泽尔卡斯特。地图如果写后队列已达到其每个节点的最大容量,则可能引发ReacheMaxSizeException。

代码示例

代码示例来源:origin: spring-projects/spring-session

@Test
public void saveUpdatedLastAccessedTimeFlushModeImmediate() {
  verify(this.sessions, times(1)).addEntryListener(any(MapListener.class),
      anyBoolean());
  this.repository.setHazelcastFlushMode(HazelcastFlushMode.IMMEDIATE);
  HazelcastSession session = this.repository.createSession();
  session.setLastAccessedTime(Instant.now());
  verify(this.sessions, times(1)).set(eq(session.getId()),
      eq(session.getDelegate()), isA(Long.class), eq(TimeUnit.SECONDS));
  verify(this.sessions, times(1)).executeOnKey(eq(session.getId()),
      any(EntryProcessor.class));
  this.repository.save(session);
  verifyZeroInteractions(this.sessions);
}

代码示例来源:origin: spring-projects/spring-session

@Test
public void saveUpdatedAttributeFlushModeImmediate() {
  verify(this.sessions, times(1)).addEntryListener(any(MapListener.class),
      anyBoolean());
  this.repository.setHazelcastFlushMode(HazelcastFlushMode.IMMEDIATE);
  HazelcastSession session = this.repository.createSession();
  session.setAttribute("testName", "testValue");
  verify(this.sessions, times(1)).set(eq(session.getId()),
      eq(session.getDelegate()), isA(Long.class), eq(TimeUnit.SECONDS));
  verify(this.sessions, times(1)).executeOnKey(eq(session.getId()),
      any(EntryProcessor.class));
  this.repository.save(session);
  verifyZeroInteractions(this.sessions);
}

代码示例来源:origin: spring-projects/spring-session

@Test
public void removeAttributeFlushModeImmediate() {
  verify(this.sessions, times(1)).addEntryListener(any(MapListener.class),
      anyBoolean());
  this.repository.setHazelcastFlushMode(HazelcastFlushMode.IMMEDIATE);
  HazelcastSession session = this.repository.createSession();
  session.removeAttribute("testName");
  verify(this.sessions, times(1)).set(eq(session.getId()),
      eq(session.getDelegate()), isA(Long.class), eq(TimeUnit.SECONDS));
  verify(this.sessions, times(1)).executeOnKey(eq(session.getId()),
      any(EntryProcessor.class));
  this.repository.save(session);
  verifyZeroInteractions(this.sessions);
}

代码示例来源:origin: spring-projects/spring-session

@Test
public void saveUpdatedMaxInactiveIntervalInSecondsFlushModeImmediate() {
  verify(this.sessions, times(1)).addEntryListener(any(MapListener.class),
      anyBoolean());
  this.repository.setHazelcastFlushMode(HazelcastFlushMode.IMMEDIATE);
  HazelcastSession session = this.repository.createSession();
  String sessionId = session.getId();
  session.setMaxInactiveInterval(Duration.ofSeconds(1));
  verify(this.sessions, times(1)).set(eq(sessionId),
      eq(session.getDelegate()), isA(Long.class), eq(TimeUnit.SECONDS));
  verify(this.sessions).setTtl(eq(sessionId), anyLong(), any());
  verify(this.sessions, times(1)).executeOnKey(eq(sessionId),
      any(EntryProcessor.class));
  this.repository.save(session);
  verifyZeroInteractions(this.sessions);
}

代码示例来源:origin: spring-projects/spring-session

@Override
public void save(HazelcastSession session) {
  if (session.isNew) {
    this.sessions.set(session.getId(), session.getDelegate(),
        session.getMaxInactiveInterval().getSeconds(), TimeUnit.SECONDS);
  }
  else if (session.sessionIdChanged) {
    this.sessions.delete(session.originalId);
    session.originalId = session.getId();
    this.sessions.set(session.getId(), session.getDelegate(),
        session.getMaxInactiveInterval().getSeconds(), TimeUnit.SECONDS);
  }
  else if (session.hasChanges()) {
    SessionUpdateEntryProcessor entryProcessor = new SessionUpdateEntryProcessor();
    if (session.lastAccessedTimeChanged) {
      entryProcessor.setLastAccessedTime(session.getLastAccessedTime());
    }
    if (session.maxInactiveIntervalChanged) {
      if (SUPPORTS_SET_TTL) {
        updateTtl(session);
      }
      entryProcessor.setMaxInactiveInterval(session.getMaxInactiveInterval());
    }
    if (!session.delta.isEmpty()) {
      entryProcessor.setDelta(session.delta);
    }
    this.sessions.executeOnKey(session.getId(), entryProcessor);
  }
  session.clearChangeFlags();
}

代码示例来源:origin: vladimir-bukhtoyarov/bucket4j

@Override
public void createInitialState(K key, BucketConfiguration configuration) {
  JCacheEntryProcessor<K, Nothing> entryProcessor = JCacheEntryProcessor.initStateProcessor(configuration);
  cache.executeOnKey(key, adoptEntryProcessor(entryProcessor));
}

代码示例来源:origin: vladimir-bukhtoyarov/bucket4j

@Override
public <T extends Serializable> CommandResult<T> execute(K key, GridCommand<T> command) {
  JCacheEntryProcessor<K, T> entryProcessor = JCacheEntryProcessor.executeProcessor(command);
  return (CommandResult<T>) cache.executeOnKey(key, adoptEntryProcessor(entryProcessor));
}

代码示例来源:origin: com.atlassian.cache/atlassian-cache-hazelcast

@Override
public void reset()
{
  versionMap.executeOnKey(ReferenceKey.KEY, IncrementVersionEntryProcessor.getInstance());
  localReference.reset();
}

代码示例来源:origin: com.atlassian.cache/atlassian-cache-hazelcast

private long getVersion()
{
  // try a standard get to give the near-cache a chance
  Long version = versionMap.get(ReferenceKey.KEY);
  if (version == null)
  {
    version = (Long) versionMap.executeOnKey(ReferenceKey.KEY, GetOrInitVersionEntryProcessor.getInstance());
  }
  return version;
}

代码示例来源:origin: com.atlassian.cache/atlassian-cache-hazelcast

private Long getVersion(K key)
{
  // try a standard get first to give the near-cache a chance
  Long version = versionMap.get(key);
  if (version == null)
  {
    version = (Long) versionMap.executeOnKey(key, GetOrInitVersionEntryProcessor.getInstance());
  }
  return version;
}

代码示例来源:origin: com.github.vladimir-bukhtoyarov/bucket4j-hazelcast

@Override
public <T extends Serializable> CommandResult<T> execute(K key, GridCommand<T> command) {
  JCacheEntryProcessor<K, T> entryProcessor = JCacheEntryProcessor.executeProcessor(command);
  return (CommandResult<T>) cache.executeOnKey(key, adoptEntryProcessor(entryProcessor));
}

代码示例来源:origin: vladimir-bukhtoyarov/bucket4j

@Override
public <T extends Serializable> T createInitialStateAndExecute(K key, BucketConfiguration configuration, GridCommand<T> command) {
  JCacheEntryProcessor<K, T> entryProcessor = JCacheEntryProcessor.initStateAndExecuteProcessor(command, configuration);
  CommandResult<T> result = (CommandResult<T>) cache.executeOnKey(key, adoptEntryProcessor(entryProcessor));
  return result.getData();
}

代码示例来源:origin: com.hazelcast.simulator/tests-common

@AfterRun
public void afterRun(ThreadState state) {
  // for each worker-thread we store the number of items it has produced for each estimator.
  for (int k = 0; k < estimatorCount; k++) {
    CardinalityEstimator estimator = estimators[k];
    // the number of unique items produced is equal to the iteration. If items 0,10,20 are produced,
    // then iteration is 3.
    long iteration = state.iterations[k];
    expectedCountMap.executeOnKey(estimator.getName(), new IncEntryProcessor(iteration));
  }
}

代码示例来源:origin: com.github.vladimir-bukhtoyarov/bucket4j-hazelcast

@Override
public <T extends Serializable> T createInitialStateAndExecute(K key, BucketConfiguration configuration, GridCommand<T> command) {
  JCacheEntryProcessor<K, T> entryProcessor = JCacheEntryProcessor.initStateAndExecuteProcessor(command, configuration);
  CommandResult<T> result = (CommandResult<T>) cache.executeOnKey(key, adoptEntryProcessor(entryProcessor));
  return result.getData();
}

代码示例来源:origin: com.hazelcast/hazelcast-all

@Override
public boolean update(final Object key, final Object newValue, final Object newVersion, final SoftLock lock) {
  if (lock instanceof MarkerWrapper) {
    final ExpiryMarker unwrappedMarker = ((MarkerWrapper) lock).getMarker();
    return (Boolean) map.executeOnKey(key, new UpdateEntryProcessor(unwrappedMarker, newValue, newVersion,
        nextMarkerId(), nextTimestamp(hazelcastInstance)));
  } else {
    return false;
  }
}

代码示例来源:origin: com.hazelcast/hazelcast-all

@Override
public SoftLock tryLock(final Object key, final Object version) {
  long timeout = nextTimestamp(hazelcastInstance) + lockTimeout;
  final ExpiryMarker marker = (ExpiryMarker) map.executeOnKey(key,
      new LockEntryProcessor(nextMarkerId(), timeout, version));
  return new MarkerWrapper(marker);
}

代码示例来源:origin: com.hazelcast/hazelcast-hibernate5

@Override
public void unlock(final Object key, final SoftLock lock) {
  if (lock instanceof MarkerWrapper) {
    final ExpiryMarker unwrappedMarker = ((MarkerWrapper) lock).getMarker();
    map.executeOnKey(key, new UnlockEntryProcessor(unwrappedMarker, nextMarkerId(),
        nextTimestamp(hazelcastInstance)));
  }
}

代码示例来源:origin: com.hazelcast/hazelcast-hibernate4

public boolean update(final Object key, final Object newValue, final Object newVersion, final SoftLock lock) {
  if (lock instanceof MarkerWrapper) {
    final ExpiryMarker unwrappedMarker = ((MarkerWrapper) lock).getMarker();
    return (Boolean) map.executeOnKey(key, new UpdateEntryProcessor(unwrappedMarker, newValue, newVersion,
        nextMarkerId(), nextTimestamp(hazelcastInstance)));
  } else {
    return false;
  }
}

代码示例来源:origin: com.hazelcast.simulator/tests-common

@TimeStep
public void timeStep(ThreadState state) {
  int key = keys[state.randomInt(keys.length)];
  long increment = state.randomInt(100);
  int delayMs = state.calculateDelay();
  map.executeOnKey(key, new IncrementEntryProcessor(increment, delayMs));
  state.localIncrementsAtKey[key] += increment;
}

代码示例来源:origin: com.hazelcast/hazelcast-hibernate52

@Override
public void unlock(final Object key, final SoftLock lock) {
  if (lock instanceof MarkerWrapper) {
    final ExpiryMarker unwrappedMarker = ((MarkerWrapper) lock).getMarker();
    map.executeOnKey(key, new UnlockEntryProcessor(unwrappedMarker, nextMarkerId(),
        nextTimestamp(hazelcastInstance)));
  }
}

相关文章