使用SQL Alchemy导入数据时,For循环不会更新Python Pandas数据框

6psbrbz9  于 2023-01-19  发布在  Python
关注(0)|答案(2)|浏览(109)

我有一个for循环,它应该从postgres表中更新一个Pandas Dataframe ,该表每5秒由另一个线程更新一次。
如果我不使用for循环来运行代码,我会得到我想要的结果,也就是最近的更新时间。2然而,如果我使用for循环来运行代码,结果不会更新,并且停留在第一个结果上。
为什么会发生这种情况?我该如何解决这个问题?

metadata = MetaData(bind=None)
table = Table(
    'datastore', 
    metadata, 
    autoload=True, 
    autoload_with=engine
)

stmt = select([
    table.columns.date,
    table.columns.open,
    table.columns.high,
    table.columns.low,
    table.columns.close
]).where(and_(table.columns.date_ == datetime.today().strftime('%Y-%m-%d') and table.columns.close != 0))
#]).where(and_(table.columns.date_ == '2023-01-12' and table.columns.close != 0))
    
connection = engine.connect()

for x in range(1000000):
    data_from_db = pd.DataFrame(connection.execute(stmt).fetchall())  
    data_from_db = data_from_db[data_from_db['close'] != 0]
    print(data_from_db.date.iloc[-1])
    time.sleep(5)

我也尝试了psycopg2库,问题总是存在:

for x in range(1000000):
    conn = psycopg2.connect(
                           host='localhost', 
                           database='ABC',  
                           user='postgres',
                           password='*******')
    cur = conn.cursor()
    cur.execute("select max(date) from public.datastore")

    y = cur.fetchall()
    print(y)
    time.sleep(5)
9gm1akwq

9gm1akwq1#

此问题可能由以下某些原因引起:
1.* 事务隔离级别 *(意味着更新表的其他线程可能有一个会话尚未关闭,因此当前脚本读取的是 * 旧 * 数据)
1.您可以对相同的语句应用 * 缓存
对于第一个因素,尝试将isolation_level="READ UNCOMMITTED"设置为具有 “脏”/新读取

engine = create_engine(
    "your_dsn path",
    isolation_level="READ UNCOMMITTED"
)

with engine.connect() as conn:
    for x in range(1000000):
        data_from_db = pd.DataFrame(conn.execute(stmt).fetchall()) 
        print(data_from_db.date.iloc[-1])
        time.sleep(5)

对于第二个因素,您可以尝试:

# disable caching for this connection
with engine.connect().execution_options(compiled_cache=None) as conn:
    # your loop here
    data_from_db = pd.DataFrame(conn.execute(stmt).fetchall())
xa9qqrwz

xa9qqrwz2#

我用的是木星笔记本。
一个单元从API插入数据,另一个单元使用不同的线程从同一个数据库阅读数据。
看起来,如果我用两个不同的笔记本电脑执行这两个操作,所有的工作。
我只是想用线。

import psycopg2
import time

def function_test():
    while True:
        while True:
            conn_ = psycopg2.connect(
                                   host='#####', 
                                   database='####',  
                                   user='####',
                                   password='#####')
            cur = conn_.cursor()
            cur.execute("SELECT date FROM public.datastore order by date desc limit 1")
            query_results = cur.fetchone()
            print(query_results)
            del(query_results)
            del(cur)
            break
        conn_.commit()
        conn_.close()
        time.sleep(5)
 
thread1 = Thread(target = function_test, args = ())
thread1.start()

相关问题