Python将Cassandra数据读入Pandas

68bkxrlz  于 2022-11-23  发布在  Cassandra
关注(0)|答案(6)|浏览(145)

什么是正确的和最快的方法读取 cassandra 数据到Pandas?现在我使用下面的代码,但它是非常缓慢的...

import pandas as pd

from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import dict_factory

auth_provider = PlainTextAuthProvider(username=CASSANDRA_USER, password=CASSANDRA_PASS)
cluster = Cluster(contact_points=[CASSANDRA_HOST], port=CASSANDRA_PORT,
    auth_provider=auth_provider)

session = cluster.connect(CASSANDRA_DB)
session.row_factory = dict_factory

sql_query = "SELECT * FROM {}.{};".format(CASSANDRA_DB, CASSANDRA_TABLE)

df = pd.DataFrame()

for row in session.execute(sql_query):
    df = df.append(pd.DataFrame(row, index=[0]))

df = df.reset_index(drop=True).fillna(pd.np.nan)

阅读1000行需要1分钟,我有“多一点”...如果我在DBeaver中运行相同的查询,我会在一分钟内得到整个结果(~ 40 k行)。
谢谢你们!

whhtz7ly

whhtz7ly1#

我在官方邮件列表上得到了答案(它工作得很完美):
你好,
尝试定义您自己panda行工厂:

def pandas_factory(colnames, rows):
    return pd.DataFrame(rows, columns=colnames)

session.row_factory = pandas_factory
session.default_fetch_size = None

query = "SELECT ..."
rslt = session.execute(query, timeout=None)
df = rslt._current_rows

我就是这么做的-而且应该更快......
如果你找到一个更快的方法-我感兴趣:)
米迦勒

rbl8hiat

rbl8hiat2#

我所做的(在python 3中)是:

query = "SELECT ..."
df = pd.DataFrame(list(session.execute(query)))
iezvtpos

iezvtpos3#

我一直致力于将数据从Cassandra移动到mssql,并使用此处给出的答案作为参考,我可以移动数据,但我在cassandra中的源表很大,我的查询从cassandra中收到超时错误,问题是我们无法增加超时,我只能选择在查询中批量选择行。我的代码也将cassandra集合的数据类型转换为字符串,因为我想在mssql中插入这些数据类型,然后解析它,请让我知道,如果有人面临类似的问题,我构建的代码如下所示:

import sys
import pandas as pd
import petl as etl
import pyodbc
import sqlalchemy
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster
from sqlalchemy import *
from cassandra.query import SimpleStatement

def pandas_factory(colnames, rows):
    return pd.DataFrame(rows, columns=colnames)
    engine = sqlalchemy.create_engine('sql_server_connection string')

cluster = Cluster(
    contact_points=['cassandra_host'], 
    auth_provider = PlainTextAuthProvider(username='username', password='passwrd')
)

session = cluster.connect('keyspace',wait_for_all_pools=True)

session.row_factory = pandas_factory
request_timeout = 60000
query = "SELECT * FROM cassandratable"
statement = SimpleStatement(query, fetch_size=5000) 
rows = session.execute(statement)

df = rows._current_rows
df['attributes'] = df.attributes.astype(str)
df['attributesgenerated'] = df.attributesgenerated.astype(str)
df['components'] = df.components.astype(str)
df['distributioncenterinfo'] = df.distributioncenterinfo.astype(str)
df['images'] = df.images.astype(str)
df['itemcustomerzonezoneproductids'] = 
df.itemcustomerzonezoneproductids.astype(str)
df['itempodconfigids'] = df.itempodconfigids.astype(str)
df['keywords'] = df.keywords.astype(str)
df['validationmessages'] = df.validationmessages.astype(str)
df['zones'] = df.zones.astype(str)
#error_bad_lines=False
#print(df)
df.to_sql(
           name='mssql_table_name',
           con=engine,
           index=False,
           if_exists='append',
           chunksize=1
         )
tyu7yeag

tyu7yeag4#

将Cassandra数据读入Pandas的最快方法,自动迭代页面。创建字典,并通过自动迭代所有页面将每个数据添加到字典中。然后,使用此字典创建 Dataframe 。

import pandas as pd
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import dict_factory

auth_provider = PlainTextAuthProvider(username=CASSANDRA_USER, password=CASSANDRA_PASS)
cluster = Cluster(contact_points=[CASSANDRA_HOST], port=CASSANDRA_PORT,
    auth_provider=auth_provider)

session = cluster.connect(CASSANDRA_DB)
session.row_factory = dict_factory

sql_query = "SELECT * FROM {}.{};".format(CASSANDRA_DB, CASSANDRA_TABLE)

dictionary ={"column1":[],"column2":[]}

for row in session.execute(sql_query):
    dictionary["column1"].append(row.column1)
    dictionary["column1"].append(row.column1)

df = pd.DataFrame(dictionary)
23c0lvtd

23c0lvtd5#

我使用row_factory解决方案几个星期,然后在尝试将 Dataframe 写入另一个具有相同结构的表时遇到了数据类型问题。Pandas猜测float数据类型用于具有许多空字段的int列。在写入过程中,cassandra驱动程序抱怨类型不匹配。

TypeError: Received an argument of invalid type for column "frequency". Expected: <class 'cassandra.cqltypes.Int32Type'>, Got: <class 'float'>; (required argument is not an integer)

Pandas int列不支持NaN或None,所以最好的选择可能是将该列设置为python对象。
一个快速的黑客正在调整panda_factory以避免panda的推断。这不是一个理想的一揽子策略:

def pandas_factory(colnames, rows):
    df = pd.DataFrame(rows, columns=colnames, dtype=object)
    return df

我还发现我可以做:df = pandas.DataFrame(result.all())(如果不需要行工厂)。
作为一个临时解决方案,我希望有一个使用result.column_types的健壮的result_to_df()函数(例如:cassandra.cqltypes.Int32Type),并对如何将它们转换为python对象或numpy类型做出了很好的猜测。如果/当我有时间写的话,我会编辑这个答案。Pandasread_cqlto_cql将是理想的,但可能超出了我的带宽。

tpgth1q7

tpgth1q76#

只需在PandasDataFrame中运行一个循环即可完成任务!!

import pandas as pd
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider

auth_provider = PlainTextAuthProvider(username=CASSANDRA_USER, password=CASSANDRA_PASS)
cluster = Cluster(contact_points=[CASSANDRA_HOST], port=CASSANDRA_PORT,
        auth_provider=auth_provider)

session = cluster.connect(CASSANDRA_DB)
data = session.execute("SELECT * FROM <table_name>;")

df = pd.DataFrame([d for d in data])

相关问题