org.apache.flink.configuration.Configuration.addAll()方法的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(8.9k)|赞(0)|评价(0)|浏览(218)

本文整理了Java中org.apache.flink.configuration.Configuration.addAll()方法的一些代码示例,展示了Configuration.addAll()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Configuration.addAll()方法的具体详情如下:
包路径:org.apache.flink.configuration.Configuration
类名称:Configuration
方法名:addAll

Configuration.addAll介绍

[英]Adds all entries from the given configuration into this configuration. The keys are prepended with the given prefix.
[中]将给定配置中的所有条目添加到此配置中。这些键的前缀是给定的。

代码示例

代码示例来源:origin: apache/flink

@Override
public void addAll(Configuration other, String prefix) {
  this.backingConfig.addAll(other, this.prefix + prefix);
}

代码示例来源:origin: apache/flink

public void appendConfiguration(Configuration config) throws IOException {
  final Configuration mergedConfig = new Configuration();
  mergedConfig.addAll(defaultConfig);
  mergedConfig.addAll(config);
  final List<String> configurationLines = mergedConfig.toMap().entrySet().stream()
    .map(entry -> entry.getKey() + ": " + entry.getValue())
    .collect(Collectors.toList());
  Files.write(conf.resolve("flink-conf.yaml"), configurationLines);
}

代码示例来源:origin: apache/flink

/**
 * Getter which returns a copy of the associated configuration.
 *
 * @return Copy of the associated configuration
 */
public Configuration getConfiguration() {
  Configuration copiedConfiguration = new Configuration();
  copiedConfiguration.addAll(configuration);
  return copiedConfiguration;
}

代码示例来源:origin: apache/flink

/**
 * Specify a custom {@code Configuration} that will be used when creating
 * the {@link FileSystem} for writing.
 */
public RollingSink<T> setFSConfig(Configuration config) {
  this.fsConfig = new Configuration();
  fsConfig.addAll(config);
  return this;
}

代码示例来源:origin: apache/flink

@Override
public Configuration clone() {
  Configuration config = new Configuration();
  config.addAll(this);
  return config;
}

代码示例来源:origin: apache/flink

/**
 * Specify a custom {@code Configuration} that will be used when creating
 * the {@link FileSystem} for writing.
 */
public BucketingSink<T> setFSConfig(Configuration config) {
  this.fsConfig = new Configuration();
  fsConfig.addAll(config);
  return this;
}

代码示例来源:origin: apache/flink

configuration.addAll(dynamicProperties);

代码示例来源:origin: apache/flink

public static ContainerSpecification createContainerSpec(Configuration configuration, Configuration dynamicProperties)
  throws Exception {
  // generate a container spec which conveys the artifacts/vars needed to launch a TM
  ContainerSpecification spec = new ContainerSpecification();
  // propagate the AM dynamic configuration to the TM
  spec.getDynamicConfiguration().addAll(dynamicProperties);
  applyOverlays(configuration, spec);
  return spec;
}

代码示例来源:origin: apache/flink

protected GenericDataSourceBase<OUT, ?> translateToDataFlow() {
  String name = this.name != null ? this.name : "at " + dataSourceLocationName + " (" + inputFormat.getClass().getName() + ")";
  if (name.length() > 150) {
    name = name.substring(0, 150);
  }
  @SuppressWarnings({"unchecked", "rawtypes"})
  GenericDataSourceBase<OUT, ?> source = new GenericDataSourceBase(this.inputFormat,
    new OperatorInformation<OUT>(getType()), name);
  source.setParallelism(parallelism);
  if (this.parameters != null) {
    source.getParameters().addAll(this.parameters);
  }
  if (this.splitDataProperties != null) {
    source.setSplitDataProperties(this.splitDataProperties);
  }
  return source;
}

代码示例来源:origin: apache/flink

public void setHeadTask(JobVertex headTask, TaskConfig headConfig) {
  this.headTask = headTask;
  this.headFinalResultConfig = new TaskConfig(new Configuration());
  
  // check if we already had a configuration, for example if the solution set was 
  if (this.headConfig != null) {
    headConfig.getConfiguration().addAll(this.headConfig.getConfiguration());
  }
  
  this.headConfig = headConfig;
}

代码示例来源:origin: apache/flink

private Configuration createConfiguration() {
  Configuration newConfiguration = new Configuration();
  newConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, getTaskManagerNumSlots());
  newConfiguration.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, isDefaultOverwriteFiles());
  newConfiguration.addAll(baseConfiguration);
  return newConfiguration;
}

代码示例来源:origin: apache/flink

