我的任务是从jdbc中获取表并将它们放到s3中。我已经用Slick模式代码生成器生成了这些表的类。如果我手动为每个表编写代码,它会完美地工作。就像这样
Slick
.source(Tables.table1.result)
.runWith(ParquetStreams.toParquetSingleFile(s"s3a://bucket/table1"))
.onComplete {
case _ =>
println("table1")
}
Slick
.source(Tables.table2.result)
.runWith(ParquetStreams.toParquetSingleFile(s"s3a://bucket/table2"))
.onComplete {
case _ =>
println("table2")
}
问题是,我有很多表,如果我可以迭代它们,这将更容易。
编码
val tbls = Map("table1" -> Tables.table1, "table2" -> Tables.table2)
tbls.foreach(table => {
val table_name = table._1
Slick
.source(table_2.result)
.runWith(ParquetStreams.toParquetSingleFile(s"s3a://bucket/$table_name"))
.onComplete {
case _ =>
println(table_name)
}
})
获取此错误。
could not find implicit value for evidence parameter of type com.github.mjakubowski84.parquet4s.ParquetRecordEncoder[_1#TableElementType]
[error] .runWith(ParquetStreams.toParquetSingleFile(s"s3a://bucket/$table_name"))
**编辑:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
谢谢你的澄清。但即使有建议,它也不起作用。应用这些东西后,同样的错误存在,很少有更多的出现。
found : slick.lifted.TableQuery[_1] where type _1 >: Tables.TableTwo with Tables.TableOne <: Tables.profile.Table[_ >: Tables.TableTwoRow with Tables.TableOneRow <: Product with java.io.Serializable]
有人问我简化的代码在这里。
import java.sql.Timestamp
import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.stream.alpakka.slick.javadsl.SlickSession
import akka.stream.alpakka.slick.scaladsl.Slick
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext}
import com.github.mjakubowski84.parquet4s.{ParquetRecordEncoder, ParquetSchemaResolver, ParquetStreams}
object Tables extends {
val profile = slick.jdbc.MySQLProfile
} with Tables
/**Slick data model trait for extension, choice of backend or usage in the cake pattern. (Make sure to initialize this late.) */
trait Tables {
val profile: slick.jdbc.JdbcProfile
import profile.api._
case class TableOneRow(id: Int, values: Option[String] = None)
class TableOne(_tableTag: Tag) extends profile.api.Table[TableOneRow](_tableTag, Some("schema"), "table_one") {
def * = (id, values) <> (TableOneRow.tupled, TableOneRow.unapply)
val id: Rep[Int] = column[Int]("id", O.AutoInc, O.PrimaryKey)
val values: Rep[Option[String]] = column[Option[String]]("values", O.Default(None))
}
lazy val TableOne = new TableQuery(tag => new TableOne(tag))
case class TableTwoRow(id: Int, values: Option[Timestamp] = None)
class TableTwo(_tableTag: Tag) extends profile.api.Table[TableTwoRow](_tableTag, Some("schema"), "table_two") {
def * = (id, date) <> (TableTwoRow.tupled, TableTwoRow.unapply)
val id: Rep[Int] = column[Int]("id", O.AutoInc, O.PrimaryKey)
val date: Rep[Option[Timestamp]] = column[Option[Timestamp]]("date", O.Default(None))
}
lazy val TableTwo = new TableQuery(tag => new TableTwo(tag))
}
object Main extends App {
implicit val actorSystem: ActorSystem[Nothing] = ActorSystem(Behaviors.empty, "alpakka-sample")
implicit val executionContext: ExecutionContext = actorSystem.executionContext
implicit val session = SlickSession.forConfig("slick-mysql") // (1)
import session.profile.api._
case class TableWithRecordEncoder[A](
table: TableQuery[A])(
implicit val recordEncoder: ParquetRecordEncoder[A]
)
def doTheThings[A](table: TableWithRecordEncoder[A], path: String) = {
import table.recordEncoder
Slick
.source(table.table.result)
.runWith(ParquetStreams.toParquetSingleFile(path))
.onComplete {
case _ =>
println("Done. " + path)
}
}
import polymorphic._
def withRecordEncoder[A](table: TableQuery[A])(implicit recordEncoder: ParquetRecordEncoder[A])
: Exists[TableWithRecordEncoder]
= Exists(TableWithRecordEncoder(table))
import polymorphic.syntax.all._
Map("s3a://bucket/table_1" -> withRecordEncoder(Tables.TableOne),
"s3a://bucket/table_2" -> withRecordEncoder(Tables.TableTwo)).
foreach{ case (path, table) =>
doTheThings(table.value, path)
}
actorSystem.whenTerminated.map(_ => session.close())
Await.result(actorSystem.whenTerminated, Duration.Inf)
}
1条答案
按热度按时间4c8rllxm1#
当你把
Tables.table1
和Tables.table2
放到一个集合中时,编译器会找到它们的类型的最小上界,我们称之为T
。T
将是Map
的值类型。但是隐式搜索是在编译时执行的,并且没有适合T#TableElementType
的ParquetRecordEncoder
示例,只适合具体的子类型。因此,如果您想迭代这些表,您不仅需要将表添加到Map中,还需要添加所有相关的类型类示例。
//edit:这实际上有点棘手,所以我准备了一个例子来说明它是如何工作的。我希望你能从这里解决它。
为了使事情变得更简单,我使用了polymorphic库。
因此,
Table
和ParquetRecordEncoder
类型看起来如下所示:我们还需要一些示例:
现在我们需要一个类型来将这两种类型打包在一起:
我们需要一个函数来处理这些对象:
但是,由于类型参数
A
对于每个表都是不同的,我们不能将不同的TableWithRecordEncoder
对象填充到同一个Map
中!这就是polymorphic
库中的Exists[F[_]]
类型的用武之地。它将所有这些不同的类型压缩为一个。让我们定义一个小助手来生成这样的对象:请注意,表的元素类型根本不会出现在
withRecordEncoder
的返回类型中。无论表的元素类型是什么,函数都将返回Exists[TableWithRecordEncoder]
!这一点至关重要,因为现在我们可以在同一个Map中填充不同的表:希望这对你有帮助!