本文整理了Java中org.apache.flink.configuration.Configuration.setLong()
方法的一些代码示例,展示了Configuration.setLong()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Configuration.setLong()
方法的具体详情如下:
包路径:org.apache.flink.configuration.Configuration
类名称:Configuration
方法名:setLong
[英]Adds the given key/value pair to the configuration object.
[中]将给定的键/值对添加到配置对象。
代码示例来源:origin: apache/flink
@Override
public void setLong(String key, long value) {
this.backingConfig.setLong(this.prefix + key, value);
}
代码示例来源:origin: apache/flink
public void setBufferTimeout(long timeout) {
config.setLong(BUFFER_TIMEOUT, timeout);
}
代码示例来源:origin: apache/flink
public void setIterationWaitTime(long time) {
config.setLong(ITERATON_WAIT, time);
}
代码示例来源:origin: apache/flink
@Override
public void setLong(ConfigOption<Long> key, long value) {
this.backingConfig.setLong(prefix + key.key(), value);
}
代码示例来源:origin: apache/flink
public PythonPlanBinder(Configuration globalConfig) {
String configuredPlanTmpPath = globalConfig.getString(PythonOptions.PLAN_TMP_DIR);
tmpPlanFilesDir = configuredPlanTmpPath != null
? configuredPlanTmpPath
: System.getProperty("java.io.tmpdir") + File.separator + "flink_plan_" + UUID.randomUUID();
operatorConfig = new Configuration();
operatorConfig.setString(PythonOptions.PYTHON_BINARY_PATH, globalConfig.getString(PythonOptions.PYTHON_BINARY_PATH));
String configuredTmpDataDir = globalConfig.getString(PythonOptions.DATA_TMP_DIR);
if (configuredTmpDataDir != null) {
operatorConfig.setString(PythonOptions.DATA_TMP_DIR, configuredTmpDataDir);
}
operatorConfig.setLong(PythonOptions.MMAP_FILE_SIZE, globalConfig.getLong(PythonOptions.MMAP_FILE_SIZE));
}
代码示例来源:origin: apache/flink
@Before
public void setUp() throws Exception {
final Configuration config = new Configuration();
config.setLong(RestOptions.AWAIT_LEADER_TIMEOUT, 1);
config.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 2);
config.setLong(RestOptions.RETRY_DELAY, 3);
restClusterClientConfiguration = RestClusterClientConfiguration.fromConfiguration(config);
}
代码示例来源:origin: apache/flink
private static Configuration getConfiguration() {
Configuration config = new Configuration();
config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
config.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, HEARTBEAT_INTERVAL);
return config;
}
代码示例来源:origin: apache/flink
/**
* Write out the tuples in a temporary file and return it.
*/
@Before
public void writeTuples() throws IOException {
this.tempFile = File.createTempFile("BinaryInputFormat", null);
this.tempFile.deleteOnExit();
Configuration configuration = new Configuration();
configuration.setLong(BinaryOutputFormat.BLOCK_SIZE_PARAMETER_KEY, this.blockSize);
if (this.parallelism == 1) {
BinaryOutputFormat<T> output = createOutputFormat(this.tempFile.toURI().toString(),
configuration);
for (int index = 0; index < this.numberOfTuples; index++) {
output.writeRecord(this.getRecord(index));
}
output.close();
} else {
this.tempFile.delete();
this.tempFile.mkdir();
int recordIndex = 0;
for (int fileIndex = 0; fileIndex < this.parallelism; fileIndex++) {
BinaryOutputFormat<T> output = createOutputFormat(this.tempFile.toURI() + "/" +
(fileIndex + 1), configuration);
for (int fileCount = 0; fileCount < this.getNumberOfTuplesPerFile(fileIndex); fileCount++, recordIndex++) {
output.writeRecord(this.getRecord(recordIndex));
}
output.close();
}
}
}
代码示例来源:origin: apache/flink
config.setString(JobManagerOptions.ADDRESS, "localhost");
config.setInteger(RestOptions.PORT, 0);
config.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, 500L);
config.setLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 10000L);
config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());
代码示例来源:origin: apache/flink
taskInfo.addAllResources(portResources);
Iterator<String> portsToAssign = tmPortKeys.iterator();
rangeValues(portResources).forEach(port -> dynamicProperties.setLong(portsToAssign.next(), port));
if (portsToAssign.hasNext()) {
throw new IllegalArgumentException("insufficient # of ports assigned");
代码示例来源:origin: apache/flink
@Test
public void testCreateInputSplitsWithOneFile() throws IOException {
// create temporary file with 3 blocks
final File tempFile = File.createTempFile("binary_input_format_test", "tmp");
tempFile.deleteOnExit();
final int blockInfoSize = new BlockInfo().getInfoSize();
final int blockSize = blockInfoSize + 8;
final int numBlocks = 3;
FileOutputStream fileOutputStream = new FileOutputStream(tempFile);
for(int i = 0; i < blockSize * numBlocks; i++) {
fileOutputStream.write(new byte[]{1});
}
fileOutputStream.close();
final Configuration config = new Configuration();
config.setLong("input.block_size", blockSize + 10);
final BinaryInputFormat<Record> inputFormat = new MyBinaryInputFormat();
inputFormat.setFilePath(tempFile.toURI().toString());
inputFormat.setBlockSize(blockSize);
inputFormat.configure(config);
FileInputSplit[] inputSplits = inputFormat.createInputSplits(numBlocks);
Assert.assertEquals("Returns requested numbers of splits.", numBlocks, inputSplits.length);
Assert.assertEquals("1. split has block size length.", blockSize, inputSplits[0].getLength());
Assert.assertEquals("2. split has block size length.", blockSize, inputSplits[1].getLength());
Assert.assertEquals("3. split has block size length.", blockSize, inputSplits[2].getLength());
}
代码示例来源:origin: apache/flink
@Before
public void setUp() throws Exception {
final Configuration clientConfig = new Configuration();
clientConfig.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 0);
clientConfig.setLong(RestOptions.RETRY_DELAY, 0);
clientConfig.addAll(CLUSTER.getClientConfiguration());
client = new RestClusterClient<>(
clientConfig,
StandaloneClusterId.getInstance()
);
}
代码示例来源:origin: apache/flink
private void executeSchedulingTest(Configuration configuration) throws Exception {
configuration.setInteger(RestOptions.PORT, 0);
final long slotIdleTimeout = 50L;
configuration.setLong(JobManagerOptions.SLOT_IDLE_TIMEOUT, slotIdleTimeout);
final int parallelism = 4;
final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
.setConfiguration(configuration)
.setNumTaskManagers(parallelism)
.setNumSlotsPerTaskManager(1)
.build();
try (MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration)) {
miniCluster.start();
MiniClusterClient miniClusterClient = new MiniClusterClient(configuration, miniCluster);
JobGraph jobGraph = createJobGraph(slotIdleTimeout << 1, parallelism);
CompletableFuture<JobSubmissionResult> submissionFuture = miniClusterClient.submitJob(jobGraph);
// wait for the submission to succeed
JobSubmissionResult jobSubmissionResult = submissionFuture.get();
CompletableFuture<JobResult> resultFuture = miniClusterClient.requestJobResult(jobSubmissionResult.getJobID());
JobResult jobResult = resultFuture.get();
assertThat(jobResult.getSerializedThrowable().isPresent(), is(false));
}
}
代码示例来源:origin: apache/flink
/**
* Verifies that latency metrics can be enabled via the configuration.
*/
@Test
public void testLatencyMarkEmissionEnabledViaFlinkConfig() throws Exception {
testLatencyMarkEmission((int) (maxProcessingTime / latencyMarkInterval) + 1, (operator, timeProvider) -> {
Configuration tmConfig = new Configuration();
tmConfig.setLong(MetricOptions.LATENCY_INTERVAL, latencyMarkInterval);
Environment env = MockEnvironment.builder()
.setTaskManagerRuntimeInfo(new TestingTaskManagerRuntimeInfo(tmConfig))
.build();
setupSourceOperator(operator, new ExecutionConfig(), env, timeProvider);
});
}
代码示例来源:origin: apache/flink
/**
* Verifies that latency metrics can be enabled via the {@link ExecutionConfig} even if they are disabled via
* the configuration.
*/
@Test
public void testLatencyMarkEmissionEnabledOverrideViaExecutionConfig() throws Exception {
testLatencyMarkEmission((int) (maxProcessingTime / latencyMarkInterval) + 1, (operator, timeProvider) -> {
ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.setLatencyTrackingInterval(latencyMarkInterval);
Configuration tmConfig = new Configuration();
tmConfig.setLong(MetricOptions.LATENCY_INTERVAL, 0L);
Environment env = MockEnvironment.builder()
.setTaskManagerRuntimeInfo(new TestingTaskManagerRuntimeInfo(tmConfig))
.build();
setupSourceOperator(operator, executionConfig, env, timeProvider);
});
}
代码示例来源:origin: apache/flink
/**
* Verifies that latency metrics can be disabled via the {@link ExecutionConfig} even if they are enabled via
* the configuration.
*/
@Test
public void testLatencyMarkEmissionDisabledOverrideViaExecutionConfig() throws Exception {
testLatencyMarkEmission(0, (operator, timeProvider) -> {
Configuration tmConfig = new Configuration();
tmConfig.setLong(MetricOptions.LATENCY_INTERVAL, latencyMarkInterval);
Environment env = MockEnvironment.builder()
.setTaskManagerRuntimeInfo(new TestingTaskManagerRuntimeInfo(tmConfig))
.build();
ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.setLatencyTrackingInterval(0);
setupSourceOperator(operator, executionConfig, env, timeProvider);
});
}
代码示例来源:origin: apache/flink
orig.setString("mykey", "myvalue");
orig.setInteger("mynumber", 100);
orig.setLong("longvalue", 478236947162389746L);
orig.setFloat("PI", 3.1415926f);
orig.setDouble("E", Math.E);
代码示例来源:origin: apache/flink
pc.setLong("long", 15);
pc.setLong("too_long", TOO_LONG);
pc.setFloat("float", 2.1456775f);
pc.setDouble("double", Math.PI);
代码示例来源:origin: uber/AthenaX
static LocalFlinkMiniCluster execute(LocalStreamEnvironment env,
Configuration conf, String jobName) throws Exception {
StreamGraph streamGraph = env.getStreamGraph();
streamGraph.setJobName(jobName);
JobGraph jobGraph = streamGraph.getJobGraph();
Configuration configuration = new Configuration(conf);
configuration.addAll(jobGraph.getJobConfiguration());
configuration.setLong("taskmanager.memory.size", -1L);
configuration.setInteger("taskmanager.numberOfTaskSlots", jobGraph.getMaximumParallelism());
LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(configuration, true);
cluster.start();
cluster.submitJobDetached(jobGraph);
return cluster;
}
}
代码示例来源:origin: org.apache.flink/flink-runtime_2.10
public Configuration generateConfiguration() {
Configuration newConfiguration = new Configuration(config);
// set the memory
long memory = getOrCalculateManagedMemoryPerTaskManager();
newConfiguration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, memory);
return newConfiguration;
}
内容来源于网络,如有侵权,请联系作者删除!