如何使用kafka connect配置hdfssinkconnector?

xxb16uws  于 2021-05-27  发布在  Hadoop
关注(0)|答案(0)|浏览(260)

我正在试着安装一个hdfssink连接器。这是我的worker.properties配置:

bootstrap.servers=kafkacluster01.corp:9092
group.id=nycd-og-kafkacluster

config.storage.topic=hive_conn_conf
offset.storage.topic=hive_conn_offs
status.storage.topic=hive_conn_stat

key.converter=org.apache.kafka.connect.storage.StringConverter

value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://my-schemaregistry.co:8081

schema.registry.url=http://my-schemaregistry.co:8081

hive.integration=true
hive.metastore.uris=dev-hive-metastore
schema.compatibility=BACKWARD

value.converter.schemas.enable=true
logs.dir = /logs
topics.dir = /topics

plugin.path=/usr/share/java

这是我打电话来设置连接器的post请求

curl -X POST localhost:9092/connectors -H "Content-Type: application/json" -d '{
  "name":"hdfs-hive_sink_con_dom16",
  "config":{
    "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
    "topics": "dom_topic",
    "hdfs.url": "hdfs://hadoop-sql-dev:10000",
    "flush.size": "3",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url":"http://my-schemaregistry.co:8081"
    }
}'

主题 dom_topic 已经存在(是avro),但我从我的工作程序中得到以下错误:

INFO Couldn't start HdfsSinkConnector: (io.confluent.connect.hdfs.HdfsSinkTask:72)
org.apache.kafka.connect.errors.ConnectException: java.io.IOException: 
Failed on local exception: com.google.protobuf.InvalidProtocolBufferException: 
Protocol message end-group tag did not match expected tag.; 
Host Details : local host is: "319dc5d70884/172.17.0.2"; destination host is: "hadoop-sql-dev":10000;
        at io.confluent.connect.hdfs.DataWriter.<init>(DataWriter.java:202)
        at io.confluent.connect.hdfs.HdfsSinkTask.start(HdfsSinkTask.java:64)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:207)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:139)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

我从hive获取的hdfs.url: jdbc:hive2://hadoop-sql-dev:10000 如果我把端口改成9092我就可以

INFO Retrying connect to server: hadoop-sql-dev/xxx.xx.x.xx:9092. Already tried 0 time(s); maxRetries=45 (org.apache.hadoop.ipc.Client:837)

我在docker上运行这一切,我的dockerfile非常简单


# FROM coinsmith/cp-kafka-connect-hdfs

FROM confluentinc/cp-kafka-connect:5.3.1

COPY confluentinc-kafka-connect-hdfs-5.3.1 /usr/share/java/kafka-connect-hdfs
COPY worker.properties worker.properties

# start

ENTRYPOINT ["connect-distributed", "worker.properties"]

任何帮助都将不胜感激。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题