HDFS Flink 1.11 StreamingFileSink将记录提前刷新到正在进行的文件

f5emj3cl  于 2022-12-09  发布在  HDFS
关注(0)|答案(3)|浏览(359)

我们有一个每小时一桶的StreamingFileSink(到HDFS),其中的记录相对不频繁。有没有办法配置Flink在记录到达时(少于1分钟)立即将其刷新到in-progress文件,而不是Flink将其保存在缓冲区中?
要求后续数据分析过程接近实时地读取in-progress文件。
我知道如何缩短InactivityInterval,但它最终会导致太多的小文件,这是不可取的。

lymnna71

lymnna711#

也许你可以看看write函数的实现。有几个实现会真实的地把数据写到hdfs文件中。StreamingFileSink本身不会把它们保存在缓冲区中,但是在FileOutputStream中会有一些缓冲区

public interface InProgressFileWriter<IN, BucketID> extends PartFileInfo<BucketID> {

/**
 * Write a element to the part file.
 *
 * @param element the element to be written.
 * @param currentTime the writing time.
 * @throws IOException Thrown if writing the element fails.
 */
void write(final IN element, final long currentTime) throws IOException;

/**
 * @return The state of the current part file.
 * @throws IOException Thrown if persisting the part file fails.
 */
InProgressFileRecoverable persist() throws IOException;

/**
 * @return The state of the pending part file. {@link Bucket} uses this to commit the pending
 *     file.
 * @throws IOException Thrown if an I/O error occurs.
 */
PendingFileRecoverable closeForCommit() throws IOException;

/** Dispose the part file. */
void dispose();

// ------------------------------------------------------------------------

/** A handle can be used to recover in-progress file.. */
interface InProgressFileRecoverable extends PendingFileRecoverable {}

/** The handle can be used to recover pending file. */
interface PendingFileRecoverable {}

}

7gcisfzg

7gcisfzg2#

StreamingFileSink时间数据真实的写入HDFS,不会保留在缓冲区中
为了支持“恰好一次"语义,进程中文件将仅在检查点期间重命名为正式文件,然后您可以执行后继数据分析
StreamingFileSink不支持实时数据查询,可以通过减小cp的间隔来提高数据可见性的实时性能,但如果cp间隔过小,则容易导致小文件问题

i2byvkas

i2byvkas3#

您是否考虑过启用文件压缩选项?https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#file-compaction

相关问题