kafka连接器到hdfs:java.io.filenotfoundexception:文件不存在

sg2wtvxw  于 2021-05-31  发布在  Hadoop
关注(0)|答案(0)|浏览(275)

一切都是通过Ambari,hdp安装的。我把一个样本文件给Kafka了。主题是 testjson . 从filebeat中的csv文件摄取的数据。Kafka成功地接受了这些主题。

/bin/kafka-topics.sh --list --zookeeper localhost:2181

结果:

test
test060920
test1
test12
testjson

从Kafka我想吞下 testjson 到hdfs。

快速启动-hdfs.properties

name=hdfs-sink
connector.class=io.confluent.connect.hdfs3.Hdfs3SinkConnector
tasks.max=1
topics=testjson
hdfs.url=hdfs://x.x.x.x:8020
flush.size=3
confluent.license=
confluent.topic.bootstrap.servers=x.x.x.x:6667

connect-standalone.properties属性

bootstrap.servers=x.x.x.x:6667
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/usr/share/java,/usr/share/confluent-hub-components

运行它

/usr/hdp/3.1.4.0-315/kafka/bin/connect-standalone.sh /etc/kafka/connect-standalone-json.properties /etc/kafka-connect-hdfs/quickstart-hdfs.properties

它返回错误作为

Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /user/root/topics/+tmp/testjson/partition=0/26c82453-4980-40de-a9d4-276aa0f3899e_tmp.avro (inode 22757) [Lease.  Holder: DFSClient_NONMAPREDUCE_496631619_33, pending creates: 1]
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2815)
        at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.analyzeFileState(FSDirWriteFileOp.java:591)
        at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.validateAddBlock(FSDirWriteFileOp.java:171)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2694)
        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:875)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:561)
        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:524)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1025)
        at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:876)
        at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:822)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2682)

        at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1511)
        at org.apache.hadoop.ipc.Client.call(Client.java:1457)
        at org.apache.hadoop.ipc.Client.call(Client.java:1367)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:228)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
        at com.sun.proxy.$Proxy47.addBlock(Unknown Source)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:510)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
        at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
        at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
        at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
        at com.sun.proxy.$Proxy48.addBlock(Unknown Source)
        at org.apache.hadoop.hdfs.DFSOutputStream.addBlock(DFSOutputStream.java:1081)
        ... 3 more
[2020-06-23 10:35:30,240] ERROR WorkerSinkTask{id=hdfs-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.avro.AvroRuntimeException: already open
        at org.apache.avro.file.DataFileWriter.assertNotOpen(DataFileWriter.java:85)
        at org.apache.avro.file.DataFileWriter.setCodec(DataFileWriter.java:93)
        at io.confluent.connect.hdfs3.avro.AvroRecordWriterProvider$1.write(AvroRecordWriterProvider.java:59)
        at io.confluent.connect.hdfs3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:675)
        at io.confluent.connect.hdfs3.TopicPartitionWriter.write(TopicPartitionWriter.java:374)
        at io.confluent.connect.hdfs3.DataWriter.write(DataWriter.java:359)
        at io.confluent.connect.hdfs3.Hdfs3SinkTask.put(Hdfs3SinkTask.java:108)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:564)
        ... 10 more
[2020-06-23 10:35:30,243] ERROR WorkerSinkTask{id=hdfs-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)

问题:

发生什么事了?我注意到了 Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /user/root/topics/+tmp/testjson/partition=0/26c82453-4980-40de-a9d4-276aa0f3899e_tmp.avro (inode 22757) . 但我不明白它能做什么?我以为它是hdfs中的目标,我已经创建了root用户。 sudo -u hdfs hadoop fs -mkdir /user/root 并为root添加权限 sudo -u hdfs hadoop fs -chown root /user/root ###编辑
我退房了 /user/root/topics/+tmp/testjson/partition=0 以及 26c82453-4980-40de-a9d4-276aa0f3899e_tmp.avro 他在那儿。我不知道该怎么办。每次我重新运行它时,都不会显示丢失的不同文件,在我停止它并检查它之后,文件就生成了,并且它就在那里。

编辑2

我退房了

