org.apache.flink.configuration.Configuration.setInteger()方法的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(13.9k)|赞(0)|评价(0)|浏览(238)

本文整理了Java中org.apache.flink.configuration.Configuration.setInteger()方法的一些代码示例,展示了Configuration.setInteger()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Configuration.setInteger()方法的具体详情如下:
包路径:org.apache.flink.configuration.Configuration
类名称:Configuration
方法名:setInteger

Configuration.setInteger介绍

[英]Adds the given key/value pair to the configuration object.
[中]将给定的键/值对添加到配置对象。

代码示例

代码示例来源:origin: apache/flink

private Configuration applyYarnProperties(Configuration configuration) throws FlinkException {
  final Configuration effectiveConfiguration = new Configuration(configuration);
  // configure the default parallelism from YARN
  String propParallelism = yarnPropertiesFile.getProperty(YARN_PROPERTIES_PARALLELISM);
  if (propParallelism != null) { // maybe the property is not set
    try {
      int parallelism = Integer.parseInt(propParallelism);
      effectiveConfiguration.setInteger(CoreOptions.DEFAULT_PARALLELISM, parallelism);
      logAndSysout("YARN properties set default parallelism to " + parallelism);
    }
    catch (NumberFormatException e) {
      throw new FlinkException("Error while parsing the YARN properties: " +
        "Property " + YARN_PROPERTIES_PARALLELISM + " is not an integer.", e);
    }
  }
  // handle the YARN client's dynamic properties
  String dynamicPropertiesEncoded = yarnPropertiesFile.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING);
  Map<String, String> dynamicProperties = getDynamicProperties(dynamicPropertiesEncoded);
  for (Map.Entry<String, String> dynamicProperty : dynamicProperties.entrySet()) {
    effectiveConfiguration.setString(dynamicProperty.getKey(), dynamicProperty.getValue());
  }
  return effectiveConfiguration;
}

代码示例来源:origin: apache/flink

private Configuration createConfiguration() {
  Configuration newConfiguration = new Configuration();
  newConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, getTaskManagerNumSlots());
  newConfiguration.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, isDefaultOverwriteFiles());
  newConfiguration.addAll(baseConfiguration);
  return newConfiguration;
}

代码示例来源:origin: apache/flink

private static Configuration getConfiguration() {
  Configuration config = new Configuration();
  config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "80m");
  config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 800);
  return config;
}

代码示例来源:origin: apache/flink

private static Configuration getConfig() {
  Configuration config = new Configuration();
  config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true);
  config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
  config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
  config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
  config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1);
  config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 1);
  config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 1);
  config.setString(
    QueryableStateOptions.PROXY_PORT_RANGE,
    QS_PROXY_PORT_RANGE_START + "-" + (QS_PROXY_PORT_RANGE_START + NUM_TMS));
  config.setString(
    QueryableStateOptions.SERVER_PORT_RANGE,
    QS_SERVER_PORT_RANGE_START + "-" + (QS_SERVER_PORT_RANGE_START + NUM_TMS));
  config.setBoolean(WebOptions.SUBMIT_ENABLE, false);
  return config;
}

代码示例来源:origin: apache/flink

private static Configuration getConfig() {
    Configuration config = new Configuration();
    config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true);
    config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
    config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
    config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
    config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1);
    config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 1);
    config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 1);
    config.setString(
      QueryableStateOptions.PROXY_PORT_RANGE,
      QS_PROXY_PORT_RANGE_START + "-" + (QS_PROXY_PORT_RANGE_START + NUM_PORT_COUNT));
    config.setString(
      QueryableStateOptions.SERVER_PORT_RANGE,
      QS_SERVER_PORT_RANGE_START + "-" + (QS_SERVER_PORT_RANGE_START + NUM_PORT_COUNT));
    config.setBoolean(WebOptions.SUBMIT_ENABLE, false);
    return config;
  }
}

代码示例来源:origin: apache/flink

private static Configuration getConfiguration() {
    Configuration config = new Configuration();
    config.setString(ConfigConstants.RESTART_STRATEGY, "fixed-delay");
    config.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
    config.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "100 ms");

    return config;
  }
}

