通过 Spark Streaming 从 HBase 读取数据
Reading data from HBase through Spark Streaming
所以我的项目流程是
Kafka -> Spark Streaming -> HBase
现在我想再次从 HBase 读取数据,它将遍历之前作业创建的 table 并进行一些聚合并将其以不同的列格式存储在另一个 table 中
Kafka -> Spark Streaming(2ms)->HBase->Spark Streaming (10ms)->HBase
现在我不知道如何使用Spark Streaming 从HBase 读取数据。我找到了一个 Cloudera 实验室项目,它是 SparkOnHbase(http://blog.cloudera.com/blog/2014/12/new-in-cloudera-labs-sparkonhbase/) 库,但我不知道如何从 HBase 获取用于流处理的 inputDStream。
如果有任何可以帮助我做到这一点,请提供任何指针或库链接。
您可以使用 queueStream 从 RDD 队列创建 DStream:
StreamingContext
JavaSparkContext sc = new JavaSparkContext(conf);
org.apache.hadoop.conf.Configuration hconf = HBaseConfiguration.create();
JavaHBaseContext jhbc = new JavaHBaseContext(sc, hconf);
Scan scan1 = new Scan();
scan1.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, tableName.getBytes());
// Create RDD
rdd = jhbc.hbaseRDD(tableName, scan1, new Function<Tuple2<ImmutableBytesWritable, Result>, Tuple2<ImmutableBytesWritable, Result>>() {
@Override
public Tuple2<ImmutableBytesWritable, Result> call(Tuple2<ImmutableBytesWritable, Result> immutableBytesWritableResultTuple2) throws Exception {
return immutableBytesWritableResultTuple2;
}
});
// Create streaming context and queue
JavaSparkStreamingContext ssc = new JavaSparkStramingContext(sc);
Queue<JavaRDD<Tuple2<ImmutableBytesWritable, Result> >> queue =new Queue<JavaRDD<Tuple2<ImmutableBytesWritable, Result>>>( );
queue.enqueue(rdd);
JavaDStream<Tuple2<ImmutableBytesWritable, Result>> ssc.queueStream(queue);
PS:您可以只使用 Spark(无需流式传输)
Splice Machine(开源)有一个展示火花流的演示 运行。
http://community.splicemachine.com/category/tutorials/data-ingestion-streaming/
这是此用例的示例代码。
所以我的项目流程是 Kafka -> Spark Streaming -> HBase
现在我想再次从 HBase 读取数据,它将遍历之前作业创建的 table 并进行一些聚合并将其以不同的列格式存储在另一个 table 中
Kafka -> Spark Streaming(2ms)->HBase->Spark Streaming (10ms)->HBase
现在我不知道如何使用Spark Streaming 从HBase 读取数据。我找到了一个 Cloudera 实验室项目,它是 SparkOnHbase(http://blog.cloudera.com/blog/2014/12/new-in-cloudera-labs-sparkonhbase/) 库,但我不知道如何从 HBase 获取用于流处理的 inputDStream。 如果有任何可以帮助我做到这一点,请提供任何指针或库链接。
您可以使用 queueStream 从 RDD 队列创建 DStream: StreamingContext
JavaSparkContext sc = new JavaSparkContext(conf);
org.apache.hadoop.conf.Configuration hconf = HBaseConfiguration.create();
JavaHBaseContext jhbc = new JavaHBaseContext(sc, hconf);
Scan scan1 = new Scan();
scan1.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, tableName.getBytes());
// Create RDD
rdd = jhbc.hbaseRDD(tableName, scan1, new Function<Tuple2<ImmutableBytesWritable, Result>, Tuple2<ImmutableBytesWritable, Result>>() {
@Override
public Tuple2<ImmutableBytesWritable, Result> call(Tuple2<ImmutableBytesWritable, Result> immutableBytesWritableResultTuple2) throws Exception {
return immutableBytesWritableResultTuple2;
}
});
// Create streaming context and queue
JavaSparkStreamingContext ssc = new JavaSparkStramingContext(sc);
Queue<JavaRDD<Tuple2<ImmutableBytesWritable, Result> >> queue =new Queue<JavaRDD<Tuple2<ImmutableBytesWritable, Result>>>( );
queue.enqueue(rdd);
JavaDStream<Tuple2<ImmutableBytesWritable, Result>> ssc.queueStream(queue);
PS:您可以只使用 Spark(无需流式传输)
Splice Machine(开源)有一个展示火花流的演示 运行。
http://community.splicemachine.com/category/tutorials/data-ingestion-streaming/
这是此用例的示例代码。