Spark Streaming Kafka 消费者

Spark Streaming Kafka Consumer

我正在尝试设置一个 Spark Streaming 简单应用程序,它将读取来自 Kafka 主题的消息。

经过大量工作,我处于这个阶段,但出现如下所示的异常。

代码:

public static void main(String[] args) throws Exception {

    String brokers = "my.kafka.broker" + ":" + "6667";
    String topics = "MyKafkaTopic";

    // Create context with a 2 seconds batch interval
    SparkConf sparkConf = new SparkConf().setAppName("StreamingE")
            .setMaster("local[1]")
            ;
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));

    Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
    Map<String, String> kafkaParams = new HashMap<>();
    kafkaParams.put("metadata.broker.list", brokers);
    System.out.println("Brokers: " + brokers);

    // Create direct kafka stream with brokers and topics
    JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
            jssc,
            String.class,
            String.class,
            StringDecoder.class,
            StringDecoder.class,
            kafkaParams,
            topicsSet
    );

    System.out.println("Message received: " + messages);

    // Start the computation
    jssc.start();
    jssc.awaitTermination();

}

抛出:

[WARNING] 
java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo.run(ExecJavaMojo.java:293)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
    at scala.Predef$.require(Predef.scala:233)
    at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:161)
    at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:542)
    at org.apache.spark.streaming.StreamingContext.liftedTree1(StreamingContext.scala:601)
    at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)
    at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:624)
    at com.ncr.dataplatform.api.StreamingE.main(StreamingE.java:66)

出于绝望,我尝试连接到 Zookeeper:

String brokers = "my.kafka.zookeeper" + ":" + "2181";
String topics = "MyKafkaTopic";

但是抛出:

[WARNING] 
java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo.run(ExecJavaMojo.java:293)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors.apply(KafkaCluster.scala:366)
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors.apply(KafkaCluster.scala:366)
    at scala.util.Either.fold(Either.scala:97)
    at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
    at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
    at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
    at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)
    at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
    at com.ncr.dataplatform.api.StreamingE.main(StreamingE.java:53)

相关的依赖是:

<properties>
  <spark.version>1.6.2</spark.version>
  <kafka.version>0.8.2.1</kafka.version>
