本文整理了Java中org.apache.flume.Context.getSubProperties()
方法的一些代码示例,展示了Context.getSubProperties()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Context.getSubProperties()
方法的具体详情如下:
包路径:org.apache.flume.Context
类名称:Context
方法名:getSubProperties
[英]Get properties which start with a prefix. When a property is returned, the prefix is removed the from name. For example, if this method is called with a parameter "hdfs." and the context contains: { hdfs.key = value, otherKey = otherValue }
this method will return a map containing: { key = value}
Note: The prefix must end with a period character. If not this method will raise an IllegalArgumentException.
[中]获取以前缀开头的属性。返回属性时,前缀将从名称中删除。例如,如果使用参数“hdfs”调用此方法并且上下文包含:{ hdfs.key = value, otherKey = otherValue }
此方法将返回一个包含:{ key = value}
的映射注意:前缀必须以句点字符结尾。否则,此方法将引发IllegalArgumentException。
代码示例来源:origin: apache/flume
private Table<String, String, String> getTable(Context context, String prefix) {
Table<String, String, String> table = HashBasedTable.create();
for (Entry<String, String> e : context.getSubProperties(prefix).entrySet()) {
String[] parts = e.getKey().split("\\.", 2);
table.put(parts[0], parts[1], e.getValue());
}
return table;
}
代码示例来源:origin: apache/flume
private void setProducerProps(Context ctx, String bootStrapServers) {
producerProps.clear();
producerProps.put(ProducerConfig.ACKS_CONFIG, DEFAULT_ACKS);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, DEFAULT_KEY_SERIALIZER);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DEFAULT_VALUE_SERIAIZER);
// Defaults overridden based on config
producerProps.putAll(ctx.getSubProperties(KAFKA_PRODUCER_PREFIX));
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
KafkaSSLUtil.addGlobalSSLParameters(producerProps);
}
代码示例来源:origin: apache/flume
private void setProducerProps(Context context, String bootStrapServers) {
kafkaProps.clear();
kafkaProps.put(ProducerConfig.ACKS_CONFIG, DEFAULT_ACKS);
//Defaults overridden based on config
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, DEFAULT_KEY_SERIALIZER);
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DEFAULT_VALUE_SERIAIZER);
kafkaProps.putAll(context.getSubProperties(KAFKA_PRODUCER_PREFIX));
kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
KafkaSSLUtil.addGlobalSSLParameters(kafkaProps);
}
代码示例来源:origin: apache/flume
private void setConsumerProps(Context ctx, String bootStrapServers) {
consumerProps.clear();
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, DEFAULT_KEY_DESERIALIZER);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DEFAULT_VALUE_DESERIAIZER);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, DEFAULT_AUTO_OFFSET_RESET);
// Defaults overridden based on config
consumerProps.putAll(ctx.getSubProperties(KAFKA_CONSUMER_PREFIX));
// These always take precedence over config
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
KafkaSSLUtil.addGlobalSSLParameters(consumerProps);
}
代码示例来源:origin: apache/flume
private void setConsumerProps(Context ctx) {
kafkaProps.clear();
kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
KafkaSourceConstants.DEFAULT_KEY_DESERIALIZER);
kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
KafkaSourceConstants.DEFAULT_VALUE_DESERIALIZER);
//Defaults overridden based on config
kafkaProps.putAll(ctx.getSubProperties(KafkaSourceConstants.KAFKA_CONSUMER_PREFIX));
//These always take precedence over config
kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
if (groupId != null) {
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
}
kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
KafkaSourceConstants.DEFAULT_AUTO_COMMIT);
KafkaSSLUtil.addGlobalSSLParameters(kafkaProps);
}
代码示例来源:origin: apache/flume
private void initializeSystemProperties(Context context) {
Map<String, String> sysProps = new HashMap<String, String>();
Map<String, String> sysPropsOld = context.getSubProperties(
ConfigurationConstants.OLD_CONFIG_JDBC_SYSPROP_PREFIX);
if (sysPropsOld.size() > 0) {
LOGGER.warn("Long form configuration prefix \""
+ ConfigurationConstants.OLD_CONFIG_JDBC_SYSPROP_PREFIX
+ "\" is deprecated. Please use the short form prefix \""
+ ConfigurationConstants.CONFIG_JDBC_SYSPROP_PREFIX
+ "\" instead.");
sysProps.putAll(sysPropsOld);
}
Map<String, String> sysPropsNew = context.getSubProperties(
ConfigurationConstants.CONFIG_JDBC_SYSPROP_PREFIX);
// Override the deprecated values with the non-deprecated
if (sysPropsNew.size() > 0) {
sysProps.putAll(sysPropsNew);
}
for (String key: sysProps.keySet()) {
String value = sysProps.get(key);
if (key != null && value != null) {
System.setProperty(key, value);
}
}
}
代码示例来源:origin: apache/flume
/**
* Utility method that will set properties on a Java bean (<code>Object configurable</code>)
* based on the provided <code>Context</code>.
* N.B. This method will take the Flume Context and look for sub-properties named after the
* class name of the <code>configurable</code> object.
* If there is a type issue, or an access problem
* then a <code>ConfigurationException</code> will be thrown.
*
* @param configurable Any properties must be modifiable via setter methods.
* @param context
* @throws ConfigurationException
*/
public static void setConfigurationFields(Object configurable, Context context)
throws ConfigurationException {
Class<?> clazz = configurable.getClass();
Map<String, String> properties = context.getSubProperties(clazz.getSimpleName() + ".");
setConfigurationFields(configurable, properties);
}
代码示例来源:origin: apache/flume
/**
* Utility method that will set properties on a Java bean (<code>Object configurable</code>)
* based on the provided <code>Context</code>.
* N.B. This method will take the Flume Context and look for sub-properties named after the
* <code>subPropertiesPrefix</code> String.
* If there is a type issue, or an access problem
* then a <code>ConfigurationException</code> will be thrown.
*
* @param configurable Object: Any properties must be modifiable via setter methods.
* @param context org.apache.flume.Context;
* @param subPropertiesPrefix String
* @throws ConfigurationException
*/
public static void setConfigurationFields(Object configurable, Context context,
String subPropertiesPrefix) throws ConfigurationException {
Map<String, String> properties = context.getSubProperties(subPropertiesPrefix);
setConfigurationFields(configurable, properties);
}
}
代码示例来源:origin: apache/flume
/**
* Reads a set of override values from the context configuration and stores
* the results in the Map provided.
*
* @param propertyName the prefix of the config property names
* @param context the context to use to read config properties
* @param override the override Map to store results in
*/
private void parseConfigOverrides(final String propertyName,
final Context context,
final Map<String, Boolean> override) {
Map<String, String> config = context.getSubProperties(
propertyName + ".");
if (config != null) {
for (Map.Entry<String, String> value : config.entrySet()) {
LOG.info(String.format("Read %s value for status code %s as %s",
propertyName, value.getKey(), value.getValue()));
if (override.containsKey(value.getKey())) {
LOG.warn(String.format("Ignoring duplicate config value for %s.%s",
propertyName, value.getKey()));
} else {
override.put(value.getKey(), Boolean.valueOf(value.getValue()));
}
}
}
}
代码示例来源:origin: apache/nifi
protected static Context getFlumeContext(String flumeConfig, String prefix) {
Properties flumeProperties = new Properties();
if (flumeConfig != null) {
try {
flumeProperties.load(new StringReader(flumeConfig));
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
Map<String, String> parameters = Maps.newHashMap();
for (String property : flumeProperties.stringPropertyNames()) {
parameters.put(property, flumeProperties.getProperty(property));
}
return new Context(new Context(parameters).getSubProperties(prefix));
}
代码示例来源:origin: apache/flume
private void configureSerializers(Context context) {
String serializerListStr = context.getString(SERIALIZERS);
Preconditions.checkArgument(!StringUtils.isEmpty(serializerListStr),
"Must supply at least one name and serializer");
String[] serializerNames = serializerListStr.split("\\s+");
Context serializerContexts =
new Context(context.getSubProperties(SERIALIZERS + "."));
serializerList = Lists.newArrayListWithCapacity(serializerNames.length);
for (String serializerName : serializerNames) {
Context serializerContext = new Context(
serializerContexts.getSubProperties(serializerName + "."));
String type = serializerContext.getString("type", "DEFAULT");
String name = serializerContext.getString("name");
Preconditions.checkArgument(!StringUtils.isEmpty(name),
"Supplied name cannot be empty.");
if ("DEFAULT".equals(type)) {
serializerList.add(new NameAndSerializer(name, defaultSerializer));
} else {
serializerList.add(new NameAndSerializer(name, getCustomSerializer(
type, serializerContext)));
}
}
}
代码示例来源:origin: apache/flume
@Override
public void configure(Context context) {
super.configure(context);
serializerType = context.getString("serializer", "TEXT");
useRawLocalFileSystem = context.getBoolean("hdfs.useRawLocalFileSystem",
false);
serializerContext =
new Context(context.getSubProperties(EventSerializer.CTX_PREFIX));
logger.info("Serializer = " + serializerType + ", UseRawLocalFileSystem = "
+ useRawLocalFileSystem);
}
代码示例来源:origin: apache/flume
@Override
public void configure(Context context) {
super.configure(context);
serializerType = context.getString("serializer", "TEXT");
useRawLocalFileSystem = context.getBoolean("hdfs.useRawLocalFileSystem",
false);
serializerContext = new Context(
context.getSubProperties(EventSerializer.CTX_PREFIX));
logger.info("Serializer = " + serializerType + ", UseRawLocalFileSystem = "
+ useRawLocalFileSystem);
}
代码示例来源:origin: apache/flume
@Override
public void configure(Context context) {
String pathManagerType = context.getString("sink.pathManager", "DEFAULT");
String directory = context.getString("sink.directory");
String rollInterval = context.getString("sink.rollInterval");
serializerType = context.getString("sink.serializer", "TEXT");
serializerContext =
new Context(context.getSubProperties("sink." +
EventSerializer.CTX_PREFIX));
Context pathManagerContext =
new Context(context.getSubProperties("sink." +
PathManager.CTX_PREFIX));
pathController = PathManagerFactory.getInstance(pathManagerType, pathManagerContext);
Preconditions.checkArgument(directory != null, "Directory may not be null");
Preconditions.checkNotNull(serializerType, "Serializer type is undefined");
if (rollInterval == null) {
this.rollInterval = defaultRollInterval;
} else {
this.rollInterval = Long.parseLong(rollInterval);
}
batchSize = context.getInteger("sink.batchSize", defaultBatchSize);
this.directory = new File(directory);
if (sinkCounter == null) {
sinkCounter = new SinkCounter(getName());
}
}
代码示例来源:origin: apache/flume
@Override
public void configure(Context context) {
Preconditions.checkState(getSinks().size() > 1,
"The LoadBalancingSinkProcessor cannot be used for a single sink. "
+ "Please configure more than one sinks and try again.");
String selectorTypeName = context.getString(CONFIG_SELECTOR,
SELECTOR_NAME_ROUND_ROBIN);
Boolean shouldBackOff = context.getBoolean(CONFIG_BACKOFF, false);
selector = null;
if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_ROUND_ROBIN)) {
selector = new RoundRobinSinkSelector(shouldBackOff);
} else if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_RANDOM)) {
selector = new RandomOrderSinkSelector(shouldBackOff);
} else {
try {
@SuppressWarnings("unchecked")
Class<? extends SinkSelector> klass = (Class<? extends SinkSelector>)
Class.forName(selectorTypeName);
selector = klass.newInstance();
} catch (Exception ex) {
throw new FlumeException("Unable to instantiate sink selector: "
+ selectorTypeName, ex);
}
}
selector.setSinks(getSinks());
selector.configure(
new Context(context.getSubProperties(CONFIG_SELECTOR_PREFIX)));
LOGGER.debug("Sink selector: " + selector + " initialized");
}
代码示例来源:origin: apache/flume
@Override
public void configure(Context context) {
super.configure(context);
// use binary writable serialize by default
writeFormat = context.getString("hdfs.writeFormat",
SequenceFileSerializerType.Writable.name());
useRawLocalFileSystem = context.getBoolean("hdfs.useRawLocalFileSystem",
false);
serializerContext = new Context(
context.getSubProperties(SequenceFileSerializerFactory.CTX_PREFIX));
serializer = SequenceFileSerializerFactory
.getSerializer(writeFormat, serializerContext);
logger.info("writeFormat = " + writeFormat + ", UseRawLocalFileSystem = "
+ useRawLocalFileSystem);
}
代码示例来源:origin: apache/flume
context.getSubProperties(MORPHLINE_VARIABLE_PARAM + "."));
morphline = new Compiler().compile(
new File(morphlineFile), morphlineId, morphlineContext, finalChild, override);
代码示例来源:origin: apache/flume
@Override
public void configure(Context context) throws ConfigurationException {
super.configure(context);
sinks = Arrays.asList(context.getString(
BasicConfigurationConstants.CONFIG_SINKS).split("\\s+"));
Map<String, String> params = context.getSubProperties(
BasicConfigurationConstants.CONFIG_SINK_PROCESSOR_PREFIX);
processorContext = new Context();
processorContext.putAll(params);
SinkProcessorType spType = getKnownSinkProcessor(processorContext.getString(
BasicConfigurationConstants.CONFIG_TYPE));
if (spType != null) {
processorConf =
(SinkProcessorConfiguration) ComponentConfigurationFactory.create(
this.getComponentName() + "-processor",
spType.toString(),
ComponentType.SINK_PROCESSOR);
if (processorConf != null) {
processorConf.setSinks(new HashSet<String>(sinks));
processorConf.configure(processorContext);
}
}
setConfigured();
}
代码示例来源:origin: apache/flume
@Override
public void configure(Context context) {
Configurables.ensureRequiredNonNull(
context, SyslogSourceConfigurationConstants.CONFIG_PORT);
port = context.getInteger(SyslogSourceConfigurationConstants.CONFIG_PORT);
host = context.getString(SyslogSourceConfigurationConstants.CONFIG_HOST);
formaterProp = context.getSubProperties(
SyslogSourceConfigurationConstants.CONFIG_FORMAT_PREFIX);
keepFields = SyslogUtils.chooseFieldsToKeep(
context.getString(
SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS,
SyslogSourceConfigurationConstants.DEFAULT_KEEP_FIELDS));
clientIPHeader =
context.getString(SyslogSourceConfigurationConstants.CONFIG_CLIENT_IP_HEADER);
clientHostnameHeader =
context.getString(SyslogSourceConfigurationConstants.CONFIG_CLIENT_HOSTNAME_HEADER);
if (sourceCounter == null) {
sourceCounter = new SourceCounter(getName());
}
}
代码示例来源:origin: apache/flume
@Override
public void configure(Context context) {
configureSsl(context);
Configurables.ensureRequiredNonNull(context,
SyslogSourceConfigurationConstants.CONFIG_PORT);
port = context.getInteger(SyslogSourceConfigurationConstants.CONFIG_PORT);
host = context.getString(SyslogSourceConfigurationConstants.CONFIG_HOST);
eventSize = context.getInteger("eventSize", SyslogUtils.DEFAULT_SIZE);
formaterProp = context.getSubProperties(
SyslogSourceConfigurationConstants.CONFIG_FORMAT_PREFIX);
keepFields = SyslogUtils.chooseFieldsToKeep(
context.getString(
SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS,
SyslogSourceConfigurationConstants.DEFAULT_KEEP_FIELDS));
clientIPHeader =
context.getString(SyslogSourceConfigurationConstants.CONFIG_CLIENT_IP_HEADER);
clientHostnameHeader =
context.getString(SyslogSourceConfigurationConstants.CONFIG_CLIENT_HOSTNAME_HEADER);
if (sourceCounter == null) {
sourceCounter = new SourceCounter(getName());
}
}
内容来源于网络,如有侵权,请联系作者删除!