kafka到hdfs的融合源代码

l7wslrjt  于 2021-06-08  发布在  Kafka
关注(0)|答案(0)|浏览(232)

对于我的项目的需求,我需要从汇合的java代码构建一个类,将kafka主题中的数据写入hdfs文件系统。
它实际上在cli中使用connect standalone,但我需要对成功构建的源代码执行相同的操作。
我对sinktask和hdfsconnector类有问题。put方法中出现异常。
下面是我的班级代码:

package io.confluent.connect.hdfs;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;

import io.confluent.connect.avro.AvroData;
import io.confluent.connect.hdfs.avro.AvroFormat;
import io.confluent.connect.hdfs.partitioner.DefaultPartitioner;
import io.confluent.connect.storage.common.StorageCommonConfig;
import io.confluent.connect.storage.partitioner.PartitionerConfig;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigDef;

public class main{
    private static Map<String, String> props = new HashMap<>(); 
    protected static final TopicPartition TOPIC_PARTITION = new TopicPartition(TOPIC, PARTITION);
      protected static String url = "hdfs://localhost:9000";
    protected static SinkTaskContext context;

    public static void main(String[] args) {
        HdfsSinkConnector hk = new HdfsSinkConnector();
        HdfsSinkTask h = new HdfsSinkTask();
        props.put(StorageCommonConfig.STORE_URL_CONFIG, url);
        props.put(HdfsSinkConnectorConfig.HDFS_URL_CONFIG, url);
        props.put(HdfsSinkConnectorConfig.FLUSH_SIZE_CONFIG, "3");
        props.put(HdfsSinkConnectorConfig.FORMAT_CLASS_CONFIG, AvroFormat.class.getName());    
        try {
            hk.start(props);
            Collection<SinkRecord> sinkRecords = new ArrayList<>();
            SinkRecord record = new SinkRecord("test", 0, null, null, null, null, 0);
            sinkRecords.add(record);
            h.initialize(context);
            h.put(sinkRecords);
            hk.stop();
        } catch (Exception e) {
            throw new ConnectException("Couldn't start HdfsSinkConnector due to configuration error", e);
        }
    }
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题