Spark Streaming Kafka 消费者
Spark Streaming Kafka Consumer
我正在尝试设置一个 Spark Streaming 简单应用程序,它将读取来自 Kafka 主题的消息。
经过大量工作,我处于这个阶段,但出现如下所示的异常。
代码:
public static void main(String[] args) throws Exception {
String brokers = "my.kafka.broker" + ":" + "6667";
String topics = "MyKafkaTopic";
// Create context with a 2 seconds batch interval
SparkConf sparkConf = new SparkConf().setAppName("StreamingE")
.setMaster("local[1]")
;
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", brokers);
System.out.println("Brokers: " + brokers);
// Create direct kafka stream with brokers and topics
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
System.out.println("Message received: " + messages);
// Start the computation
jssc.start();
jssc.awaitTermination();
}
抛出:
[WARNING]
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo.run(ExecJavaMojo.java:293)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
at scala.Predef$.require(Predef.scala:233)
at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:161)
at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:542)
at org.apache.spark.streaming.StreamingContext.liftedTree1(StreamingContext.scala:601)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)
at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:624)
at com.ncr.dataplatform.api.StreamingE.main(StreamingE.java:66)
出于绝望,我尝试连接到 Zookeeper:
String brokers = "my.kafka.zookeeper" + ":" + "2181";
String topics = "MyKafkaTopic";
但是抛出:
[WARNING]
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo.run(ExecJavaMojo.java:293)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors.apply(KafkaCluster.scala:366)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)
at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
at com.ncr.dataplatform.api.StreamingE.main(StreamingE.java:53)
相关的依赖是:
<properties>
<spark.version>1.6.2</spark.version>
<kafka.version>0.8.2.1</kafka.version>
</properties>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
我想问:
我应该连接到 Kafka 代理还是 zookeeper 服务器?
我的代码做错了什么导致无法 connect/listen 收到消息?
Caused by: java.lang.IllegalArgumentException: requirement failed: No
output operations registered, so nothing to execute
Spark 的工作方式是大多数 转换是惰性的。当你想让一个图执行时,你需要注册一个Output Transformation。输出转换以 foreachRDD
、print
、collect
或 count
(以及更多)的形式出现。
而不是使用 println
,调用 DStream.print()
:
// Create direct kafka stream with brokers and topics
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
messages.print();
// Start the computation
jssc.start();
jssc.awaitTermination();
关于 Kafka,metadata.broker.list
需要提供您的 Kafka 代理节点的地址。有一个名为 zookeeper.connect
的单独键来提供 ZooKeepers 地址。
import static org.apache.spark.streaming.kafka.KafkaUtils.createStream;
import java.util.HashMap;
import java.util.Map;
import org.apache.spark.SparkConf;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableMap;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.Properties;
import kafka.serializer.StringDecoder;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.api.java.JavaDStream;
import scala.Tuple2;
public class KafkaKerberosReader {
// Spark information
private static SparkConf conf;
private static String appName = "KafkaKerberosReader";
private static JavaStreamingContext context;
private static final Logger logger = LoggerFactory.getLogger(KafkaKerberosReader.class.getSimpleName());
// Kafka information
private static String zkQuorum = "";
private static String kfkQuorum = "";
private static String group = "";
private static Integer threads = 1;
private static Map<String, String> kafkaParams = new HashMap<String, String>();
public static void loadProps() {
Properties prop = new Properties();
try {
logger.info("------------------------------loadProps");
InputStream input = new FileInputStream("config.properties");
prop.load(input);
System.out.println("loadProps loaded:" + prop);
appName = prop.getProperty("app.name");
autoOffsetReset = prop.getProperty("auto.offset.reset");
secProtocol = prop.getProperty("security.protocol");
kfkQuorum = bServers = prop.getProperty("bootstrap.servers");
zkQuorum = zServers = prop.getProperty("zookeeper.connect");
group = kGroupId = prop.getProperty("group.id");
kKeyTabFile = prop.getProperty("kerberos.keytabfile");
kJaas = prop.getProperty("kerberos.jaas");
kTopic = prop.getProperty("kafka.topic");
kPrincipal = prop.getProperty("kerberos.principal");
logger.info("loadProps:Props:zk:" + zServers + ",issecure:" + secProtocol + ",autoOffsetReset:"
+ autoOffsetReset + ",bServers:" + bServers + ",kJaas:" + kJaas + ",keytab:" + kKeyTabFile
+ ", kTopic:" + kTopic + ", kPrincipal" + kPrincipal);
if (kPrincipal != null && kKeyTabFile != null) {
logger.info("---------------------Logging into Kerberos");
org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
conf.set("hadoop.security.authentication", "Kerberos");
UserGroupInformation.setConfiguration(conf);
UserGroupInformation.loginUserFromKeytabAndReturnUGI(kPrincipal, kKeyTabFile);
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
logger.info("------------------------------main:START");
loadProps();
// Configure the application
configureSpark();
// Create the context
context = createContext(kTopic);
// Stop the application
context.start();
context.awaitTermination();
logger.info("main:END");
}
/**
* ----------------------------------------------- | This is the kernel of
* the spark application | -----------------------------------------------
*
*/
private static JavaStreamingContext createContext(String topic) {
logger.info("-------------------------------------------------------");
logger.info("| Starting: {} |", appName);
logger.info("| kafkaParams: |", kafkaParams);
logger.info("-------------------------------------------------------");
// Create the spark streaming context
context = new JavaStreamingContext(conf, Seconds.apply(5));
// Read from a Kerberized Kafka
JavaPairReceiverInputDStream<String, String> kafkaStream = createStream(context, zkQuorum, "Default",
ImmutableMap.of(topic, threads), StorageLevel.MEMORY_AND_DISK_SER());
kafkaStream.print();
JavaDStream<String> lines = kafkaStream.map(new Function<Tuple2<String, String>, String>() {
private static final long serialVersionUID = 1L;
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
lines.print();
// kafkaStream.map(message -> message._2.toLowerCase()).print();
logger.info("-------------------------------------------------------");
logger.info("| Finished: {} |", appName);
logger.info("-------------------------------------------------------");
return context;
}
/**
* Create a SparkConf and configure it.
*
*/
private static void configureSpark() {
logger.info("------------------------------Initializing '%s'.", appName);
conf = new SparkConf().setAppName(appName);
if (group != null && group.trim().length() != 0) {
kafkaParams.put("group.id", group);
}
kafkaParams.put("auto.offset.reset", autoOffsetReset);
kafkaParams.put("security.protocol", secProtocol);
kafkaParams.put("bootstrap.servers", kfkQuorum);
kafkaParams.put("zookeeper.connect", zkQuorum);
logger.info(">- Configuration done with the follow properties:");
logger.info(conf.toDebugString());
}
static String autoOffsetReset, secProtocol, bServers, zServers, kGroupId, kKeyTabFile, kJaas, kTopic, kPrincipal;
}
属性:
app.name=KafkaKerberosReader
auto.offset.reset=smallest
security.protocol=PLAINTEXTSASL
bootstrap.servers=sandbox.hortonworks.com:6667
zookeeper.connect=sandbox.hortonworks.com:2181
group.id=Default
kafka.topic=ifinboundprecint
//#kerberos.keytabfile=/etc/hello.keytab
//#kerberos.jaas=/etc/kafka/conf/kafka_client_jaas.conf
//#kerberos.principal=hello@EXAMPLE.COM
通话中:
spark-submit --master yarn --deploy-mode client --num-executors 3
--executor-memory 500M --executor-cores 3 --class com.my.spark.KafkaKerberosReader
~/SparkStreamKafkaTest-1.0-SNAPSHOT.jar
我正在尝试设置一个 Spark Streaming 简单应用程序,它将读取来自 Kafka 主题的消息。
经过大量工作,我处于这个阶段,但出现如下所示的异常。
代码:
public static void main(String[] args) throws Exception {
String brokers = "my.kafka.broker" + ":" + "6667";
String topics = "MyKafkaTopic";
// Create context with a 2 seconds batch interval
SparkConf sparkConf = new SparkConf().setAppName("StreamingE")
.setMaster("local[1]")
;
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", brokers);
System.out.println("Brokers: " + brokers);
// Create direct kafka stream with brokers and topics
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
System.out.println("Message received: " + messages);
// Start the computation
jssc.start();
jssc.awaitTermination();
}
抛出:
[WARNING]
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo.run(ExecJavaMojo.java:293)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
at scala.Predef$.require(Predef.scala:233)
at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:161)
at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:542)
at org.apache.spark.streaming.StreamingContext.liftedTree1(StreamingContext.scala:601)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)
at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:624)
at com.ncr.dataplatform.api.StreamingE.main(StreamingE.java:66)
出于绝望,我尝试连接到 Zookeeper:
String brokers = "my.kafka.zookeeper" + ":" + "2181";
String topics = "MyKafkaTopic";
但是抛出:
[WARNING]
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo.run(ExecJavaMojo.java:293)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors.apply(KafkaCluster.scala:366)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)
at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
at com.ncr.dataplatform.api.StreamingE.main(StreamingE.java:53)
相关的依赖是:
<properties>
<spark.version>1.6.2</spark.version>
<kafka.version>0.8.2.1</kafka.version>
</properties>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
我想问:
我应该连接到 Kafka 代理还是 zookeeper 服务器?
我的代码做错了什么导致无法 connect/listen 收到消息?
Caused by: java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
Spark 的工作方式是大多数 转换是惰性的。当你想让一个图执行时,你需要注册一个Output Transformation。输出转换以 foreachRDD
、print
、collect
或 count
(以及更多)的形式出现。
而不是使用 println
,调用 DStream.print()
:
// Create direct kafka stream with brokers and topics
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
messages.print();
// Start the computation
jssc.start();
jssc.awaitTermination();
关于 Kafka,metadata.broker.list
需要提供您的 Kafka 代理节点的地址。有一个名为 zookeeper.connect
的单独键来提供 ZooKeepers 地址。
import static org.apache.spark.streaming.kafka.KafkaUtils.createStream;
import java.util.HashMap;
import java.util.Map;
import org.apache.spark.SparkConf;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableMap;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.Properties;
import kafka.serializer.StringDecoder;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.api.java.JavaDStream;
import scala.Tuple2;
public class KafkaKerberosReader {
// Spark information
private static SparkConf conf;
private static String appName = "KafkaKerberosReader";
private static JavaStreamingContext context;
private static final Logger logger = LoggerFactory.getLogger(KafkaKerberosReader.class.getSimpleName());
// Kafka information
private static String zkQuorum = "";
private static String kfkQuorum = "";
private static String group = "";
private static Integer threads = 1;
private static Map<String, String> kafkaParams = new HashMap<String, String>();
public static void loadProps() {
Properties prop = new Properties();
try {
logger.info("------------------------------loadProps");
InputStream input = new FileInputStream("config.properties");
prop.load(input);
System.out.println("loadProps loaded:" + prop);
appName = prop.getProperty("app.name");
autoOffsetReset = prop.getProperty("auto.offset.reset");
secProtocol = prop.getProperty("security.protocol");
kfkQuorum = bServers = prop.getProperty("bootstrap.servers");
zkQuorum = zServers = prop.getProperty("zookeeper.connect");
group = kGroupId = prop.getProperty("group.id");
kKeyTabFile = prop.getProperty("kerberos.keytabfile");
kJaas = prop.getProperty("kerberos.jaas");
kTopic = prop.getProperty("kafka.topic");
kPrincipal = prop.getProperty("kerberos.principal");
logger.info("loadProps:Props:zk:" + zServers + ",issecure:" + secProtocol + ",autoOffsetReset:"
+ autoOffsetReset + ",bServers:" + bServers + ",kJaas:" + kJaas + ",keytab:" + kKeyTabFile
+ ", kTopic:" + kTopic + ", kPrincipal" + kPrincipal);
if (kPrincipal != null && kKeyTabFile != null) {
logger.info("---------------------Logging into Kerberos");
org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
conf.set("hadoop.security.authentication", "Kerberos");
UserGroupInformation.setConfiguration(conf);
UserGroupInformation.loginUserFromKeytabAndReturnUGI(kPrincipal, kKeyTabFile);
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
logger.info("------------------------------main:START");
loadProps();
// Configure the application
configureSpark();
// Create the context
context = createContext(kTopic);
// Stop the application
context.start();
context.awaitTermination();
logger.info("main:END");
}
/**
* ----------------------------------------------- | This is the kernel of
* the spark application | -----------------------------------------------
*
*/
private static JavaStreamingContext createContext(String topic) {
logger.info("-------------------------------------------------------");
logger.info("| Starting: {} |", appName);
logger.info("| kafkaParams: |", kafkaParams);
logger.info("-------------------------------------------------------");
// Create the spark streaming context
context = new JavaStreamingContext(conf, Seconds.apply(5));
// Read from a Kerberized Kafka
JavaPairReceiverInputDStream<String, String> kafkaStream = createStream(context, zkQuorum, "Default",
ImmutableMap.of(topic, threads), StorageLevel.MEMORY_AND_DISK_SER());
kafkaStream.print();
JavaDStream<String> lines = kafkaStream.map(new Function<Tuple2<String, String>, String>() {
private static final long serialVersionUID = 1L;
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
lines.print();
// kafkaStream.map(message -> message._2.toLowerCase()).print();
logger.info("-------------------------------------------------------");
logger.info("| Finished: {} |", appName);
logger.info("-------------------------------------------------------");
return context;
}
/**
* Create a SparkConf and configure it.
*
*/
private static void configureSpark() {
logger.info("------------------------------Initializing '%s'.", appName);
conf = new SparkConf().setAppName(appName);
if (group != null && group.trim().length() != 0) {
kafkaParams.put("group.id", group);
}
kafkaParams.put("auto.offset.reset", autoOffsetReset);
kafkaParams.put("security.protocol", secProtocol);
kafkaParams.put("bootstrap.servers", kfkQuorum);
kafkaParams.put("zookeeper.connect", zkQuorum);
logger.info(">- Configuration done with the follow properties:");
logger.info(conf.toDebugString());
}
static String autoOffsetReset, secProtocol, bServers, zServers, kGroupId, kKeyTabFile, kJaas, kTopic, kPrincipal;
}
属性:
app.name=KafkaKerberosReader
auto.offset.reset=smallest
security.protocol=PLAINTEXTSASL
bootstrap.servers=sandbox.hortonworks.com:6667
zookeeper.connect=sandbox.hortonworks.com:2181
group.id=Default
kafka.topic=ifinboundprecint
//#kerberos.keytabfile=/etc/hello.keytab
//#kerberos.jaas=/etc/kafka/conf/kafka_client_jaas.conf
//#kerberos.principal=hello@EXAMPLE.COM
通话中:
spark-submit --master yarn --deploy-mode client --num-executors 3 --executor-memory 500M --executor-cores 3 --class com.my.spark.KafkaKerberosReader ~/SparkStreamKafkaTest-1.0-SNAPSHOT.jar