org.apache.flume.Context.getSubProperties()方法的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(15.4k)|赞(0)|评价(0)|浏览(261)

本文整理了Java中org.apache.flume.Context.getSubProperties()方法的一些代码示例,展示了Context.getSubProperties()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Context.getSubProperties()方法的具体详情如下:
包路径:org.apache.flume.Context
类名称:Context
方法名:getSubProperties

Context.getSubProperties介绍

[英]Get properties which start with a prefix. When a property is returned, the prefix is removed the from name. For example, if this method is called with a parameter "hdfs." and the context contains: { hdfs.key = value, otherKey = otherValue } this method will return a map containing: { key = value}Note: The prefix must end with a period character. If not this method will raise an IllegalArgumentException.
[中]获取以前缀开头的属性。返回属性时,前缀将从名称中删除。例如,如果使用参数“hdfs”调用此方法并且上下文包含:{ hdfs.key = value, otherKey = otherValue }此方法将返回一个包含:{ key = value}的映射注意:前缀必须以句点字符结尾。否则,此方法将引发IllegalArgumentException。

代码示例

代码示例来源:origin: apache/flume

private Table<String, String, String> getTable(Context context, String prefix) {
 Table<String, String, String> table = HashBasedTable.create();
 for (Entry<String, String> e : context.getSubProperties(prefix).entrySet()) {
  String[] parts = e.getKey().split("\\.", 2);
  table.put(parts[0], parts[1], e.getValue());
 }
 return table;
}

代码示例来源:origin: apache/flume

private void setProducerProps(Context ctx, String bootStrapServers) {
 producerProps.clear();
 producerProps.put(ProducerConfig.ACKS_CONFIG, DEFAULT_ACKS);
 producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, DEFAULT_KEY_SERIALIZER);
 producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DEFAULT_VALUE_SERIAIZER);
 // Defaults overridden based on config
 producerProps.putAll(ctx.getSubProperties(KAFKA_PRODUCER_PREFIX));
 producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
 KafkaSSLUtil.addGlobalSSLParameters(producerProps);
}

代码示例来源:origin: apache/flume

private void setProducerProps(Context context, String bootStrapServers) {
 kafkaProps.clear();
 kafkaProps.put(ProducerConfig.ACKS_CONFIG, DEFAULT_ACKS);
 //Defaults overridden based on config
 kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, DEFAULT_KEY_SERIALIZER);
 kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DEFAULT_VALUE_SERIAIZER);
 kafkaProps.putAll(context.getSubProperties(KAFKA_PRODUCER_PREFIX));
 kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
 KafkaSSLUtil.addGlobalSSLParameters(kafkaProps);
}

代码示例来源:origin: apache/flume

private void setConsumerProps(Context ctx, String bootStrapServers) {
 consumerProps.clear();
 consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, DEFAULT_KEY_DESERIALIZER);
 consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DEFAULT_VALUE_DESERIAIZER);
 consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, DEFAULT_AUTO_OFFSET_RESET);
 // Defaults overridden based on config
 consumerProps.putAll(ctx.getSubProperties(KAFKA_CONSUMER_PREFIX));
 // These always take precedence over config
 consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
 consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
 consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
 KafkaSSLUtil.addGlobalSSLParameters(consumerProps);
}

代码示例来源:origin: apache/flume

private void setConsumerProps(Context ctx) {
 kafkaProps.clear();
 kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
         KafkaSourceConstants.DEFAULT_KEY_DESERIALIZER);
 kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
         KafkaSourceConstants.DEFAULT_VALUE_DESERIALIZER);
 //Defaults overridden based on config
 kafkaProps.putAll(ctx.getSubProperties(KafkaSourceConstants.KAFKA_CONSUMER_PREFIX));
 //These always take precedence over config
 kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
 if (groupId != null) {
  kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
 }
 kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
         KafkaSourceConstants.DEFAULT_AUTO_COMMIT);
 KafkaSSLUtil.addGlobalSSLParameters(kafkaProps);
}

代码示例来源:origin: apache/flume

private void initializeSystemProperties(Context context) {
 Map<String, String> sysProps = new HashMap<String, String>();
 Map<String, String> sysPropsOld = context.getSubProperties(
   ConfigurationConstants.OLD_CONFIG_JDBC_SYSPROP_PREFIX);
 if (sysPropsOld.size() > 0) {
  LOGGER.warn("Long form configuration prefix \""
    + ConfigurationConstants.OLD_CONFIG_JDBC_SYSPROP_PREFIX
    + "\" is deprecated. Please use the short form prefix \""
    + ConfigurationConstants.CONFIG_JDBC_SYSPROP_PREFIX
    + "\" instead.");
  sysProps.putAll(sysPropsOld);
 }
 Map<String, String> sysPropsNew = context.getSubProperties(
   ConfigurationConstants.CONFIG_JDBC_SYSPROP_PREFIX);
 // Override the deprecated values with the non-deprecated
 if (sysPropsNew.size() > 0) {
  sysProps.putAll(sysPropsNew);
 }
 for (String key: sysProps.keySet()) {
  String value = sysProps.get(key);
  if (key != null && value != null) {
   System.setProperty(key, value);
  }
 }
}

代码示例来源:origin: apache/flume

/**
 * Utility method that will set properties on a Java bean (<code>Object configurable</code>)
 * based on the provided <code>Context</code>.
 * N.B. This method will take the Flume Context and look for sub-properties named after the
 * class name of the <code>configurable</code> object.
 * If there is a type issue, or an access problem
 * then a <code>ConfigurationException</code> will be thrown.
 *
 * @param configurable Any properties must be modifiable via setter methods.
 * @param context
 * @throws ConfigurationException
 */
public static void setConfigurationFields(Object configurable, Context context)
           throws ConfigurationException {
 Class<?> clazz = configurable.getClass();
 Map<String, String> properties = context.getSubProperties(clazz.getSimpleName() + ".");
 setConfigurationFields(configurable, properties);
}

代码示例来源:origin: apache/flume

/**
  * Utility method that will set properties on a Java bean (<code>Object configurable</code>)
  * based on the provided <code>Context</code>.
  * N.B. This method will take the Flume Context and look for sub-properties named after the
  * <code>subPropertiesPrefix</code> String.
  * If there is a type issue, or an access problem
  * then a <code>ConfigurationException</code> will be thrown.
  *
  * @param configurable Object: Any properties must be modifiable via setter methods.
  * @param context org.apache.flume.Context;
  * @param subPropertiesPrefix String
  * @throws ConfigurationException
  */
 public static void setConfigurationFields(Object configurable, Context context,
   String subPropertiesPrefix) throws ConfigurationException {
  Map<String, String> properties = context.getSubProperties(subPropertiesPrefix);
  setConfigurationFields(configurable, properties);
 }
}

代码示例来源:origin: apache/flume

/**
 * Reads a set of override values from the context configuration and stores
 * the results in the Map provided.
 *
 * @param propertyName  the prefix of the config property names
 * @param context       the context to use to read config properties
 * @param override      the override Map to store results in
 */
private void parseConfigOverrides(final String propertyName,
                 final Context context,
                 final Map<String, Boolean> override) {
 Map<String, String> config = context.getSubProperties(
   propertyName + ".");
 if (config != null) {
  for (Map.Entry<String, String> value : config.entrySet()) {
   LOG.info(String.format("Read %s value for status code %s as %s",
     propertyName, value.getKey(), value.getValue()));
   if (override.containsKey(value.getKey())) {
    LOG.warn(String.format("Ignoring duplicate config value for %s.%s",
      propertyName, value.getKey()));
   } else {
    override.put(value.getKey(), Boolean.valueOf(value.getValue()));
   }
  }
 }
}

代码示例来源:origin: apache/nifi

protected static Context getFlumeContext(String flumeConfig, String prefix) {
  Properties flumeProperties = new Properties();
  if (flumeConfig != null) {
    try {
      flumeProperties.load(new StringReader(flumeConfig));
    } catch (IOException ex) {
      throw new RuntimeException(ex);
    }
  }
  Map<String, String> parameters = Maps.newHashMap();
  for (String property : flumeProperties.stringPropertyNames()) {
    parameters.put(property, flumeProperties.getProperty(property));
  }
  return new Context(new Context(parameters).getSubProperties(prefix));
}

代码示例来源:origin: apache/flume

