Python CSV到数据类

oyjwcjzk  于 2023-03-05  发布在  Python
关注(0)|答案(2)|浏览(96)

我想在Python中将CSV加载到数据类中。数据类由字符串和枚举组成,我想相应地解析它。我现在知道有一个Python库可以做到这一点,但它不允许跳过格式错误的行,不幸的是,这些行是存在的。
我已经为此创建了一个方法,它可以读取文件,看起来像这样:

def dataset_reader(path: str):
        #with open(path, 'r') as csv_handler:
        reader = csv.reader(path)
        header = reader.__next__()
        expected_order = fields(MyFancyDataclass)
        order_mapping = {fieldname: index for index, fieldname in enumerate([field.name for field in expected_order])}
        header_mapping = {rowname: index for index, rowname in enumerate(header)}
        order = [header_mapping.get(i[0]) for i in sorted(order_mapping.items(), key=lambda x: x[1])]
        types = [type_ for type_ in [field.type for field in fields(MyFancyDataclass)]]
        for line in reader:
            try:
                #yield MyFancyDataclass(*[line[x] if types[x] == str else types[x](line[x]) for x in order])
                yield MyFancyDataclass(line[order[0]], line[order[1]], line[order[2]], line[order[3]], SourceType(line[order[4]]), line[order[5]], line[order[6]], line[order[7]],)
            except Exception as e:
                logging.error(line)

我基本上尝试做的是不假设CSV的写入顺序。只要所需的行在文件中,我们就解析它。为此,我首先读取头部,然后创建列Map的索引。然后,我对数据类执行相同的操作,并找到CSV的正确顺序。
然后我一行一行地读CSV,你会看到两种方法,一种是注解掉(注解掉更优雅,因为我们没有硬编码列数),另一种是更快。
我现在遇到的问题是,它仍然非常慢。当我们处理大数据时,这是一个小问题。对于如何加快速度,有什么好主意吗?Nogos正在假设CSV中的列顺序。尽管它应该始终保持相同的顺序,但我们不想假设它总是如此。因为本质上,所有事情都只是查找当前的产量。我看不出我们还可以改进什么来提高速度。
提前感谢所有的帮助!
用于复制的CSV文件,称之为test.csv:

key,value
123,aaa
234,bbb
12,aaa
1919191,bbb
12,
13,aaa
,bbb
,
123,bbb

用于复制的完整最小python脚本。将其存储在与test.csv相同的文件夹中:

from dataclasses import fields, dataclass
import logging
import csv
from enum import Enum

class SourceType(Enum):
        a = "aaa"
        b = "bbb"

@dataclass
class MyFancyDataclass:
        key: str
        value: SourceType

def dataset_reader(path: str):
        #with open(path, 'r') as csv_handler:
        reader = csv.reader(path)
        header = reader.__next__()
        expected_order = fields(MyFancyDataclass)
        order_mapping = {fieldname: index for index, fieldname in enumerate([field.name for field in expected_order])}
        header_mapping = {rowname: index for index, rowname in enumerate(header)}
        order = [header_mapping.get(i[0]) for i in sorted(order_mapping.items(), key=lambda x: x[1])]
        types = [type_ for type_ in [field.type for field in fields(MyFancyDataclass)]]
        print(order)
        for line in reader:
            try:
                #yield MyFancyDataclass(*[line[x] if types[x] == str else types[x](line[x]) for x in order])
                yield MyFancyDataclass(line[order[0]], SourceType(line[order[1]]),)
            except Exception as e:
                print(e)
                logging.error(line)

if __name__=="__main__":
        print(list(dataset_reader(open("test.csv"))))
fd3cxomn

fd3cxomn1#

我认为你会从csv的DictReader中得到很多好处,它掩盖了一致的订购问题:

with open(csv_fname, newline="") as f:
    reader = csv.DictReader(f)
    for row in reader:
        yield Fancy(key=row["key"], value=row["value"])

接下来,我不知道您需要什么,因为您的示例数据为空值(""),但我不知道您如何处理数据类中必须接受字符串和枚举值的情况。
我将提供以下代码来说明如何处理CSV中的空值(为了简洁起见,我还将枚举类重命名为Value):

@dataclass
class Fancy:
    key: str | None
    value: Value | None

    def __init__(self, key: str, value: str) -> None:
        """Create a Fancy from CSV string values, including blanks ("")."""
        self.key = key if key else None
        self.value = Value(value) if value else None

如果传入了一个非枚举字符串值(例如"ccc"),您想做什么?
我削减了您的示例CSV文件,命名为input-kv.csv:

key,value
123,aaa
456,bbb
789,
,bbb
,

并且还翻转了列input-vk. csv:

value,key
aaa,123
bbb,456
,789
bbb,
,

