从JSON对象组成的列表中高效提取项目

8iwquhpp  于 2023-08-08  发布在  其他
关注(0)|答案(1)|浏览(98)

我需要从某些URL中提取,这些URL由JSON项目组成-对应某些条件的项目。比如item['status'] == 6,和item ['creationDate'] == 'today'。目标是使程序更快地检索数据并将其传递给用户。在url上有10 mb的数据。在我的代码中,我首先检索整个列表,然后使用某些条件对它进行排序。之后,我将该结果的job_ids保存到'checkpoint.txt'中,这样程序就可以检查这些对象是否已经被解析过,并且不会在输出中重复它们。下面是我的代码:

def save_checkpoint(job_id):
    with open(checkpoint_file, 'a') as f:
        f.write(job_id + '\n')

def load_checkpoint():
    try:
        with open(checkpoint_file, 'r') as f:
            job_ids = set(f.read().strip().split('\n'))
    except FileNotFoundError:
        job_ids = set()
    return job_ids

def get_feed_items():
    json_data = get_json_from_url(feed_url)
    try:
        data = json.loads(json_data)
    except data == []:
        json_data == get_json_from_url(sub_url)
    items = []
    checkpoint = load_checkpoint()

    sorted_data = sorted(
        data,
        key=lambda sorted_data: datetime.strptime(data['job']['creationDate'], '%d.%m.%Y %H:%M:%S'),
        reverse=True)

    for item in sorted_data[:20]:
        job_id = item['job']['id']
        if item['job']['status'] == 6 and job_id not in checkpoint:
            try:
                long_name = item['job']['name']
                name = long_name.split('_')[1]
                if item:
                    for history_item in item['tasks'][0]['history']:
                        item_data = {
                            'name': name,
                            'job_id': job_id,
                            'renderer': item['job']['renderer'],
                            'status': item['job']['status'],
                            'comment': history_item['comment'],
                        }
                    #print(f"Found new item: {item_data}")
                    items.append(item_data)
                save_checkpoint(job_id)
            except KeyError as a:
                raise a
        else:
            pass
    return items

字符串
我的任务的最佳解决方案是什么?什么是最好的方式来排序数据和检索所需的项目?我的机器人将在本地机器上工作,数据服务器每5秒更新一次。
为什么我首先检索一切的原因-是因为项目是随机添加的。而且没有机会跟踪新的项目,也不是通过索引,也不是通过其他任何东西(正如我所想的))。
这就是为什么我首先检索所有数据,然后按创建日期和状态排序,然后将批准的数据保存到检查点。下一次程序运行时,它将检查除了之前解析的项目之外是否有任何新的项目,如果存在,则添加它们。如果不是-sleep
如何优化我的代码?或者在这种情况下使用不同的解决方案或算法?

kjthegm6

kjthegm61#

你的代码有严重的问题,没有数据,我无法帮助你进一步。但我已经修复了你的代码中的一些问题。
首先,您没有导入所需的模块,请将import jsonfrom datetime import datetime添加到脚本的顶部,否则您将得到NameError。我想你已经知道了,但你没有把它们贴在这里,你是在强迫别人做额外的工作。
然后,您的代码缺少几个函数定义,并且一些小写变量传递给了内部函数,但没有传递给外部函数的参数。get_json_from_url是什么?你应该把它的定义贴出来,真的。这一点很重要,因为没有它,我们就无法运行您的代码,而且我们也无法使用它。checkpoint_filefeed_urlsub_url都应该是大写的,因为它们是全局的。
然后在get_feed_items函数中,你用的是except data == [],这是错误的。空列表不会导致异常,except只能捕获异常,data == []是一个条件,这里不会引发异常,所以except块没有任何作用。
请考虑下列程式码:

try:
    one = 1
except one == 1:
    one = 0

字符串
one的值是多少?
相反,您应该使用if,如下所示:

data = json.loads(json_data)
if not data:
    json_data = get_json_from_url(SUB_URL)


你要抓住KeyError,然后再加注,总的效果是你没有抓住它。你最好别抓住它。如果要忽略该异常,请使用pass而不是raise a
你的else: pass块根本没有任何作用,如果一个条件不满足并且没有其他块存在,Python自动不对case进行任何操作。为简洁起见,请将其删除。
items.append(item_data)超出了它上面的for循环,它不是循环的一部分,如果这样做,您将只得到最后一项。您需要通过将其缩进另一个级别来将其移动到循环内部。
你的if item没有任何效果,因为如果它包含你以前检查过的键,那么它不是空的,把它去掉。
最后,每次迭代都要将数据保存到磁盘,这是错误的,保存到磁盘非常慢,您应该只在循环结束时保存到磁盘。

import json
from datetime import datetime

def save_checkpoint(job_ids):
    with open(CHECKPOINT, 'a') as f:
        f.write('\n'.join(job_ids) + '\n')

def load_checkpoint():
    try:
        with open(CHECKPOINT, 'r') as f:
            job_ids = set(f.read().strip().split('\n'))
    except FileNotFoundError:
        job_ids = set()
    return job_ids

def get_feed_items():
    json_data = get_json_from_url(FEED_URL)
    data = json.loads(json_data)
    if not data:
        json_data = get_json_from_url(SUB_URL)
    items = []
    job_ids = set()
    checkpoint = load_checkpoint()

    sorted_data = sorted(
        data,
        key=lambda row: datetime.strptime(row['job']['creationDate'], '%d.%m.%Y %H:%M:%S'),
        reverse=True)

    for item in sorted_data[:20]:
        job_id = item['job']['id']
        if item['job']['status'] == 6 and job_id not in checkpoint:
            name = item['job']['name'].split('_')[1]
            items.extend(
                {
                    'name': name,
                    'job_id': job_id,
                    'renderer': item['job']['renderer'],
                    'status': item['job']['status'],
                    'comment': history_item['comment'],
                }
                for history_item in item['tasks'][0]['history']
            )
        job_ids.add(job_id)
    save_checkpoint(job_ids)

    return items

相关问题