Akka-Stream 实现比单线程实现慢

Akka-Stream implementation slower than single threaded implementation

2015-10-30 更新


基于 Roland Kuhn Awnser:

Akka Streams is using asynchronous message passing between Actors to implement stream processing stages. Passing data across an asynchronous boundary has an overhead that you are seeing here: your computation seems to take only about 160ns (derived from the single-threaded measurement) while the streaming solution takes roughly 1µs per element, which is dominated by the message passing.

Another misconception is that saying “stream” implies parallelism: in your code all computation runs sequentially in a single Actor (the map stage), so no benefit can be expected over the primitive single-threaded solution.

In order to benefit from the parallelism afforded by Akka Streams you need to have multiple processing stages that each perform tasks of

1µs per element, see also the docs.

我做了一些改动。我的代码现在看起来像:

object MultiThread {
  implicit val actorSystem = ActorSystem("Sys")
  implicit val materializer = ActorMaterializer()

  var counter = 0
  var oldProgess = 0

  //RunnableFlow: in -> flow -> sink
  val in = Source(() => Iterator.continually((1254785478l, "name", 48, 23.09f)))

  val flow = Flow[(Long, String, Int, Float)].map(p => SharedFunctions.transform2(SharedFunctions.transform(p)))

  val tupleToEvent = Flow[(Long, String, Int, Float)].map(SharedFunctions.transform)

  val eventToFactorial = Flow[Event].map(SharedFunctions.transform2)

  val eventChef: Flow[(Long, String, Int, Float), Int, Unit] = Flow() { implicit builder =>
    import FlowGraph.Implicits._

    val dispatchTuple = builder.add(Balance[(Long, String, Int, Float)](4))
    val mergeEvents = builder.add(Merge[Int](4))

    dispatchTuple.out(0) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(0)
    dispatchTuple.out(1) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(1)
    dispatchTuple.out(2) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(2)
    dispatchTuple.out(3) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(3)

    (dispatchTuple.in, mergeEvents.out)
  }

  val sink = Sink.foreach[Int]{
    v => counter += 1
    oldProgess = SharedFunctions.printProgress(oldProgess, SharedFunctions.maxEventCount, counter,
    DateTime.now.getMillis - SharedFunctions.startTime.getMillis)
    if(counter == SharedFunctions.maxEventCount) endAkka()
  }

  def endAkka() = {
    val duration = new Duration(SharedFunctions.startTime, DateTime.now)
    println("Time: " + duration.getMillis + " || Data: " + counter)
    actorSystem.shutdown
    actorSystem.awaitTermination
    System.exit(-1)
  }

  def main(args: Array[String]) {
    println("MultiThread started: " + SharedFunctions.startTime)
    in.via(flow).runWith(sink)
   // in.via(eventChef).runWith(sink)
  }

}

我不确定我是否完全错了,但我使用 akka-streams 的实现仍然慢得多(现在比以前更慢)但我发现:如果我增加工作量,例如通过做一些划分 akka-streams 的实现变得更快。因此,如果我做对了(否则请纠正我),我的示例中的开销似乎太多了。因此,如果代码必须做繁重的工作,您只能从 akka-streams 中受益吗?




我在 scala 和 akka-stream 方面都比较陌生。我写了一个小测试项目,它会创建一些事件,直到计数器达到特定数字。对于每个事件,正在计算事件的一个字段的阶乘。我实施了两次。一次使用 akka-stream,一次不使用 akka-stream(单线程)并比较运行时间。

没想到:当我创建一个事件时,两个程序的运行时间几乎相同。但是,如果我创建 70,000,000 个事件,则没有 akka-streams 的实现会快得多。这是我的结果(以下数据基于24次测量):



所以我的问题是:这是怎么回事?为什么我使用 akka-stream 的实现速度较慢?

这里是我的代码:

使用 Akka 实现

object MultiThread {
  implicit val actorSystem = ActorSystem("Sys")
  implicit val materializer = ActorMaterializer()

  var counter = 0
  var oldProgess = 0

  //RunnableFlow: in -> flow -> sink
  val in = Source(() => Iterator.continually((1254785478l, "name", 48, 23.09f)))

  val flow = Flow[(Long, String, Int, Float)].map(p => SharedFunctions.transform2(SharedFunctions.transform(p)))

