我正在尝试为法国的公共药物数据库(https://base-donnees-publique.medicaments.gouv.fr/)编写一个解析器/API。它由八个CSV文件(实际上是TSV,因为它们使用选项卡)组成,每个文件从几KB到4 MB,最大的有~20000行(每行代表药物及其名称,代码,价格等)。
由于这些文件可能会定期出现,我希望直接解析它们,而不是创建一个更干净的数据库(因为我可能必须定期重新创建它)。
导入这些文件花了一点时间(大约一秒钟),所以我试着加快一点速度,并对几种方法做了一些基准测试,我惊讶地看到,最基本的一种似乎也是最快的。
这是我的测试代码(很抱歉,它很长)。每个文件都与一个专用的类相关联以解析其行。基本上,这些类都是namedtuples,带有一个自定义的classmethod来解析日期、数字等
import pathlib
import enum
import datetime
from decimal import Decimal
from collections import namedtuple
import csv
def parse_date(date: str) -> datetime.datetime:
return datetime.datetime.strptime(date, "%d/%m/%Y").date()
def parse_date_bis(date: str) -> datetime.datetime:
return datetime.datetime.strptime(date, "%Y%m%d").date()
def parse_text(text):
if not text:
return ""
return text.replace("<br>", "\n").strip()
def parse_list(raw):
return raw.split(";")
def parse_price(price: str) -> Decimal:
if not price:
return None
# Handles cases like "4,417,08".
price = '.'.join(price.rsplit(",", 1)).replace(',', '')
return Decimal(price)
def parse_percentage(raw: str) -> int:
if not raw:
return None
return int(raw.replace("%", "").strip())
class StatutAdministratifPresentation(enum.Enum):
ACTIVE = "Présentation active"
ABROGEE = "Présentation abrogée"
class EtatCommercialisation(enum.Enum):
DC = "Déclaration de commercialisation"
S = "Déclaration de suspension de commercialisation"
DAC = "Déclaration d'arrêt de commercialisation"
AC = "Arrêt de commercialisation (le médicament n'a plus d'autorisation)"
class MotifAvisSMR(enum.Enum):
INSCRIPTION = "Inscription (CT)"
RENOUVELLEMENT = "Renouvellement d'inscription (CT)"
EXT = "Extension d'indication"
EXTNS = "Extension d'indication non sollicitée"
REEV_SMR = "Réévaluation SMR"
REEV_ASMR = "Réévaluation ASMR"
REEV_SMR_ASMR = "Réévaluation SMR et ASMR"
REEV_ETUDE = "Réévaluation suite à résultats étude post-inscript"
REEV_SAISINE = "Réévaluation suite saisine Ministères (CT)"
NOUV_EXAM = "Nouvel examen suite au dépôt de nouvelles données"
MODIF_COND = "Modification des conditions d'inscription (CT)"
AUTRE = "Autre demande"
class ImportanceSMR(enum.Enum):
IMPORTANT = "Important"
MODERE = "Modéré"
FAIBLE = "Faible"
INSUFFISANT = "Insuffisant"
COMMENTAIRES = "Commentaires"
NP = "Non précisé"
class ImportanceASMR(enum.Enum):
COM = "Commentaires sans chiffrage de l'ASMR"
I = "I"
II = "II"
III = "III"
IV = "IV"
V = "V"
NP = "Non précisée"
SO = "Sans objet"
class Specialite(namedtuple("Specialite", ("cis", "denomation", "forme", "voies_administration", "statut_amm", "type_amm", "commercialisation", "date_amm", "statut_bdm", "numero_autorisation_europeenne", "titulaire", "surveillance_renforcee"))):
@classmethod
def from_line(cls, line):
line[2] = line[2].replace(" ", " ").strip()
line[3] = parse_list(line[3])
line[7] = parse_date(line[7])
line[10] = line[10].strip() # There are often leading spaces here (like ' OPELLA HEALTHCARE FRANCE').
return cls(*line)
class Presentation(namedtuple("Specialite", ("cis", "cip7", "libelle", "statut", "commercialisation", "date_commercialisation", "cip13", "agrement_collectivites", "taux_remboursement", "prix", "prix_hors_honoraires", "montant_honoraires", "indications_remboursement"))):
@classmethod
def from_line(cls, line):
if line[3] == "Présentation active":
line[3] = StatutAdministratifPresentation.ACTIVE
else:
line[3] = StatutAdministratifPresentation.ABROGEE
line[4] = {
"Déclaration de commercialisation": EtatCommercialisation.DC,
"Déclaration de suspension de commercialisation": EtatCommercialisation.S,
"Déclaration d'arrêt de commercialisation": EtatCommercialisation.DAC,
"Arrêt de commercialisation (le médicament n'a plus d'autorisation)": EtatCommercialisation.AC
}.get(line[4])
line[5] = parse_date(line[5])
line[7] = True if line[7] == "oui" else False
line[8] = parse_percentage(line[8])
line[9] = parse_price(line[9])
line[10] = parse_price(line[10])
line[11] = parse_price(line[11])
line[12] = parse_text(line[12])
return cls(*line)
class Composition(namedtuple("Composition", ("cis", "element", "code", "substance", "dosage", "ref_dosage", "nature_composant", "cle"))):
@classmethod
def from_line(cls, line):
line.pop(-1)
return cls(*line)
class AvisSMR(namedtuple("AvisSMR", ("cis", "dossier_has", "motif", "date", "valeur", "libelle"))):
@classmethod
def from_line(cls, line):
line[2] = MotifAvisSMR(line[2])
line[3] = parse_date_bis(line[3])
line[4] = ImportanceSMR(line[4])
line[5] = parse_text(line[5])
return cls(*line)
class AvisASMR(namedtuple("AvisASMR", ("cis", "dossier_has", "motif", "date", "valeur", "libelle"))):
@classmethod
def from_line(cls, line):
line[2] = MotifAvisSMR(line[2])
line[3] = parse_date_bis(line[3])
line[4] = ImportanceASMR(line[4])
line[5] = parse_text(line[5])
return cls(*line)
class AvisCT(namedtuple("AvisCT", ("dossier_has", "lien"))):
@classmethod
def from_line(cls, line):
return cls(*line)
FILE_MATCHES = {
"CIS_bdpm.txt": Specialite,
"CIS_CIP_bdpm.txt": Presentation,
"CIS_COMPO_bdpm.txt": Composition,
"CIS_HAS_ASMR_bdpm.txt": AvisASMR,
"CIS_HAS_SMR_bdpm.txt": AvisSMR,
"HAS_LiensPageCT_bdpm.txt": AvisCT
}
def sequential_import_file_data(filename, cls):
result = {cls: []}
with (pathlib.Path("data") / filename).open("r", encoding="latin1") as f:
rows = csv.reader(f, delimiter="\t")
for line in rows:
data = cls.from_line(line)
result[cls].append(data)
return result
def import_data_sequential():
results = []
for filename, cls in FILE_MATCHES.items():
results.append(sequential_import_file_data(filename, cls))
from multiprocessing.pool import ThreadPool
def import_data_mp_tp(n=2):
pool = ThreadPool(n)
results = []
for filename, cls in FILE_MATCHES.items():
results.append(pool.apply_async(
sequential_import_file_data,
(filename, cls)
))
results = [r.get() for r in results]
from multiprocessing.pool import Pool
def import_data_mp_p(n=2):
pool = Pool(n)
results = []
for filename, cls in FILE_MATCHES.items():
results.append(pool.apply_async(
sequential_import_file_data,
(filename, cls)
))
results = [r.get() for r in results]
import asyncio
import aiofiles
from aiocsv import AsyncReader
async def async_import_file_data(filename, cls):
results = {cls: []}
async with aiofiles.open(
(pathlib.Path("data") / filename),
mode="r",
encoding="latin1"
) as afp:
async for line in AsyncReader(afp, delimiter="\t"):
data = cls.from_line(line)
results[cls].append(data)
return results
def import_data_async():
results = []
for filename, cls in FILE_MATCHES.items():
results.append(asyncio.run(async_import_file_data(filename, cls)))
def main():
import timeit
print(
"Sequential:",
timeit.timeit(lambda: import_data_sequential(), number=10)
)
print(
"Multi ThreadPool:",
timeit.timeit(lambda: import_data_mp_tp(), number=10)
)
print(
"Multi Pool:",
timeit.timeit(lambda: import_data_mp_p(), number=10)
)
print(
"Async:",
timeit.timeit(lambda: import_data_async(), number=10)
)
if __name__ == "__main__":
main()
字符串
当我运行它时,我得到以下结果。
Sequential: 9.821639589001279
Multi ThreadPool: 10.137484730999859
Multi Pool: 12.531487682997977
Async: 30.953154197999538
型
迭代所有文件及其所有行的最基本解决方案似乎也是最快的。
我做错了什么会减慢进口的事吗?这样的时间差是正常的吗?
1条答案
按热度按时间1cosmwyk1#
像往常一样:在代码上运行一个分析器,看看它在哪里花费了时间。(这是PyCharm的,它 Package 了stdlib
cProfile
。连续:7.865187874995172
x1c 0d1x的数据
嗯,好的。
strptime
,我可以告诉它会被datetime.datetime.strptime
调用。奇怪的是getlocale
...为什么我们需要在那里设置地点?点击调用图可以看到,strptime
实际上是在查找当前的locale,并且有一堆锁等等--如果我们用自己的实现替换那些parse_date
呢?字符串
序号:3.8978060420195106
好的,我们开始做饭了!52%的改善!
的
(It没有显示在这里的屏幕截图上,因为我是一个愚蠢的鹅裁剪它,但
strptime
在引擎盖下使用的re
的东西也掉了。)现在让我们假设有很多相同的日期,并在这些热
parse_date_*
函数上加载@lru_cache(maxsize=None)
s(RAM灵活,无限缓存),运行代码并打印该高速缓存信息:型
我觉得不错,上一个号码再打八五折。
parse_price
显然也可以使用缓存:型
嘿,谁知道,数据中只有4096个单独的价格字符串。
如果你有足够的内存,剩下的解析函数也可以使用缓存,但是通过一点分析和解析,现在它的速度快了2.7倍[当运行所有东西10次时,这意味着这些缓存将是热的-单次运行的加速并不那么引人注目],不需要并行处理。魔法!
为了让游戏场地更加公平,这里有一个
hyperfine
基准测试,其中Python解释器在每次导入时都从头开始(每个解释器只运行一次导入):型
因此,快速查看分析器(以及一些额外的优化,例如不在
from_line
函数中创建Mapdicts等),速度提升了55%。