csv 在Python中比较设备序列号

b4wnujal  于 2023-02-27  发布在  Python
关注(0)|答案(1)|浏览(146)

Python非常非常新,我一直在尝试构建一个工具来帮助工作中的设备协调。我们基本上有相同设备的四个不同系统,需要进行比较才能完成协调报告。
到目前为止,它在清理两个系统中的数据并输出两个新的包含清理过的数据的CSV方面效果非常好。
项目的下一部分是我遇到的问题,我尝试了很多不同的方法,我希望它做的是从一个特定的键中获取值,将它们与另一个系统字典中的特定键进行比较,然后将数据组合输出到CSV中以匹配值。
因此,如果system_1 ={'serial':aaaaaa,“类型”:iPad、“imei”:12345}且系统_2 ={“串行”:aaaa,“订单号”:1} -当它迭代时,它会根据序列号匹配数据,然后输出一个CSV,其中每个序列号匹配都包含“sys 1 serial”、“sys 2 serial”、“sys 2 order number”、“sys 1 type”、“sys 1 imei”。
以下是我到目前为止所做的工作,其中一些尝试被注解掉了:

import csv
import re

digits = r'[0-9]'

# Create the list of headers for the new file
jamf_headers = [
    'JAMF_LOCATION',
    'JAMF_SERIAL',
    'JAMF_IMEI',
    'JAMF_PHONE',
    'JAMF_MODEL',
    'JAMF_LAST_CHECKIN',
    'JAMF_LAST_INVENTORY'
]

abm_headers = [
    'ABM_SERIAL',
    'ABM_IMEI',
    'ABM_MODEL',
    'ABM_ORDER_NO',
    'ABM_DATE_REMOVED',
    'ABM_MDM'
]

combo_headers = [
    'JAMF_LOCATION',
    'JAMF_SERIAL',
    'ABM_SERIAL',
    'JAMF_IMEI',
    'ABM_IMEI',
    'JAMF_PHONE',
    'JAMF_MODEL',
    'ABM_ORDER_NO',
    'ABM_MDM',
    'ABM_DATE_REMOVED',
    'JAMF_LAST_CHECKIN',
    'JAMF_LAST_INVENTORY'
]

jamf_values = {}

# Open the files using the 'with' statement
with open('JAMF_Devices.csv', 'r') as jamf_data,\
     open('ABM_Devices.csv', 'r') as abm_data,\
     open('jamf_data_clean.csv', 'w', newline = '') as jamf_clean,\
     open('abm_data_clean.csv', 'w', newline = '') as abm_clean,\
     open('combined_data.csv', 'w', newline = '') as combo:

# Define our CSV readers and writers    
    jamf_reader = csv.reader(jamf_data)
    abm_reader = csv.reader(abm_data)
    jamf_writer = csv.DictWriter(jamf_clean, fieldnames = jamf_headers)
    abm_writer = csv.DictWriter(abm_clean, fieldnames = abm_headers)
    combined_writer = csv.DictWriter(combo, fieldnames = combo_headers)

# Skip header values
    next(jamf_reader)
    next(abm_reader)

    jamf_writer.writeheader()
    abm_writer.writeheader()
    combined_writer.writeheader()

# Loop through lines of the JAMF CSV reader and store to keys
    for line in jamf_reader:
        model = re.sub(digits, '', line[4])
        jamf_values = [{
        'JAMF_LOCATION': line[2].lower().strip(),
        'JAMF_SERIAL': line[0].strip(),
        'JAMF_IMEI': line[36].replace(' ', '').strip(),
        'JAMF_PHONE': line[34].replace('1', '', 1).strip(),
        'JAMF_MODEL': model.replace(',', ''),
        'JAMF_LAST_CHECKIN': line[13].split('T')[0],
        'JAMF_LAST_INVENTORY': line[14].split('T')[0]
        }]
#       print(jamf_values)
#       jamf_writer.writerows(jamf_values)
#       print(jamf_values.get('JAMF_SERIAL'))

        for line in abm_reader:
            abm_values = [{
            'ABM_SERIAL': line[0].strip(),
            'ABM_IMEI': line[1].strip(),
            'ABM_MODEL': line[2].strip(),
            'ABM_ORDER_NO': line[8].strip().split('_')[2],
            'ABM_DATE_REMOVED': line[10].strip().split('T')[0],
            'ABM_MDM': line[11].strip()
            }]
#           print(abm_values)
#           abm_writer.writerows(abm_values)            
#           print(abm_values.get('ABM_SERIAL')) 
    
