我如何才能摆脱空单元格的csv文件,我下载的API使用Python?

9o685dep  于 2023-07-31  发布在  Python
关注(0)|答案(1)|浏览(80)

我正在为ArcGIS Pro和ArcGIS GeoEvent Server进行一个复杂的项目,该项目需要不断从API下载CSV文件(每10分钟生成一个CSV文件)。问题是我可以完美地下载它,但有些单元格是空的,因此,我无法将CSV文件与GeoEvent Server一起使用。
因此,基本上,我需要能够选择字段(单元格)是空的,并放弃他们自动下载CSV文件时,因为我不想这样做手动
然后,CSV文件我需要将其转换为JSON并使用它,但与此同时,我还有另一个问题,即文件的名称,因为我需要与CSV文件相同,我不知道如何自动完成。我试图读取文件夹的所有内容(通过.csv扩展名过滤)使用os模块,以文件名,然后将其复制到JSON,但我不能。
这是我用来下载CSV文件的代码。

import os
import yaml
import logging
from datetime import datetime, timedelta
import sys
import copy
import json
import csv
import requests
import time

from csvToJSON_01 import *
from exceptions import MaxRetries, ConnectionLost
from requests.adapters import HTTPAdapter
from requests.exceptions import RetryError
from requests.packages.urllib3.util.retry import Retry
from apscheduler.schedulers.background import BackgroundScheduler

log = logging.getLogger(__name__)

target_updates = []
time_from = None

def reset_bucket():
    global target_updates

    target_updates = []  # Initially is an empty array.

def export_to_csv_job():
    global time_from
    global target_updates

    to_proccess = copy.deepcopy(target_updates)
    old_time_from = copy.deepcopy(time_from)
    time_from = datetime.now()  # To get the current local time and date.

    reset_bucket()  # Reset target_updates.

    if len(to_proccess) > 0:  # If you have one or more thing to process.
        print(to_proccess[0])
        data_file = open(
            f"csvFiles/data_{old_time_from.strftime('%m_%d_%Y_%H_%M_%S')}_{datetime.now().strftime('%m_%d_%Y_%H_%M_%S')}.csv",
            "w",
        )  # Creates a file named "data", strftime() transforms date, time and datetime to string.

        csv_writer = csv.writer(
            data_file
        )  # Write the infomation to the CSV created before.

        most_keys = max(
            to_proccess, key=lambda item: len(item.keys())
        )  # Define the maximum amount of CSV files to process.
        csv_writer.writerow(most_keys.keys())

        for elem in to_proccess:
            csv_writer.writerow(
                map(lambda key: elem.get(key, ""), most_keys.keys())
            )  # Insert data to the csv file, row by row.
        data_file.close()

def listen_to_stream(timeout=None):
    global time_from
    reset_bucket()
    if timeout is not None:
        timeout = datetime.now() + timedelta(0, timeout)

    scheduler = BackgroundScheduler()
    retry_strategy = Retry(
        # 10 retries before throwing exception.
        total=10,
        backoff_factor=3,
        status_forcelist=[429, 500, 502, 503, 504, 422],
        allowed_methods=["HEAD", "GET", "OPTIONS"],
    )
    adapter = HTTPAdapter(max_retries=retry_strategy)
    http = requests.Session()
    http.mount("https://", adapter)
    http.mount("http://", adapter)

    try:
        response = http.get(
            "https://api.airsafe.spire.com/v2/targets/stream?compression=none",
            headers={"Authorization": f"Bearer {os.environ['AVIATION_TOKEN']}"},
            stream=True,
        )
    except RetryError:
        log.warn(RetryError)
        raise MaxRetries()

    if response.status_code == 401:  # If it's unauthorized.
        print("Unauthorized, token might be invalid")
        sys.exit()  # Break.

    try:
        scheduler.add_job(
            export_to_csv_job,
            "cron",
            minute="*/10",
            id="airsafe_stream_csv",
        )  # I execute this with the Task Manager every 10 minutes.
        time_from = datetime.now()  # I start counting.
        scheduler.start()
    except Exception as e:
        log.warn(e)
        print("failed to start scheduler")
        raise ConnectionLost()

    try:
        for line in response.iter_lines(decode_unicode=True):
            if timeout is not None and datetime.now() >= timeout:
                scheduler.remove_job("airsafe_stream_csv")
                scheduler.shutdown()
                export_to_csv_job()
                response.close()
                sys.exit()
            if line and '"target":{' in line:
                target = json.loads(line)["target"]
                target_updates.append(target)
    except Exception as e:
        log.warn(e)
        scheduler.remove_job("airsafe_stream_csv")
        scheduler.shutdown()
        export_to_csv_job()
        raise ConnectionLost()

def connection_manager():
    try:
        # If you wish to listen for a specific time:
        # Wlisten_to_stream(70) will listen for 70 seconds
        listen_to_stream()
    except MaxRetries:
        print("stream failed to connect multiple times, will retry in 30mn")
        time.sleep(60 * 30)
        connection_manager()
    except ConnectionLost:
        print("Connection was lost retrying now ...")
        connection_manager()

csvJSON = csv_to_json(
    csvFilePath, jsonFilePath
)  # Now, I call this function from csvToJSON.py

if __name__ == "__main__":  # If the file is the main one.
    config = yaml.load(
        open("env.yaml"), Loader=yaml.FullLoader
    )  # Open env.yaml and pass the object, AVIATION_TOKEN in this case.
    os.environ.update(config)

    connection_manager()

字符串
另一个是我用来转换成JSON的。

import csv
import json
import time
import os

directory = "../csvFiles"

def csv_to_json(csvFilePath, jsonFilePath):
    jsonArray = []

    # Read csv file.
    with open(csvFilePath, encoding="utf-8") as csvf:
        # Load csv file data using csv library's dictionary reader.
        csvReader = csv.DictReader(csvf)

        # Convert each csv row into python dict.
        for row in csvReader:
            # Add this python dict to json array.
            jsonArray.append(row)

    # Convert python jsonArray to JSON String and write to file.
    with open(jsonFilePath, "w", encoding="utf-8") as jsonf:
        jsonString = json.dumps(jsonArray, indent=4)
        jsonf.write(jsonString)

for file in os.listdir(directory):
    if file.endswith(".csv"):
        # Prints only CSV files present in csvFiles folder.
        # csvFilePath = os.path.splitext(file)[0]
        csvFilePath = file
        # print(csvFilePath)
        jsonFilePath = r"../csvFiles/data.json"

start = time.perf_counter()
csv_to_json(csvFilePath, jsonFilePath)
csv_to_json()
finish = time.perf_counter()

print(f"Conversion completed successfully in {finish - start:0.4f} seconds")

相关问题