root@ambari:/home/hadoop# hdfs dfs -ls /user/root/topics/+tmp/testjson
Found 1 items
drwxr-xr-x   - root hdfs          0 2020-06-24 06:18 /user/root/topics/+tmp/testjson/partition=0
root@ambari:/home/hadoop# hdfs dfs -ls /user/root/topics/+tmp/testjson/partition=0
Found 2 items
-rw-r--r--   3 root hdfs          0 2020-06-24 06:18 /user/root/topics/+tmp/testjson/partition=0/05cc9305-c370-44f8-8e9d-b311fb284e26_tmp.avro
-rw-r--r--   3 root hdfs          0 2020-06-24 03:26 /user/root/topics/+tmp/testjson/partition=0/26c82453-4980-40de-a9d4-276aa0f3899e_tmp.avro
``` `_tmp.avro` 每次运行时都会生成文件。 `/usr/hdp/3.1.4.0-315/kafka/bin/connect-standalone.sh connect-standalone-json.properties quickstart-hdfs.properties` . 如果得到允许,我决定改变主意 `dfs.permissions.enabled` 设置为false(默认值为true)并重新运行。还是不走运。

## 编辑3

新建错误日志

[2020-06-26 09:57:28,703] WARN The configuration 'config.storage.topic' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig:287)
[2020-06-26 09:57:28,704] WARN The configuration 'status.storage.topic' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig:287)
[2020-06-26 09:57:28,705] WARN The configuration 'config.storage.replication.factor' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig:287)
[2020-06-26 09:57:28,706] WARN The configuration 'offset.flush.interval.ms' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig:287)
[2020-06-26 09:57:28,706] WARN The configuration 'key.converter.schemas.enable' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig:287)
[2020-06-26 09:57:28,706] WARN The configuration 'offset.storage.file.filename' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig:287)
[2020-06-26 09:57:28,706] WARN The configuration 'status.storage.replication.factor' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig:287)
[2020-06-26 09:57:28,706] WARN The configuration 'value.converter.schemas.enable' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig:287)
[2020-06-26 09:57:28,706] WARN The configuration 'offset.storage.replication.factor' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig:287)
[2020-06-26 09:57:28,706] WARN The configuration 'offset.storage.topic' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig:287)
[2020-06-26 09:57:28,706] WARN The configuration 'value.converter' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig:287)
[2020-06-26 09:57:28,707] WARN The configuration 'key.converter' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig:287)
[2020-06-26 09:57:28,707] INFO Kafka version : 2.0.0.3.1.4.0-315 (org.apache.kafka.common.utils.AppInfoParser:109)
[2020-06-26 09:57:28,707] INFO Kafka commitId : 4243d589e2b33433 (org.apache.kafka.common.utils.AppInfoParser:110)
[2020-06-26 09:57:28,712] INFO Cluster ID: CQkRgktxRZmGv_C-Q87ViQ (org.apache.kafka.clients.Metadata:273)
[2020-06-26 09:57:28,733] INFO [Consumer clientId=consumer-3, groupId=connect-cluster] Discovered group coordinator 10.64.2.236:6667 (id: 2147482646 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:677)
[2020-06-26 09:57:28,742] INFO [Consumer clientId=consumer-3, groupId=connect-cluster] Resetting offset for partition connect-configs-0 to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:583)
[2020-06-26 09:57:28,745] INFO Finished reading KafkaBasedLog for topic connect-configs (org.apache.kafka.connect.util.KafkaBasedLog:153)
[2020-06-26 09:57:28,745] INFO Started KafkaBasedLog for topic connect-configs (org.apache.kafka.connect.util.KafkaBasedLog:155)
[2020-06-26 09:57:28,745] INFO Started KafkaConfigBackingStore (org.apache.kafka.connect.storage.KafkaConfigBackingStore:253)
[2020-06-26 09:57:28,745] INFO Herder started (org.apache.kafka.connect.runtime.distributed.DistributedHerder:217)
[2020-06-26 09:57:28,753] INFO Cluster ID: CQkRgktxRZmGv_C-Q87ViQ (org.apache.kafka.clients.Metadata:273)
[2020-06-26 09:57:28,753] INFO [Worker clientId=connect-1, groupId=connect-cluster] Discovered group coordinator 10.64.2.236:6667 (id: 2147482646 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:677)
[2020-06-26 09:57:28,762] INFO [Worker clientId=connect-1, groupId=connect-cluster] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:509)
Jun 26, 2020 9:57:29 AM org.glassfish.jersey.internal.inject.Providers checkProviderRuntime
WARNING: A provider org.apache.kafka.connect.runtime.rest.resources.RootResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider org.apache.kafka.connect.runtime.rest.resources.RootResource will be ignored.
Jun 26, 2020 9:57:29 AM org.glassfish.jersey.internal.inject.Providers checkProviderRuntime
WARNING: A provider org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will be ignored.
Jun 26, 2020 9:57:29 AM org.glassfish.jersey.internal.inject.Providers checkProviderRuntime
WARNING: A provider org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource will be ignored.
Jun 26, 2020 9:57:29 AM org.glassfish.jersey.internal.Errors logErrors
WARNING: The following warnings have been detected: WARNING: The (sub)resource method createConnector in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method listConnectors in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method listConnectorPlugins in org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource contains empty path annotation.
WARNING: The (sub)resource method serverInfo in org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty path annotation.

[2020-06-26 09:57:29,315] INFO Started o.e.j.s.ServletContextHandler@3d6a6bee{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:855)
[2020-06-26 09:57:29,328] INFO Started http_8083@376a312c{HTTP/1.1,[http/1.1]}{0.0.0.0:8083} (org.eclipse.jetty.server.AbstractConnector:292)
[2020-06-26 09:57:29,329] INFO Started @5042ms (org.eclipse.jetty.server.Server:410)
[2020-06-26 09:57:29,329] INFO Advertised URI: http://10.64.2.236:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:267)
[2020-06-26 09:57:29,329] INFO REST server listening at http://10.64.2.236:8083/, advertising URL http://10.64.2.236:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:217)
[2020-06-26 09:57:29,329] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:55)
[2020-06-26 09:57:31,782] INFO [Worker clientId=connect-1, groupId=connect-cluster] Successfully joined group with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:473)
[2020-06-26 09:57:31,783] INFO Joined group and got assignment: Assignment{error=0, leader='connect-1-c3e95584-0853-44d1-b3a9-4c97593f4f2d', leaderUrl='http://10.64.2.236:8083/', offset=-1, connectorIds=[], taskIds=[]} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1216)
[2020-06-26 09:57:31,784] INFO Starting connectors and tasks using config offset -1 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:858)
[2020-06-26 09:57:31,784] INFO Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:868)

正如我建议的那样 `/usr/hdp/3.1.4.0-315/kafka/bin/connect-distributed.sh connect-distributed.properties quickstart-hdfs.properties` ###cconnect-distributed.properties属性

bootstrap.servers=10.64.2.236:6667
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
offset.storage.file.filename=/tmp/connect.offsets
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
offset.flush.interval.ms=10000

我没有在tmp文件中生成任何东西 `root@ambari:/test_stream# hdfs dfs -ls /user/root/topics/+tmp` 任何建议和回应将不胜感激。谢谢您。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题