flink streaming job中如何读写HBase
How to read and write to HBase in flink streaming job
如果我们必须在流式应用程序中读取和写入 HBASE,我们该怎么做。我们通过 open 方法打开连接进行写入,我们如何打开连接进行读取。
object test {
if (args.length != 11) {
//print args
System.exit(1)
}
val Array() = args
println("Parameters Passed " + ...);
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", metadataBrokerList)
properties.setProperty("zookeeper.connect", zkQuorum)
properties.setProperty("group.id", group)
val messageStream = env.addSource(new FlinkKafkaConsumer08[String](topics, new SimpleStringSchema(), properties))
messageStream.map { x => getheader(x) }
def getheader(a: String) {
//Get header and parse and split the headers
if (metadata not available hit HBASE) { //Device Level send(Just JSON)
//How to read from HBASE here .
}
//If the resultset is not available in Map fetch from phoenix
else {
//fetch from cache
}
}
}
messageStream.writeUsingOutputFormat(new HBaseOutputFormat());
env.execute()
}
现在在方法 getheader
中,如果我想从 if(metadata not available hit HBASE)
中的 HBASE 读取我该怎么做。我不想在这里打开一个连接,想法是为一个线程维护一个连接并重用它,就像 flink 对 HBASE sink 和 open() 方法所做的那样,或者 spark 如何对 foreachpartition 做的。我试过 this 但我无法将 StreamExecutionEnvironment 传递给方法。我怎样才能做到这一点,有人可以提供一个片段吗?
您想从流式用户函数读取/写入 Apache HBase。您链接的 HBaseReadExample 正在做一些不同的事情:它将 HBase table 读入数据集(Flink 的批处理抽象)。在用户函数中使用此代码意味着从 Flink 程序中启动 Flink 程序。
对于您的用例,您需要在用户函数中直接创建一个 HBase 客户端并与之交互。执行此操作的最佳方法是使用 RichFlatMapFunction
并在 open()
方法中创建到 HBase 的连接。
下一版本的 Flink (1.2.0) 将在用户函数中支持 asynchronous I/O operations,这将显着提高应用程序的吞吐量。
如果我们必须在流式应用程序中读取和写入 HBASE,我们该怎么做。我们通过 open 方法打开连接进行写入,我们如何打开连接进行读取。
object test {
if (args.length != 11) {
//print args
System.exit(1)
}
val Array() = args
println("Parameters Passed " + ...);
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", metadataBrokerList)
properties.setProperty("zookeeper.connect", zkQuorum)
properties.setProperty("group.id", group)
val messageStream = env.addSource(new FlinkKafkaConsumer08[String](topics, new SimpleStringSchema(), properties))
messageStream.map { x => getheader(x) }
def getheader(a: String) {
//Get header and parse and split the headers
if (metadata not available hit HBASE) { //Device Level send(Just JSON)
//How to read from HBASE here .
}
//If the resultset is not available in Map fetch from phoenix
else {
//fetch from cache
}
}
}
messageStream.writeUsingOutputFormat(new HBaseOutputFormat());
env.execute()
}
现在在方法 getheader
中,如果我想从 if(metadata not available hit HBASE)
中的 HBASE 读取我该怎么做。我不想在这里打开一个连接,想法是为一个线程维护一个连接并重用它,就像 flink 对 HBASE sink 和 open() 方法所做的那样,或者 spark 如何对 foreachpartition 做的。我试过 this 但我无法将 StreamExecutionEnvironment 传递给方法。我怎样才能做到这一点,有人可以提供一个片段吗?
您想从流式用户函数读取/写入 Apache HBase。您链接的 HBaseReadExample 正在做一些不同的事情:它将 HBase table 读入数据集(Flink 的批处理抽象)。在用户函数中使用此代码意味着从 Flink 程序中启动 Flink 程序。
对于您的用例,您需要在用户函数中直接创建一个 HBase 客户端并与之交互。执行此操作的最佳方法是使用 RichFlatMapFunction
并在 open()
方法中创建到 HBase 的连接。
下一版本的 Flink (1.2.0) 将在用户函数中支持 asynchronous I/O operations,这将显着提高应用程序的吞吐量。