当我运行整个过程时:

import csv

from dataclasses import dataclass
from enum import Enum
from typing import Generator

class Value(Enum):
    a = "aaa"
    b = "bbb"

@dataclass
class Fancy:
    key: str | None
    value: Value | None

    def __init__(self, key: str, value: str) -> None:
        """Create a Fancy from CSV string values, including blanks ("")."""
        self.key = key if key else None
        self.value = Value(value) if value else None

def fancy_iterator(csv_fname: str) -> Generator[Fancy, None, None]:
    with open(csv_fname, newline="") as f:
        reader = csv.DictReader(f)
        for row in reader:
            yield Fancy(key=row["key"], value=row["value"])

for kv, vk in zip(
    fancy_iter("input-kv.csv"),
    fancy_iter("input-vk.csv"),
):
    print(f"kv: {str(kv):<40}   vk: {str(vk):<40}")

我得到:

kv: Fancy(key='123', value=<Value.a: 'aaa'>)   vk: Fancy(key='123', value=<Value.a: 'aaa'>)
kv: Fancy(key='456', value=<Value.b: 'bbb'>)   vk: Fancy(key='456', value=<Value.b: 'bbb'>)
kv: Fancy(key='789', value=None)               vk: Fancy(key='789', value=None)            
kv: Fancy(key=None, value=<Value.b: 'bbb'>)    vk: Fancy(key=None, value=<Value.b: 'bbb'>) 
kv: Fancy(key=None, value=None)                vk: Fancy(key=None, value=None)

这并不完全准确,因为Fancy构造函数只接受字符串,但我认为它在翻转列和空白字段之间得到了相同的、一致的结果。

mfpqipee

mfpqipee2#

如果速度是最重要的,那么你会希望在你的row-in-reader循环中削减尽可能多的动态性......至少我认为......我没有计时或分析这个......只是根据你自己的分析,DictReader太慢了,所以......
我能想到的处理任意顺序的列的最快方法是什么?显式命名您期望的列,然后根据标题获取它们的行索引:

def fancy_iter(csv_fname:str) -> Generator[Fancy, None, None]:
    reader = csv.reader(open(csv_fname))
    header = next(reader)

    # Hard-coded list of columns your team knows and maintains
    idx_a = header.index("col_A")
    idx_b = header.index("col_B")
    idx_c = header.index("col_C")

    for row in reader:
        yield Fancy(
            a=row[idx_a],
            b=int(row[idx_b]),
            c=RomanNumeral(row[idx_c]),
        )

否则,就像您之前那样,您将在row-in-reader循环中执行某种字段到列的Map查找,我发现这确实会降低数百万行的性能,但这也只是我的直觉/推测
下面是我的完整测试程序(而且,我再次修改了您的值/语义,以尝试更好地说明问题的范围):
x一个一个一个一个x一个一个二个x
根据我对Cost of exception handlers的理解,我还将try/catch改为显式if/continue:IF块总体上更快,并且在预期的行将无效的情况下肯定更快。
您的团队必须保持数据类中的字段名和函数中的idx_变量同步,但这是为了在运行时提高速度而做出的权衡(在我看来)。无论如何,您似乎依赖于类型提示(也许是一个linter?),这将有助于捕捉不匹配。一些不匹配至少会导致运行时错误:

  • 如果iter func与CSV本身不同步,列Map可能失败(如果名称更改,或者CSV缺少一列)
  • 如果iter函数与数据类不同步,Fancy()init失败

而且,您可以只使用完整的列集进行当前单元测试。

    • 为了好玩**

我想出了一个方案,让文档字符串的字段名下面有与iter函数索引Map行中的列名匹配的列名,例如:

a: str
"""col_A"""
b: int
"""col_B"""
c: RomanNumeral
"""col_C"""

并提出了一个"linter"(我第一次尝试使用ast模块),它确保这些文档字符串与花哨的iter func中的字符串常量匹配:

# Ensure that the fancy_iter() function "knows" the correct and
# complete mapping of CSV column names to the Fancy dataclass
# fieldnames.

import ast
import sys

MAIN_PY = "main2.py"