  val sink = Sink.foreach[Int]{
    v => counter += 1
    oldProgess = SharedFunctions.printProgress(oldProgess, SharedFunctions.maxEventCount, counter,
    DateTime.now.getMillis - SharedFunctions.startTime.getMillis)
    if(counter == SharedFunctions.maxEventCount) endAkka()
  }

  def endAkka() = {
    val duration = new Duration(SharedFunctions.startTime, DateTime.now)
    println("Time: " + duration.getMillis + " || Data: " + counter)
    actorSystem.shutdown
    actorSystem.awaitTermination
    System.exit(-1)
  }

  def main(args: Array[String]) {
    import scala.concurrent.ExecutionContext.Implicits.global
    println("MultiThread started: " + SharedFunctions.startTime)
    in.via(flow).runWith(sink).onComplete(_ => endAkka())
  }

}

没有 Akka 的实现

对象单线程{

  def main(args: Array[String]) {
    println("SingleThread started at: " + SharedFunctions.startTime)
    println("0%")
    val i = createEvent(0)
    val duration = new Duration(SharedFunctions.startTime, DateTime.now());
    println("Time: " + duration.getMillis + " || Data: " + i)
  }

  def createEventWorker(oldProgress: Int, count: Int, randDate: Long, name: String, age: Int, myFloat: Float): Int = {
    if (count == SharedFunctions.maxEventCount) count
    else {
      val e = SharedFunctions.transform((randDate, name, age, myFloat))
      SharedFunctions.transform2(e)
      val p = SharedFunctions.printProgress(oldProgress, SharedFunctions.maxEventCount, count,
        DateTime.now.getMillis - SharedFunctions.startTime.getMillis)
      createEventWorker(p, count + 1, 1254785478l, "name", 48, 23.09f)
    }
  }

  def createEvent(count: Int): Int = {
    createEventWorker(0, count, 1254785478l, "name", 48, 23.09f)
  }
}

SharedFunctions

object SharedFunctions {
  val maxEventCount = 70000000
  val startTime = DateTime.now

  def transform(t : (Long, String, Int, Float)) : Event = new Event(t._1 ,t._2,t._3,t._4)
  def transform2(e : Event) : Int = factorial(e.getAgeYrs)

  def calculatePercentage(totalValue: Long, currentValue: Long) = Math.round((currentValue * 100) / totalValue)
  def printProgress(oldProgress : Int, fileSize: Long, currentSize: Int, t: Long) = {
    val cProgress = calculatePercentage(fileSize, currentSize)
    if (oldProgress != cProgress) println(s"$oldProgress% | $t ms")
    cProgress
  }

  private def factorialWorker(n1: Int, n2: Int): Int = {
    if (n1 == 0) n2
    else factorialWorker(n1 -1, n2*n1)
  }
  def factorial (n : Int): Int = {
    factorialWorker(n, 1)
  }
}

实施事件

/**
 * Autogenerated by Avro
 * 
 * DO NOT EDIT DIRECTLY
 */

