spark流Kafka消费者

kq0g1dla  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(381)

我试图建立一个Spark流简单的应用程序,将阅读Kafka主题的消息。
经过大量的工作,我在这个阶段,但得到如下所示的例外情况。
代码:

public static void main(String[] args) throws Exception {

    String brokers = "my.kafka.broker" + ":" + "6667";
    String topics = "MyKafkaTopic";

    // Create context with a 2 seconds batch interval
    SparkConf sparkConf = new SparkConf().setAppName("StreamingE")
            .setMaster("local[1]")
            ;
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));

    Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
    Map<String, String> kafkaParams = new HashMap<>();
    kafkaParams.put("metadata.broker.list", brokers);
    System.out.println("Brokers: " + brokers);

    // Create direct kafka stream with brokers and topics
    JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
            jssc,
            String.class,
            String.class,
            StringDecoder.class,
            StringDecoder.class,
            kafkaParams,
            topicsSet
    );

    System.out.println("Message received: " + messages);

    // Start the computation
    jssc.start();
    jssc.awaitTermination();

}

它抛出:

[WARNING] 
java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
    at scala.Predef$.require(Predef.scala:233)
    at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:161)
    at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:542)
    at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601)
    at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)
    at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:624)
    at com.ncr.dataplatform.api.StreamingE.main(StreamingE.java:66)

出于绝望,我试着联系Zookeeper:

String brokers = "my.kafka.zookeeper" + ":" + "2181";
String topics = "MyKafkaTopic";

但这意味着:

[WARNING] 
java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
    at scala.util.Either.fold(Either.scala:97)
    at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
    at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
    at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
    at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)
    at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
    at com.ncr.dataplatform.api.StreamingE.main(StreamingE.java:53)

相关依赖项包括:

<properties>
  <spark.version>1.6.2</spark.version>
  <kafka.version>0.8.2.1</kafka.version>
