我正在尝试使用intellij中的flink从s3读取文件,出现以下异常:
原因:java.lang.classnotfoundexception:找不到类org.apache.hadoop.fs.s3a.s3afilesystem
我的代码是这样的:
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.column.page.PageReadStore
import org.apache.parquet.example.data.simple.convert.GroupRecordConverter
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.parquet.io.ColumnIOFactory
class ParquetSourceFunction extends SourceFunction[String]{
override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
val inputPath = "s3a://path-to-bucket/"
val outputPath = "s3a://path-to-output-bucket/"
val conf = new Configuration()
conf.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
val readFooter = ParquetFileReader.open(HadoopInputFile.fromPath(new Path(inputPath), conf))
val metadata = readFooter.getFileMetaData
val schema = metadata.getSchema
val parquetFileReader = new ParquetFileReader(conf, metadata, new Path(inputPath), readFooter.getRowGroups, schema.getColumns)
// val parquetFileReader2 = new ParquetFileReader(new Path(inputPath), ParquetReadOptions)
var pages: PageReadStore = null
try {
while ({ pages = parquetFileReader.readNextRowGroup; pages != null }) {
val rows = pages.getRowCount
val columnIO = new ColumnIOFactory().getColumnIO(schema)
val recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema))
(0L until rows).foreach { _ =>
val group = recordReader.read()
val myString = group.getString("field_name", 0)
ctx.collect(myString)
}
}
}
}
override def cancel(): Unit = ???
}
object Job {
def main(args: Array[String]): Unit = {
// set up the execution environment
lazy val env = StreamExecutionEnvironment.getExecutionEnvironment
lazy val stream = env.addSource(new ParquetSourceFunction)
stream.print()
env.execute()
}
}
sbt依赖项:val flinkversion=“1.12.1”
val flinkDependencies = Seq(
"org.apache.flink" %% "flink-clients" % flinkVersion,// % "provided",
"org.apache.flink" %% "flink-scala" % flinkVersion,// % "provided",
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion, // % "provided")
"org.apache.flink" %% "flink-parquet" % flinkVersion)
lazy val root = (project in file(".")).
settings(
libraryDependencies ++= flinkDependencies,
libraryDependencies += "org.apache.hadoop" % "hadoop-common" % "3.3.0" ,
libraryDependencies += "org.apache.parquet" % "parquet-hadoop" % "1.11.1",
libraryDependencies += "org.apache.flink" %% "flink-table-planner-blink" % "1.12.1" //% "provided"
)
1条答案
按热度按时间but5z9lq1#
s3仅通过添加相应的
flink-s3-fs-hadoop
你的插件文件夹,如插件文档所述。对于ide本地设置,默认情况下,应该包含plugins目录的根目录是工作目录。您可以使用env var覆盖它FLINK_PLUGINS_DIR
.为了得到
flink-s3-fs-hadoop
对于插件,我猜一些sbt胶水是必要的(或者你手动做一次)。在gradle中,我会定义一个插件范围,并将自定义任务中的jar复制到plugin dir。