hdfs文件观察程序服务

ykejflvf  于 2021-05-31  发布在  Hadoop
关注(0)|答案(2)|浏览(322)

我正在使用hdfs文件观察服务加载配置文件,一旦它在我的flink流作业中被更改。
watcher服务的源代码:hdfs file watcher
我在这里面临的问题是,watcher服务对整个hdfs中的更改做出React,而不仅仅是我传递的目录。
我的代码:

public static void main( String[] args ) throws IOException, InterruptedException, MissingEventsException
  {
    HdfsAdmin admin = new HdfsAdmin( URI.create("hdfs://stage.my-org.in:8020/tmp/anurag/"), new Configuration() );
    DFSInotifyEventInputStream eventStream = admin.getInotifyEventStream();
    while( true ) {
      EventBatch events = eventStream.take();
      for( Event event : events.getEvents() ) {
        switch( event.getEventType() ) {
          case CREATE:
            System.out.print( "event type = " + event.getEventType() );
            CreateEvent createEvent = (CreateEvent) event;
            System.out.print( "  path = " + createEvent.getPath() + "\n");
            break;
          default:
            break;
        }
      }
    }
  }

程序输出:

event type = CREATE  path = /tmp/anurag/newFile.txt
event type = CREATE  path = /tmp/newFile2.txt

请帮助我解决这个问题,这样我就可以在特定的目录中观看作为uri传递的文件
谢谢期待
注意:如果您尝试运行此程序,请以hdfs用户身份运行,否则您将得到org.apache.hadoop.security.accesscontrolexception

uqxowvwt

uqxowvwt1#

现在,我使用hadoopapi每30秒获取一次文件,读取它的修改时间,如果它大于再次加载文件。

xriantvc

xriantvc2#

inotifyeventstream只不过是解析到对象中的hdfs事件日志,它会将hdfs中的所有事件发送给您,无论您在构造函数中设置的是哪个目录,这就是您需要使用超组成员运行该代码的原因之一。
解决方案是在事件发生时对其进行过滤,只从您想要的目录中获取那些事件。比如:

EventBatch events = eventStream.take();
ArrayList<CreateEvent> filteredEvents = new ArrayList();
for( Event event : events.getEvents() ) {
  switch( event.getEventType() ) {
    case CREATE:
      System.out.print( "event type = " + event.getEventType() );
      CreateEvent createEvent = (CreateEvent) event;
      if (createEvent.getPath() == '/your/desired/path') {
        System.out.print( "  path = " + createEvent.getPath() + "\n");
        filteredEvents.add(createEvent);
      }           
      break;
    default:
      break;
  }
}

相关问题