HDFS 文件观察服务

HDFS File Watcher Service

我正在使用 hdfs 文件观察器服务在我的 flink 流作业中更改配置文件后立即加载它。

观察者服务来源:

我在这里面临的问题是观察者服务正在对整个 hdfs 的变化做出反应,而不仅仅是我正在传递的目录。

我的代码:

public static void main( String[] args ) throws IOException, InterruptedException, MissingEventsException
  {
    HdfsAdmin admin = new HdfsAdmin( URI.create("hdfs://stage.my-org.in:8020/tmp/anurag/"), new Configuration() );
    DFSInotifyEventInputStream eventStream = admin.getInotifyEventStream();
    while( true ) {
      EventBatch events = eventStream.take();
      for( Event event : events.getEvents() ) {
        switch( event.getEventType() ) {
          case CREATE:
            System.out.print( "event type = " + event.getEventType() );
            CreateEvent createEvent = (CreateEvent) event;
            System.out.print( "  path = " + createEvent.getPath() + "\n");
            break;
          default:
            break;
        }
      }
    }
  }

程序输出:

event type = CREATE  path = /tmp/anurag/newFile.txt
event type = CREATE  path = /tmp/newFile2.txt

请帮我解决这个问题,这样我就可以观看作为 URI 传递的特定目录中的文件

感谢期待

注意:如果你尝试运行这个程序,请运行作为hdfs用户,否则你会得到org.apache.hadoop.security.AccessControlException

现在,我使用 Hadoop API 每 30 秒获取一次文件,读取它的修改时间,如果它大于重新加载文件。

InotifyEventStream 无非就是将HDFS 事件日志解析成一个对象,无论你在构造函数中设置哪个目录,它都会将HDFS 中的所有事件发送给你,这也是你需要[=15] 的原因之一=] 与超级组成员的代码。

解决方案是在事件到来时对其进行过滤,只从您想要的目录中获取它们。类似于:

EventBatch events = eventStream.take();
ArrayList<CreateEvent> filteredEvents = new ArrayList();
for( Event event : events.getEvents() ) {
  switch( event.getEventType() ) {
    case CREATE:
      System.out.print( "event type = " + event.getEventType() );
      CreateEvent createEvent = (CreateEvent) event;
      if (createEvent.getPath() == '/your/desired/path') {
        System.out.print( "  path = " + createEvent.getPath() + "\n");
        filteredEvents.add(createEvent);
      }           
      break;
    default:
      break;
  }
}