我正在尝试使用“storm hdfs connector 0.1.3”将数据写入hdfs。github url:https://github.com/ptgoetz/storm-hdfs,我已将此依赖项添加到maven项目中。
<dependency>
<groupId>com.github.ptgoetz</groupId>
<artifactId>storm-hdfs</artifactId>
<version>0.1.3-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
StormHDFS项目本身提供了将数据写入hdfs的示例拓扑。我只是修改了它以匹配我的文件位置。hdfsfiletopology是:
package my.company.app;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import org.apache.storm.hdfs.bolt.HdfsBolt;
import org.apache.storm.hdfs.bolt.AbstractHdfsBolt;
import org.apache.storm.hdfs.bolt.SequenceFileBolt;
import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.hdfs.bolt.format.RecordFormat;
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy.Units;
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
import org.apache.storm.hdfs.common.rotation.MoveFileAction;
import org.yaml.snakeyaml.Yaml;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
public class HdfsFileTopology {
static final String SENTENCE_SPOUT_ID = "sentence-spout";
static final String BOLT_ID = "my-bolt";
static final String TOPOLOGY_NAME = "test-topology";
public static void main(String[] args) throws Exception {
Config config = new Config();
config.setNumWorkers(1);
SentenceSpout spout = new SentenceSpout();
// sync the filesystem after every 1k tuples
SyncPolicy syncPolicy = new CountSyncPolicy(1000);
// rotate files when they reach 5MB
FileRotationPolicy rotationPolicy = new TimedRotationPolicy(1.0f, TimedRotationPolicy.TimeUnit.MINUTES);
FileNameFormat fileNameFormat = new DefaultFileNameFormat()
.withPath("/users/storm/")
.withExtension(".txt");
// use "|" instead of "," for field delimiter
RecordFormat format = new DelimitedRecordFormat()
.withFieldDelimiter("|");
Yaml yaml = new Yaml();
InputStream in = new FileInputStream(args[1]);
Map<String, Object> yamlConf = (Map<String, Object>) yaml.load(in);
in.close();
config.put("hdfs.config", yamlConf);
HdfsBolt bolt = new HdfsBolt()
.withConfigKey("hdfs.config")
.withFsUrl(args[0])
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy)
.addRotationAction(new MoveFileAction().toDestination("/dest2/"));
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SENTENCE_SPOUT_ID, spout, 1);
// SentenceSpout --> MyBolt
builder.setBolt(BOLT_ID, bolt, 4)
.shuffleGrouping(SENTENCE_SPOUT_ID);
if (args.length == 2) {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
waitForSeconds(120);
cluster.killTopology(TOPOLOGY_NAME);
cluster.shutdown();
System.exit(0);
} else if (args.length == 3) {
StormSubmitter.submitTopology(args[0], config, builder.createTopology());
} else{
System.out.println("Usage: HdfsFileTopology [topology name] <yaml config file>");
}
}
public static void waitForSeconds(int seconds) {
try {
Thread.sleep(seconds * 1000);
} catch (InterruptedException e) {
}
}
public static class SentenceSpout extends BaseRichSpout {
private ConcurrentHashMap<UUID, Values> pending;
private SpoutOutputCollector collector;
private String[] sentences = {
"my dog has fleas",
"i like cold beverages",
"the dog ate my homework",
"don't have a cow man",
"i don't think i like fleas"
};
private int index = 0;
private int count = 0;
private long total = 0L;
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence", "timestamp"));
}
public void open(Map config, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
this.pending = new ConcurrentHashMap<UUID, Values>();
}
public void nextTuple() {
Values values = new Values(sentences[index], System.currentTimeMillis());
UUID msgId = UUID.randomUUID();
this.pending.put(msgId, values);
this.collector.emit(values, msgId);
index++;
if (index >= sentences.length) {
index = 0;
}
count++;
total++;
if(count > 20000){
count = 0;
System.out.println("Pending count: " + this.pending.size() + ", total: " + this.total);
}
Thread.yield();
}
public void ack(Object msgId) {
this.pending.remove(msgId);
}
public void fail(Object msgId) {
System.out.println("****RESENDING FAILED TUPLE");
this.collector.emit(this.pending.get(msgId), msgId);
}
}
public static class MyBolt extends BaseRichBolt {
private HashMap<String, Long> counts = null;
private OutputCollector collector;
public void prepare(Map config, TopologyContext context, OutputCollector collector) {
this.counts = new HashMap<String, Long>();
this.collector = collector;
}
public void execute(Tuple tuple) {
collector.ack(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// this bolt does not emit anything
}
@Override
public void cleanup() {
}
}
}
我使用maven(groupid:my.company.app)编译这个项目,构建成功,但是当我提交jar文件到storm时,它会抛出错误。
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/storm/hdfs/bolt/format/FileNameFormat
Caused by: java.lang.ClassNotFoundException: org.apache.storm.hdfs.bolt.format.FileNameFormat
即使我已经包含了这个类,为什么它会抛出一个没有找到这个类的错误呢?任何帮助如何写数据到hdfs使用风暴将不胜感激。
根据pom.xml的请求
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.mycompany.app</groupId>
<artifactId>hdfs_example</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>hdfs_example</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<repositories>
<repository>
<id>Codehaus</id>
<url>http://repository.codehaus.org</url>
</repository>
<repository>
<id>Codehaus.Snapshots</id>
<url>http://snapshots.repository.codehaus.org</url>
<snapshots><enabled>true</enabled></snapshots>
</repository>
<repository>
<id>github-releases</id>
<url>http://oss.sonatype.org/content/repositories/github-releases/</url>
</repository>
<repository>
<id>clojars.org</id>
<url>http://clojars.org/repo</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>6.8.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.9.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easytesting</groupId>
<artifactId>fest-assert-core</artifactId>
<version>2.0M8</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jmock</groupId>
<artifactId>jmock</artifactId>
<version>2.6.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.2-incubating</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>15.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.2.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.github.ptgoetz</groupId>
<artifactId>storm-hdfs</artifactId>
<version>0.1.3-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/java</sourceDirectory>
<testSourceDirectory>test/main/java</testSourceDirectory>
<resources>
<resource>
<directory>${basedir}/multilang</directory>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>1.4</version>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<!--
<plugin>
<groupId>com.theoryinpractise</groupId>
<artifactId>clojure-maven-plugin</artifactId>
<version>1.3.12</version>
<extensions>true</extensions>
<configuration>
<sourceDirectories>
<sourceDirectory>src/clj</sourceDirectory>
</sourceDirectories>
</configuration>
<executions>
<execution>
<id>compile</id>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test</id>
<phase>test</phase>
<goals>
<goal>test</goal>
</goals>
</execution>
</executions>
</plugin>
-->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>java</executable>
<includeProjectDependencies>true</includeProjectDependencies>
<includePluginDependencies>false</includePluginDependencies>
<classpathScope>compile</classpathScope>
<mainClass>${storm.topology}</mainClass>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
1条答案
按热度按时间mbzjlibv1#
我也有同样的问题,在详细检查pom.xml之后,我意识到在StormHDFS的依赖中
<version>0.1.3-SNAPSHOT</version>
范围定义为included,我认为这意味着我们必须将jar添加到stormjar中,maven在打包时不会这样做。我将版本更改为maven repo中可用的版本,并删除了迫使maven在构建期间下载jar并将其包含在stormjar中的作用域。
以下是我的pom.xml供参考(删除了一些基本细节):
src/main/resources/false core-site.xml hdfs-site.xml