我想和Flink和Cassandra合作。两者都是大规模并行环境,但我很难让它们一起工作。
现在我需要通过不同的令牌范围对cassandra进行并行读取操作,并在读取n个对象之后终止查询。
批处理模式更适合我,但数据流也是可能的。我尝试了longcounter(见下文),但它不会像我预期的那样工作。我没能从他们那里得到全局和。仅本地值。
异步模式是不必要的,因为这个操作cassandrarequester是在大约64或128并行化的并行上下文中执行的。
这是我的尝试
class CassandraRequester<T> (val klass: Class<T>, private val context: FlinkCassandraContext):
RichFlatMapFunction<CassandraTokenRange, T>() {
companion object {
private val session = ApplicationContext.session!!
private var preparedStatement: PreparedStatement? = null
private val manager = MappingManager(session)
private var mapper: Mapper<*>? = null
private val log = LoggerFactory.getLogger(CassandraRequesterStateless::class.java)
public const val COUNTER_ROWS_NUMBER = "flink-cassandra-select-count"
}
private lateinit var counter: LongCounter
override fun open(parameters: Configuration?) {
super.open(parameters)
if(preparedStatement == null)
preparedStatement = session.prepare(context.prepareQuery()).setConsistencyLevel(ConsistencyLevel.LOCAL_ONE)
if(mapper == null) {
mapper = manager.mapper<T>(klass)
}
counter = runtimeContext.getLongCounter(COUNTER_ROWS_NUMBER)
}
override fun flatMap(tokenRange: CassandraTokenRange, collector: Collector<T>) {
val bs = preparedStatement!!.bind(tokenRange.start, tokenRange.end)
val rs = session.execute(bs)
val resultSelect = mapper!!.map(rs)
val iter = resultSelect.iterator()
while (iter.hasNext()) when {
this.context.maxRowsExtracted == 0L || counter.localValue < context.maxRowsExtracted -> {
counter.add(1)
collector.collect(iter.next() as T)
}
else -> {
collector.close()
return
}
}
}
}
在这种情况下是否可以终止查询?
暂无答案!
目前还没有任何答案,快来回答吧!