private <I, O> org.apache.flink.api.common.operators.Operator<O> translateSingleInputOperator(SingleInputOperator<?, ?, ?> op) {
  @SuppressWarnings("unchecked")
  SingleInputOperator<I, O, ?> typedOp = (SingleInputOperator<I, O, ?>) op;
  @SuppressWarnings("unchecked")
  DataSet<I> typedInput = (DataSet<I>) op.getInput();
  Operator<I> input = translate(typedInput);
  org.apache.flink.api.common.operators.Operator<O> dataFlowOp = typedOp.translateToDataFlow(input);
  if (op instanceof UdfOperator<?>) {
    @SuppressWarnings("unchecked")
    SingleInputUdfOperator<I, O, ?> udfOp = (SingleInputUdfOperator<I, O, ?>) op;
    // set configuration parameters
    Configuration opParams = udfOp.getParameters();
    if (opParams != null) {
      dataFlowOp.getParameters().addAll(opParams);
    }
    if (dataFlowOp instanceof org.apache.flink.api.common.operators.SingleInputOperator) {
      org.apache.flink.api.common.operators.SingleInputOperator<?, O, ?> unaryOp =
          (org.apache.flink.api.common.operators.SingleInputOperator<?, O, ?>) dataFlowOp;
      // set the semantic properties
      unaryOp.setSemanticProperties(udfOp.getSemanticProperties());
    }
  }
  return dataFlowOp;
}

代码示例来源:origin: apache/flink

configuration.addAll(jobGraph.getJobConfiguration());
configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");
configuration.addAll(this.configuration);

代码示例来源:origin: apache/flink

private <I1, I2, O> org.apache.flink.api.common.operators.Operator<O> translateTwoInputOperator(TwoInputOperator<?, ?, ?, ?> op) {
  @SuppressWarnings("unchecked")
  TwoInputOperator<I1, I2, O, ?> typedOp = (TwoInputOperator<I1, I2, O, ?>) op;
  @SuppressWarnings("unchecked")
  DataSet<I1> typedInput1 = (DataSet<I1>) op.getInput1();
  @SuppressWarnings("unchecked")
  DataSet<I2> typedInput2 = (DataSet<I2>) op.getInput2();
  Operator<I1> input1 = translate(typedInput1);
  Operator<I2> input2 = translate(typedInput2);
  org.apache.flink.api.common.operators.Operator<O> dataFlowOp = typedOp.translateToDataFlow(input1, input2);
  if (op instanceof UdfOperator<?>) {
    @SuppressWarnings("unchecked")
    TwoInputUdfOperator<I1, I2, O, ?> udfOp = (TwoInputUdfOperator<I1, I2, O, ?>) op;
    // set configuration parameters
    Configuration opParams = udfOp.getParameters();
    if (opParams != null) {
      dataFlowOp.getParameters().addAll(opParams);
    }
    if (dataFlowOp instanceof org.apache.flink.api.common.operators.DualInputOperator) {
      org.apache.flink.api.common.operators.DualInputOperator<?, ?,  O, ?> binaryOp =
          (org.apache.flink.api.common.operators.DualInputOperator<?, ?, O, ?>) dataFlowOp;
      // set the semantic properties
      binaryOp.setSemanticProperties(udfOp.getSemanticProperties());
    }
  }
  return dataFlowOp;
}

代码示例来源:origin: apache/flink

dynamicProperties.addAll(containerSpec.getDynamicConfiguration());

代码示例来源:origin: apache/flink

configuration.addAll(clientConfiguration);

代码示例来源:origin: apache/flink

protected GenericDataSinkBase<T> translateToDataFlow(Operator<T> input) {
  // select the name (or create a default one)
  String name = this.name != null ? this.name : this.format.toString();
  GenericDataSinkBase<T> sink = new GenericDataSinkBase<>(this.format, new UnaryOperatorInformation<>(this.type, new NothingTypeInfo()), name);
  // set input
  sink.setInput(input);
  // set parameters
  if (this.parameters != null) {
    sink.getParameters().addAll(this.parameters);
  }
  // set parallelism
  if (this.parallelism > 0) {
    // use specified parallelism
    sink.setParallelism(this.parallelism);
  } else {
    // if no parallelism has been specified, use parallelism of input operator to enable chaining
    sink.setParallelism(input.getParallelism());
  }
  if (this.sortKeyPositions != null) {
    // configure output sorting
    Ordering ordering = new Ordering();
    for (int i = 0; i < this.sortKeyPositions.length; i++) {
      ordering.appendOrdering(this.sortKeyPositions[i], null, this.sortOrders[i]);
    }
    sink.setLocalOrder(ordering);
  }
  return sink;
}

代码示例来源:origin: apache/flink

@Before
public void setUp() throws Exception {
  final Configuration clientConfig = new Configuration();
  clientConfig.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 0);
  clientConfig.setLong(RestOptions.RETRY_DELAY, 0);
  clientConfig.addAll(CLUSTER.getClientConfiguration());
  client = new RestClusterClient<>(
    clientConfig,
    StandaloneClusterId.getInstance()
  );
}

代码示例来源:origin: apache/flink

config.addAll(jobGraph.getJobConfiguration());
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");

代码示例来源:origin: uber/AthenaX

static LocalFlinkMiniCluster execute(LocalStreamEnvironment env,
                    Configuration conf, String jobName) throws Exception {
  StreamGraph streamGraph = env.getStreamGraph();
  streamGraph.setJobName(jobName);
  JobGraph jobGraph = streamGraph.getJobGraph();
  Configuration configuration = new Configuration(conf);
  configuration.addAll(jobGraph.getJobConfiguration());
  configuration.setLong("taskmanager.memory.size", -1L);
  configuration.setInteger("taskmanager.numberOfTaskSlots", jobGraph.getMaximumParallelism());

  LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(configuration, true);
  cluster.start();
  cluster.submitJobDetached(jobGraph);
  return cluster;
 }
}

相关文章