本文整理了Java中kafka.consumer.Consumer.createJavaConsumerConnector()
方法的一些代码示例,展示了Consumer.createJavaConsumerConnector()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Consumer.createJavaConsumerConnector()
方法的具体详情如下:
包路径:kafka.consumer.Consumer
类名称:Consumer
方法名:createJavaConsumerConnector
暂无
代码示例来源:origin: apache/incubator-gobblin
protected ConsumerConnector createConsumerConnector() {
return Consumer.createJavaConsumerConnector(this.consumerConfig);
}
代码示例来源:origin: apache/incubator-gobblin
public SimpleKafkaConsumer(Properties props, KafkaCheckpoint checkpoint)
{
Config config = ConfigFactory.parseProperties(props);
topic = config.getString("topic");
String zkConnect = config.getString("zookeeper.connect");
schemaRegistry = KafkaSchemaRegistryFactory.getSchemaRegistry(props);
deserializer = new LiAvroDeserializer(schemaRegistry);
/** TODO: Make Confluent schema registry integration configurable
* HashMap<String, String> avroSerDeConfig = new HashMap<>();
* avroSerDeConfig.put("schema.registry.url", "http://localhost:8081");
* deserializer = new io.confluent.kafka.serializers.KafkaAvroDeserializer();
* deserializer.configure(avroSerDeConfig, false);
*
**/
Properties consumeProps = new Properties();
consumeProps.put("zookeeper.connect", zkConnect);
consumeProps.put("group.id", "gobblin-tool-" + System.nanoTime());
consumeProps.put("zookeeper.session.timeout.ms", "10000");
consumeProps.put("zookeeper.sync.time.ms", "10000");
consumeProps.put("auto.commit.interval.ms", "10000");
consumeProps.put("auto.offset.reset", "smallest");
consumeProps.put("auto.commit.enable", "false");
//consumeProps.put("consumer.timeout.ms", "10000");
consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumeProps));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(ImmutableMap.of(topic, 1));
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(this.topic);
stream = streams.get(0);
iterator = stream.iterator();
}
代码示例来源:origin: pinterest/secor
@Override
public void init(SecorConfig config) throws UnknownHostException {
this.mConfig = config;
mConsumerConnector = Consumer.createJavaConsumerConnector(createConsumerConfig());
if (!mConfig.getKafkaTopicBlacklist().isEmpty() && !mConfig.getKafkaTopicFilter().isEmpty()) {
throw new RuntimeException("Topic filter and blacklist cannot be both specified.");
}
TopicFilter topicFilter = !mConfig.getKafkaTopicBlacklist().isEmpty() ? new Blacklist(mConfig.getKafkaTopicBlacklist()) :
new Whitelist(mConfig.getKafkaTopicFilter());
LOG.debug("Use TopicFilter {}({})", topicFilter.getClass(), topicFilter);
List<KafkaStream<byte[], byte[]>> streams =
mConsumerConnector.createMessageStreamsByFilter(topicFilter);
KafkaStream<byte[], byte[]> stream = streams.get(0);
mIterator = stream.iterator();
mKafkaMessageTimestampFactory = new KafkaMessageTimestampFactory(mConfig.getKafkaMessageTimestampClass());
}
代码示例来源:origin: Graylog2/graylog2-server
cc = Consumer.createJavaConsumerConnector(consumerConfig);
代码示例来源:origin: apache/incubator-gobblin
KafkaConsumerSuite(String zkConnectString, String topic)
{
_topic = topic;
Properties consumeProps = new Properties();
consumeProps.put("zookeeper.connect", zkConnectString);
consumeProps.put("group.id", _topic+"-"+System.nanoTime());
consumeProps.put("zookeeper.session.timeout.ms", "10000");
consumeProps.put("zookeeper.sync.time.ms", "10000");
consumeProps.put("auto.commit.interval.ms", "10000");
consumeProps.put("_consumer.timeout.ms", "10000");
_consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumeProps));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
_consumer.createMessageStreams(ImmutableMap.of(this._topic, 1));
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(this._topic);
_stream = streams.get(0);
_iterator = _stream.iterator();
}
代码示例来源:origin: apache/incubator-gobblin
KafkaConsumerSuite(String zkConnectString, String topic)
{
_topic = topic;
Properties consumeProps = new Properties();
consumeProps.put("zookeeper.connect", zkConnectString);
consumeProps.put("group.id", _topic+"-"+System.nanoTime());
consumeProps.put("zookeeper.session.timeout.ms", "10000");
consumeProps.put("zookeeper.sync.time.ms", "10000");
consumeProps.put("auto.commit.interval.ms", "10000");
consumeProps.put("_consumer.timeout.ms", "10000");
_consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumeProps));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
_consumer.createMessageStreams(ImmutableMap.of(this._topic, 1));
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(this._topic);
_stream = streams.get(0);
_iterator = _stream.iterator();
}
代码示例来源:origin: apache/incubator-pinot
kafka.consumer.Consumer.createJavaConsumerConnector(kafkaHighLevelStreamConfig.getKafkaConsumerConfig());
代码示例来源:origin: apache/incubator-druid
final ConsumerConnector connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps));
代码示例来源:origin: apache/incubator-gobblin
public KafkaTestBase(String topic) throws InterruptedException, RuntimeException {
startServer();
this.topic = topic;
AdminUtils.createTopic(zkClient, topic, 1, 1, new Properties());
List<KafkaServer> servers = new ArrayList<>();
servers.add(kafkaServer);
TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(servers), topic, 0, 5000);
Properties consumeProps = new Properties();
consumeProps.put("zookeeper.connect", zkConnect);
consumeProps.put("group.id", "testConsumer");
consumeProps.put("zookeeper.session.timeout.ms", "10000");
consumeProps.put("zookeeper.sync.time.ms", "10000");
consumeProps.put("auto.commit.interval.ms", "10000");
consumeProps.put("consumer.timeout.ms", "10000");
consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumeProps));
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(this.topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(this.topic);
stream = streams.get(0);
iterator = stream.iterator();
}
代码示例来源:origin: apache/nifi
consumer = Consumer.createJavaConsumerConnector(consumerConfig);
代码示例来源:origin: uber/chaperone
private void init() {
// register kafka offset lag metrics, one Gauge is for per consumer level granularity
MetricRegistry registry = Metrics.getRegistry();
try {
fetchedMsgCounter = registry.meter("kafkaIngesterConsumer." + this.getName() + "-msgFetchRate");
failedToIngestCounter = registry.meter("kafkaIngesterConsumer." + this.getName() + "-failedToIngest");
kafkaOffsetLagGauge =
registry.register("kafkaIngesterConsumer." + this.getName() + "-kafkaOffsetLag", new JmxAttributeGauge(
new ObjectName(maxLagMetricName), "Value"));
} catch (MalformedObjectNameException | IllegalArgumentException e) {
logger.error("Register failure for metrics of KafkaIngesterConsumer", e);
}
TopicFilter topicFilter = new Whitelist(AuditConfig.AUDIT_TOPIC_NAME);
logger.info("{}: Topic filter is {}", getName(), AuditConfig.AUDIT_TOPIC_NAME);
this.consumer = Consumer.createJavaConsumerConnector(createConsumerConfig());
KafkaStream<byte[], byte[]> stream = consumer.createMessageStreamsByFilter(topicFilter, 1).get(0);
iterator = stream.iterator();
logger.info("KafkaIngesterConsumer thread {} is initialized successfully", getName());
if (AuditConfig.INGESTER_ENABLE_DEDUP) {
deduplicator =
new Deduplicator(threadId, AuditConfig.INGESTER_REDIS_HOST, AuditConfig.INGESTER_REDIS_PORT,
AuditConfig.INGESTER_REDIS_KEY_TTL_SEC, AuditConfig.INGESTER_DUP_HOST_PREFIX,
AuditConfig.INGESTER_HOSTS_WITH_DUP);
deduplicator.open();
} else {
deduplicator = null;
}
}
代码示例来源:origin: io.zipkin.zipkin2/zipkin-collector-kafka08
ZookeeperConsumerConnector get() {
if (connector == null) {
synchronized (this) {
if (connector == null) {
connector = (ZookeeperConsumerConnector) createJavaConsumerConnector(config);
}
}
}
return connector;
}
代码示例来源:origin: apache/incubator-edgent
private synchronized ConsumerConnector client() {
if (consumer == null)
consumer = Consumer.createJavaConsumerConnector(
createConsumerConfig());
return consumer;
}
代码示例来源:origin: ningg/flume-ng-extends-source
public static ConsumerConnector getConsumer(Properties kafkaProps) {
ConsumerConfig consumerConfig =
new ConsumerConfig(kafkaProps);
ConsumerConnector consumer =
Consumer.createJavaConsumerConnector(consumerConfig);
return consumer;
}
代码示例来源:origin: vakinge/jeesuite-libs
@SuppressWarnings("unchecked")
public OldApiTopicConsumer(ConsumerContext context) {
super(context);
try {
Class<?> deserializerClass = Class.forName(context.getProperties().getProperty("value.deserializer"));
deserializer = (Deserializer<Object>) deserializerClass.newInstance();
} catch (Exception e) {}
this.connector = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(context.getProperties()));
}
代码示例来源:origin: com.netflix.suro/suro-kafka
@Override
public void start() throws Exception {
connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps));
final Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(ImmutableMap.of(topic, 1));
final List<KafkaStream<byte[], byte[]>> streamList = streams.get(topic);
if (streamList == null || streamList.size() != 1) {
throw new RuntimeException(topic + " is not valid");
}
stream = streamList.get(0).iterator();
startTakingTraffic();
}
代码示例来源:origin: locationtech/geowave
private ConsumerConnector buildKafkaConsumer() {
final Properties kafkaProperties = kafkaOptions.getProperties();
final ConsumerConnector consumer =
Consumer.createJavaConsumerConnector(new ConsumerConfig(kafkaProperties));
return consumer;
}
代码示例来源:origin: com.github.hackerwin7/jlib-utils
/**
* constructor with kafka conf
* @param conf
*/
public KafkaHighConsumer(KafkaConf conf) {
ConsumerConfig config = new ConsumerConfig(conf.getProps());
consumer = Consumer.createJavaConsumerConnector(config);
topic = conf.getProp(KafkaConf.HIGH_TOPIC);
}
代码示例来源:origin: apache/apex-malhar
public KafkaTestConsumer(String topic, String zkaddress)
{
this.zkaddress = zkaddress;
this.topic = topic;
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
}
代码示例来源:origin: mariamhakobyan/elasticsearch-river-kafka
public KafkaConsumer(final RiverConfig riverConfig) {
consumerConnector = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(riverConfig));
final Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(riverConfig.getTopic(), AMOUNT_OF_THREADS_PER_CONSUMER);
final Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams =
consumerConnector.createMessageStreams(topicCountMap);
streams = consumerStreams.get(riverConfig.getTopic());
logger.debug("Index: {}: Started kafka consumer for topic: {} with {} partitions in it.",
riverConfig.getIndexName(), riverConfig.getTopic(), streams.size());
}
内容来源于网络,如有侵权,请联系作者删除!