java 如何使用方解石优化和重写SQL从一个DB引擎到另一个

s6fujrry  于 2023-11-15  发布在  Java
关注(0)|答案(1)|浏览(177)

我试图在一个通用的系统上工作,它暴露了下面的API,

val outputSql: String = SQLRewriter
  .inputSql(query = " <query> ", engine = Engine.SparkSQL, connectionHook = < connection >)
  .rewriteTo(engine = Engine.MySQL)

字符串
这个系统应该,
1.能够读取一些支持引擎的输入SQL查询字符串
1.优化目标引擎的查询(项目下推)
1.将目标引擎的优化查询重写为字符串
我认为Apache Calcite非常适合这一点,或者它已经具备了这些功能。为此,我尝试在代码中查看文档,博客和文档字符串,但我觉得我在兜圈子。
我想知道,
1.方解石已经具备了这些能力
1.如果它很适合此用例,
1.有什么代码样本可以给我指出来吗
有Maven能帮我吗?谢谢。

j2cgzkjk

j2cgzkjk1#

我们的想法是,

  • 通用规划结构中的代码
  • 优化(基于成本)此结构,几乎不配置
  • 改写成任何方言

今天,我又在玩弄同样的想法,并成功地让它在一个示例查询中工作-

  • 在Calcite DSL中编写了一个随机连接+过滤器查询,
  • 使用基于成本的VolcanoPlanner进行优化
  • 将此优化计划转换为SparkSql,Terrace,Snowflake等方言。
    我设法得到一个像这样的工作POC,不确定这是否是最好的方法,虽然.任何方解石Maven想审查这种方法?
import org.apache.calcite.adapter.enumerable.EnumerableConvention
import org.apache.calcite.jdbc.CalciteSchema
import org.apache.calcite.plan.volcano.VolcanoPlanner
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.core.JoinRelType
import org.apache.calcite.rel.externalize.RelWriterImpl
import org.apache.calcite.rel.rel2sql.RelToSqlConverter
import org.apache.calcite.sql.dialect.*
import org.apache.calcite.test.CalciteAssert
import org.apache.calcite.tools.{Frameworks, RelBuilder}

import java.io.PrintWriter

   
val rootSchema = CalciteSchema.createRootSchema(true).plus()
val config = Frameworks
  .newConfigBuilder()
  .defaultSchema(CalciteAssert.addSchema(rootSchema, CalciteAssert.SchemaSpec.HR))
  .build()
val builder = RelBuilder.create(config)

// Create a example plan using calcite, this should be replaced with real business logic
val opTree: RelNode = builder
  .scan("emps") // scan table 1
  .scan("depts") // scan table 2
  .join(JoinRelType.INNER, "deptno") // inner join between the 2 tables on deptno
  .filter(builder.equals(builder.field("empid"), builder.literal(100))) // filter on empid
  .build

val rw = new RelWriterImpl(new PrintWriter(System.out, true))

// Print basic Logical Plan
opTree.explain(rw)

val cluster = opTree.getCluster
val planner = cluster.getPlanner.asInstanceOf[VolcanoPlanner]

val desiredTraits = cluster.traitSet.replace(EnumerableConvention.INSTANCE)
val newRoot = planner.changeTraits(opTree, desiredTraits)
planner.setRoot(newRoot)

val optimized: RelNode = planner.findBestExp

// Print optimized Logical Plan
// filter happens before join to reduce the amount of data joined. Rules can be configured.
optimized.explain(rw)

// Rewrite Logical Plan as SQL queries based on different dialects
// Each of these dialects can be configured, with UDFs/ procedures etc.
val sqlDialects = Seq(
  SparkSqlDialect.DEFAULT,
  MysqlSqlDialect.DEFAULT,
  PostgresqlSqlDialect.DEFAULT,
  SnowflakeSqlDialect.DEFAULT,
  TeradataSqlDialect.DEFAULT,
  RedshiftSqlDialect.DEFAULT,
  HiveSqlDialect.DEFAULT)

sqlDialects.foreach(dialect => {
  // print name of dialect
  println(dialect)

  // print SQL as per the dialect. dialect parser can be heavily configured.
  val conv = RelToSqlConverter(dialect)
  println(conv.visitRoot(optimized).asQueryOrValues().toString)
})

字符串

相关问题