Samza 无法发送消息。例外

Samza Failed to send message. Exception

我在 aws emr 实例上使用 samza,我总是遇到如下异常,有人可以帮助我吗?:

org.apache.samza.SamzaException: 发送消息失败。例外: java.lang.IllegalStateException: 生产者关闭后无法发送。 在 org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$发送$5.apply(KafkaSystemProducer.scala:120) 在 org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$发送$5.apply(KafkaSystemProducer.scala:111) 在 org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:81) 在 org.apache.samza.system.kafka.KafkaSystemProducer. 发送(KafkaSystemProducer.scala:86) 在 org.apache.samza.system.SystemProducers.send(SystemProducers.scala:87) 在 org.apache.samza.task.TaskInstanceCollector.send(TaskInstanceCollector.scala:61) 在 org.apache.samza.storage.kv.LoggedStore.putAll(LoggedStore.scala:72) 在 org.apache.samza.storage.kv.SerializedKeyValueStore.putAll(SerializedKeyValueStore.scala:57) 在 org.apache.samza.storage.kv.CachedStore.flush(CachedStore.scala:166) 在 org.apache.samza.storage.kv.NullSafeKeyValueStore.flush(NullSafeKeyValueStore.scala:69) 在 org.apache.samza.storage.kv.KeyValueStorageEngine.flush(KeyValueStorageEngine.scala:113) 在 org.apache.samza.storage.kv.KeyValueStorageEngine.close(KeyValueStorageEngine.scala:125) 在 org.apache.samza.storage.kv.KeyValueStorageEngine.stop(KeyValueStorageEngine.scala:119) 在 org.apache.samza.storage.TaskStorageManager$$anonfun$stop$2.apply(TaskStorageManager.scala:147) 在 org.apache.samza.storage.TaskStorageManager$$anonfun$stop$2.apply(TaskStorageManager.scala:147) 在 scala.collection.Iterator$class.foreach(Iterator.scala:727) 在 scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 在 scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) 在 org.apache.samza.storage.TaskStorageManager.stop(TaskStorageManager.scala:147) 在 org.apache.samza.container.TaskInstance.shutdownStores(TaskInstance.scala:185) 在 org.apache.samza.container.SamzaContainer$$anonfun$shutdownStores$2.apply(SamzaContainer.scala:650) 在 org.apache.samza.container.SamzaContainer$$anonfun$shutdownStores$2.apply(SamzaContainer.scala:650) 在 scala.collection.Iterator$class.foreach(Iterator.scala:727) 在 scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 在 scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) 在 org.apache.samza.container.SamzaContainer.shutdownStores(SamzaContainer.scala:650) 在 org.apache.samza.container.SamzaContainer.运行(SamzaContainer.scala:560) 在 org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:93) 在 org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:67) 在 org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)

我在错误的文件中错过了真正的异常。