为什么Kafka的数据到我的Spark流应用程序没有保存到 cassandra ?

ozxc1zmp  于 2022-11-05  发布在  Cassandra
关注(0)|答案(1)|浏览(135)

我的流媒体应用程序不保存数据到CAS,我尝试了不同的方法,使用foreachRDDstream.print来找出为什么它不工作,但它不打印任何东西。
对于输入数据,我使用kafka-console-producer.sh

object letsRun extends App {
  import org.apache.spark.{SparkConf, SparkContext}
  import org.apache.spark._
  import com.datastax.spark.connector._
  import org.apache.spark.sql._
  import com.datastax.spark.connector.writer._
  import org.apache.spark.streaming._
  import org.apache.spark.streaming.StreamingContext._
  import com.datastax.spark.connector.streaming._
  import org.apache.spark.streaming.kafka010._
  import org.apache.kafka.clients.consumer.ConsumerRecord
  import org.apache.kafka.common.serialization.StringDeserializer
  import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
  import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

  val conf = new SparkConf().setMaster("local[*]").setAppName("test").set("spark.cassandra.connection.host", "192.168.1.44")
  //val sc = new SparkContext(conf)
  val ssc = new StreamingContext(conf, Seconds(5))
  ssc.sparkContext.setLogLevel("WARN")

  val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "192.168.1.46:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
  )

  val topics = Set[String]("testTopic")
  val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
    PreferConsistent,
    Subscribe[String, String](topics, kafkaParams)
  )
  implicit val sqlRowWriter = SqlRowWriter.Factory

  val messages = stream.map(record => (record.key, record.value))
  messages.saveToCassandra("ks", "tb", SomeColumns("key", "value"))
  ssc.start()
  ssc.awaitTermination()
}

Eclipse中的输出:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
17/06/12 20:23:25 INFO SparkContext: Running Spark version 2.1.1
17/06/12 20:23:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/06/12 20:23:26 WARN Utils: Your hostname, dev resolves to a loopback address: 127.0.0.1; using 192.168.1.41 instead (on interface wlo1)
17/06/12 20:23:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
17/06/12 20:23:26 INFO SecurityManager: Changing view acls to: dev
17/06/12 20:23:26 INFO SecurityManager: Changing modify acls to: dev
17/06/12 20:23:26 INFO SecurityManager: Changing view acls groups to: 
17/06/12 20:23:26 INFO SecurityManager: Changing modify acls groups to: 
17/06/12 20:23:26 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(dev); groups with view permissions: Set(); users  with modify permissions: Set(dev); groups with modify permissions: Set()
17/06/12 20:23:26 INFO Utils: Successfully started service 'sparkDriver' on port 39585.
17/06/12 20:23:26 INFO SparkEnv: Registering MapOutputTracker
17/06/12 20:23:26 INFO SparkEnv: Registering BlockManagerMaster
17/06/12 20:23:26 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
17/06/12 20:23:26 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
17/06/12 20:23:26 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-8eb85c8e-216f-4b69-a567-3e7833cd675d
17/06/12 20:23:26 INFO MemoryStore: MemoryStore started with capacity 870.9 MB
17/06/12 20:23:26 INFO SparkEnv: Registering OutputCommitCoordinator
17/06/12 20:23:27 INFO Utils: Successfully started service 'SparkUI' on port 4040.
17/06/12 20:23:27 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.1.41:4040
17/06/12 20:23:27 INFO Executor: Starting executor ID driver on host localhost
17/06/12 20:23:27 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 43841.
17/06/12 20:23:27 INFO NettyBlockTransferService: Server created on 192.168.1.41:43841
17/06/12 20:23:27 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
17/06/12 20:23:27 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.1.41, 43841, None)
17/06/12 20:23:27 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.1.41:43841 with 870.9 MB RAM, BlockManagerId(driver, 192.168.1.41, 43841, None)
17/06/12 20:23:27 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.1.41, 43841, None)
17/06/12 20:23:27 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.1.41, 43841, None)
17/06/12 20:23:27 WARN KafkaUtils: overriding enable.auto.commit to false for executor
17/06/12 20:23:27 WARN KafkaUtils: overriding auto.offset.reset to none for executor
17/06/12 20:23:27 WARN KafkaUtils: overriding executor group.id to spark-executor-use_a_separate_group_id_for_each_stream
17/06/12 20:23:27 WARN KafkaUtils: overriding receive.buffer.bytes to 65536 see KAFKA-3135
rt4zxlrg

rt4zxlrg1#

对不起,问题是我自己解决的

ssc.sparkContext.setLogLevel("DEBUG")

Spark试图解析我的虚拟机的主机名,但在config Kafka address中我使用了IP,所以将地址添加到/etc/hosts中,它就工作了!

相关问题