代码示例来源:origin: apache/flink

private Configuration getConfiguration() throws Exception {
  // Flink configuration
  final Configuration config = new Configuration();
  config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
  config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, DEFAULT_PARALLELISM);
  UUID id = UUID.randomUUID();
  final File checkpointDir = TEMP_FOLDER.newFolder("checkpoints_" + id).getAbsoluteFile();
  final File savepointDir = TEMP_FOLDER.newFolder("savepoints_" + id).getAbsoluteFile();
  if (!checkpointDir.exists() || !savepointDir.exists()) {
    throw new Exception("Test setup failed: failed to create (temporary) directories.");
  }
  LOG.info("Created temporary checkpoint directory: " + checkpointDir + ".");
  LOG.info("Created savepoint directory: " + savepointDir + ".");
  config.setString(CheckpointingOptions.STATE_BACKEND, "memory");
  config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
  config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 0);
  config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
  return config;
}

代码示例来源:origin: apache/flink

@Before
public void setup() {
  testingFatalErrorHandler = new TestingFatalErrorHandler();
  flinkConfig = new Configuration();
  flinkConfig.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 100);
  File root = folder.getRoot();
  File home = new File(root, "home");
  boolean created = home.mkdir();
  assertTrue(created);
  env = new HashMap<>();
  env.put(ENV_APP_ID, "foo");
  env.put(ENV_CLIENT_HOME_DIR, home.getAbsolutePath());
  env.put(ENV_CLIENT_SHIP_FILES, "");
  env.put(ENV_FLINK_CLASSPATH, "");
  env.put(ENV_HADOOP_USER_NAME, "foo");
  env.put(FLINK_JAR_PATH, root.toURI().toString());
}

代码示例来源:origin: apache/flink

private Configuration getFileBasedCheckpointsConfig(final String savepointDir) {
  final Configuration config = new Configuration();
  config.setString(CheckpointingOptions.STATE_BACKEND, "filesystem");
  config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
  config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 0);
  config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
  return config;
}

代码示例来源:origin: apache/flink

private void createBroadcastVariable(PythonOperationInfo info) {
  UdfOperator<?> op1 = (UdfOperator<?>) sets.getDataSet(info.parentID);
  DataSet<?> op2 = sets.getDataSet(info.otherID);
  op1.withBroadcastSet(op2, info.name);
  Configuration c = op1.getParameters();
  if (c == null) {
    c = new Configuration();
  }
  int count = c.getInteger(PLANBINDER_CONFIG_BCVAR_COUNT, 0);
  c.setInteger(PLANBINDER_CONFIG_BCVAR_COUNT, count + 1);
  c.setString(PLANBINDER_CONFIG_BCVAR_NAME_PREFIX + count, info.name);
  op1.withParameters(c);
}

代码示例来源:origin: apache/flink

@Nonnull
private YarnClusterDescriptor setupYarnClusterDescriptor() {
  final Configuration flinkConfiguration = new Configuration();
  flinkConfiguration.setString(YarnConfigOptions.APPLICATION_ATTEMPTS, "10");
  flinkConfiguration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
  flinkConfiguration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, storageDir);
  flinkConfiguration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
  flinkConfiguration.setInteger(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT, 1000);
  flinkConfiguration.setString(ConfigConstants.RESTART_STRATEGY, "fixed-delay");
  flinkConfiguration.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, Integer.MAX_VALUE);
  final int minMemory = 100;
  flinkConfiguration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, minMemory);
  return createYarnClusterDescriptor(flinkConfiguration);
}

代码示例来源:origin: apache/flink

private static Configuration getConfig() throws Exception {
  Configuration config = new Configuration();
  config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true);
  config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
  config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
  config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
  config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
  config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 2);
  config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 2);
  config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2);
  config.setString(
    QueryableStateOptions.PROXY_PORT_RANGE,
    QS_PROXY_PORT_RANGE_START + "-" + (QS_PROXY_PORT_RANGE_START + NUM_TMS));
  config.setString(
    QueryableStateOptions.SERVER_PORT_RANGE,
    QS_SERVER_PORT_RANGE_START + "-" + (QS_SERVER_PORT_RANGE_START + NUM_TMS));
  config.setBoolean(WebOptions.SUBMIT_ENABLE, false);
  config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().toString());
  config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
  config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
  return config;
}

