django csv自动导入

q3aa0525  于 2021-06-17  发布在  Mysql
关注(0)|答案(0)|浏览(294)

我是来自意大利的马塞洛,我是python和django的新手。我问了一个问题来获取我在django项目的信息。
我创建了一个有两个表的项目,我的问题是如何从csv自动获取数据,我将创建一个ftp服务器来存储csv.txt并更新表(例如每天)。
现在我手动更新django导入/导出。我想让django读取csv文件和更新数据库,然后删除。
非常感谢大家的支持!!!
我的模特.py:

from django.db import models
from django.urls import reverse

class AnagraficaCliente(models.Model):
    codice_cliente = models.CharField(max_length=20, primary_key=True, 
                                      null=False, unique=True)
    ragione_sociale = models.CharField(max_length=40)
    ragione_sociale_dest = models.CharField(max_length=40)
    nome = models.CharField(max_length=20, blank=True)
    cognome = models.CharField(max_length=20, blank=True)
    ragione_sociale = models.CharField(max_length=20)
    indirizzo = models.TextField(blank=True)
    cap = models.CharField(max_length=5, blank=True)
    piva = models.CharField(max_length=11, blank=True)
    vatnumber = models.CharField(max_length=13, blank=True)
    ragione_sociale_dest = models.CharField(max_length=40)
    indirizzo_dest = models.TextField(null=True)

    def __str__(self):
        #return self.ragione_sociale + " " + self.codice_cliente
        return self.ragione_sociale_dest + " " + self.indirizzo_dest 

    class Meta:
        verbose_name = "AnagraficaCliente"
        verbose_name_plural = "AnagraficaClienti"

class Tracking(models.Model):
    track = models.CharField(max_length=10, null=False, unique=True, 
                             primary_key=True)
    passaggio1 = models.CharField(max_length=50, blank=True)
    passaggio2 = models.CharField(max_length=50, blank=True)
    passaggio3 = models.CharField(max_length=50, blank=True)
    passaggio4 = models.CharField(max_length=50, blank=True)
    passaggio5 = models.CharField(max_length=50, blank=True)
    consegna = (
        ('C', 'Consegnato'),
        ('N', 'Non consegnato'),
    )
    consegnato = models.CharField(max_length=1, choices=consegna)

    #consegnato = models.BooleanField(blank=True)
    #esito = models.CharField(max_length=10, blank=True)
    flag = models.CharField(max_length=2, blank=True)
    AnagraficaCliente = models.ForeignKey(AnagraficaCliente, 
    on_delete=models.CASCADE, related_name='trackings')

    def __str__(self):
        return self.track

    class Meta:
        verbose_name = "Tracking"
        verbose_name_plural = "Trackings"

我制作了一个脚本,从trackingtest.csv读取并存储在result\u str\u csv\u anag\u index\u 8\u list中,然后从anagraficatest.csv读取并插入mysql,在此之后,我从表跟踪\u anagraficacliente(以前在mysql中输入)得到一个查询,并得到最后一个字段数据存储在result\u str\u query\u anag\u index\u 8\u list中,然后比较result\u str\u csv\u anag\u index\u 8\u list中的result\u str\u query\u anag\u index\u 8\u list,如果外键正确的话。
所有这些都可以在linux中使用cron,但我希望在django内部(在后端)实现alert(例如,如果insert工作正常,则使用红色或绿色按钮)。
我希望能很好地解释我所做的事情,我是初学者;)
谢谢您。


# !/usr/bin/python3

import csv
import MySQLdb, time

sql_anag= """
        INSERT INTO tracking_anagraficacliente 
        (codice_cliente,nome,cognome,ragione_sociale,indirizzo,cap,piva,vatnumber,ragione_sociale_dest,indirizzo_dest)
        VALUES 
        (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) 
        ON DUPLICATE KEY UPDATE 
                                                    -- no primary key  aggiornata

        nome   = VALUES(nome),
        cognome   = VALUES(cognome),
        ragione_sociale   = VALUES(ragione_sociale),
        indirizzo   = VALUES(indirizzo),
        cap   = VALUES(cap),
        piva   = VALUES(piva),
        vatnumber   = VALUES(vatnumber),
        ragione_sociale_dest   = VALUES(ragione_sociale_dest),
        indirizzo_dest   = VALUES(indirizzo_dest);
        """
    sql_track= """
        INSERT INTO tracking_tracking 

        VALUES 
        (%s, %s, %s, %s, %s, %s, %s, %s, %s) 
        ON DUPLICATE KEY UPDATE 
                                                    -- no primary key  aggiornata

        passaggio1   = VALUES(passaggio1),
        passaggio2   = VALUES(passaggio2),
        passaggio3   = VALUES(passaggio3),
        passaggio4   = VALUES(passaggio4),
        passaggio5   = VALUES(passaggio5),
        consegnato   = VALUES(consegnato),
        flag   = VALUES(flag),
        AnagraficaCliente_id  = VALUES(AnagraficaCliente_id);
        """