'''     
    for line in jamf_reader:
        if line[0] in abm_reader:
            model = re.sub(digits, '', serial[4])
            combo_values = [{
            'JAMF_LOCATION': serial[2].lower().strip(),
            'JAMF_SERIAL': serial[0].strip(),
            'ABM_SERIAL': line[0].strip(),
            'JAMF_IMEI': serial[36].replace(' ', '').strip(),
            'ABM_IMEI': line[1].strip(),
            'JAMF_PHONE': serial[34].replace('1', '', 1).strip(),
            'JAMF_MODEL': model.replace(',', ''),
            'ABM_ORDER_NO': line[8].strip().split('_')[2],
            'ABM_MDM': line[11].strip(),
            'ABM_DATE_REMOVED': line[10].strip().split('T')[0],
            'JAMF_LAST_CHECKIN': serial[13].split('T')[0],
            'JAMF_LAST_INVENTORY': serial[14].split('T')[0]
            }]
            print(combo_values)
'''

#print(jamf_values)

我尝试过不嵌套第二个系统(abm-reader循环),但是由于作用域的原因,我无法访问jamf_reader值。
在jamf和abm循环中,我都尝试过从[{'key'中删除'[]':value}]定义,这样我就可以使用字典方法,但它不会写入CSV。
在最后一个代码块中,我尝试了嵌套,不是嵌套,字典方法等等......我根本不能让它迭代。我确实嵌套了一次,它确实完美地返回了一行,带有匹配的序列号和所有信息。我对此非常兴奋。然而,有大约1,400个序列号需要迭代。

mepcadol

mepcadol1#

为了让这个例子更容易理解,我使用了下面更简单的CSV示例:
a.csv

serial,imei,type
aaa,123,foo
bbb,456,bar
ccc,789,baz

b.csv

serial,order_no
aaa,x_y_1
bbb,x_y_2
ddd,x_y_5

我们可以定义几个函数来组织数据的加载和处理:

from csv import DictReader, DictWriter
from typing import Dict, List  # Using these in case you are using Python<3.8

SystemDataMap = Dict[str, Dict[str, Dict[str, str]]]

def normalize_row(row):
    # This could have an additional parameter for system name, if necessary, to change behavior accordingly
    try:
        row['order_no'] = row['order_no'].strip().split('_')[2]
    except KeyError:
        pass
    # ... and so on...
    return row

def load_data(system_path_map: Dict[str, str]) -> SystemDataMap:
    system_data_map = {}
    for system, path in system_path_map.items():
        with open(path, 'r') as f:
            # Making each of these be dicts makes it easier to look up the system-specific data for a given serial number later
            system_data_map[system] = {row['serial']: normalize_row(row) for row in DictReader(f)}
    return system_data_map

def combine_data(system_data_map: SystemDataMap) -> List[Dict[str, str]]:
    combined = []
    # Create the all_serials set, containing only 1 entry per unique value
    all_serials = {serial for data in system_data_map.values() for serial in data}
    for serial in all_serials:
        combined_row = {'serial': serial}
        for system, serial_data_map in system_data_map.items():
            try:
                data = serial_data_map[serial]
            except KeyError:  # This system doesn't have this serial number
                pass
            else:
                for key, val in data.items():
                    combined_row[f'{system}_{key}'] = val

        combined.append(combined_row)

    return combined

{row['serial']: normalize_row(row) for row in DictReader(f)}部分等效于:

serial_data_map = {}
for row in DictReader(f):  # each row is a dict of {column header: row value}
    serial_data_map[row['serial']] = normalize_row(row)

类似地,all_serials = {serial for data in system_data_map.values() for serial in data}行等效于:

all_serials = set()
for data in system_data_map.values():
    for serial in data:
        all_serials.add(serial)

所有嵌套的list/dict/set/etc解析都可以转换成一系列嵌套的传统for循环,其顺序与解析中for X in Y子句的顺序相同。
把碎片拼在一起:

system_path_map = {'a': 'a.csv', 'b': 'b.csv'}
system_data_map = load_data(system_path_map)

all_fields = ['serial']
# This ensures we capture all possible field names
# next(iter(...)) is very similar to some_list[0], but it works for non-sequences
all_fields.extend(f'{system}_{key}' for system, data in system_data_map.items() for key in next(iter(data.values())))

combined = combine_data(system_data_map)

with open('combined_data.csv', 'w') as f:
    writer = DictWriter(f, all_fields)
    writer.writeheader()
    writer.writerows(combined)

结果文件的内容:

serial,a_serial,a_imei,a_type,b_serial,b_order_no
ddd,,,,ddd,5
aaa,aaa,123,foo,aaa,1
bbb,bbb,456,bar,bbb,2
ccc,ccc,789,baz,,

使用此方法,您可以同时处理两个以上的源,并动态创建输出列名,而无需提前手动定义所有输出列名。
对于数据规范化步骤,可以在combine_dataload_data中处理,在combine_data中可以将给定行的规范化分派给特定于系统的规范化函数。

相关问题