python—如何并行读取csv并并行写入cassandra以实现高吞吐量?

7eumitmz  于 2021-06-14  发布在  Cassandra
关注(0)|答案(0)|浏览(481)

我试过使用 execute , execute_async 以及 execute_concurrent 在cassandra中,要不是读取10米的行,我可以在不少于55分钟的时间内将它们索引到cassandra中。注意,我已经将并发线程设置为1000,并将yaml文件的并发读写限制也调整为10000。我在创建群集时尝试了复制因子0、1、2。没有人能在更短的时间内索引文件。所以,我决定不按顺序读取csv,将其添加到列表中,然后以批处理、并发模式或异步模式写入cassandra,不如并行读取csv?!因此,我使用dask读取10m行的csv文件。

import json
import logging
from datetime import datetime
import dask.dataframe as dd
import dask.multiprocessing
import sys
import json

import pandas as pd
from cassandra import ConsistencyLevel, WriteTimeout
from cassandra.cluster import BatchStatement, Cluster
from cassandra.query import SimpleStatement
from cassandra.concurrent import execute_concurrent, execute_concurrent_with_args

class PythonCassandraExample:
    def __init__(self, version):
        self.cluster = None
        self.session = None
        self.keyspace = None
        self.log = None
        self.version = version

    def __del__(self):
        self.cluster.shutdown()

    def createsession(self):
        self.cluster = Cluster(['localhost'], connect_timeout=50)
        self.session = self.cluster.connect(self.keyspace)

    def getsession(self):
        return self.session

    # How about Adding some log info to see what went wrong
    def setlogger(self):
        log = logging.getLogger()
        log.setLevel('INFO')
        handler = logging.StreamHandler()
        handler.setFormatter(logging.Formatter(
            "%(asctime)s [%(levelname)s] %(name)s: %(message)s"))
        log.addHandler(handler)
        self.log = log
    # Create Keyspace based on Given Name

    def handle_error(self, exception):
        self.log.error("Failed to fetch user info: %s", exception)

    def createkeyspace(self, keyspace):
        """
        :param keyspace:  The Name of Keyspace to be created
        :return:
        """
        # Before we create new lets check if exiting keyspace; we will drop that and create new
        rows = self.session.execute(
            "SELECT keyspace_name FROM system_schema.keyspaces")
        if keyspace in [row[0] for row in rows]:
            self.log.info("dropping existing keyspace...")
            self.session.execute("DROP KEYSPACE " + keyspace)

        self.log.info("creating keyspace...")
        self.session.execute("""
                CREATE KEYSPACE %s
                WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '3' }
                """ % keyspace)

        self.log.info("setting keyspace...")
        self.session.set_keyspace(keyspace)

    def create_table(self, table_name):
        self.table_name = table_name
        c_sql = "CREATE TABLE IF NOT EXISTS {} (id varchar, version varchar, row varchar, PRIMARY KEY(id, version));".format(
            self.table_name)
        print("Query for creating table is: {}".format(c_sql))
        self.session.execute(c_sql)
        self.log.info("DP Table Created !!!")
        self.insert_sql = self.session.prepare(
            (
                "INSERT INTO  {} ({}, {}, {}) VALUES (?,?,?)"
            ).format(
                self.table_name, "id", "version", "row"
            )
        )

    # lets do some batch insert
    def insert_data(self, key, version, row):
        self.session.execute(
            self.insert_sql, [key, version, row]
        )
    @dask.delayed
    def print_a_block(self, d):
        d = d.to_dict(orient='records')
        for row in d:
            key = str(row["0"])
            row = json.dumps(row, default=str)
            self.insert_data(key, self.version, row)

if __name__ == '__main__':
    start_time = datetime.utcnow()
    example1 = PythonCassandraExample(version="version_1")
    example1.createsession()
    example1.setlogger()
    example1.createkeyspace('fri_athena_two')
    example1.create_table('TenMillion')
    example1.log.info("Calling compute!")
    df = dd.read_csv("/Users/aviralsrivastava/dev/levelsdb-learning/10gb.csv")
    dask.compute(*[example1.print_a_block(d) for d in df.to_delayed()])
    print(datetime.utcnow() - start_time)

即使使用dask,所有的努力都白费了一个小时,然而,将行写入cassandra的任务还没有完成?我还应该做些什么来减少所花的时间?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题