我使用下面的示例代码将一些测试数据从合流的kafka加载到hbase,但不知何故,我没有看到hbase中的数据。我什么都试过了,但一点用都没有。有人能帮忙吗?
如何复制问题:
下载示例代码:
https://github.com/mravi/kafka-connect-hbase
编译示例代码
mvn-dskiptests=true包
修改了connectdistributed服务器上的hbase-sink.properties
vim./etc/kafka连接hbase/hbase-sink.properties
name=kafka-cdc-hbase
connector.class=io.svectors.hbase.sink.HBaseSinkConnector
tasks.max=1
topics=test
zookeeper.quorum=wdsjnl012.test.com:2181
event.parser.class=io.svectors.hbase.parser.AvroEventParser
hbase.test.rowkey.columns=id
hbase.test.rowkey.delimiter=,
hbase.test.family=name
通过以下命令启动connectdistributed:
./bin/合流启动连接
创建hbase表:
hbase(main):007:0>创建“test”、“name”
hbase(main):007:0>描述“测试”表测试已启用
测试
列族说明
{name=>'name',bloomfilter=>'row',versions=>'1',in\ memory=>'false',keep\ deleted\ cells=>'false',data\ block\ en coding=>'none',ttl=>'forever',compression=>'none',min\ versions=>'0',blockcache=>'true',blocksize=>'65536',replication\ scope=>'0'}
生成数据:
./bin/kafka avro console producer--broker list wdsjnl010:9092--topic test--property value.schema='{“type”:“record”,“name”:“record”,“fields”:[{“name”:“id”,“type”:“int”},{“name”:“name”,“type”:“string”}]}'
{“id”:1,“name”:“foo”}
顺便说一下,我从日志文件中仔细检查了hbase connect是否已加载,我认为hbase connect插件已加载:
[2018-05-18 19:48:32,714] INFO Loading plugin from:
/usr/local/confluent/share/java/kafka-connect-hbase
(org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:194)
[2018-05-18 19:48:35,447] INFO Registered loader:
PluginClassLoader{pluginLocation=file:/usr/local/confluent/share/java/kafka-
connect-hbase/}
(org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:217)
[2018-05-18 19:48:35,447] INFO Added plugin
'io.svectors.hbase.sink.HBaseSinkConnector'
(org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:146)
[2018-05-18 19:48:35,447] INFO Loading plugin from:
/usr/local/confluent/share/java/confluent-common
(org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:194)
[2018-05-18 19:48:35,516] INFO Registered loaded:
PluginClassLoader{pluginLocation=file:/usr/local/confluent/share/java/confluent-common/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:217)
[2018-05-18 19:48:35,518] INFO Loading plugin from: /usr/local/confluent/share/java/kafka-connect-s3 (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:194)
[2018-05-18 19:48:38,396] INFO Registered loader: PluginClassLoader{pluginLocation=file:/usr/local/confluent/share/java/kafka-connect-s3/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:217)
[2018-05-18 19:48:38,396] INFO Added plugin 'io.confluent.connect.s3.S3SinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:146)
[2018-05-18 19:48:40,673] INFO Registered loader: sun.misc.Launcher$AppClassLoader@764c12b6 (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:217)
[2018-05-18 19:48:40,675] INFO Added aliases 'ElasticsearchSinkConnector' and 'ElasticsearchSink' to plugin 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:351)
[2018-05-18 19:48:40,675] INFO Added aliases 'HdfsSinkConnector' and 'HdfsSink' to plugin 'io.confluent.connect.hdfs.HdfsSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:351)
[2018-05-18 19:48:40,676] INFO Added aliases 'JdbcSinkConnector' and 'JdbcSink' to plugin 'io.confluent.connect.jdbc.JdbcSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:351)
[2018-05-18 19:48:40,676] INFO Added aliases 'JdbcSourceConnector' and 'JdbcSource' to plugin 'io.confluent.connect.jdbc.JdbcSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:351)
[2018-05-18 19:48:40,676] INFO Added aliases 'S3SinkConnector' and 'S3Sink' to plugin 'io.confluent.connect.s3.S3SinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:351)
[2018-05-18 19:48:40,676] INFO Added aliases 'HBaseSinkConnector' and 'HBaseSink' to plugin 'io.svectors.hbase.sink.HBaseSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:351)
[2018-05-18 19:48:40,676] INFO Added aliases 'FileStreamSinkConnector' and 'FileStreamSink' to plugin 'org.apache.kafka.connect.file.FileStreamSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:351)
[2018-05-18 19:48:40,677] INFO Added aliases 'FileStreamSourceConnector' and 'FileStreamSource' to plugin 'org.apache.kafka.connect.file.FileStreamSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:351)
[2018-05-18 19:48:40,677] INFO Added aliases 'MockConnector' and 'Mock' to plugin 'org.apache.kafka.connect.tools.MockConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:351)
[2018-05-18 19:48:40,677] INFO Added aliases 'MockSinkConnector' and 'MockSink' to plugin 'org.apache.kafka.connect.tools.MockSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:351)
[2018-05-18 19:48:40,677] INFO Added aliases 'MockSourceConnector' and 'MockSource' to plugin 'org.apache.kafka.connect.tools.MockSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:351)
[2018-05-18 19:48:40,677] INFO Added aliases 'VerifiableSinkConnector' and 'VerifiableSink' to plugin 'org.apache.kafka.connect.tools.VerifiableSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:351)
[2018-05-18 19:48:40,677] INFO Added aliases 'VerifiableSourceConnector' and 'VerifiableSource' to plugin 'org.apache.kafka.connect.tools.VerifiableSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:351)
[2018-05-18 19:48:40,678] INFO Added aliases 'AvroConverter' and 'Avro' to plugin 'io.confluent.connect.avro.AvroConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:351)
[2018-05-18 19:48:40,678] INFO Added aliases 'ByteArrayConverter' and 'ByteArray' to plugin 'org.apache.kafka.connect.converters.ByteArrayConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:351)
[2018-05-18 19:48:40,678] INFO Added aliases 'JsonConverter' and 'Json' to plugin 'org.apache.kafka.connect.json.JsonConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:351)
[2018-05-18 19:48:40,678] INFO Added aliases 'StringConverter' and 'String' to plugin 'org.apache.kafka.connect.storage.StringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:351)
[2018-05-18 19:48:40,678] INFO Added aliases 'ByteArrayConverter' and 'ByteArray' to plugin 'org.apache.kafka.connect.converters.ByteArrayConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:351)
[2018-05-18 19:48:40,678] INFO Added aliases 'JsonConverter' and 'Json' to plugin 'org.apache.kafka.connect.json.JsonConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:351)
[2018-05-18 19:48:40,678] INFO Added alias 'SimpleHeaderConverter' to plugin 'org.apache.kafka.connect.storage.SimpleHeaderConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:348)
[2018-05-18 19:48:40,678] INFO Added aliases 'StringConverter' and 'String' to plugin 'org.apache.kafka.connect.storage.StringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:351)
[2018-05-18 19:48:40,680] INFO Added alias 'RegexRouter' to plugin 'org.apache.kafka.connect.transforms.RegexRouter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:348)
[2018-05-18 19:48:40,680] INFO Added alias 'TimestampRouter' to plugin 'org.apache.kafka.connect.transforms.TimestampRouter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:348)
[2018-05-18 19:48:40,680] INFO Added alias 'ValueToKey' to plugin 'org.apache.kafka.connect.transforms.ValueToKey' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:348)
[2018-05-18 19:48:40,700] INFO DistributedConfig values:
暂无答案!
目前还没有任何答案,快来回答吧!