如何在apache flink中创建外部目录表

blmhpbnm  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(665)

我尝试在apache flink表中创建和使用externalcatalog。我创建并添加到flink表环境(这里是官方文档)。由于某些原因,“目录”中唯一存在的外部表在扫描期间找不到。我在上面的代码中遗漏了什么?

val catalogName = s"externalCatalog$fileNumber"
  val ec: ExternalCatalog = getExternalCatalog(catalogName, 1, tableEnv)
  tableEnv.registerExternalCatalog(catalogName, ec)
  val s1: Table = tableEnv.scan("S_EXT")

  def getExternalCatalog(catalogName: String, fileNumber: Int, tableEnv: BatchTableEnvironment): ExternalCatalog = {
    val cat = new InMemoryExternalCatalog(catalogName)
    // external Catalog table
    val externalCatalogTableS = getExternalCatalogTable("S")
    // add external Catalog table
    cat.createTable("S_EXT", externalCatalogTableS, ignoreIfExists = false)
    cat
  }

  private def getExternalCatalogTable(fileName: String): ExternalCatalogTable = {
    // connector descriptor
    val connectorDescriptor = new FileSystem()
    connectorDescriptor.path(getFilePath(fileNumber, fileName))
    // format
    val fd = new Csv()
    fd.field("X", Types.STRING)
    fd.field("Y", Types.STRING)
    fd.fieldDelimiter(",")
    // statistic
    val statistics = new Statistics()
    statistics.rowCount(0)
    // metadata
    val md = new Metadata()
    ExternalCatalogTable.builder(connectorDescriptor)
      .withFormat(fd)
      .withStatistics(statistics)
      .withMetadata(md)
      .asTableSource()
  }

上面的例子是这个git测试文件的一部分。

41ik7eoe

41ik7eoe1#

这可能是命名空间问题。外部目录中的表由目录名(可能是模式)的列表标识,最后是表名。
在您的示例中,以下操作应起作用:

val s1: Table = tableEnv.scan("externalCatalog1", "S_EXT")

您可以查看externalcatalogtest以了解如何使用外部目录。

相关问题