</properties>

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.10</artifactId>
  <version>${kafka.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.10</artifactId>
  <version>${spark.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.10</artifactId>
  <version>${spark.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-kafka_2.10</artifactId>
  <version>${spark.version}</version>
</dependency>

我想问:

我应该连接到 Kafka 代理还是 zookeeper 服务器?

我的代码做错了什么导致无法 connect/listen 收到消息?

Caused by: java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute

Spark 的工作方式是大多数 转换是惰性的。当你想让一个图执行时,你需要注册一个Output Transformation。输出转换以 foreachRDDprintcollectcount(以及更多)的形式出现。

而不是使用 println,调用 DStream.print():

// Create direct kafka stream with brokers and topics
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
        jssc,
        String.class,
        String.class,
        StringDecoder.class,
        StringDecoder.class,
        kafkaParams,
        topicsSet
);

messages.print();

// Start the computation
jssc.start();
jssc.awaitTermination();

关于 Kafka,metadata.broker.list 需要提供您的 Kafka 代理节点的地址。有一个名为 zookeeper.connect 的单独键来提供 ZooKeepers 地址。

import static org.apache.spark.streaming.kafka.KafkaUtils.createStream;

import java.util.HashMap;
import java.util.Map;

import org.apache.spark.SparkConf;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.ImmutableMap;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.Properties;

import kafka.serializer.StringDecoder;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.api.java.JavaDStream;
import scala.Tuple2;

public class KafkaKerberosReader {

    // Spark information
    private static SparkConf conf;
    private static String appName = "KafkaKerberosReader";
    private static JavaStreamingContext context;
    private static final Logger logger = LoggerFactory.getLogger(KafkaKerberosReader.class.getSimpleName());

    // Kafka information
    private static String zkQuorum = "";
    private static String kfkQuorum = "";
    private static String group = "";
    private static Integer threads = 1;
    private static Map<String, String> kafkaParams = new HashMap<String, String>();

    public static void loadProps() {
        Properties prop = new Properties();
        try {
            logger.info("------------------------------loadProps");
            InputStream input = new FileInputStream("config.properties");
            prop.load(input);
            System.out.println("loadProps loaded:" + prop);

            appName = prop.getProperty("app.name");
            autoOffsetReset = prop.getProperty("auto.offset.reset");
            secProtocol = prop.getProperty("security.protocol");
            kfkQuorum = bServers = prop.getProperty("bootstrap.servers");
            zkQuorum = zServers = prop.getProperty("zookeeper.connect");
            group = kGroupId = prop.getProperty("group.id");
            kKeyTabFile = prop.getProperty("kerberos.keytabfile");
            kJaas = prop.getProperty("kerberos.jaas");
            kTopic = prop.getProperty("kafka.topic");
            kPrincipal = prop.getProperty("kerberos.principal");
            logger.info("loadProps:Props:zk:" + zServers + ",issecure:" + secProtocol + ",autoOffsetReset:"
                    + autoOffsetReset + ",bServers:" + bServers + ",kJaas:" + kJaas + ",keytab:" + kKeyTabFile
                    + ", kTopic:" + kTopic + ", kPrincipal" + kPrincipal);

            if (kPrincipal != null && kKeyTabFile != null) {
                logger.info("---------------------Logging into Kerberos");
                org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
                conf.set("hadoop.security.authentication", "Kerberos");
                UserGroupInformation.setConfiguration(conf);
                UserGroupInformation.loginUserFromKeytabAndReturnUGI(kPrincipal, kKeyTabFile);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        logger.info("------------------------------main:START");
        loadProps();
        // Configure the application
        configureSpark();

        // Create the context
        context = createContext(kTopic);

        // Stop the application
        context.start();
        context.awaitTermination();
        logger.info("main:END");
    }

    /**
     * ----------------------------------------------- | This is the kernel of
     * the spark application | -----------------------------------------------
     *
     */
    private static JavaStreamingContext createContext(String topic) {

        logger.info("-------------------------------------------------------");
        logger.info("|            Starting: {}             |", appName);
        logger.info("|            kafkaParams:              |", kafkaParams);
        logger.info("-------------------------------------------------------");

        // Create the spark streaming context
        context = new JavaStreamingContext(conf, Seconds.apply(5));

        // Read from a Kerberized Kafka
        JavaPairReceiverInputDStream<String, String> kafkaStream = createStream(context, zkQuorum, "Default",
                ImmutableMap.of(topic, threads), StorageLevel.MEMORY_AND_DISK_SER());

        kafkaStream.print();
        JavaDStream<String> lines = kafkaStream.map(new Function<Tuple2<String, String>, String>() {
            private static final long serialVersionUID = 1L;

            @Override
            public String call(Tuple2<String, String> tuple2) {
                return tuple2._2();
            }
        });
        lines.print();

        // kafkaStream.map(message -> message._2.toLowerCase()).print();
        logger.info("-------------------------------------------------------");
        logger.info("|            Finished: {}             |", appName);
        logger.info("-------------------------------------------------------");

        return context;
    }

    /**
     * Create a SparkConf and configure it.
     *
     */
    private static void configureSpark() {
        logger.info("------------------------------Initializing '%s'.", appName);
        conf = new SparkConf().setAppName(appName);

        if (group != null && group.trim().length() != 0) {
            kafkaParams.put("group.id", group);
        }
        kafkaParams.put("auto.offset.reset", autoOffsetReset);
        kafkaParams.put("security.protocol", secProtocol);
        kafkaParams.put("bootstrap.servers", kfkQuorum);
        kafkaParams.put("zookeeper.connect", zkQuorum);

        logger.info(">- Configuration done with the follow properties:");
        logger.info(conf.toDebugString());
    }

    static String autoOffsetReset, secProtocol, bServers, zServers, kGroupId, kKeyTabFile, kJaas, kTopic, kPrincipal;

}

属性:

app.name=KafkaKerberosReader

auto.offset.reset=smallest

security.protocol=PLAINTEXTSASL

bootstrap.servers=sandbox.hortonworks.com:6667

zookeeper.connect=sandbox.hortonworks.com:2181

group.id=Default

kafka.topic=ifinboundprecint

//#kerberos.keytabfile=/etc/hello.keytab

//#kerberos.jaas=/etc/kafka/conf/kafka_client_jaas.conf

//#kerberos.principal=hello@EXAMPLE.COM

通话中:

spark-submit --master yarn --deploy-mode client --num-executors 3 --executor-memory 500M --executor-cores 3 --class com.my.spark.KafkaKerberosReader ~/SparkStreamKafkaTest-1.0-SNAPSHOT.jar