带有zkstringserializer的scala包问题

wfsdck30  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(480)

我正在尝试使用类zkstringserializer,这是我得到的

import kafka.utils.ZKStringSerializer

根据整个互联网,甚至我自己的代码之前,我重新启动电脑,这应该允许我的代码工作。但是,我现在得到了一个非常混乱的编译错误,

object ZKStringSerializer in package utils cannot be accessed in package kafka.utils

这很混乱,因为这个文件不应该在任何包中,而且我没有在任何地方指定包。这是我的密码:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.execution.streaming.FileStreamSource.Timestamp
import org.apache.spark.sql.types._
import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.ZkConnection
import java.util.Properties

import org.apache.kafka.clients.admin
import kafka.admin.{AdminUtils, RackAwareMode}
import kafka.utils.ZKStringSerializer
import kafka.utils.ZkUtils
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

object SpeedTester {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder.master("local[4]").appName("SpeedTester").config("spark.driver.memory", "8g").getOrCreate()
    val rootLogger = Logger.getRootLogger()
    rootLogger.setLevel(Level.ERROR)
    import spark.implicits._
    val zookeeperConnect = "localhost:2181"
    val sessionTimeoutMs = 10000
    val connectionTimeoutMs = 10000
    val zkClient = new ZkClient(zookeeperConnect, sessionTimeoutMs, connectionTimeoutMs, ZKStringSerializer)
    val topicName = "testTopic"
    val numPartitions = 8
    val replicationFactor = 1
    val topicConfig = new Properties
    val isSecureKafkaCluster = false
    val zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster)
    AdminUtils.createTopic(zkUtils, topicName, numPartitions, replicationFactor, topicConfig)

    // Create producer for topic testTopic and actually push values to the topic
    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9592")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    val producer = new KafkaProducer[String, String](props)
    val TOPIC = "testTopic"
    for (i <- 1 to 50) {
      val record = new ProducerRecord(TOPIC, "key", s"hello $i")
      producer.send(record)
    }

    val record = new ProducerRecord(TOPIC, "key", "the end" + new java.util.Date)
    producer.send(record)
    producer.flush()
    producer.close()
  }
}
w8f9ii69

w8f9ii691#

我知道这已经太迟了,但是对于其他人来说,他们也会寻找同样的问题-
在最新版本的kafka中,kafka.utils被弃用。所以请使用kafka管理客户端api

相关问题