</properties>

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.10</artifactId>
  <version>${kafka.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.10</artifactId>
  <version>${spark.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.10</artifactId>
  <version>${spark.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-kafka_2.10</artifactId>
  <version>${spark.version}</version>
</dependency>

我想问:
我应该连接到kafka代理还是zookeeper服务器?
我的代码中有什么错误,无法连接/侦听传入的消息?

gpfsuwkq

gpfsuwkq1#

import static org.apache.spark.streaming.kafka.KafkaUtils.createStream;

import java.util.HashMap;
import java.util.Map;

import org.apache.spark.SparkConf;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.ImmutableMap;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.Properties;

import kafka.serializer.StringDecoder;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.api.java.JavaDStream;
import scala.Tuple2;

public class KafkaKerberosReader {

    // Spark information
    private static SparkConf conf;
    private static String appName = "KafkaKerberosReader";
    private static JavaStreamingContext context;
    private static final Logger logger = LoggerFactory.getLogger(KafkaKerberosReader.class.getSimpleName());

    // Kafka information
    private static String zkQuorum = "";
    private static String kfkQuorum = "";
    private static String group = "";
    private static Integer threads = 1;
    private static Map<String, String> kafkaParams = new HashMap<String, String>();

    public static void loadProps() {
        Properties prop = new Properties();
        try {
            logger.info("------------------------------loadProps");
            InputStream input = new FileInputStream("config.properties");
            prop.load(input);
            System.out.println("loadProps loaded:" + prop);

            appName = prop.getProperty("app.name");
            autoOffsetReset = prop.getProperty("auto.offset.reset");
            secProtocol = prop.getProperty("security.protocol");
            kfkQuorum = bServers = prop.getProperty("bootstrap.servers");
            zkQuorum = zServers = prop.getProperty("zookeeper.connect");
            group = kGroupId = prop.getProperty("group.id");
            kKeyTabFile = prop.getProperty("kerberos.keytabfile");
            kJaas = prop.getProperty("kerberos.jaas");
            kTopic = prop.getProperty("kafka.topic");
            kPrincipal = prop.getProperty("kerberos.principal");
            logger.info("loadProps:Props:zk:" + zServers + ",issecure:" + secProtocol + ",autoOffsetReset:"
                    + autoOffsetReset + ",bServers:" + bServers + ",kJaas:" + kJaas + ",keytab:" + kKeyTabFile
                    + ", kTopic:" + kTopic + ", kPrincipal" + kPrincipal);

            if (kPrincipal != null && kKeyTabFile != null) {
                logger.info("---------------------Logging into Kerberos");
                org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
                conf.set("hadoop.security.authentication", "Kerberos");
                UserGroupInformation.setConfiguration(conf);
                UserGroupInformation.loginUserFromKeytabAndReturnUGI(kPrincipal, kKeyTabFile);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        logger.info("------------------------------main:START");
        loadProps();
        // Configure the application
        configureSpark();

        // Create the context
        context = createContext(kTopic);

        // Stop the application
        context.start();
        context.awaitTermination();
        logger.info("main:END");
    }

    /**
     * ----------------------------------------------- | This is the kernel of
     * the spark application | -----------------------------------------------
     *
     */
    private static JavaStreamingContext createContext(String topic) {

        logger.info("-------------------------------------------------------");
        logger.info("|            Starting: {}             |", appName);
        logger.info("|            kafkaParams:              |", kafkaParams);
        logger.info("-------------------------------------------------------");

        // Create the spark streaming context
        context = new JavaStreamingContext(conf, Seconds.apply(5));

        // Read from a Kerberized Kafka
        JavaPairReceiverInputDStream<String, String> kafkaStream = createStream(context, zkQuorum, "Default",
                ImmutableMap.of(topic, threads), StorageLevel.MEMORY_AND_DISK_SER());

        kafkaStream.print();
        JavaDStream<String> lines = kafkaStream.map(new Function<Tuple2<String, String>, String>() {
            private static final long serialVersionUID = 1L;

            @Override
            public String call(Tuple2<String, String> tuple2) {
                return tuple2._2();
            }
        });
        lines.print();

        // kafkaStream.map(message -> message._2.toLowerCase()).print();
        logger.info("-------------------------------------------------------");
        logger.info("|            Finished: {}             |", appName);
        logger.info("-------------------------------------------------------");

        return context;
    }

    /**
     * Create a SparkConf and configure it.
     *
     */
    private static void configureSpark() {
        logger.info("------------------------------Initializing '%s'.", appName);
        conf = new SparkConf().setAppName(appName);

        if (group != null && group.trim().length() != 0) {
            kafkaParams.put("group.id", group);
        }
        kafkaParams.put("auto.offset.reset", autoOffsetReset);
        kafkaParams.put("security.protocol", secProtocol);
        kafkaParams.put("bootstrap.servers", kfkQuorum);
        kafkaParams.put("zookeeper.connect", zkQuorum);

        logger.info(">- Configuration done with the follow properties:");
        logger.info(conf.toDebugString());
    }

    static String autoOffsetReset, secProtocol, bServers, zServers, kGroupId, kKeyTabFile, kJaas, kTopic, kPrincipal;

}

属性:

app.name=KafkaKerberosReader

auto.offset.reset=smallest

security.protocol=PLAINTEXTSASL

bootstrap.servers=sandbox.hortonworks.com:6667

zookeeper.connect=sandbox.hortonworks.com:2181

group.id=Default

kafka.topic=ifinboundprecint

//#kerberos.keytabfile=/etc/hello.keytab

//#kerberos.jaas=/etc/kafka/conf/kafka_client_jaas.conf

//#kerberos.principal=hello@EXAMPLE.COM

打电话:
spark submit--master yarn--deploy mode client--num executors 3--executor memory 500m--executor cores 3--class com.my.spark.kafkakerberosreader~/sparkstreamkafkatest-1.0-snapshot.jar

falq053o

falq053o2#

原因:java.lang.illegalargumentexception:需求失败:没有注册输出操作,因此没有要执行的操作
spark的工作方式是,它的大多数转换都是懒惰的。当您想要执行一个图形时,您需要注册一个输出转换。输出转换以 foreachRDD , print , collect 或者 count (还有更多)。
而不是使用 println ,呼叫 DStream.print() :

// Create direct kafka stream with brokers and topics
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
        jssc,
        String.class,
        String.class,
        StringDecoder.class,
        StringDecoder.class,
        kafkaParams,
        topicsSet
);

messages.print();

// Start the computation
jssc.start();
jssc.awaitTermination();

关于Kafka, metadata.broker.list 需要提供kafka代理节点的地址。有一个单独的键名为 zookeeper.connect 提供Zookeeper的地址。

相关问题