本文整理了Java中org.apache.flink.configuration.Configuration.setInteger()
方法的一些代码示例,展示了Configuration.setInteger()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Configuration.setInteger()
方法的具体详情如下:
包路径:org.apache.flink.configuration.Configuration
类名称:Configuration
方法名:setInteger
[英]Adds the given key/value pair to the configuration object.
[中]将给定的键/值对添加到配置对象。
代码示例来源:origin: apache/flink
private Configuration applyYarnProperties(Configuration configuration) throws FlinkException {
final Configuration effectiveConfiguration = new Configuration(configuration);
// configure the default parallelism from YARN
String propParallelism = yarnPropertiesFile.getProperty(YARN_PROPERTIES_PARALLELISM);
if (propParallelism != null) { // maybe the property is not set
try {
int parallelism = Integer.parseInt(propParallelism);
effectiveConfiguration.setInteger(CoreOptions.DEFAULT_PARALLELISM, parallelism);
logAndSysout("YARN properties set default parallelism to " + parallelism);
}
catch (NumberFormatException e) {
throw new FlinkException("Error while parsing the YARN properties: " +
"Property " + YARN_PROPERTIES_PARALLELISM + " is not an integer.", e);
}
}
// handle the YARN client's dynamic properties
String dynamicPropertiesEncoded = yarnPropertiesFile.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING);
Map<String, String> dynamicProperties = getDynamicProperties(dynamicPropertiesEncoded);
for (Map.Entry<String, String> dynamicProperty : dynamicProperties.entrySet()) {
effectiveConfiguration.setString(dynamicProperty.getKey(), dynamicProperty.getValue());
}
return effectiveConfiguration;
}
代码示例来源:origin: apache/flink
private Configuration createConfiguration() {
Configuration newConfiguration = new Configuration();
newConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, getTaskManagerNumSlots());
newConfiguration.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, isDefaultOverwriteFiles());
newConfiguration.addAll(baseConfiguration);
return newConfiguration;
}
代码示例来源:origin: apache/flink
private static Configuration getConfiguration() {
Configuration config = new Configuration();
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "80m");
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 800);
return config;
}
代码示例来源:origin: apache/flink
private static Configuration getConfig() {
Configuration config = new Configuration();
config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true);
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1);
config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 1);
config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 1);
config.setString(
QueryableStateOptions.PROXY_PORT_RANGE,
QS_PROXY_PORT_RANGE_START + "-" + (QS_PROXY_PORT_RANGE_START + NUM_TMS));
config.setString(
QueryableStateOptions.SERVER_PORT_RANGE,
QS_SERVER_PORT_RANGE_START + "-" + (QS_SERVER_PORT_RANGE_START + NUM_TMS));
config.setBoolean(WebOptions.SUBMIT_ENABLE, false);
return config;
}
代码示例来源:origin: apache/flink
private static Configuration getConfig() {
Configuration config = new Configuration();
config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true);
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1);
config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 1);
config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 1);
config.setString(
QueryableStateOptions.PROXY_PORT_RANGE,
QS_PROXY_PORT_RANGE_START + "-" + (QS_PROXY_PORT_RANGE_START + NUM_PORT_COUNT));
config.setString(
QueryableStateOptions.SERVER_PORT_RANGE,
QS_SERVER_PORT_RANGE_START + "-" + (QS_SERVER_PORT_RANGE_START + NUM_PORT_COUNT));
config.setBoolean(WebOptions.SUBMIT_ENABLE, false);
return config;
}
}
代码示例来源:origin: apache/flink
private static Configuration getConfiguration() {
Configuration config = new Configuration();
config.setString(ConfigConstants.RESTART_STRATEGY, "fixed-delay");
config.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
config.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "100 ms");
return config;
}
}
代码示例来源:origin: apache/flink
private Configuration getConfiguration() throws Exception {
// Flink configuration
final Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, DEFAULT_PARALLELISM);
UUID id = UUID.randomUUID();
final File checkpointDir = TEMP_FOLDER.newFolder("checkpoints_" + id).getAbsoluteFile();
final File savepointDir = TEMP_FOLDER.newFolder("savepoints_" + id).getAbsoluteFile();
if (!checkpointDir.exists() || !savepointDir.exists()) {
throw new Exception("Test setup failed: failed to create (temporary) directories.");
}
LOG.info("Created temporary checkpoint directory: " + checkpointDir + ".");
LOG.info("Created savepoint directory: " + savepointDir + ".");
config.setString(CheckpointingOptions.STATE_BACKEND, "memory");
config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 0);
config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
return config;
}
代码示例来源:origin: apache/flink
@Before
public void setup() {
testingFatalErrorHandler = new TestingFatalErrorHandler();
flinkConfig = new Configuration();
flinkConfig.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 100);
File root = folder.getRoot();
File home = new File(root, "home");
boolean created = home.mkdir();
assertTrue(created);
env = new HashMap<>();
env.put(ENV_APP_ID, "foo");
env.put(ENV_CLIENT_HOME_DIR, home.getAbsolutePath());
env.put(ENV_CLIENT_SHIP_FILES, "");
env.put(ENV_FLINK_CLASSPATH, "");
env.put(ENV_HADOOP_USER_NAME, "foo");
env.put(FLINK_JAR_PATH, root.toURI().toString());
}
代码示例来源:origin: apache/flink
private Configuration getFileBasedCheckpointsConfig(final String savepointDir) {
final Configuration config = new Configuration();
config.setString(CheckpointingOptions.STATE_BACKEND, "filesystem");
config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 0);
config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
return config;
}
代码示例来源:origin: apache/flink
private void createBroadcastVariable(PythonOperationInfo info) {
UdfOperator<?> op1 = (UdfOperator<?>) sets.getDataSet(info.parentID);
DataSet<?> op2 = sets.getDataSet(info.otherID);
op1.withBroadcastSet(op2, info.name);
Configuration c = op1.getParameters();
if (c == null) {
c = new Configuration();
}
int count = c.getInteger(PLANBINDER_CONFIG_BCVAR_COUNT, 0);
c.setInteger(PLANBINDER_CONFIG_BCVAR_COUNT, count + 1);
c.setString(PLANBINDER_CONFIG_BCVAR_NAME_PREFIX + count, info.name);
op1.withParameters(c);
}
代码示例来源:origin: apache/flink
@Nonnull
private YarnClusterDescriptor setupYarnClusterDescriptor() {
final Configuration flinkConfiguration = new Configuration();
flinkConfiguration.setString(YarnConfigOptions.APPLICATION_ATTEMPTS, "10");
flinkConfiguration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
flinkConfiguration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, storageDir);
flinkConfiguration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
flinkConfiguration.setInteger(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT, 1000);
flinkConfiguration.setString(ConfigConstants.RESTART_STRATEGY, "fixed-delay");
flinkConfiguration.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, Integer.MAX_VALUE);
final int minMemory = 100;
flinkConfiguration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, minMemory);
return createYarnClusterDescriptor(flinkConfiguration);
}
代码示例来源:origin: apache/flink
private static Configuration getConfig() throws Exception {
Configuration config = new Configuration();
config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true);
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 2);
config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 2);
config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2);
config.setString(
QueryableStateOptions.PROXY_PORT_RANGE,
QS_PROXY_PORT_RANGE_START + "-" + (QS_PROXY_PORT_RANGE_START + NUM_TMS));
config.setString(
QueryableStateOptions.SERVER_PORT_RANGE,
QS_SERVER_PORT_RANGE_START + "-" + (QS_SERVER_PORT_RANGE_START + NUM_TMS));
config.setBoolean(WebOptions.SUBMIT_ENABLE, false);
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().toString());
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
return config;
}
代码示例来源:origin: apache/flink
@Test
public void testUnresolvableHostname1() throws UnknownHostException, ConfigurationException {
Configuration config = new Configuration();
config.setString(JobManagerOptions.ADDRESS, nonExistingHostname);
config.setInteger(JobManagerOptions.PORT, 17234);
StandaloneUtils.createLeaderRetrievalService(
config,
false,
JobMaster.JOB_MANAGER_NAME);
}
代码示例来源:origin: apache/flink
private static Configuration getConfiguration() {
Configuration config = new Configuration();
config.setString(ConfigConstants.RESTART_STRATEGY, "failure-rate");
config.setInteger(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL, 1);
config.setString(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL, "1 second");
config.setString(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_DELAY, "100 ms");
return config;
}
}
代码示例来源:origin: apache/flink
private RestClusterClient<StandaloneClusterId> createRestClusterClient(final int port) throws Exception {
final Configuration clientConfig = new Configuration(restConfig);
clientConfig.setInteger(RestOptions.PORT, port);
return new RestClusterClient<>(
clientConfig,
createRestClient(),
StandaloneClusterId.getInstance(),
(attempt) -> 0,
null);
}
代码示例来源:origin: apache/flink
private static Configuration getConfig() throws Exception {
Configuration config = new Configuration();
config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true);
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 2);
config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 2);
config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2);
config.setString(
QueryableStateOptions.PROXY_PORT_RANGE,
QS_PROXY_PORT_RANGE_START + "-" + (QS_PROXY_PORT_RANGE_START + NUM_TMS));
config.setString(
QueryableStateOptions.SERVER_PORT_RANGE,
QS_SERVER_PORT_RANGE_START + "-" + (QS_SERVER_PORT_RANGE_START + NUM_TMS));
config.setBoolean(WebOptions.SUBMIT_ENABLE, false);
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().toString());
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
return config;
}
}
代码示例来源:origin: apache/flink
@Before
public void setUp() throws Exception {
final Configuration config = new Configuration();
config.setLong(RestOptions.AWAIT_LEADER_TIMEOUT, 1);
config.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 2);
config.setLong(RestOptions.RETRY_DELAY, 3);
restClusterClientConfiguration = RestClusterClientConfiguration.fromConfiguration(config);
}
代码示例来源:origin: apache/flink
/**
* Tests the specifying heap memory with old config key for job manager and task manager.
*/
@Test
public void testHeapMemoryPropertyWithOldConfigKey() throws Exception {
Configuration configuration = new Configuration();
configuration.setInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB, 2048);
configuration.setInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB, 4096);
final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
configuration,
tmp.getRoot().getAbsolutePath(),
"y",
"yarn");
final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[0], false);
final ClusterSpecification clusterSpecification = flinkYarnSessionCli.getClusterSpecification(commandLine);
assertThat(clusterSpecification.getMasterMemoryMB(), is(2048));
assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(4096));
}
代码示例来源:origin: apache/flink
@Test
public void testUnresolvableHostname2() throws Exception {
try {
Configuration config = new Configuration();
config.setString(JobManagerOptions.ADDRESS, nonExistingHostname);
config.setInteger(JobManagerOptions.PORT, 17234);
StandaloneUtils.createLeaderRetrievalService(
config,
true,
JobMaster.JOB_MANAGER_NAME);
fail("This should fail with an UnknownHostException");
}
catch (UnknownHostException e) {
// that is what we want!
}
}
代码示例来源:origin: apache/flink
private static Configuration getConfiguration() {
verifyJvmOptions();
Configuration config = new Configuration();
config.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true);
config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
config.setString(TaskManagerOptions.MEMORY_SEGMENT_SIZE, "4096");
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 2048);
return config;
}
内容来源于网络,如有侵权,请联系作者删除!