在安全集群中使用 Nifi
Using Nifi in a secured cluster
我正在尝试使用 spark streaming 从安全集群中的 Nifi 读取数据。
我通过在 SiteToSiteClient 中添加 SSLContext 使用 SSLContext 进行身份验证,但 SSLContext 不可序列化。
我的代码如下所示:
def main(args: Array[String]) {
val pKeyFile = new java.io.File("/path/to/file-cert.pfx")
val pKeyPassword = "password"
val keyStore = java.security.KeyStore.getInstance("JKS")
val kmf = javax.net.ssl.KeyManagerFactory.getInstance(javax.net.ssl.KeyManagerFactory.getDefaultAlgorithm())
val keyInput = new java.io.FileInputStream(pKeyFile)
keyStore.load(keyInput, pKeyPassword.toCharArray())
keyInput.close()
kmf.init(keyStore, pKeyPassword.toCharArray())
val sslContext = javax.net.ssl.SSLContext.getInstance("SSL")
sslContext.init(kmf.getKeyManagers(), null, new java.security.SecureRandom())
val conf = new SiteToSiteClient
.Builder()
.sslContext(sslContext)
.url("https://urlOfNifi:9090/nifi/")
.portName("Spark_Test")
.buildConfig()
val config = new SparkConf().setAppName("Nifi_Spark_Data")
val sc = new SparkContext(config)
val ssc = new StreamingContext(sc, Seconds(10))
val lines = ssc.receiverStream(new NiFiReceiver(conf, StorageLevel.MEMORY_ONLY))
val text = lines.map(dataPacket => new String(dataPacket.getContent, StandardCharsets.UTF_8))
text.print()
ssc.start()
ssc.awaitTermination()
}
}
我想做的是从 nifi 获取流数据,但是当我启动我的 spark 流应用程序时,出现以下错误:
Exception during serialization: java.io.NotSerializableException: javax.net.ssl.SSLContext
Serialization stack:
- object not serializable (class: javax.net.ssl.SSLContext, value: javax.net.ssl.SSLContext@2181e104)
- field (class: org.apache.nifi.remote.client.SiteToSiteClient$StandardSiteToSiteClientConfig, name: sslContext, type: class javax.net.ssl.SSLContext)
- object (class org.apache.nifi.remote.client.SiteToSiteClient$StandardSiteToSiteClientConfig, org.apache.nifi.remote.client.SiteToSiteClient$StandardSiteToSiteClientConfig@5a0d6057)
- field (class: org.apache.nifi.spark.NiFiReceiver, name: clientConfig, type: interface org.apache.nifi.remote.client.SiteToSiteClientConfig)
- object (class org.apache.nifi.spark.NiFiReceiver, org.apache.nifi.spark.NiFiReceiver@224fb09a)
- element of array (index: 0)
- array (class [Lorg.apache.spark.streaming.receiver.Receiver;, size 1)
- field (class: scala.collection.mutable.WrappedArray$ofRef, name: array, type: class [Ljava.lang.Object;)
- object (class scala.collection.mutable.WrappedArray$ofRef, WrappedArray(org.apache.nifi.spark.NiFiReceiver@224fb09a))
- writeObject data (class: org.apache.spark.rdd.ParallelCollectionPartition)
- object (class org.apache.spark.rdd.ParallelCollectionPartition, org.apache.spark.rdd.ParallelCollectionPartition@87d)
- field (class: org.apache.spark.scheduler.ResultTask, name: partition, type: interface org.apache.spark.Partition)
- object (class org.apache.spark.scheduler.ResultTask, ResultTask(12, 0))
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1271)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1270)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
好像SiteToSiteClientCofig可以序列化,而里面的SSLContext不行。在 sparkStreaming 中,将在其他节点中使用的对象应该是可序列化的,但我找不到使 SSLContext 可序列化的方法。有什么方法可以运行 spark streaming 在安全集群中接收 Nifi 流数据吗?
提前致谢。
您应该能够在 SiteToSiteClient.Builder 上调用以下方法,而不是提前创建 SSLContext:
keystoreFilename(...)
keystorePass(...)
keystoreType(...)
truststoreFilename(...)
truststorePass(...)
truststoreType(...)
通过这样做,NiFiReceiver 将在序列化之后从 SiteToSiteClientConfig 构建 SiteToSiteClient 时创建一个 SSLContext。
请注意,这可能需要将 keystore/truststore 放在您的 Spark Streaming 作业 运行 所在的所有节点上,并且在每个节点上的相同位置。
我正在尝试使用 spark streaming 从安全集群中的 Nifi 读取数据。
我通过在 SiteToSiteClient 中添加 SSLContext 使用 SSLContext 进行身份验证,但 SSLContext 不可序列化。
我的代码如下所示:
def main(args: Array[String]) {
val pKeyFile = new java.io.File("/path/to/file-cert.pfx")
val pKeyPassword = "password"
val keyStore = java.security.KeyStore.getInstance("JKS")
val kmf = javax.net.ssl.KeyManagerFactory.getInstance(javax.net.ssl.KeyManagerFactory.getDefaultAlgorithm())
val keyInput = new java.io.FileInputStream(pKeyFile)
keyStore.load(keyInput, pKeyPassword.toCharArray())
keyInput.close()
kmf.init(keyStore, pKeyPassword.toCharArray())
val sslContext = javax.net.ssl.SSLContext.getInstance("SSL")
sslContext.init(kmf.getKeyManagers(), null, new java.security.SecureRandom())
val conf = new SiteToSiteClient
.Builder()
.sslContext(sslContext)
.url("https://urlOfNifi:9090/nifi/")
.portName("Spark_Test")
.buildConfig()
val config = new SparkConf().setAppName("Nifi_Spark_Data")
val sc = new SparkContext(config)
val ssc = new StreamingContext(sc, Seconds(10))
val lines = ssc.receiverStream(new NiFiReceiver(conf, StorageLevel.MEMORY_ONLY))
val text = lines.map(dataPacket => new String(dataPacket.getContent, StandardCharsets.UTF_8))
text.print()
ssc.start()
ssc.awaitTermination()
}
}
我想做的是从 nifi 获取流数据,但是当我启动我的 spark 流应用程序时,出现以下错误:
Exception during serialization: java.io.NotSerializableException: javax.net.ssl.SSLContext
Serialization stack:
- object not serializable (class: javax.net.ssl.SSLContext, value: javax.net.ssl.SSLContext@2181e104)
- field (class: org.apache.nifi.remote.client.SiteToSiteClient$StandardSiteToSiteClientConfig, name: sslContext, type: class javax.net.ssl.SSLContext)
- object (class org.apache.nifi.remote.client.SiteToSiteClient$StandardSiteToSiteClientConfig, org.apache.nifi.remote.client.SiteToSiteClient$StandardSiteToSiteClientConfig@5a0d6057)
- field (class: org.apache.nifi.spark.NiFiReceiver, name: clientConfig, type: interface org.apache.nifi.remote.client.SiteToSiteClientConfig)
- object (class org.apache.nifi.spark.NiFiReceiver, org.apache.nifi.spark.NiFiReceiver@224fb09a)
- element of array (index: 0)
- array (class [Lorg.apache.spark.streaming.receiver.Receiver;, size 1)
- field (class: scala.collection.mutable.WrappedArray$ofRef, name: array, type: class [Ljava.lang.Object;)
- object (class scala.collection.mutable.WrappedArray$ofRef, WrappedArray(org.apache.nifi.spark.NiFiReceiver@224fb09a))
- writeObject data (class: org.apache.spark.rdd.ParallelCollectionPartition)
- object (class org.apache.spark.rdd.ParallelCollectionPartition, org.apache.spark.rdd.ParallelCollectionPartition@87d)
- field (class: org.apache.spark.scheduler.ResultTask, name: partition, type: interface org.apache.spark.Partition)
- object (class org.apache.spark.scheduler.ResultTask, ResultTask(12, 0))
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1271)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1270)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
好像SiteToSiteClientCofig可以序列化,而里面的SSLContext不行。在 sparkStreaming 中,将在其他节点中使用的对象应该是可序列化的,但我找不到使 SSLContext 可序列化的方法。有什么方法可以运行 spark streaming 在安全集群中接收 Nifi 流数据吗?
提前致谢。
您应该能够在 SiteToSiteClient.Builder 上调用以下方法,而不是提前创建 SSLContext:
keystoreFilename(...)
keystorePass(...)
keystoreType(...)
truststoreFilename(...)
truststorePass(...)
truststoreType(...)
通过这样做,NiFiReceiver 将在序列化之后从 SiteToSiteClientConfig 构建 SiteToSiteClient 时创建一个 SSLContext。
请注意,这可能需要将 keystore/truststore 放在您的 Spark Streaming 作业 运行 所在的所有节点上,并且在每个节点上的相同位置。