我们正在测试分布式模式下的kafka连接,以将主题记录从kafka拉到hdfs。我们有两个箱子。Kafka和zookeeper守护进程运行的一个。我们在这个盒子里保存了一个Kafka连接的示例。我们还有一个框,其中有hdfs namenode。我们在这里保留了另一个Kafka连接的示例。
我们开始Kafka,Zookeeper和Kafka连接在第一个盒子。我们在第二个盒子里也开始了Kafka连接。现在根据confluent文档,我们必须使用RESTAPI启动hdfs连接器(或任何其他连接器)。所以,在这两个框中启动kafka connect之后,我们尝试通过restapi启动连接器。我们在下面试过了command:-
curl -X POST -H "HTTP/1.1 Host: ip-10-16-34-57.ec2.internal:9092 Content-Type: application/json Accept: application/json" --data '{"name": "hdfs-sink", "config": {"connector.class":"io.confluent.connect.hdfs.HdfsSinkConnector", "format.class":"com.qubole.streamx.SourceFormat", "tasks.max":"1", "hdfs.url":"hdfs://ip-10-16-37-124:9000", "topics":"Prd_IN_TripAnalysis,Prd_IN_Alerts,Prd_IN_GeneralEvents", "partitioner.class":"io.confluent.connect.hdfs.partitioner.DailyPartitioner", "locale":"", "timezone":"Asia/Calcutta" }}' http://ip-10-16-34-57.ec2.internal:8083/connectors
我们一按这里的回车键,就会得到以下响应:
<html>
<head>
<meta http-equiv="Content-Type" content="text/html;charset=ISO-8859-1"/>
<title>Error 415 </title>
</head>
<body>
<h2>HTTP ERROR: 415</h2>
<p>Problem accessing /connectors. Reason:
<pre> Unsupported Media Type</pre></p>
<hr /><i><small>Powered by Jetty://</small></i>
</body>
</html>
etc/kafka/中的connect-distributed.properties文件位于下面两个kafka connect节点中。我们还创建了上述三个主题(连接偏移量、连接配置、连接状态)
bootstrap.servers=ip-10-16-34-57.ec2.internal:9092
group.id=connect-cluster
key.converter=com.qubole.streamx.ByteArrayConverter
value.converter=com.qubole.streamx.ByteArrayConverter
enable.auto.commit=true
auto.commit.interval.ms=1000
offset.flush.interval.ms=1000
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
rest.port=8083
config.storage.topic=connect-configs
status.storage.topic=connect-status
offset.flush.interval.ms=10000
这里有什么问题?我们缺少一些东西来启动kafka connect在分布式模式下使用hdfs连接器。独立模式下的Kafka连接工作正常。
1条答案
按热度按时间vqlkdk9b1#
要上载连接器,这是put命令,而不是post:http://docs.confluent.io/3.1.1/connect/restapi.html#put--连接器-(字符串名称)-配置
另一方面,我认为您的curl命令可能是错误的:
每个头需要一个-h开关,将所有头放在一个-h参数中不是它的工作方式(我认为)。
我不认为端口是主机头的一部分。