akka 迭代时evidence参数的隐式值

dhxwm5r4  于 2022-11-06  发布在  其他
关注(0)|答案(1)|浏览(134)

我的任务是从jdbc中获取表并将它们放到s3中。我已经用Slick模式代码生成器生成了这些表的类。如果我手动为每个表编写代码,它会完美地工作。就像这样

Slick
  .source(Tables.table1.result)
  .runWith(ParquetStreams.toParquetSingleFile(s"s3a://bucket/table1"))
  .onComplete {
    case _ =>
      println("table1")
  }
Slick
  .source(Tables.table2.result)
  .runWith(ParquetStreams.toParquetSingleFile(s"s3a://bucket/table2"))
  .onComplete {
    case _ =>
      println("table2")
  }

问题是,我有很多表,如果我可以迭代它们,这将更容易。
编码

val tbls = Map("table1" -> Tables.table1, "table2" -> Tables.table2)
tbls.foreach(table => {
  val table_name = table._1
  Slick
    .source(table_2.result)
    .runWith(ParquetStreams.toParquetSingleFile(s"s3a://bucket/$table_name"))
    .onComplete {
      case _ =>
        println(table_name)
    }
})

获取此错误。

could not find implicit value for evidence parameter of type com.github.mjakubowski84.parquet4s.ParquetRecordEncoder[_1#TableElementType]
[error]       .runWith(ParquetStreams.toParquetSingleFile(s"s3a://bucket/$table_name"))

**编辑:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

谢谢你的澄清。但即使有建议,它也不起作用。应用这些东西后,同样的错误存在,很少有更多的出现。

found   : slick.lifted.TableQuery[_1] where type _1 >: Tables.TableTwo with Tables.TableOne <: Tables.profile.Table[_ >: Tables.TableTwoRow with Tables.TableOneRow <: Product with java.io.Serializable]

有人问我简化的代码在这里。

import java.sql.Timestamp

import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.stream.alpakka.slick.javadsl.SlickSession
import akka.stream.alpakka.slick.scaladsl.Slick

import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext}
import com.github.mjakubowski84.parquet4s.{ParquetRecordEncoder, ParquetSchemaResolver, ParquetStreams}

object Tables extends {
  val profile = slick.jdbc.MySQLProfile
} with Tables

/**Slick data model trait for extension, choice of backend or usage in the cake pattern. (Make sure to initialize this late.) */
trait Tables {
  val profile: slick.jdbc.JdbcProfile
  import profile.api._

  case class TableOneRow(id: Int, values: Option[String] = None)
  class TableOne(_tableTag: Tag) extends profile.api.Table[TableOneRow](_tableTag, Some("schema"), "table_one") {
    def * = (id, values) <> (TableOneRow.tupled, TableOneRow.unapply)

    val id: Rep[Int] = column[Int]("id", O.AutoInc, O.PrimaryKey)
    val values: Rep[Option[String]] = column[Option[String]]("values", O.Default(None))
  }
  lazy val TableOne = new TableQuery(tag => new TableOne(tag))

  case class TableTwoRow(id: Int, values: Option[Timestamp] = None)
  class TableTwo(_tableTag: Tag) extends profile.api.Table[TableTwoRow](_tableTag, Some("schema"), "table_two") {
    def * = (id, date) <> (TableTwoRow.tupled, TableTwoRow.unapply)

    val id: Rep[Int] = column[Int]("id", O.AutoInc, O.PrimaryKey)
    val date: Rep[Option[Timestamp]] = column[Option[Timestamp]]("date", O.Default(None))
  }
  lazy val TableTwo = new TableQuery(tag => new TableTwo(tag))
}

object Main extends App {

  implicit val actorSystem: ActorSystem[Nothing] = ActorSystem(Behaviors.empty, "alpakka-sample")
  implicit val executionContext: ExecutionContext = actorSystem.executionContext
  implicit val session = SlickSession.forConfig("slick-mysql") // (1)

  import session.profile.api._

  case class TableWithRecordEncoder[A](
    table: TableQuery[A])(
    implicit val recordEncoder: ParquetRecordEncoder[A]
  )

  def doTheThings[A](table: TableWithRecordEncoder[A], path: String) = {
    import table.recordEncoder
    Slick
      .source(table.table.result)
      .runWith(ParquetStreams.toParquetSingleFile(path))
      .onComplete {
        case _ =>
          println("Done. " + path)
      }
  }

  import polymorphic._
  def withRecordEncoder[A](table: TableQuery[A])(implicit recordEncoder: ParquetRecordEncoder[A])
  : Exists[TableWithRecordEncoder]
  = Exists(TableWithRecordEncoder(table))

  import polymorphic.syntax.all._
  Map("s3a://bucket/table_1" -> withRecordEncoder(Tables.TableOne),
     "s3a://bucket/table_2" -> withRecordEncoder(Tables.TableTwo)).
     foreach{ case (path, table) =>
      doTheThings(table.value, path)
    }

  actorSystem.whenTerminated.map(_ => session.close())
  Await.result(actorSystem.whenTerminated, Duration.Inf)

}
4c8rllxm

4c8rllxm1#

当你把Tables.table1Tables.table2放到一个集合中时,编译器会找到它们的类型的最小上界,我们称之为TT将是Map的值类型。但是隐式搜索是在编译时执行的,并且没有适合T#TableElementTypeParquetRecordEncoder示例,只适合具体的子类型。
因此,如果您想迭代这些表,您不仅需要将表添加到Map中,还需要添加所有相关的类型类示例。
//edit:这实际上有点棘手,所以我准备了一个例子来说明它是如何工作的。我希望你能从这里解决它。
为了使事情变得更简单,我使用了polymorphic库。
因此,TableParquetRecordEncoder类型看起来如下所示:

trait Table[A] // details omitted
trait ParquetRecordEncoder[A]

我们还需要一些示例:

val intTable = new Table[Int] {}
val stringTable = new Table[String] {}
object ParquetRecordEncoder {
  implicit val int = new ParquetRecordEncoder[Int] {}
  implicit val string = new ParquetRecordEncoder[String] {}
}

现在我们需要一个类型来将这两种类型打包在一起:

case class TableWithRecordEncoder[A](
  table: Table[A])(
  implicit val recordEncoder: ParquetRecordEncoder[A]
)

我们需要一个函数来处理这些对象:

def doTheThings[A](table: TableWithRecordEncoder[A]) = {
    import table.recordEncoder // now we have a ParquetRecordEncoder[A]
                               // available as an implicit!

    ???                        // logic goes here
  }

但是,由于类型参数A对于每个表都是不同的,我们不能将不同的TableWithRecordEncoder对象填充到同一个Map中!这就是polymorphic库中的Exists[F[_]]类型的用武之地。它将所有这些不同的类型压缩为一个。让我们定义一个小助手来生成这样的对象:

import polymorphic._
def withRecordEncoder[A](table: Table[A])(implicit recordEncoder: ParquetRecordEncoder[A])
  : Exists[TableWithRecordEncoder]
  = Exists(TableWithRecordEncoder(table))

请注意,表的元素类型根本不会出现在withRecordEncoder的返回类型中。无论表的元素类型是什么,函数都将返回Exists[TableWithRecordEncoder]!这一点至关重要,因为现在我们可以在同一个Map中填充不同的表:

import polymorphic.syntax.all._
  Map(
    "int" -> withRecordEncoder(intTable),
    "string" -> withRecordEncoder(stringTable)
  ).foreach { case (name, table) =>
    doTheThings(table.value)
  }

希望这对你有帮助!

相关问题