我编写了一个python脚本,它读取csv并使用异步和并发将它们插入cassandra表中,但是并发比异步慢。我使用concurrent的目的是实现并行写入,从而加快在cassandra中索引csv文件的任务。
使用异步的代码:
for df in chunks:
futures = []
df = df.to_dict(orient='records')
chunk_counter += 1
for row in df:
key = str(row["0"])
row = json.dumps(row, default=str)
futures.append(
self.session.execute_async(
insert_sql, [key, "version_1", row]
)
)
# batch.add(insert_sql, (key, "version_1", row))
# self.session.execute(batch)
for future in futures:
self.log.debug(future)
continue
使用concurrents的代码:
for df in chunks:
futures = []
df = df.to_dict(orient='records')
chunk_counter += 1
for row in df:
key = str(row["0"])
row = json.dumps(row, default=str)
params = (key, row, )
futures.append(
(
insert_sql,
params
)
)
results = execute_concurrent(
self.session, futures, raise_on_first_error=False)
for (success, result) in results:
if not success:
self.handle_error(result) # result will be an Exception
1条答案
按热度按时间atmip9wb1#
你没有设定
concurrency
的参数execute_concurrent
,默认情况下使用100。根据文件:
concurrency参数控制将并发执行的语句数。什么时候
Cluster.protocol_version
如果设置为1或2,建议将其保持在每个主机的核心连接数乘以已连接主机数的100倍以下(请参阅Cluster.set_core_connections_per_host()
). 如果超过该数量,事件循环线程可能会尝试阻止创建新连接,从而严重影响吞吐量。如果protocol\u version为3或更高版本,则可以安全地尝试更高级别的并发性。