private void configureSerializers(Context context) {
 String serializerListStr = context.getString(SERIALIZERS);
 Preconditions.checkArgument(!StringUtils.isEmpty(serializerListStr),
   "Must supply at least one name and serializer");
 String[] serializerNames = serializerListStr.split("\\s+");
 Context serializerContexts =
   new Context(context.getSubProperties(SERIALIZERS + "."));
 serializerList = Lists.newArrayListWithCapacity(serializerNames.length);
 for (String serializerName : serializerNames) {
  Context serializerContext = new Context(
    serializerContexts.getSubProperties(serializerName + "."));
  String type = serializerContext.getString("type", "DEFAULT");
  String name = serializerContext.getString("name");
  Preconditions.checkArgument(!StringUtils.isEmpty(name),
    "Supplied name cannot be empty.");
  if ("DEFAULT".equals(type)) {
   serializerList.add(new NameAndSerializer(name, defaultSerializer));
  } else {
   serializerList.add(new NameAndSerializer(name, getCustomSerializer(
     type, serializerContext)));
  }
 }
}

代码示例来源:origin: apache/flume

@Override
public void configure(Context context) {
 super.configure(context);
 serializerType = context.getString("serializer", "TEXT");
 useRawLocalFileSystem = context.getBoolean("hdfs.useRawLocalFileSystem",
   false);
 serializerContext =
   new Context(context.getSubProperties(EventSerializer.CTX_PREFIX));
 logger.info("Serializer = " + serializerType + ", UseRawLocalFileSystem = "
   + useRawLocalFileSystem);
}

代码示例来源:origin: apache/flume

@Override
public void configure(Context context) {
 super.configure(context);
 serializerType = context.getString("serializer", "TEXT");
 useRawLocalFileSystem = context.getBoolean("hdfs.useRawLocalFileSystem",
   false);
 serializerContext = new Context(
   context.getSubProperties(EventSerializer.CTX_PREFIX));
 logger.info("Serializer = " + serializerType + ", UseRawLocalFileSystem = "
   + useRawLocalFileSystem);
}

代码示例来源:origin: apache/flume

@Override
public void configure(Context context) {
 String pathManagerType = context.getString("sink.pathManager", "DEFAULT");
 String directory = context.getString("sink.directory");
 String rollInterval = context.getString("sink.rollInterval");
 serializerType = context.getString("sink.serializer", "TEXT");
 serializerContext =
   new Context(context.getSubProperties("sink." +
     EventSerializer.CTX_PREFIX));
 Context pathManagerContext =
      new Context(context.getSubProperties("sink." +
          PathManager.CTX_PREFIX));
 pathController = PathManagerFactory.getInstance(pathManagerType, pathManagerContext);
 Preconditions.checkArgument(directory != null, "Directory may not be null");
 Preconditions.checkNotNull(serializerType, "Serializer type is undefined");
 if (rollInterval == null) {
  this.rollInterval = defaultRollInterval;
 } else {
  this.rollInterval = Long.parseLong(rollInterval);
 }
 batchSize = context.getInteger("sink.batchSize", defaultBatchSize);
 this.directory = new File(directory);
 if (sinkCounter == null) {
  sinkCounter = new SinkCounter(getName());
 }
}

代码示例来源:origin: apache/flume

@Override
public void configure(Context context) {
 Preconditions.checkState(getSinks().size() > 1,
   "The LoadBalancingSinkProcessor cannot be used for a single sink. "
   + "Please configure more than one sinks and try again.");
 String selectorTypeName = context.getString(CONFIG_SELECTOR,
   SELECTOR_NAME_ROUND_ROBIN);
 Boolean shouldBackOff = context.getBoolean(CONFIG_BACKOFF, false);
 selector = null;
 if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_ROUND_ROBIN)) {
  selector = new RoundRobinSinkSelector(shouldBackOff);
 } else if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_RANDOM)) {
  selector = new RandomOrderSinkSelector(shouldBackOff);
 } else {
  try {
   @SuppressWarnings("unchecked")
   Class<? extends SinkSelector> klass = (Class<? extends SinkSelector>)
     Class.forName(selectorTypeName);
   selector = klass.newInstance();
  } catch (Exception ex) {
   throw new FlumeException("Unable to instantiate sink selector: "
     + selectorTypeName, ex);
  }
 }
 selector.setSinks(getSinks());
 selector.configure(
   new Context(context.getSubProperties(CONFIG_SELECTOR_PREFIX)));
 LOGGER.debug("Sink selector: " + selector + " initialized");
}

