本文整理了Java中org.apache.flume.Context.<init>()
方法的一些代码示例,展示了Context.<init>()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Context.<init>()
方法的具体详情如下:
包路径:org.apache.flume.Context
类名称:Context
方法名:<init>
暂无
代码示例来源:origin: apache/nifi
protected static Context getFlumeContext(String flumeConfig, String prefix) {
Properties flumeProperties = new Properties();
if (flumeConfig != null) {
try {
flumeProperties.load(new StringReader(flumeConfig));
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
Map<String, String> parameters = Maps.newHashMap();
for (String property : flumeProperties.stringPropertyNames()) {
parameters.put(property, flumeProperties.getProperty(property));
}
return new Context(new Context(parameters).getSubProperties(prefix));
}
代码示例来源:origin: apache/flume
Context context = new Context();
for (String key : keys) {
if (key.startsWith(CONF_MONITOR_PREFIX)) {
代码示例来源:origin: apache/flume
private boolean addComponentConfig(
String key, String value, String configPrefix, Map<String, Context> contextMap
) {
ComponentNameAndConfigKey parsed = parseConfigKey(key, configPrefix);
if (parsed != null) {
String name = parsed.getComponentName().trim();
LOGGER.info("Processing:{}", name);
Context context = contextMap.get(name);
if (context == null) {
LOGGER.debug("Created context for {}: {}", name, parsed.getConfigKey());
context = new Context();
contextMap.put(name, context);
}
context.put(parsed.getConfigKey(), value);
return true;
}
return false;
}
代码示例来源:origin: apache/flume
@Override
public void configure(Context context) {
super.configure(context);
serializerType = context.getString("serializer", "TEXT");
useRawLocalFileSystem = context.getBoolean("hdfs.useRawLocalFileSystem",
false);
serializerContext =
new Context(context.getSubProperties(EventSerializer.CTX_PREFIX));
logger.info("Serializer = " + serializerType + ", UseRawLocalFileSystem = "
+ useRawLocalFileSystem);
}
代码示例来源:origin: apache/flume
private void configureSerializers(Context context) {
String serializerListStr = context.getString(SERIALIZERS);
Preconditions.checkArgument(!StringUtils.isEmpty(serializerListStr),
"Must supply at least one name and serializer");
String[] serializerNames = serializerListStr.split("\\s+");
Context serializerContexts =
new Context(context.getSubProperties(SERIALIZERS + "."));
serializerList = Lists.newArrayListWithCapacity(serializerNames.length);
for (String serializerName : serializerNames) {
Context serializerContext = new Context(
serializerContexts.getSubProperties(serializerName + "."));
String type = serializerContext.getString("type", "DEFAULT");
String name = serializerContext.getString("name");
Preconditions.checkArgument(!StringUtils.isEmpty(name),
"Supplied name cannot be empty.");
if ("DEFAULT".equals(type)) {
serializerList.add(new NameAndSerializer(name, defaultSerializer));
} else {
serializerList.add(new NameAndSerializer(name, getCustomSerializer(
type, serializerContext)));
}
}
}
代码示例来源:origin: apache/flume
@Override
public void configure(Context context) {
super.configure(context);
serializerType = context.getString("serializer", "TEXT");
useRawLocalFileSystem = context.getBoolean("hdfs.useRawLocalFileSystem",
false);
serializerContext = new Context(
context.getSubProperties(EventSerializer.CTX_PREFIX));
logger.info("Serializer = " + serializerType + ", UseRawLocalFileSystem = "
+ useRawLocalFileSystem);
}
代码示例来源:origin: apache/flume
public static ChannelSelector create(List<Channel> channels,
Map<String, String> config) {
ChannelSelector selector = getSelectorForType(config.get(
BasicConfigurationConstants.CONFIG_TYPE));
selector.setChannels(channels);
Context context = new Context();
context.putAll(config);
Configurables.configure(selector, context);
return selector;
}
代码示例来源:origin: apache/flume
Context context = new Context();
代码示例来源:origin: apache/flume
context.getSubProperties(
HTTPSourceConfigurationConstants.CONFIG_HANDLER_PREFIX);
handler.configure(new Context(subProps));
} catch (ClassNotFoundException ex) {
LOG.error("Error while configuring HTTPSource. Exception follows.", ex);
代码示例来源:origin: apache/flume
@Override
public void configure(Context context) {
String pathManagerType = context.getString("sink.pathManager", "DEFAULT");
String directory = context.getString("sink.directory");
String rollInterval = context.getString("sink.rollInterval");
serializerType = context.getString("sink.serializer", "TEXT");
serializerContext =
new Context(context.getSubProperties("sink." +
EventSerializer.CTX_PREFIX));
Context pathManagerContext =
new Context(context.getSubProperties("sink." +
PathManager.CTX_PREFIX));
pathController = PathManagerFactory.getInstance(pathManagerType, pathManagerContext);
Preconditions.checkArgument(directory != null, "Directory may not be null");
Preconditions.checkNotNull(serializerType, "Serializer type is undefined");
if (rollInterval == null) {
this.rollInterval = defaultRollInterval;
} else {
this.rollInterval = Long.parseLong(rollInterval);
}
batchSize = context.getInteger("sink.batchSize", defaultBatchSize);
this.directory = new File(directory);
if (sinkCounter == null) {
sinkCounter = new SinkCounter(getName());
}
}
代码示例来源:origin: apache/flume
@Override
public void configure(Context context) {
Preconditions.checkState(getSinks().size() > 1,
"The LoadBalancingSinkProcessor cannot be used for a single sink. "
+ "Please configure more than one sinks and try again.");
String selectorTypeName = context.getString(CONFIG_SELECTOR,
SELECTOR_NAME_ROUND_ROBIN);
Boolean shouldBackOff = context.getBoolean(CONFIG_BACKOFF, false);
selector = null;
if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_ROUND_ROBIN)) {
selector = new RoundRobinSinkSelector(shouldBackOff);
} else if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_RANDOM)) {
selector = new RandomOrderSinkSelector(shouldBackOff);
} else {
try {
@SuppressWarnings("unchecked")
Class<? extends SinkSelector> klass = (Class<? extends SinkSelector>)
Class.forName(selectorTypeName);
selector = klass.newInstance();
} catch (Exception ex) {
throw new FlumeException("Unable to instantiate sink selector: "
+ selectorTypeName, ex);
}
}
selector.setSinks(getSinks());
selector.configure(
new Context(context.getSubProperties(CONFIG_SELECTOR_PREFIX)));
LOGGER.debug("Sink selector: " + selector + " initialized");
}
代码示例来源:origin: apache/flume
new Context(context.getSubProperties("interceptors."));
Context interceptorContext = new Context(
interceptorContexts.getSubProperties(interceptorName + "."));
String type = interceptorContext.getString("type");
代码示例来源:origin: apache/flume
@Override
public void configure(Context context) {
super.configure(context);
// use binary writable serialize by default
writeFormat = context.getString("hdfs.writeFormat",
SequenceFileSerializerType.Writable.name());
useRawLocalFileSystem = context.getBoolean("hdfs.useRawLocalFileSystem",
false);
serializerContext = new Context(
context.getSubProperties(SequenceFileSerializerFactory.CTX_PREFIX));
serializer = SequenceFileSerializerFactory
.getSerializer(writeFormat, serializerContext);
logger.info("writeFormat = " + writeFormat + ", UseRawLocalFileSystem = "
+ useRawLocalFileSystem);
}
代码示例来源:origin: apache/flume
private Context prepareDefaultContext() {
// Prepares a default context with Kafka Server Properties
Context context = new Context();
context.put(BOOTSTRAP_SERVERS_CONFIG, testUtil.getKafkaServerUrl());
context.put(BATCH_SIZE, "1");
return context;
}
代码示例来源:origin: apache/flume
Context selectorContext = new Context();
selectorContext.putAll(selectorParams);
String config = null;
代码示例来源:origin: apache/flume
@Override
public void configure(Context context) throws ConfigurationException {
super.configure(context);
sinks = Arrays.asList(context.getString(
BasicConfigurationConstants.CONFIG_SINKS).split("\\s+"));
Map<String, String> params = context.getSubProperties(
BasicConfigurationConstants.CONFIG_SINK_PROCESSOR_PREFIX);
processorContext = new Context();
processorContext.putAll(params);
SinkProcessorType spType = getKnownSinkProcessor(processorContext.getString(
BasicConfigurationConstants.CONFIG_TYPE));
if (spType != null) {
processorConf =
(SinkProcessorConfiguration) ComponentConfigurationFactory.create(
this.getComponentName() + "-processor",
spType.toString(),
ComponentType.SINK_PROCESSOR);
if (processorConf != null) {
processorConf.setSinks(new HashSet<String>(sinks));
processorConf.configure(processorContext);
}
}
setConfigured();
}
代码示例来源:origin: apache/flume
deserializerContext = new Context(context.getSubProperties(DESERIALIZER +
"."));
代码示例来源:origin: apache/flume
@Test
public void testKafkaProperties() {
KafkaSink kafkaSink = new KafkaSink();
Context context = new Context();
context.put(KAFKA_PREFIX + TOPIC_CONFIG, "");
context.put(KAFKA_PRODUCER_PREFIX + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"override.default.serializer");
context.put("kafka.producer.fake.property", "kafka.property.value");
context.put("kafka.bootstrap.servers", "localhost:9092,localhost:9092");
context.put("brokerList", "real-broker-list");
Configurables.configure(kafkaSink, context);
Properties kafkaProps = kafkaSink.getKafkaProps();
//check that we have defaults set
assertEquals(kafkaProps.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG),
DEFAULT_KEY_SERIALIZER);
//check that kafka properties override the default and get correct name
assertEquals(kafkaProps.getProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG),
"override.default.serializer");
//check that any kafka-producer property gets in
assertEquals(kafkaProps.getProperty("fake.property"),
"kafka.property.value");
//check that documented property overrides defaults
assertEquals(kafkaProps.getProperty("bootstrap.servers"),
"localhost:9092,localhost:9092");
}
代码示例来源:origin: apache/flume
@Test
public void testOldProperties() {
KafkaSink kafkaSink = new KafkaSink();
Context context = new Context();
context.put("topic", "test-topic");
context.put(OLD_BATCH_SIZE, "300");
context.put(BROKER_LIST_FLUME_KEY, "localhost:9092,localhost:9092");
context.put(REQUIRED_ACKS_FLUME_KEY, "all");
Configurables.configure(kafkaSink, context);
Properties kafkaProps = kafkaSink.getKafkaProps();
assertEquals(kafkaSink.getTopic(), "test-topic");
assertEquals(kafkaSink.getBatchSize(), 300);
assertEquals(kafkaProps.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),
"localhost:9092,localhost:9092");
assertEquals(kafkaProps.getProperty(ProducerConfig.ACKS_CONFIG), "all");
}
代码示例来源:origin: kaaproject/kaa
private Context prepareContext() throws IOException {
Context context = new Context();
// Channel parameters
context.put("capacity", "100000000");
context.put("transactionCapacity", "10000000");
context.put("keep-alive", "1");
context.put("port", "31333");
context.put("bind", "localhost");
context.put(ConfigurationConstants.CONFIG_ROOT_HDFS_PATH, fileSystem.makeQualified(new Path("/logs")).toString());
context.put(ConfigurationConstants.CONFIG_HDFS_TXN_EVENT_MAX, "100000");
context.put(ConfigurationConstants.CONFIG_HDFS_THREAD_POOL_SIZE, "20");
context.put(ConfigurationConstants.CONFIG_HDFS_ROLL_TIMER_POOL_SIZE, "1");
context.put(ConfigurationConstants.CONFIG_HDFS_MAX_OPEN_FILES, "5000");
context.put(ConfigurationConstants.CONFIG_HDFS_CALL_TIMEOUT, "10000");
context.put(ConfigurationConstants.CONFIG_HDFS_ROLL_INTERVAL, "86400000"); // milliseconds
context.put(ConfigurationConstants.CONFIG_HDFS_ROLL_SIZE, "0"); // bytes (0 means don't roll by size)
context.put(ConfigurationConstants.CONFIG_HDFS_ROLL_COUNT, "5500000"); // records count
context.put(ConfigurationConstants.CONFIG_HDFS_BATCH_SIZE, "" + flushRecordsCount); // flush records count
context.put(ConfigurationConstants.CONFIG_HDFS_DEFAULT_BLOCK_SIZE, "" + blockSize); // default dfs block size in bytes
context.put(ConfigurationConstants.CONFIG_HDFS_FILE_PREFIX, "data");
context.put(ConfigurationConstants.CONFIG_STATISTICS_INTERVAL, "10");
context.put("serializer.compressionCodec", "null");
context.put("serializer.avro.schema.source", "local");
context.put("serializer.avro.schema.local.root", logSchemasRootDir.getAbsolutePath());
return context;
}
内容来源于网络,如有侵权,请联系作者删除!