我正在使用flink 1.17.1并运行flink-sql应用程序,如
create table enrich_table (
col1 STRING,
col2....
)
WITH ( 'connector' = 'upsert-kafka',
'topic' = 'enrich_table',
'properties.bootstrap.servers' = '${KAFKA_BROKERS}',
'properties.group.id' = '${CONSUMER_GROUP_ID}',
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = '${SCHEMA_REGISTRY_URL}',
'key.format' = 'avro-confluent',
'key.avro-confluent.url' = '${SCHEMA_REGISTRY_URL}');
Insert into enrich_table as
select
col1,col2...
from
table1 t1
inner join table2 t2 on ..
inner join table3 t2 on ...
.....
字符串
通过创建自定义工厂将LZ 4压缩设置为rocksdb
package my.org.abc
import org.rocksdb.{ColumnFamilyOptions, DBOptions}
import org.apache.flink.configuration.ReadableConfig
import org.apache.flink.contrib.streaming.state.{ConfigurableRocksDBOptionsFactory, DefaultConfigurableOptionsFactory, RocksDBOptionsFactory}
import org.apache.flink.configuration.ConfigOptions
import org.apache.flink.configuration.ConfigOption
import org.apache.flink.configuration.description.LinkElement.link
import org.rocksdb.CompressionType
import org.apache.flink.configuration.description.Description
class RocksDBTuningJobOptionsFactory extends ConfigurableRocksDBOptionsFactory {
private val defaultFactory = new DefaultConfigurableOptionsFactory
private var configuration: ReadableConfig = _
private val COMPRESSION: ConfigOption[CompressionType] =
ConfigOptions.key("state.backend.rocksdb.custom.compression")
.enumType(classOf[CompressionType])
.noDefaultValue()
.withDescription(
Description.builder()
.text("Configures RocksDB compression")
.linebreak()
.text(
"For more information, please refer to %s",
link("https://github.com/facebook/rocksdb/wiki/Compression")
)
.build()
)
override def configure(configuration: ReadableConfig): RocksDBOptionsFactory = {
defaultFactory.configure(configuration)
this.configuration = configuration
this
}
override def createDBOptions(currentOptions: DBOptions, handlesToClose: java.util.Collection[AutoCloseable]): DBOptions = {
defaultFactory.createDBOptions(currentOptions, handlesToClose)
}
override def createColumnOptions(currentOptions: ColumnFamilyOptions, handlesToClose: java.util.Collection[AutoCloseable]): ColumnFamilyOptions = {
var updatedOptions = defaultFactory.createColumnOptions(currentOptions, handlesToClose)
configuration.getOptional(COMPRESSION).ifPresent(currentOptions.setCompressionType)
updatedOptions
}
}
型
并提供如下配置
........
state.backend.rocksdb.use-bloom-filter: "true"
state.backend.rocksdb.options-factory: 'com.mdsol.streaming.util.RocksDBTuningJobOptionsFactory'
state.backend.rocksdb.custom.compression: "LZ4_COMPRESSION"
型
问题1:根据讨论here,LZ 4提高了性能。只是想知道社区使用什么来处理大状态,高容量和速度的基于flink-sql(没有临时连接)的应用程序?
Q#2:为了设置Lz 4压缩,我们必须扩展可扩展RocksDBOptionsFactory吗?或者有更好的方法吗?
问题3:将压缩(Lz 4)更改为已运行并具有检查点的现有作业是否有任何影响
1条答案
按热度按时间6yoyoihd1#
关于Speedb Hive:
Q1默认压缩是Snappy。Q2 Flink问题。Q3 RocksDB和Speedb允许随时更改压缩类型。如果Flink不支持,我们在Speedb创建了一个后门来更改运行时的任何可变选项。
如果您有更多问题或需要其他信息,您可以找到Speedb hive here和(注册后)包含您的问题here的线程链接