Spark Streaming - 传递参数的问题
Spark Streaming - Issue with Passing parameters
请看下面用scala写的spark streaming代码:
object HBase {
var hbaseTable = ""
val hConf = new HBaseConfiguration()
hConf.set("hbase.zookeeper.quorum", "zookeeperhost")
def init(input: (String)) {
hbaseTable = input
}
def display() {
print(hbaseTable)
}
def insertHbase(row: (String)) {
val hTable = new HTable(hConf,hbaseTable)
}
}
object mainHbase {
def main(args : Array[String]) {
if (args.length < 5) {
System.err.println("Usage: MetricAggregatorHBase <zkQuorum> <group> <topics> <numThreads> <hbaseTable>")
System.exit(1)
}
val Array(zkQuorum, group, topics, numThreads, hbaseTable) = args
HBase.init(hbaseTable)
HBase.display()
val sparkConf = new SparkConf().setAppName("mainHbase")
val ssc = new StreamingContext(sparkConf, Seconds(10))
ssc.checkpoint("checkpoint")
val topicpMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
val storeStg = lines.foreachRDD(rdd => rdd.foreach(HBase.insertHbase))
lines.print()
ssc.start()
}
}
我正在尝试通过调用 HBase.init
方法来初始化对象 HBase
中的参数 hbaseTable
。它正在正确设置参数。我通过在下一行中调用 HBase.display
方法来确认这一点。
但是调用foreachRDD
中的HBase.insertHbase
方法时,没有设置hbaseTable
的抛出错误。
更新异常:
java.lang.IllegalArgumentException: Table qualifier must not be empty
org.apache.hadoop.hbase.TableName.isLegalTableQualifierName(TableName.java:179)
org.apache.hadoop.hbase.TableName.isLegalTableQualifierName(TableName.java:149)
org.apache.hadoop.hbase.TableName.<init>(TableName.java:303)
org.apache.hadoop.hbase.TableName.createTableNameIfNecessary(TableName.java:339)
org.apache.hadoop.hbase.TableName.valueOf(TableName.java:426)
org.apache.hadoop.hbase.client.HTable.<init>(HTable.java:156)
你能告诉我如何使这段代码工作吗?
"Where is this code running" - 这是我们需要提出的问题,以便了解正在发生的事情。
HBase
是一个 Scala 对象 - 根据定义,它是一个单例构造,在 JVM 中使用 'only once' 语义进行初始化。
在初始化点,HBase.init(hbaseTable)
在这个Spark应用程序的驱动程序中执行,在驱动程序的VM中用给定的值初始化这个对象。
但是当我们这样做时:rdd.foreach(HBase.insertHbase)
,闭包作为任务在每个托管给定 RDD 分区的执行程序上执行。那时,对象 HBase
在每个 VM 上为每个执行程序初始化。正如我们所见,此时此对象尚未进行任何初始化。
有两个选项:
我们可以向 HBase
对象添加一些检查 "isInitialized",并添加 -now 条件调用以在每次调用 foreach
时进行初始化。
另一种选择是使用
rdd.foreachPartitition{partition =>
HBase.initialize(...)
partition.foreach(elem => HBase.insert(elem))
}
此构造将按每个分区中的元素数量分摊任何初始化。也可以将它与初始化检查结合起来,以防止不必要的 bootstrap 工作。
请看下面用scala写的spark streaming代码:
object HBase {
var hbaseTable = ""
val hConf = new HBaseConfiguration()
hConf.set("hbase.zookeeper.quorum", "zookeeperhost")
def init(input: (String)) {
hbaseTable = input
}
def display() {
print(hbaseTable)
}
def insertHbase(row: (String)) {
val hTable = new HTable(hConf,hbaseTable)
}
}
object mainHbase {
def main(args : Array[String]) {
if (args.length < 5) {
System.err.println("Usage: MetricAggregatorHBase <zkQuorum> <group> <topics> <numThreads> <hbaseTable>")
System.exit(1)
}
val Array(zkQuorum, group, topics, numThreads, hbaseTable) = args
HBase.init(hbaseTable)
HBase.display()
val sparkConf = new SparkConf().setAppName("mainHbase")
val ssc = new StreamingContext(sparkConf, Seconds(10))
ssc.checkpoint("checkpoint")
val topicpMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
val storeStg = lines.foreachRDD(rdd => rdd.foreach(HBase.insertHbase))
lines.print()
ssc.start()
}
}
我正在尝试通过调用 HBase.init
方法来初始化对象 HBase
中的参数 hbaseTable
。它正在正确设置参数。我通过在下一行中调用 HBase.display
方法来确认这一点。
但是调用foreachRDD
中的HBase.insertHbase
方法时,没有设置hbaseTable
的抛出错误。
更新异常:
java.lang.IllegalArgumentException: Table qualifier must not be empty
org.apache.hadoop.hbase.TableName.isLegalTableQualifierName(TableName.java:179)
org.apache.hadoop.hbase.TableName.isLegalTableQualifierName(TableName.java:149)
org.apache.hadoop.hbase.TableName.<init>(TableName.java:303)
org.apache.hadoop.hbase.TableName.createTableNameIfNecessary(TableName.java:339)
org.apache.hadoop.hbase.TableName.valueOf(TableName.java:426)
org.apache.hadoop.hbase.client.HTable.<init>(HTable.java:156)
你能告诉我如何使这段代码工作吗?
"Where is this code running" - 这是我们需要提出的问题,以便了解正在发生的事情。
HBase
是一个 Scala 对象 - 根据定义,它是一个单例构造,在 JVM 中使用 'only once' 语义进行初始化。
在初始化点,HBase.init(hbaseTable)
在这个Spark应用程序的驱动程序中执行,在驱动程序的VM中用给定的值初始化这个对象。
但是当我们这样做时:rdd.foreach(HBase.insertHbase)
,闭包作为任务在每个托管给定 RDD 分区的执行程序上执行。那时,对象 HBase
在每个 VM 上为每个执行程序初始化。正如我们所见,此时此对象尚未进行任何初始化。
有两个选项:
我们可以向 HBase
对象添加一些检查 "isInitialized",并添加 -now 条件调用以在每次调用 foreach
时进行初始化。
另一种选择是使用
rdd.foreachPartitition{partition =>
HBase.initialize(...)
partition.foreach(elem => HBase.insert(elem))
}
此构造将按每个分区中的元素数量分摊任何初始化。也可以将它与初始化检查结合起来,以防止不必要的 bootstrap 工作。