kafka 2.9.1 producer 0.8.2.1编译与运行时依赖关系

yuvru6vn  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(407)

因此,在api 0.8.2中,生产者的kakfa配置属性发生了变化;在处理完这个问题并让我的java生产者编译之后,我得到了一个异常。生产者是针对我的kafka_2.9.1-0.8.2.1集群的节点,我得到了关于 DefaultSerializer 不示例化:

  1. Exception in thread "main" org.apache.kafka.common.KafkaException: Could not instantiate class kafka.serializer.DefaultEncoder Does it have a public no-argument constructor?
  2. at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:235)
  3. at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:136)
  4. at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:216)
  5. ........

考虑到这是在kakfa中实现的,我想知道使用kafka作为依赖项进行编译是否不够,因为在运行时可能需要打包一个或多个kafka jar。我还没有找到关于这个的文档(最新的或者其他的)。我丢了一个生产者运行时jar吗?
作为参考,我把我的 build.gradle 这里(有点乱)。编译中的排除项是在已经获得此错误之后的新添加项,因此无论依赖项块中是否包含这些行,都会发生错误。我试着只依靠 kafka-client 0.8.2的模块,但我认为这对制作人不适用。文件如下:

  1. buildscript {
  2. repositories {
  3. mavenCentral()
  4. }
  5. dependencies {
  6. classpath 'com.google.protobuf:protobuf-gradle-plugin:0.7.0'
  7. }
  8. }
  9. group 'lamblin'
  10. version '0.1-SNAPSHOT'
  11. apply plugin: 'application'
  12. apply plugin: "com.google.protobuf"
  13. sourceCompatibility = 1.7
  14. targetCompatibility = 1.7
  15. // Eliminates bootstrap class warning from javac
  16. //tasks.withType(Compile) {
  17. // options.bootClasspath = "$JDK6_HOME/jre/lib/rt.jar"
  18. //}
  19. repositories {
  20. mavenCentral()
  21. }
  22. dependencies {
  23. testCompile group: 'junit', name: 'junit', version: '4.11'
  24. compile group: 'com.google.guava', name: 'guava', version: '18.0'
  25. compile group: 'com.google.protobuf', name: 'protobuf-java-util', version: '3.0.0-beta-1'
  26. compile group: 'com.google.transit', name: 'gtfs-realtime-bindings', version: '0.0.4'
  27. compile group: 'com.offbytwo', name: 'docopt', version: '0.6.0.20150202'
  28. //compile group: 'org.apache.kafka', name: 'kafka_2.9.1', version: '0.8.2.1' {
  29. compile ('org.apache.kafka:kafka_2.9.1:0.8.2.1') {
  30. exclude group: 'com.sun.jmx', module: 'jmxri'
  31. exclude group: 'javax.jmx', module: 'jms'
  32. exclude group: 'com.sun.jdmk', module: 'jmxtools'
  33. }
  34. }
  35. protobuf {
  36. generateProtoTasks {
  37. all().each { task ->
  38. task.builtins {
  39. python { }
  40. }
  41. }
  42. }
  43. protoc {
  44. // artifact = 'com.google.protobuf:protoc:3.0.0-alpha-3'
  45. artifact = 'com.google.protobuf:protoc:2.6.1'
  46. }
  47. }
  48. // First Application Script
  49. mainClassName = "com.insight.lamblin.GtfsToJson"
  50. applicationName = "gtfsToJson"
  51. // Subsequent Scripts
  52. task createAllStartScripts() << {
  53. // This task is added to by a loop over the scripts array creating-sub tasks
  54. }
  55. def scripts = [ 'gtfsToJson': 'com.insight.lamblin.GtfsToJson',
  56. 'rawGtfsKafkaProducer': 'com.insight.lamblin.RawGtfsKafkaProducer'
  57. ]
  58. scripts.each() { scriptName, className ->
  59. def t = tasks.create(name: scriptName+'StartScript', type: CreateStartScripts) {
  60. mainClassName = className
  61. applicationName = scriptName
  62. outputDir = new File(project.buildDir, 'scripts')
  63. classpath = jar.outputs.files + project.configurations.runtime
  64. }
  65. applicationDistribution.into("bin") {
  66. from(t)
  67. fileMode = 0755
  68. }
  69. createAllStartScripts.dependsOn(t)
  70. }
dohp0rv5

dohp0rv51#

场景:在一个不起眼的教堂地下室里,一圈折叠金属椅上坐着各种各样的中年男子和少数妇女,他们中的大多数人都戴着眼镜,显得漠不关心。有一盒咖啡和一些半甜甜圈依次排列在一个塑料板上,在靠近入口的一堵墙旁边的一张有缺口的折叠桌上。
丹尼尔:嗨,我叫丹尼尔,我是(呜咽)。。。文档略读器。
小组(慢慢地):欢迎丹尼尔。
那个剧本是因为我的Kafka问题似乎只吸引了蟋蟀,所以我在这里保持一点有趣。。。感到寂寞。
在我的辩护中,有10个看似权威的kafka.apache.org文档是关于生产者的属性设置的。这个 kafka.serializer.DefaultSerializer 在几乎所有设置属性的示例中都非常突出和常见,而java producer示例完全缺乏关于属性或运行示例代码的详细信息。
此外,尽管名称为“default”,但此属性没有默认值,因此需要设置它。这似乎是一个愚蠢的细节,但它必须有意义的一些人在Kafka开发团队。
当运行用java编写的kafka生产者时,生产者应该从可用的少数java特定编码器中设置编码器。前面提到的一个似乎是scala特有的。对于您感兴趣的java org.apache.kafka.common.serialization 与默认序列化程序等价的是: ByteArraySerializer . 如果你设置 key.serializer 以及 value.serializer 应该有用。设置这些的更好方法是在中使用静态字符串 ProducerConfig 就像 ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG .
设置有点像:

  1. import org.apache.kafka.clients.producer.KafkaProducer;
  2. import org.apache.kafka.clients.producer.ProducerConfig;
  3. import org.apache.kafka.clients.producer.ProducerRecord;
  4. ...
  5. import java.util.Properties;
  6. ...
  7. Properties props = new Properties();
  8. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
  9. "172.31.22.7:9092,172.31.22.6:9092,172.31.22.5:9092,172.31.22.4:9092");
  10. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  11. "org.apache.kafka.common.serialization.ByteArraySerializer");
  12. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  13. "org.apache.kafka.common.serialization.ByteArraySerializer");
  14. KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(props);
  15. ...
展开查看全部

相关问题