def get_dataclass_cols(dataclass_node: ast.ClassDef) -> set[str]:
    """
    Look in the Fancy dataclass for pairs of lines of
    fieldname-line and docstring-line (column name in CSV), like:

        class Fancy:
            a: str
            '''col_A'''
            b: int
            '''col_B'''

    and return a set of CSV column names, e.g., {'col_A', 'col_B'}
    """
    _node = dataclass_node

    cols: set[str] = set()

    # Looking for pairs of AST objects like:
    #   AnnAssign( ... )                  <-- an AnnAssign node
    #   ...                                   followed by...
    #   Expr(                             <-- an Expr node
    #     value=Constant(value='col_A'))  <--   w/a Constant w/a string value (the column name)

    for i in range(len(_node.body)):
        # Verify "lines" 1 & 2 are AnnAssign and Expr
        node1 = _node.body[i]
        if not isinstance(node1, ast.AnnAssign):
            continue
        node2 = _node.body[i + 1]
        if not isinstance(node2, ast.Expr):
            continue
        expr = node2

        # Verify Expr has string Constant
        if not isinstance(expr.value, ast.Constant):
            continue
        const = expr.value
        if not isinstance(const.value, str):
            continue

        cols.add(const.value)

    return cols

def get_iterfunc_cols(func_node: ast.FunctionDef) -> set[str]:
    """
    Look in the CSV iter func for lines assigning column names to indexes,
    beginning with "idx_", like:

        idx_a = header.index("col_A")
        idx_b = header.index("col_B")

    and return a set of CSV column names, e.g., {'col_A', 'col_B'}
    """
    cols: set[str] = set()

    # Looking for AST objects like:
    #   Assign(                                <-- an Assign node
    #     targets=[                            <--   w/a target
    #       Name(id='idx_b', ctx=Store())],    <--     w/a Name that starts with 'idx_'
    #     value=Call(                          <--   and a Call node...
    #       ...
    #       args=[                             <--     w/an arg
    #         Constant(value='col_B') ],       <--       w/a Constant w/a string value (the column name)
    #   )

    for node in func_node.body:
        # Verify Assign with correct Name
        if not isinstance(node, ast.Assign):
            continue
        if len(node.targets) == 0:
            continue
        target = node.targets[0]
        if not isinstance(target, ast.Name):
            continue
        name = target
        if not name.id.startswith("idx_"):
            continue
        if not isinstance(node.value, ast.Call):
            continue

        # Verify Call with correct string Constant
        call = node.value
        if len(call.args) == 0:
            continue
        arg = call.args[0]
        if not isinstance(arg, ast.Constant):
            continue
        const = arg
        if not isinstance(const.value, str):
            continue

        cols.add(const.value)

    return cols

def error(msg: str):
    print("Error, " + msg, file=sys.stderr)
    sys.exit(1)

def main():
    iterfunc_cols: set[str] = set()
    dataclass_cols: set[str] = set()

    main_body = ast.parse(open(MAIN_PY).read()).body
    for node in main_body:
        if isinstance(node, ast.FunctionDef) and node.name == "fancy_iter":
            iterfunc_cols = get_iterfunc_cols(node)

        if isinstance(node, ast.ClassDef) and node.name == "Fancy":
            dataclass_cols = get_dataclass_cols(node)

    if len(dataclass_cols) == 0:
        error("did not find any columns in the dataclass")

    if len(iterfunc_cols) == 0:
        error("did not find any columns in the iter func")

    if iterfunc_cols != dataclass_cols:
        err_msg = "\n".join(
            [
                "columns do not match:",
                "  dataclass_cols: %s" % sorted(dataclass_cols),
                "  iterfunc_cols:  %s" % sorted(iterfunc_cols),
            ]
        )
        error(err_msg)

if __name__ == "__main__":
    main()

只要你的数据类和iter函数是同步的:

a: str             
"""col_A"""        idx_a = header.index("col_A")
b: int             
"""col_B"""        idx_b = header.index("col_B")
c: RomanNumeral    
"""col_C"""        idx_c = header.index("col_C")

linter很高兴,但是一旦linter在dataclass和iter func中都找不到任何列,或者发现两者不同步:

a: str             
"""col_A"""        idx_a = header.index("col_a")
b: int             
"""col_B"""        idx_b = header.index("col_B")
c: RomanNumeral    
"""col_C"""        idx_c = header.index("col_C")
Error, columns do not match:
  dataclass_cols: ['col_A', 'col_B', 'col_C']
  iterfunc_cols:  ['col_B', 'col_C', 'col_a']

a: str             
"""col_A"""        idx_a = header.index("col_A")
b: int             
"""col_B"""        idx_b = header.index("col_B")
c: RomanNumeral    
"""col_C"""        idx_c = header.index("col_C")
d: float
"""col_D"""
Error, columns do not match:
  dataclass_cols: ['col_A', 'col_B', 'col_C', 'col_D']
  iterfunc_cols:  ['col_A', 'col_B', 'col_C']

这两个错误都将在运行时引发异常:

idx_a = header.index("col_a")
            ^^^^^^^^^^^^^^^^^^^^^
ValueError: 'col_a' is not in list

或:

yield Fancy(
          ^^^^^^
TypeError: Fancy.__init__() missing 1 required positional argument: 'd'

相关问题