puthivestreaming处理器

ryhaxcpt  于 2021-06-27  发布在  Hive
关注(0)|答案(2)|浏览(444)

我正在调试一个遵循官方puthivestreaming处理器的hiveprocessor,但它写入Hive2.x而不是3.x。流在nifi集群1.7.1中运行。尽管发生了此异常,但数据仍会写入配置单元。
例外情况是:

java.lang.NullPointerException: null
    at org.apache.hadoop.hive.ql.security.authorization.plugin.AuthorizationMetaStoreFilterHook.getFilteredObjects(AuthorizationMetaStoreFilterHook.java:77)
    at org.apache.hadoop.hive.ql.security.authorization.plugin.AuthorizationMetaStoreFilterHook.filterDatabases(AuthorizationMetaStoreFilterHook.java:54)
    at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getDatabases(HiveMetaStoreClient.java:1147)
    at org.apache.hive.hcatalog.common.HiveClientCache$CacheableHiveMetaStoreClient.isOpen(HiveClientCache.java:471)
    at sun.reflect.GeneratedMethodAccessor1641.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:169)
    at com.sun.proxy.$Proxy308.isOpen(Unknown Source)
    at org.apache.hive.hcatalog.common.HiveClientCache.get(HiveClientCache.java:270)
    at org.apache.hive.hcatalog.common.HCatUtil.getHiveMetastoreClient(HCatUtil.java:558)
    at org.apache.hive.hcatalog.streaming.AbstractRecordWriter.<init>(AbstractRecordWriter.java:95)
    at org.apache.hive.hcatalog.streaming.StrictJsonWriter.<init>(StrictJsonWriter.java:82)
    at org.apache.hive.hcatalog.streaming.StrictJsonWriter.<init>(StrictJsonWriter.java:60)
    at org.apache.nifi.util.hive.HiveWriter.lambda$getRecordWriter$0(HiveWriter.java:91)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
    at org.apache.nifi.util.hive.HiveWriter.getRecordWriter(HiveWriter.java:91)
    at org.apache.nifi.util.hive.HiveWriter.<init>(HiveWriter.java:75)
    at org.apache.nifi.util.hive.HiveUtils.makeHiveWriter(HiveUtils.java:46)
    at org.apache.nifi.processors.hive.PutHive2Streaming.makeHiveWriter(PutHive2Streaming.java:1152)
    at org.apache.nifi.processors.hive.PutHive2Streaming.getOrCreateWriter(PutHive2Streaming.java:1065)
    at org.apache.nifi.processors.hive.PutHive2Streaming.access$1000(PutHive2Streaming.java:114)
    at org.apache.nifi.processors.hive.PutHive2Streaming$1.lambda$process$2(PutHive2Streaming.java:858)
    at org.apache.nifi.processor.util.pattern.ExceptionHandler.execute(ExceptionHandler.java:127)
    at org.apache.nifi.processors.hive.PutHive2Streaming$1.process(PutHive2Streaming.java:855)
    at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2211)
    at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2179)
    at org.apache.nifi.processors.hive.PutHive2Streaming.onTrigger(PutHive2Streaming.java:808)
    at org.apache.nifi.processors.hive.PutHive2Streaming.lambda$onTrigger$4(PutHive2Streaming.java:672)
    at org.apache.nifi.processor.util.pattern.PartialFunctions.onTrigger(PartialFunctions.java:114)
    at org.apache.nifi.processor.util.pattern.RollbackOnFailure.onTrigger(RollbackOnFailure.java:184)
    at org.apache.nifi.processors.hive.PutHive2Streaming.onTrigger(PutHive2Streaming.java:672)
    at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)
    at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)
    at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

我也喜欢重新产生错误。会用吗 TestRunners.newTestRunner(processor); 你能找到它吗?我参考了hive3.x的测试用例https://github.com/apache/nifi/blob/ea9b0db2f620526c8dd0db595cf8b44c3ef835be/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/testputhivestreaming.java
另一种方法是在本地运行hive2.x和nifi容器。但我得跑了 docker cp 通过mvn复制nar包,并从intellij连接远程jvm,如本文所述。https://community.hortonworks.com/articles/106931/nifi-debugging-tutorial.html
有人做过类似的事吗?或者有没有更简单的方法来调试自定义处理器?

8ftvxx2r

8ftvxx2r1#

npe不会在之后出现 hcatalog.hive.client.cache.disabled 设置为真
kafka connect也推荐这种设置。
从Kafka连接文件https://docs.confluent.io/3.0.0/connect/connect-hdfs/docs/hdfs_connector.html
由于连接器任务长时间运行,到配置单元元存储的连接将保持打开状态,直到任务停止。在默认配置单元配置中,重新连接到配置单元元存储将创建一个新连接。当任务数较大时,重试可能会导致打开的连接数超过操作系统中允许的最大连接数。因此,建议在hive.xml中将hcatalog.hive.client.cache.disabled设置为true。
什么时候 Max Concurrent TasksPutHiveStreaming 如果设置的值大于1,则此属性将自动设置为false
nifi的文件也解决了这个问题。
nifi PutHiveStreaming 有一个连接池,因此是多线程的;将hcatalog.hive.client.cache.disabled设置为true将允许每个连接设置为自己的会话,而不依赖缓存。
裁判:https://community.hortonworks.com/content/supportkb/196628/hive-client-puthivestreaming-fails-against-partiti.html

vzgqcmou

vzgqcmou2#

这是一个红鲱鱼错误,在配置单元端出现了一些问题,无法获取自己的ip地址或主机名,因此会定期发出此错误。不过,我不认为这会造成任何实际问题,正如你所说的,数据会被写入hive。
为了完整起见,在apachenifi中,puthivestreaming是针对hive1.2.x构建的,而不是针对hive2.x。目前没有特定的Hive2.x处理器,我们从未确定Hive1.2.x处理器是否可以与Hive2.x兼容。
对于调试,如果您可以在容器中运行hive并公开metastore端口(我相信9083是默认端口),那么您应该能够使用以下方法创建集成测试 TestRunners 从ide本地运行nifi。这就是如何对外部系统(例如mongodb或elasticsearch)执行其他集成测试。
在hive测试套件中有一个minihs2类用于集成测试,但它不在已发布的工件中,因此不幸的是,我们不得不对真实的hive示例运行测试。

相关问题