system.env和属性文件中的值

ykejflvf  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(432)

我有一个多节点spark集群,并在master所在的节点上提交spark程序。
当作业提交到从属节点时,hostname参数会给出null值。这是将属性读取为null的行。
未从从属节点读取system.getenv(主机名)。

System.out.println("line 76 System.getenv(HOSTNAME)=" + System.getenv("HOSTNAME"));

audit\u user,读取时audit\u password也为空(它们都在属性文件中)。
如果我用一个节点提交作业,我对这些参数没有问题。但是,如果你提交6个节点的独立模式作业,我得到这个问题。
我为所有节点上的属性文件创建了相同的文件夹。
这是我的密码。你能告诉我为什么system.env不给null,我的属性是null吗?

package com.fb.cpd.myapp;

import java.io.Serializable;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Future;

import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.configuration.reloading.FileChangedReloadingStrategy;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

import kafka.common.TopicAndPartition;
import kafka.message.MessageAndMetadata;
import kafka.serializer.DefaultDecoder;
import kafka.serializer.StringDecoder;

public class GenericLogic implements Serializable {
    /**
     * 
     */
    private static final long serialVersionUID = 1L;
    private static final Logger logger = LogManager.getLogger(GenericLogic.class);
    private PropertiesConfiguration props;
    private Producer<String, String> producer = null;
    private Future<RecordMetadata> receipt = null;
    private RecordMetadata receiptInfo = null;
    private ConnectToRDBMS auditor = null;
    private ConnectToRDBMS df = null;

    private static String myId = null;

    private Map<TopicAndPartition, Long> getOffsets(String topic) throws SQLException {
        String appName = "myapp";
        String TopicName = topic;
        Map<TopicAndPartition, Long> topicMap = new HashMap<>(); //
        System.out.println("line 64 before making connection");

        try {
            props = new PropertiesConfiguration("/app/lock/conf/empty.properties");
        } catch (ConfigurationException e) { // TODO Auto-generated catch block
            System.out.println("Line 70");
            e.printStackTrace();
        }

        try {
            System.out.println("line 76 System.getenv(HOSTNAME)=" + System.getenv("HOSTNAME"));
            auditor = new ConnectToRDBMS(System.getenv("HOSTNAME"), "lockSparkCollector", null, null, null, null, null,
                    0, props.getString("AUDIT_USER"), props.getString("AUDIT_PASSWORD"),
                    props.getString("AUDIT_DB_URL"));
        } catch (SQLException e) {
            logger.error("ASSERT: run() ERROR CONNECTING TO AUDIT DB " + e.getMessage());
        }
        System.out.println("line 64 after making connection");

        Statement stmt = null;

        String query = "select va_application, topic_name, partition_id, from_offset,until_offset from lock_spark_offsets where va_application = "
                + "'" + appName + "'" + " and topic_name= " + "'" + TopicName + "'";
        System.out.println("query" + query);
        System.out.println("before query exection");
        try {
            stmt = auditor.dbConnection.createStatement();
            System.out.println("line 81");

            ResultSet rs = stmt.executeQuery(query);
            System.out.println("line 83");
            while (rs.next()) {
                System.out.println("pass 1 of Resultset");
                System.out.println("getOffsets=" + topic.trim() + " " + rs.getInt("partition_id") + " "
                        + rs.getString("until_offset") + " " + rs.getString("until_offset"));
                Integer partition = rs.getInt("partition_id");

                TopicAndPartition tp = new TopicAndPartition(topic.trim(), partition);
                System.out.println("102");
                Long.parseLong(rs.getString("until_offset"));
                topicMap.put(tp, Long.parseLong(rs.getString("until_offset")));
                System.out.println("105");

            }
            System.out.println("after populating topic map");

        } catch (

        SQLException e) {
            System.out.println("printing exception");
            e.printStackTrace();
        } finally {
            if (stmt != null) {
                System.out.println("closing statement");
                stmt.close();
            }
        }
        return topicMap;
    }

    public void setDefaultProperties() {
        FileChangedReloadingStrategy strategy = new FileChangedReloadingStrategy();
        strategy.setRefreshDelay(10000);
        System.out.println("Line 45");
        // supply the properties file.
        try {
            props = new PropertiesConfiguration("/app/lock/conf/empty.properties");
        } catch (ConfigurationException e) {
            // TODO Auto-generated catch block
            System.out.println("Line 51");
            e.printStackTrace();
        }
        props.setReloadingStrategy(strategy);
        System.out.println("Line 56");

        // Producer configs
        if (!props.containsKey("acks")) {
            props.setProperty("acks", "1");
        }

        if (!props.containsKey("retries")) {
            props.setProperty("retries", "1000");
        }

        if (!props.containsKey("compression.type")) {
            props.setProperty("compression.type", "gzip");
        }

        if (!props.containsKey("request.timeout.ms")) {
            props.setProperty("request.timeout.ms", "600000");
        }

        if (!props.containsKey("batch.size")) {
            props.setProperty("batch.size", "32768");
        }

        if (!props.containsKey("buffer.memory")) {
            props.setProperty("buffer.memory", "134217728");
        }

        if (!props.containsKey("block.on.buffer.full")) {
            props.setProperty("block.on.buffer.full", "true");
        }

        if (!props.containsKey("SHUTDOWN")) {
            props.setProperty("SHUTDOWN", "false");
        }

        if (!props.containsKey("producer.topic")) {
            props.setProperty("producer.topic", "mytopic1");
        }

        Properties producer_props = ConfigurationConverter.getProperties(props);

        producer_props.setProperty("bootstrap.servers", props.getString("target.bootstrap.servers"));
        producer_props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producer_props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // ????

        this.producer = new KafkaProducer<String, String>(producer_props);
        System.out.println("Line 107");

    }

