我们如何连接 AMPS[CRANK UP THE AMPS] 服务器和 Apache Flink 以获取实时流?
How do we connect AMPS[CRANK UP THE AMPS] Server and Apache Flink for Real time stream?
我们正在订阅来自 AMPS[CRANK UP THE AMPS] 服务器的实时数据作为 Apache flink 的来源。关于如何像卡夫卡一样连接它们的任何想法。
Amps 服务器:http://www.crankuptheamps.com/amps/
目前,Apache Flink 没有为 AMPS 提供任何开箱即用的连接器,如您所见here。但是,它确实提供了一个可扩展的 Source/Sink 界面,可用于点击任何自定义 source/sink.
您可以通过扩展 RichSourceFunction
并将其传递给 addSource
方法来创建自己的 AMPS 源连接器,如 crankuptheamps 提供的 flink documentation. Refer the Java Client API 中所述,用于连接到源主题并订阅用于消息。
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import com.crankuptheamps.client.Client;
import com.crankuptheamps.client.Message;
public class AMPSSource extends RichSourceFunction<String> {
private static final long serialVersionUID = -8708182052610791593L;
private String name, topic, connectionString;
private Client client;
public AMPSSource(String name, String connectionString, String topic) {
this.name = name;
this.topic = topic;
this.connectionString = connectionString;
}
@Override
public void open(Configuration parameters) throws Exception {
// We create a Client, then connect() and logon()
client = new Client(this.name);
client.connect(this.connectionString);
client.logon();
}
public void run(SourceContext<String> sourceContext) throws Exception {
/*
* Here, we iterate over messages in the MessageStream returned by
* subscribe method
*/
for (Message message : client.subscribe(this.topic)) {
sourceContext.collect(message.getData());
}
}
@Override
public void close() throws Exception {
try {
cancel();
} finally {
super.close();
}
}
public void cancel() {
client.close();
}
}
这可以用作处理器中的源,如下所示,
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class StreamProcessor {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> ampsStream = env
.addSource(new AMPSSource("flink-consumer", "tcp://127.0.0.1:9007/amps/json", "test-topic"));
ampsStream.print();
env.execute();
}
}
注意:RichSourceFunction 实现的并行度为 1。 要启用并行执行,用户定义的源应实现 org.apache.flink.streaming.api.functions.source.ParallelSourceFunction
或扩展 org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction
我们正在订阅来自 AMPS[CRANK UP THE AMPS] 服务器的实时数据作为 Apache flink 的来源。关于如何像卡夫卡一样连接它们的任何想法。
Amps 服务器:http://www.crankuptheamps.com/amps/
目前,Apache Flink 没有为 AMPS 提供任何开箱即用的连接器,如您所见here。但是,它确实提供了一个可扩展的 Source/Sink 界面,可用于点击任何自定义 source/sink.
您可以通过扩展 RichSourceFunction
并将其传递给 addSource
方法来创建自己的 AMPS 源连接器,如 crankuptheamps 提供的 flink documentation. Refer the Java Client API 中所述,用于连接到源主题并订阅用于消息。
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import com.crankuptheamps.client.Client;
import com.crankuptheamps.client.Message;
public class AMPSSource extends RichSourceFunction<String> {
private static final long serialVersionUID = -8708182052610791593L;
private String name, topic, connectionString;
private Client client;
public AMPSSource(String name, String connectionString, String topic) {
this.name = name;
this.topic = topic;
this.connectionString = connectionString;
}
@Override
public void open(Configuration parameters) throws Exception {
// We create a Client, then connect() and logon()
client = new Client(this.name);
client.connect(this.connectionString);
client.logon();
}
public void run(SourceContext<String> sourceContext) throws Exception {
/*
* Here, we iterate over messages in the MessageStream returned by
* subscribe method
*/
for (Message message : client.subscribe(this.topic)) {
sourceContext.collect(message.getData());
}
}
@Override
public void close() throws Exception {
try {
cancel();
} finally {
super.close();
}
}
public void cancel() {
client.close();
}
}
这可以用作处理器中的源,如下所示,
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class StreamProcessor {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> ampsStream = env
.addSource(new AMPSSource("flink-consumer", "tcp://127.0.0.1:9007/amps/json", "test-topic"));
ampsStream.print();
env.execute();
}
}
注意:RichSourceFunction 实现的并行度为 1。 要启用并行执行,用户定义的源应实现 org.apache.flink.streaming.api.functions.source.ParallelSourceFunction
或扩展 org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction