如何在 Java 中为 Apache Spark Streaming 中的文件名模式定义文件过滤器?

How to define a file filter for file name patterns in Apache Spark Streaming in Java?

我正在使用 Apache Spark Streaming 1.2.0 并尝试在创建 InputDStream by invoking the fileStream method. My code is working perfectly fine when I don't use a file filter, e.g. by invoking the other fileStream method (described here) 时为文件名定义文件过滤器。

根据fileStream方法的文档,我可以传

scala.Function1<org.apache.hadoop.fs.Path,Object> filter

但到目前为止,我无法创建 fileFilter。我最初的尝试是

1- 尝试将其实现为:

Function1<Path, Object> fileFilter = new Function1<Path, Object>() {
    @Override
    public Object apply(Path v1) {
      return true;
    }

    @Override
    public <A> Function1<A, Object> compose(Function1<A, Path> g) {
      return Function1$class.compose(this, g);
    }

    @Override
    public <A> Function1<Path, A> andThen(Function1<Object, A> g) {
      return Function1$class.andThen(this, g);
    }
  };

但显然我对andThen的实现是错误的,我不明白我应该如何实现它。它抱怨匿名函数

is not abstract and does not override abstract method <A>andThen$mcVJ$sp(scala.Function1<scala.runtime.BoxedUnit,A>) in scala.Function1

2- 尝试将其实现为:

Function1<Path, Object> fileFilter = new AbstractFunction1<Path, Object>() {
    @Override
    public Object apply(Path v1) {
      return true;
    }
  };

这个可以编译,但是当我 运行 它时我得到一个异常:

2015-02-02 13:42:50 ERROR OneForOneStrategy:66 - myModule
java.io.NotSerializableException: myModule
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject.apply$mcV$sp(DStreamGraph.scala:169)
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985)
    at org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:164)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.streaming.CheckpointWriter.write(Checkpoint.scala:184)
    at org.apache.spark.streaming.scheduler.JobGenerator.doCheckpoint(JobGenerator.scala:263)
    at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:167)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$$anon$$anonfun$receive.applyOrElse(JobGenerator.scala:76)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$$anon.aroundReceive(JobGenerator.scala:74)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

关于如何实现 fileFilter 以便我可以传递它的任何想法 fileStream 方法,这样我就可以使 Spark Streaming 仅处理文件名模式想要?

我必须创建另一个名为 FileFilter.java:

的文件
import org.apache.hadoop.fs.Path;
import scala.runtime.AbstractFunction1;
import java.io.Serializable;

public class FileFilter extends AbstractFunction1<Path, Object> implements Serializable {

  @Override
  public Object apply(Path v1) {
    if ( v1.toString().endsWith((".json")) ) {
      return Boolean.TRUE;
    } else {
      return Boolean.FALSE;
    }
  }
}

然后将其传递给 fileStream 方法,如下所示:

fileStream(inDirectory, new FileFilter(), false, ...)

而且它没有任何问题。