query_id_anag = """ select codice_cliente from tracking_anagraficacliente; """

# query_id_track = """ select AnagraficaCliente_id from tracking_tracking; """

db = MySQLdb.connect(
        host="localhost",    # your host
        user="xxxxxxx",         # your username
        passwd="xxxxxx",  # your password
        db="lm_cms")        # name of the data base

cur = db.cursor()

with open('trackingtest.csv') as csvfile: #
readCSVQuery = csv.reader(csvfile, delimiter=',') 
next (readCSVQuery) 
list_csv_tracking = [] 
for i in readCSVQuery: 
    trackAnagraficaId = i[8]
    list_csv_tracking.append(trackAnagraficaId) 
result_str_csv_track_index_8 = str(list_csv_tracking).replace(",", 
"").replace("[", "").replace("]", "") 
result_str_csv_anag_index_8_list = result_str_csv_track_index_8.split(' ') 
print ("Risultato CSV Tracking Foreign 
Key",result_str_csv_anag_index_8_list)

def insert_anagrafica():
    with open('anagraficatest.csv') as csvfile: #read CSV anagraficatest
        readCSV = csv.reader(csvfile, delimiter=',')
        next (readCSV) # Jump first line
        for row_anag in readCSV:
            try:
                cur.execute (sql_anag, (row_anag)) #Insert data on mysql
                db.commit() # Commit your changes in the database
                print(row_anag)
            except (MySQLdb.Error, MySQLdb.Warning) as e: #Exception
                print(e)
                db.rollback() # Rollback in case there is any error
                db.close()

def query_anag():
    cur.execute(query_id_anag) #Execute query to get codice_cliente from tracking_anagraficacliente
    results_query_id_anag = cur.fetchall() # Get all data from query and assign to results_query_id_anag
    result_query_anag = str(results_query_id_anag).replace("(", "").replace(")", "").replace(",", "")# Delete ( ) , from results_query_id_anag and convert to string
    result_str_query_anag_index_8_list = result_query_anag.split(' ') # Convert in string from result_query_anag to result_str_query_anag_index_8_list
    print ("Risultato Query anagrafica Codice cliente  ",result_str_query_anag_index_8_list)
    return result_str_query_anag_index_8_list #return value

def insert_tracking():
    with open('trackingtest.csv') as csvfile: #Read Csv
        readCSV = csv.reader(csvfile, delimiter=',') #Read csv file
        next (readCSV) #Jump first line
        #print (results_query_id_anag)
        #compare if AnagraficaClienteID in trackingtest.csv is  present in mysql tracking_aanagraficacliente Codice Cliente
        #,if present make the insert  sql_track else no
        for i in result_str_csv_anag_index_8_list: #compare element lists in result_str_csv_anag_index_8_list is present in result_str_query_anag_index_8_list
            if i in result_str_query_anag_index_8_list:
                for row_track in readCSV: # Read all the line in trackingtest.csv
                    try:
                        #print(row_track[0])
                        #print(row_track)

                        cur.execute (sql_track, (row_track)) #Insert data trackingtest.csv in tracking_tracking
                        db.commit() # Commit your changes in the database
                        print ("Insert effettuata con successo ", row_track)
                    except (MySQLdb.Error, MySQLdb.Warning) as e:
                        print (e, "Riga non inserita: ",row_track)
                        #db.close() # disconnect from server
            else:
                print ("Attenzione, una o più righe non sono state inserite")

insert_anagrafica()
time.sleep(2)
(result_str_query_anag_index_8_list) = query_anag() #value passed from def query_anag() to insert_tracking
time.sleep(2)
insert_tracking()

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题