在flink操作中终止对数据库的请求

bmvo0sr5  于 2021-06-24  发布在  Flink
关注(0)|答案(0)|浏览(160)

我想和Flink和Cassandra合作。两者都是大规模并行环境,但我很难让它们一起工作。
现在我需要通过不同的令牌范围对cassandra进行并行读取操作,并在读取n个对象之后终止查询。
批处理模式更适合我,但数据流也是可能的。我尝试了longcounter(见下文),但它不会像我预期的那样工作。我没能从他们那里得到全局和。仅本地值。
异步模式是不必要的,因为这个操作cassandrarequester是在大约64或128并行化的并行上下文中执行的。
这是我的尝试

class CassandraRequester<T> (val klass: Class<T>, private val context: FlinkCassandraContext):
        RichFlatMapFunction<CassandraTokenRange, T>() {

    companion object {
        private val session = ApplicationContext.session!!
        private var preparedStatement: PreparedStatement? = null
        private val manager = MappingManager(session)
        private var mapper: Mapper<*>? = null
        private val log = LoggerFactory.getLogger(CassandraRequesterStateless::class.java)

        public const val COUNTER_ROWS_NUMBER = "flink-cassandra-select-count"
    }

    private lateinit var counter: LongCounter

    override fun open(parameters: Configuration?) {
        super.open(parameters)

        if(preparedStatement == null)
            preparedStatement = session.prepare(context.prepareQuery()).setConsistencyLevel(ConsistencyLevel.LOCAL_ONE)
        if(mapper == null) {
            mapper = manager.mapper<T>(klass)
        }
        counter = runtimeContext.getLongCounter(COUNTER_ROWS_NUMBER)

    }

    override fun flatMap(tokenRange: CassandraTokenRange, collector: Collector<T>) {

        val bs = preparedStatement!!.bind(tokenRange.start, tokenRange.end)

        val rs = session.execute(bs)
        val resultSelect = mapper!!.map(rs)
        val iter = resultSelect.iterator()
        while (iter.hasNext()) when {
            this.context.maxRowsExtracted == 0L || counter.localValue < context.maxRowsExtracted -> {
                counter.add(1)
                collector.collect(iter.next() as T)
            }
            else -> {
                collector.close()
                return
            }
        }
    }

}

在这种情况下是否可以终止查询?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题