将mysql中的大数据加载到spark中

ghhkc1vu  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(421)

寻找Spark理解。。。
我正在将mysql中的大量数据加载到spark中,它一直在消亡:-(

org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:156)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)

这是我的密码

val query =
  s"""
     (
      select 
      mod(act.AccountID, ${parts}) part,
      p.Value name, event.EventTime eventTime, act.AccountID accountID, act.UserGoal goalID,event.ActivityID activityID, id.CountryID countryID, arr.ConsumerID consumerID
      from DimIdentity as id
      join FactArrival as arr on  arr.IdentityID=id.IdentityID
      join FactActivityEvent as event on event.ArrivalID=arr.ArrivalID
      join DimAccount as act on  act.AccountID=event.AccountID
      join DimAccountRoleTypeMatch as role on role.AccountID=act.AccountID
      join DimDateTime as d on event.DateTimeID=d.DateTimeID
      join DimProperty as p on p.PropertyID=event.EventTypeID
      where
        id.Botness=0 and 
        d.DayOfYear>=${from} and d.DayOfYear<${to} and d.Year=${year} and
        (role.AccountRoleTypeID=1 or role.AccountRoleTypeID=2)
  ) a
  """.stripMargin

val events = sqlContext.read.format("jdbc").
  option("url", sqlURL).
  option("driver", "com.mysql.jdbc.Driver").
  option("useUnicode", "true").
  option("zeroDateTimeBehavior", "round").
  option("continueBatchOnError", "true").
  option("useSSL", "false").
  option("dbtable", query).
  option("user", sqlUser).
  option("password", sqlPassword).
  option("partitionColumn", "part").
  option("lowerBound", "0").
  option("upperBound", s"${parts - 1}").
  option("numPartitions", s"${parts}").
  load().as[Activity].toDF

请注意,我使用的是partitioncolumn、lowerbound、upperbound和numpartitions,这是其他答案中推荐的
我试着将分区从4设置为512,但总是死掉。从文件或mongo读取相同数量的数据没有问题。这是mysql连接器的问题吗?有解决办法吗?
请注意,我找到了一个建议避免spark的答案,并将查询读入hdfs上的一个文件,然后加载该文件
spark rdd中的多个分区
这真的是最好的方法吗?

s8vozzvw

s8vozzvw1#

您可以尝试增加获取大小,而不使用动态分区进行读取。

sqlContext.read.options(options).jdbc(
url=sqlURL, table=query, columnName="part",
fetchSize=1000000,connectionProperties=new java.util.Properties())
eoxn13cs

eoxn13cs2#

这是我得到的答案。。。
对我来说,答案是避免spark的mysql连接:-(我发现避免分区导致的崩溃太困难了。mysql连接需要手动调整分区,并且不会提高速度。更容易编写将数据读入大文本文件的非spark代码,并在文本文件上调用spark。spark非常适合大多数数据源,但不适合mysql。。。至少还没有

相关问题