    public void PublishMessages(String st) {

        try {
            System.out.println("Line 111");
            String key = UUID.randomUUID().toString().replace("-", "");
            System.out.println("Started Producing...");

            receipt = producer.send(new ProducerRecord<String, String>(props.getString("producer.topic"), key, // Key
                    st));
            System.out.println("After Completion of Producing Producing");
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("Exception in PublishMessages ");
        }

    }

    public void DBConnect() {
        try {
            auditor = new ConnectToRDBMS(System.getenv("HOSTNAME"), "myapp", props.getString("consumer.topic"), null,
                    null, null, null, 0, props.getString("AUDIT_USER"), props.getString("AUDIT_PASSWORD"),
                    props.getString("AUDIT_DB_URL"));
        } catch (SQLException e) {
            logger.error("ASSERT: run() ERROR CONNECTING TO AUDIT DB " + e.getMessage());
            return;
        }
    }

    private void writeToDB(Long startTime, Integer partnId, String fromOffset, String untilOffset, Integer count) {
        this.auditor.audit(startTime, partnId, fromOffset, untilOffset, count);

    }

    /**
     * 
     * @param jsc
     * @param topicSet
     * @throws Exception
     */
    public static void main(String[] args) {
        String topicNames = "MySourceTopic";
        GenericLogic ec = new GenericLogic();
        Map<TopicAndPartition, Long> topicMap = null;
        try {

            topicMap = ec.getOffsets("MySourceTopic");

        } catch (SQLException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }

        boolean clusterMode = false;

        Integer batchDuration = Integer.parseInt("30000");
        JavaSparkContext sparkConf = new JavaSparkContext("abcd.net:7077", "Kafka-Spark-Integration");

        sparkConf.getConf().set("spark.local.ip", "lock-dt-a4d.xyz.com");
        sparkConf.getConf().set("spark.eventLog.enabled", "false");
        sparkConf.getConf().set("spark.shuffle.blockTransferService", "nio");

        JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, new Duration(10000));
        Map<String, String> kafkaParams = new HashMap<String, String>();
        String pollInterval = "10000";
        String zookeeper = "lock-dt-a5d.xyz.com:2181,lock-dt-a6d.xyz.com:2181";

        kafkaParams.put("metadata.broker.list", "lock-dt-a5d.xyz.com:9092,lock-dt-a6d.xyz.com:9092");
        kafkaParams.put("group.id", "Consumer");
        kafkaParams.put("client.id", "Consumer");
        kafkaParams.put("zookeeper.connect", zookeeper);

        JavaInputDStream<byte[]> directKafkaStream = KafkaUtils.createDirectStream(jsc, String.class, byte[].class,
                StringDecoder.class, DefaultDecoder.class, byte[].class, kafkaParams, topicMap,
                (Function<MessageAndMetadata<String, byte[]>, byte[]>) MessageAndMetadata::message);

        directKafkaStream.foreachRDD(rdd -> {
            if (rdd.isEmpty()) {
                System.out.println("No events polled in last " + pollInterval + " milli seconds");
                return;
            }

            rdd.foreachPartition(itr -> {
                Integer partnId = TaskContext.get().partitionId();
                Long systime = System.nanoTime();
                Map<String, String> hmap = new HashMap<String, String>();

                GenericLogic ec2 = new GenericLogic();
                ec2.setDefaultProperties();
                ec2.DBConnect();

                try {

                    while (itr.hasNext()) {
                        System.out.println("232");
                    }

                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }

            });
        });
        jsc.start();
        jsc.awaitTermination();
    }

}
y0u0uwnf

y0u0uwnf1#

你能让我们知道所有节点的操作系统,如果你已经确保主节点上的注解是导出主机名。回答你的问题会更好,如果你让我们知道你的操作系统的细节。
getenv(“hostname”)可能不会在所有平台(例如ubuntu或mac)中提供主机名。
更好的是为什么不导出主机名。
注意:我假设您已经检查过props是否为null或空?如果未加载,则调试并检查是否加载了属性文件,如果已加载,则不是空属性文件,因此已从该文件加载属性。
看看您的问题(不仅是环境变量,而且属性也没有返回),属性文件或其在不同计算机上的相对位置可能有问题。如果它不是放在不同计算机上的精确副本,请同时检查它是否是适合linux的文件(不是在windows中编写和编辑的,而是放在linux中)。

ghhkc1vu

ghhkc1vu2#

我用start-slaves.sh启动了药膏。这就是问题所在。我必须通过提供主地址来开始工作。

相关问题