代码示例来源:origin: apache/flume

@Override
public void configure(Context context) {
 super.configure(context);
 // use binary writable serialize by default
 writeFormat = context.getString("hdfs.writeFormat",
  SequenceFileSerializerType.Writable.name());
 useRawLocalFileSystem = context.getBoolean("hdfs.useRawLocalFileSystem",
   false);
 serializerContext = new Context(
     context.getSubProperties(SequenceFileSerializerFactory.CTX_PREFIX));
 serializer = SequenceFileSerializerFactory
     .getSerializer(writeFormat, serializerContext);
 logger.info("writeFormat = " + writeFormat + ", UseRawLocalFileSystem = "
   + useRawLocalFileSystem);
}

代码示例来源:origin: apache/flume

context.getSubProperties(MORPHLINE_VARIABLE_PARAM + "."));
morphline = new Compiler().compile(
  new File(morphlineFile), morphlineId, morphlineContext, finalChild, override);

代码示例来源:origin: apache/flume

@Override
public void configure(Context context) throws ConfigurationException {
 super.configure(context);
 sinks = Arrays.asList(context.getString(
   BasicConfigurationConstants.CONFIG_SINKS).split("\\s+"));
 Map<String, String> params = context.getSubProperties(
   BasicConfigurationConstants.CONFIG_SINK_PROCESSOR_PREFIX);
 processorContext = new Context();
 processorContext.putAll(params);
 SinkProcessorType spType = getKnownSinkProcessor(processorContext.getString(
     BasicConfigurationConstants.CONFIG_TYPE));
 if (spType != null) {
  processorConf =
    (SinkProcessorConfiguration) ComponentConfigurationFactory.create(
      this.getComponentName() + "-processor",
      spType.toString(),
      ComponentType.SINK_PROCESSOR);
  if (processorConf != null) {
   processorConf.setSinks(new HashSet<String>(sinks));
   processorConf.configure(processorContext);
  }
 }
 setConfigured();
}

代码示例来源:origin: apache/flume

@Override
public void configure(Context context) {
 Configurables.ensureRequiredNonNull(
   context, SyslogSourceConfigurationConstants.CONFIG_PORT);
 port = context.getInteger(SyslogSourceConfigurationConstants.CONFIG_PORT);
 host = context.getString(SyslogSourceConfigurationConstants.CONFIG_HOST);
 formaterProp = context.getSubProperties(
   SyslogSourceConfigurationConstants.CONFIG_FORMAT_PREFIX);
 keepFields = SyslogUtils.chooseFieldsToKeep(
   context.getString(
     SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS,
     SyslogSourceConfigurationConstants.DEFAULT_KEEP_FIELDS));
 clientIPHeader =
   context.getString(SyslogSourceConfigurationConstants.CONFIG_CLIENT_IP_HEADER);
 clientHostnameHeader =
   context.getString(SyslogSourceConfigurationConstants.CONFIG_CLIENT_HOSTNAME_HEADER);
 if (sourceCounter == null) {
  sourceCounter = new SourceCounter(getName());
 }
}

代码示例来源:origin: apache/flume

@Override
public void configure(Context context) {
 configureSsl(context);
 Configurables.ensureRequiredNonNull(context,
   SyslogSourceConfigurationConstants.CONFIG_PORT);
 port = context.getInteger(SyslogSourceConfigurationConstants.CONFIG_PORT);
 host = context.getString(SyslogSourceConfigurationConstants.CONFIG_HOST);
 eventSize = context.getInteger("eventSize", SyslogUtils.DEFAULT_SIZE);
 formaterProp = context.getSubProperties(
   SyslogSourceConfigurationConstants.CONFIG_FORMAT_PREFIX);
 keepFields = SyslogUtils.chooseFieldsToKeep(
   context.getString(
     SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS,
     SyslogSourceConfigurationConstants.DEFAULT_KEEP_FIELDS));
 clientIPHeader =
   context.getString(SyslogSourceConfigurationConstants.CONFIG_CLIENT_IP_HEADER);
 clientHostnameHeader =
   context.getString(SyslogSourceConfigurationConstants.CONFIG_CLIENT_HOSTNAME_HEADER);
 if (sourceCounter == null) {
  sourceCounter = new SourceCounter(getName());
 }
}

相关文章