@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class Event extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Event\",\"namespace\":\"week2P2\",\"fields\":[{\"name\":\"timestampMS\",\"type\":\"long\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"ageYrs\",\"type\":\"int\"},{\"name\":\"sizeCm\",\"type\":\"float\"}]}");
  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
  @Deprecated public long timestampMS;
  @Deprecated public CharSequence name;
  @Deprecated public int ageYrs;
  @Deprecated public float sizeCm;

  /**
   * Default constructor.  Note that this does not initialize fields
   * to their default values from the schema.  If that is desired then
   * one should use <code>newBuilder()</code>. 
   */
  public Event() {}

  /**
   * All-args constructor.
   */
  public Event(Long timestampMS, CharSequence name, Integer ageYrs, Float sizeCm) {
    this.timestampMS = timestampMS;
    this.name = name;
    this.ageYrs = ageYrs;
    this.sizeCm = sizeCm;
  }

  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
  // Used by DatumWriter.  Applications should not call. 
  public Object get(int field$) {
    switch (field$) {
    case 0: return timestampMS;
    case 1: return name;
    case 2: return ageYrs;
    case 3: return sizeCm;
    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
    }
  }
  // Used by DatumReader.  Applications should not call. 
  @SuppressWarnings(value="unchecked")
  public void put(int field$, Object value$) {
    switch (field$) {
    case 0: timestampMS = (Long)value$; break;
    case 1: name = (CharSequence)value$; break;
    case 2: ageYrs = (Integer)value$; break;
    case 3: sizeCm = (Float)value$; break;
    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
    }
  }

  /**
   * Gets the value of the 'timestampMS' field.
   */
  public Long getTimestampMS() {
    return timestampMS;
  }

  /**
   * Sets the value of the 'timestampMS' field.
   * @param value the value to set.
   */
  public void setTimestampMS(Long value) {
    this.timestampMS = value;
  }

  /**
   * Gets the value of the 'name' field.
   */
  public CharSequence getName() {
    return name;
  }

  /**
   * Sets the value of the 'name' field.
   * @param value the value to set.
   */
  public void setName(CharSequence value) {
    this.name = value;
  }

  /**
   * Gets the value of the 'ageYrs' field.
   */
  public Integer getAgeYrs() {
    return ageYrs;
  }

  /**
   * Sets the value of the 'ageYrs' field.
   * @param value the value to set.
   */
  public void setAgeYrs(Integer value) {
    this.ageYrs = value;
  }

  /**
   * Gets the value of the 'sizeCm' field.
   */
  public Float getSizeCm() {
    return sizeCm;
  }

  /**
   * Sets the value of the 'sizeCm' field.
   * @param value the value to set.
   */
  public void setSizeCm(Float value) {
    this.sizeCm = value;
  }

  /** Creates a new Event RecordBuilder */
  public static Event.Builder newBuilder() {
    return new Event.Builder();
  }

  /** Creates a new Event RecordBuilder by copying an existing Builder */
  public static Event.Builder newBuilder(Event.Builder other) {
    return new Event.Builder(other);
  }

  /** Creates a new Event RecordBuilder by copying an existing Event instance */
  public static Event.Builder newBuilder(Event other) {
    return new Event.Builder(other);
  }

  /**
   * RecordBuilder for Event instances.
   */
  public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<Event>
    implements org.apache.avro.data.RecordBuilder<Event> {

    private long timestampMS;
    private CharSequence name;
    private int ageYrs;
    private float sizeCm;

    /** Creates a new Builder */
    private Builder() {
      super(Event.SCHEMA$);
    }

    /** Creates a Builder by copying an existing Builder */
    private Builder(Event.Builder other) {
      super(other);
      if (isValidValue(fields()[0], other.timestampMS)) {
        this.timestampMS = data().deepCopy(fields()[0].schema(), other.timestampMS);
        fieldSetFlags()[0] = true;
      }
      if (isValidValue(fields()[1], other.name)) {
        this.name = data().deepCopy(fields()[1].schema(), other.name);
        fieldSetFlags()[1] = true;
      }
      if (isValidValue(fields()[2], other.ageYrs)) {
        this.ageYrs = data().deepCopy(fields()[2].schema(), other.ageYrs);
        fieldSetFlags()[2] = true;
      }
      if (isValidValue(fields()[3], other.sizeCm)) {
        this.sizeCm = data().deepCopy(fields()[3].schema(), other.sizeCm);
        fieldSetFlags()[3] = true;
      }
    }

    /** Creates a Builder by copying an existing Event instance */
    private Builder(Event other) {
            super(Event.SCHEMA$);
      if (isValidValue(fields()[0], other.timestampMS)) {
        this.timestampMS = data().deepCopy(fields()[0].schema(), other.timestampMS);
        fieldSetFlags()[0] = true;
      }
      if (isValidValue(fields()[1], other.name)) {
        this.name = data().deepCopy(fields()[1].schema(), other.name);
        fieldSetFlags()[1] = true;
      }
      if (isValidValue(fields()[2], other.ageYrs)) {
        this.ageYrs = data().deepCopy(fields()[2].schema(), other.ageYrs);
        fieldSetFlags()[2] = true;
      }
      if (isValidValue(fields()[3], other.sizeCm)) {
        this.sizeCm = data().deepCopy(fields()[3].schema(), other.sizeCm);
        fieldSetFlags()[3] = true;
      }
    }

    /** Gets the value of the 'timestampMS' field */
    public Long getTimestampMS() {
      return timestampMS;
    }

    /** Sets the value of the 'timestampMS' field */
    public Event.Builder setTimestampMS(long value) {
      validate(fields()[0], value);
      this.timestampMS = value;
      fieldSetFlags()[0] = true;
      return this; 
    }

    /** Checks whether the 'timestampMS' field has been set */
    public boolean hasTimestampMS() {
      return fieldSetFlags()[0];
    }

    /** Clears the value of the 'timestampMS' field */
    public Event.Builder clearTimestampMS() {
      fieldSetFlags()[0] = false;
      return this;
    }

    /** Gets the value of the 'name' field */
    public CharSequence getName() {
      return name;
    }

    /** Sets the value of the 'name' field */
    public Event.Builder setName(CharSequence value) {
      validate(fields()[1], value);
      this.name = value;
      fieldSetFlags()[1] = true;
      return this; 
    }

    /** Checks whether the 'name' field has been set */
    public boolean hasName() {
      return fieldSetFlags()[1];
    }

    /** Clears the value of the 'name' field */
    public Event.Builder clearName() {
      name = null;
      fieldSetFlags()[1] = false;
      return this;
    }

    /** Gets the value of the 'ageYrs' field */
    public Integer getAgeYrs() {
      return ageYrs;
    }

    /** Sets the value of the 'ageYrs' field */
    public Event.Builder setAgeYrs(int value) {
      validate(fields()[2], value);
      this.ageYrs = value;
      fieldSetFlags()[2] = true;
      return this; 
    }

    /** Checks whether the 'ageYrs' field has been set */
    public boolean hasAgeYrs() {
      return fieldSetFlags()[2];
    }

    /** Clears the value of the 'ageYrs' field */
    public Event.Builder clearAgeYrs() {
      fieldSetFlags()[2] = false;
      return this;
    }

    /** Gets the value of the 'sizeCm' field */
    public Float getSizeCm() {
      return sizeCm;
    }

    /** Sets the value of the 'sizeCm' field */
    public Event.Builder setSizeCm(float value) {
      validate(fields()[3], value);
      this.sizeCm = value;
      fieldSetFlags()[3] = true;
      return this; 
    }

    /** Checks whether the 'sizeCm' field has been set */
    public boolean hasSizeCm() {
      return fieldSetFlags()[3];
    }

    /** Clears the value of the 'sizeCm' field */
    public Event.Builder clearSizeCm() {
      fieldSetFlags()[3] = false;
      return this;
    }

    @Override
    public Event build() {
      try {
        Event record = new Event();
        record.timestampMS = fieldSetFlags()[0] ? this.timestampMS : (Long) defaultValue(fields()[0]);
        record.name = fieldSetFlags()[1] ? this.name : (CharSequence) defaultValue(fields()[1]);
        record.ageYrs = fieldSetFlags()[2] ? this.ageYrs : (Integer) defaultValue(fields()[2]);
        record.sizeCm = fieldSetFlags()[3] ? this.sizeCm : (Float) defaultValue(fields()[3]);
        return record;
      } catch (Exception e) {
        throw new org.apache.avro.AvroRuntimeException(e);
      }
    }
  }
}

Akka Streams 使用 Actors 之间的异步消息传递来实现流处理阶段。跨异步边界传递数据具有您在此处看到的开销:您的计算似乎只需要大约 160ns(从单线程测量得出),而流解决方案每个元素大约需要 1µs,这主要由消息传递决定。

另一个误解是说“流”意味着并行:在您的代码中,所有计算都在单个 Actor 中按顺序运行(map 阶段),因此与原始的单线程解决方案相比没有任何好处.

为了从 Akka Streams 提供的并行性中获益,您需要有多个处理阶段,每个阶段执行每个元素 >1µs 的任务,另请参见 the docs

除了我完全同意Roland的解释外,应该明白akka Streams不仅仅是一个并发编程框架。流还提供背压,这意味着只有 Source 需要在 Sink 中处理事件时才会生成事件。这种需求交流会在每个处理步骤中增加一些开销。

所以你的单线程和多线程比较不是"apples-to-apples"。

如果您想要原始的多线程执行性能,那么 Futures/Actors 是更好的选择。