apache storm flux kafka喷口记录转换器异常

gblwokeq  于 2021-06-24  发布在  Storm
关注(0)|答案(1)|浏览(326)

我使用storm flux 1.2.2来部署拓扑。我正在将记录转换器传递给kafkaspoutconfig(从->https://github.com/apache/storm/blob/master/flux/flux-examples/src/main/resources/kafka_spout.yaml),但出现以下异常:

java.lang.ClassNotFoundException: org.apache.storm.flux.examples.OnlyValueRecordTranslator
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:264)
    at org.apache.storm.flux.FluxBuilder.buildObject(FluxBuilder.java:342)
    at org.apache.storm.flux.FluxBuilder.buildComponents(FluxBuilder.java:421)
    at org.apache.storm.flux.FluxBuilder.buildTopology(FluxBuilder.java:101)
    at com.gsl.saf.storm.flux.manager.CustomFluxManager.runFlux(CustomFluxManager.java:87)
    at com.gsl.saf.storm.flux.manager.StromTopologyManager.SubmitTopology(StromTopologyManager.java:185)
    at com.gsl.saf.storm.flux.manager.StromTopologyManager.submitTopology(StromTopologyManager.java:299)
    at com.gsl.saf.stormflux.App.main(App.java:36)

通量yaml配置:

components:

  - id: "windowDuration"
    className: "org.apache.storm.topology.base.BaseWindowedBolt$Duration"
    constructorArgs:
      - 10
      - "SECONDS"
  - id: "onlyValueRecordTranslator"
    className: "org.apache.storm.flux.examples.OnlyValueRecordTranslator"

  - id: "spoutConfigBuilder"
    className: "org.apache.storm.kafka.spout.KafkaSpoutConfig$Builder"
    constructorArgs:
      - "localhost:9092"
      - ["test_topic"]
    properties:
      - name: "firstPollOffsetStrategy"
        value: EARLIEST
      - name: "recordTranslator"
        ref: "onlyValueRecordTranslator"
    configMethods:
      - name: "setProp"
        args:
          - {
              "key.deserializer" : "org.apache.kafka.common.serialization.StringDeserializer",
              "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
            }

  - id: "spoutConfig"
    className: "org.apache.storm.kafka.spout.KafkaSpoutConfig"
    constructorArgs:
      - ref: "spoutConfigBuilder"
config:
  topology.workers: 1

# spout definitions

spouts:
  - id: "spout-1"
    className: "org.apache.storm.kafka.spout.KafkaSpout"
    constructorArgs:
      - ref: "spoutConfig"

pom配置:

<dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>1.2.2</version>
            </dependency>

        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>flux-core</artifactId>
            <version>1.2.2</version>
        </dependency>

谢谢

but5z9lq

but5z9lq1#

OnlyValueRecordTranslator 只存在于 flux-examples 项目。看到了吗https://github.com/apache/storm/blob/master/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/onlyvaluerecordtranslator.java. 如果你愿意,你可以把它复制到你的项目中。

相关问题