代码示例来源:origin: apache/flink

@Test
public void testUnresolvableHostname1() throws UnknownHostException, ConfigurationException {
  Configuration config = new Configuration();
  config.setString(JobManagerOptions.ADDRESS, nonExistingHostname);
  config.setInteger(JobManagerOptions.PORT, 17234);
  StandaloneUtils.createLeaderRetrievalService(
    config,
    false,
    JobMaster.JOB_MANAGER_NAME);
}

代码示例来源:origin: apache/flink

private static Configuration getConfiguration() {
    Configuration config = new Configuration();
    config.setString(ConfigConstants.RESTART_STRATEGY, "failure-rate");
    config.setInteger(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL, 1);
    config.setString(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL, "1 second");
    config.setString(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_DELAY, "100 ms");

    return config;
  }
}

代码示例来源:origin: apache/flink

private RestClusterClient<StandaloneClusterId> createRestClusterClient(final int port) throws Exception {
  final Configuration clientConfig = new Configuration(restConfig);
  clientConfig.setInteger(RestOptions.PORT, port);
  return new RestClusterClient<>(
    clientConfig,
    createRestClient(),
    StandaloneClusterId.getInstance(),
    (attempt) -> 0,
    null);
}

代码示例来源:origin: apache/flink

private static Configuration getConfig() throws Exception {

    Configuration config = new Configuration();
    config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true);
    config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
    config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
    config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
    config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
    config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 2);
    config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 2);
    config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2);
    config.setString(
      QueryableStateOptions.PROXY_PORT_RANGE,
      QS_PROXY_PORT_RANGE_START + "-" + (QS_PROXY_PORT_RANGE_START + NUM_TMS));
    config.setString(
      QueryableStateOptions.SERVER_PORT_RANGE,
      QS_SERVER_PORT_RANGE_START + "-" + (QS_SERVER_PORT_RANGE_START + NUM_TMS));
    config.setBoolean(WebOptions.SUBMIT_ENABLE, false);

    config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().toString());

    config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
    config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");

    return config;
  }
}

代码示例来源:origin: apache/flink

@Before
public void setUp() throws Exception {
  final Configuration config = new Configuration();
  config.setLong(RestOptions.AWAIT_LEADER_TIMEOUT, 1);
  config.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 2);
  config.setLong(RestOptions.RETRY_DELAY, 3);
  restClusterClientConfiguration = RestClusterClientConfiguration.fromConfiguration(config);
}

代码示例来源:origin: apache/flink

/**
 * Tests the specifying heap memory with old config key for job manager and task manager.
 */
@Test
public void testHeapMemoryPropertyWithOldConfigKey() throws Exception {
  Configuration configuration = new Configuration();
  configuration.setInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB, 2048);
  configuration.setInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB, 4096);
  final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
    configuration,
    tmp.getRoot().getAbsolutePath(),
    "y",
    "yarn");
  final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[0], false);
  final ClusterSpecification clusterSpecification = flinkYarnSessionCli.getClusterSpecification(commandLine);
  assertThat(clusterSpecification.getMasterMemoryMB(), is(2048));
  assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(4096));
}

代码示例来源:origin: apache/flink

@Test
public void testUnresolvableHostname2() throws Exception {
  try {
    Configuration config = new Configuration();
    config.setString(JobManagerOptions.ADDRESS, nonExistingHostname);
    config.setInteger(JobManagerOptions.PORT, 17234);
    StandaloneUtils.createLeaderRetrievalService(
      config,
      true,
      JobMaster.JOB_MANAGER_NAME);
    fail("This should fail with an UnknownHostException");
  }
  catch (UnknownHostException e) {
    // that is what we want!
  }
}

代码示例来源:origin: apache/flink

private static Configuration getConfiguration() {
  verifyJvmOptions();
  Configuration config = new Configuration();
  config.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true);
  config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
  config.setString(TaskManagerOptions.MEMORY_SEGMENT_SIZE, "4096");
  config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 2048);
  return config;
}

相关文章