HDFS 文件观察器
HDFS file watcher
我可以在 HDFS 上安装 file watcher 吗?
场景:
文件登陆 HDFS continuously.I 想要在文件数量达到阈值(可以是文件数量或文件大小)后启动 Spark 作业。
是否可以在 HDFS 上实现文件观察器来实现这一点。如果是,那么任何人都可以建议这样做的方法吗?有哪些不同的选择? Zookeeper 或 Oozie 能做到吗?
任何帮助都将是 appreciated.Thanks。
Hadoop 2.6 引入了 DFSInotifyEventInputStream
,您可以为此使用它。您可以从 HdfsAdmin
获取它的实例,然后只需调用 .take()
或 .poll()
即可获取所有事件。事件类型包括 delete、append 和 create,应该涵盖您要查找的内容。
这是一个基本示例。确保你 运行 它是 hdfs
用户,因为管理界面需要 HDFS root。
public static void main( String[] args ) throws IOException, InterruptedException, MissingEventsException
{
HdfsAdmin admin = new HdfsAdmin( URI.create( args[0] ), new Configuration() );
DFSInotifyEventInputStream eventStream = admin.getInotifyEventStream();
while( true ) {
EventBatch events = eventStream.take();
for( Event event : events.getEvents() ) {
System.out.println( "event type = " + event.getEventType() );
switch( event.getEventType() ) {
case CREATE:
CreateEvent createEvent = (CreateEvent) event;
System.out.println( " path = " + createEvent.getPath() );
break;
default:
break;
}
}
}
}
这里有一篇博客 post 更详细地介绍了它:
http://johnjianfang.blogspot.com/2015/03/hdfs-6634-inotify-in-hdfs.html?m=1
Oozie 协调器可以做到这一点。可以根据数据可用性触发 Oozie 协调器操作。编写一个数据触发的协调器。协调器动作是根据完成标志触发的。 done-flag 只是一个空文件。因此,当达到阈值时,将一个空文件写入目录。
旧线程...以防万一,如果有人想在 Scala
中执行此操作
import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hdfs.client.HdfsAdmin
import org.apache.hadoop.hdfs.inotify.Event.{AppendEvent, CreateEvent, RenameEvent}
object HDFSTest extends App {
val admin = new HdfsAdmin( URI.create( "hdfs://namenode:port" ), new Configuration() )
val eventStream = admin.getInotifyEventStream()
while( true ) {
val events = eventStream.poll(2l, java.util.concurrent.TimeUnit.SECONDS)
events.getEvents.toList.foreach { event ⇒
println(s"event type = ${event.getEventType}")
event match {
case create: CreateEvent ⇒
println("CREATE: " + create.getPath)
case rename: RenameEvent ⇒
println("RENAME: " + rename.getSrcPath + " => " + rename.getDstPath)
case append: AppendEvent ⇒
println("APPEND: " + append.getPath)
case other ⇒
println("other: " + other)
}
}
}
}
以防万一,如果有人想使用模拟用户...设置环境变量:HADOOP_USER_NAME=user-name
我可以在 HDFS 上安装 file watcher 吗?
场景: 文件登陆 HDFS continuously.I 想要在文件数量达到阈值(可以是文件数量或文件大小)后启动 Spark 作业。
是否可以在 HDFS 上实现文件观察器来实现这一点。如果是,那么任何人都可以建议这样做的方法吗?有哪些不同的选择? Zookeeper 或 Oozie 能做到吗?
任何帮助都将是 appreciated.Thanks。
Hadoop 2.6 引入了 DFSInotifyEventInputStream
,您可以为此使用它。您可以从 HdfsAdmin
获取它的实例,然后只需调用 .take()
或 .poll()
即可获取所有事件。事件类型包括 delete、append 和 create,应该涵盖您要查找的内容。
这是一个基本示例。确保你 运行 它是 hdfs
用户,因为管理界面需要 HDFS root。
public static void main( String[] args ) throws IOException, InterruptedException, MissingEventsException
{
HdfsAdmin admin = new HdfsAdmin( URI.create( args[0] ), new Configuration() );
DFSInotifyEventInputStream eventStream = admin.getInotifyEventStream();
while( true ) {
EventBatch events = eventStream.take();
for( Event event : events.getEvents() ) {
System.out.println( "event type = " + event.getEventType() );
switch( event.getEventType() ) {
case CREATE:
CreateEvent createEvent = (CreateEvent) event;
System.out.println( " path = " + createEvent.getPath() );
break;
default:
break;
}
}
}
}
这里有一篇博客 post 更详细地介绍了它:
http://johnjianfang.blogspot.com/2015/03/hdfs-6634-inotify-in-hdfs.html?m=1
Oozie 协调器可以做到这一点。可以根据数据可用性触发 Oozie 协调器操作。编写一个数据触发的协调器。协调器动作是根据完成标志触发的。 done-flag 只是一个空文件。因此,当达到阈值时,将一个空文件写入目录。
旧线程...以防万一,如果有人想在 Scala
import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hdfs.client.HdfsAdmin
import org.apache.hadoop.hdfs.inotify.Event.{AppendEvent, CreateEvent, RenameEvent}
object HDFSTest extends App {
val admin = new HdfsAdmin( URI.create( "hdfs://namenode:port" ), new Configuration() )
val eventStream = admin.getInotifyEventStream()
while( true ) {
val events = eventStream.poll(2l, java.util.concurrent.TimeUnit.SECONDS)
events.getEvents.toList.foreach { event ⇒
println(s"event type = ${event.getEventType}")
event match {
case create: CreateEvent ⇒
println("CREATE: " + create.getPath)
case rename: RenameEvent ⇒
println("RENAME: " + rename.getSrcPath + " => " + rename.getDstPath)
case append: AppendEvent ⇒
println("APPEND: " + append.getPath)
case other ⇒
println("other: " + other)
}
}
}
}
以防万一,如果有人想使用模拟用户...设置环境变量:HADOOP_USER_NAME=user-name