我在配置单元中有一个表,其中有351837条(110mb大小)记录,我正在使用python读取这个表并将其写入sqlserver。
在这个过程中,从Hive读取数据到PandasDataframe需要很长时间。当我加载整个记录(351k)时,需要90分钟。
为了提高性能,我采用了以下方法,比如从配置单元读取一次10k行,然后写入SQLServer。但是一次从配置单元读取10k行并将其分配给Dataframe就需要4-5分钟的时间。
def execute_hadoop_export():
"""
This will run the steps required for a Hadoop Export.
Return Values is boolean for success fail
"""
try:
hql='select * from db.table '
# Open Hive ODBC Connection
src_conn = pyodbc.connect("DSN=****",autocommit=True)
cursor=src_conn.cursor()
#tgt_conn = pyodbc.connect(target_connection)
# Using SQLAlchemy to dynamically generate query and leverage dataframe.to_sql to write to sql server...
sql_conn_url = urllib.quote_plus('DRIVER={ODBC Driver 13 for SQL Server};SERVER=Xyz;DATABASE=Db2;UID=ee;PWD=*****')
sql_conn_str = "mssql+pyodbc:///?odbc_connect={0}".format(sql_conn_url)
engine = sqlalchemy.create_engine(sql_conn_str)
# read source table.
vstart=datetime.datetime.now()
for df in pandas.read_sql(hql, src_conn,chunksize=10000):
vfinish=datetime.datetime.now()
print 'Finished 10k rows reading from hive and it took', (vfinish-vstart).seconds/60.0,' minutes'
# Get connection string for target from Ctrl.Connnection
df.to_sql(name='table', schema='dbo', con=engine, chunksize=10000, if_exists="append", index=False)
print 'Finished 10k rows writing into sql server and it took', (datetime.datetime.now()-vfinish).seconds/60.0, ' minutes'
vstart=datetime.datetime.now()
cursor.Close()
except Exception, e:
print str(e)
输出:
在python中读取配置单元表数据的最快方法是什么?
更新配置单元表结构
CREATE TABLE `table1`(
`policynumber` varchar(15),
`unitidentifier` int,
`unitvin` varchar(150),
`unitdescription` varchar(100),
`unitmodelyear` varchar(4),
`unitpremium` decimal(18,2),
`garagelocation` varchar(150),
`garagestate` varchar(50),
`bodilyinjuryoccurrence` decimal(18,2),
`bodilyinjuryaggregate` decimal(18,2),
`bodilyinjurypremium` decimal(18,2),
`propertydamagelimits` decimal(18,2),
`propertydamagepremium` decimal(18,2),
`medicallimits` decimal(18,2),
`medicalpremium` decimal(18,2),
`uninsuredmotoristoccurrence` decimal(18,2),
`uninsuredmotoristaggregate` decimal(18,2),
`uninsuredmotoristpremium` decimal(18,2),
`underinsuredmotoristoccurrence` decimal(18,2),
`underinsuredmotoristaggregate` decimal(18,2),
`underinsuredmotoristpremium` decimal(18,2),
`umpdoccurrence` decimal(18,2),
`umpddeductible` decimal(18,2),
`umpdpremium` decimal(18,2),
`comprehensivedeductible` decimal(18,2),
`comprehensivepremium` decimal(18,2),
`collisiondeductible` decimal(18,2),
`collisionpremium` decimal(18,2),
`emergencyroadservicepremium` decimal(18,2),
`autohomecredit` tinyint,
`lossfreecredit` tinyint,
`multipleautopoliciescredit` tinyint,
`hybridcredit` tinyint,
`goodstudentcredit` tinyint,
`multipleautocredit` tinyint,
`fortyfivepluscredit` tinyint,
`passiverestraintcredit` tinyint,
`defensivedrivercredit` tinyint,
`antitheftcredit` tinyint,
`antilockbrakescredit` tinyint,
`perkcredit` tinyint,
`plantype` varchar(100),
`costnew` decimal(18,2),
`isnocontinuousinsurancesurcharge` tinyint)
CLUSTERED BY (
policynumber,
unitidentifier)
INTO 50 BUCKETS
注意:我也尝试过sqoop导出选项,但是我的配置单元表已经是bucketing格式了。
2条答案
按热度按时间gv8xihay1#
我尝试过多处理,我可以减少8-10分钟,从2小时。请查看以下脚本。
---------query.py文件-------
---------写入\u tosql.py文件---------
任何其他的解决办法都能帮助我及时减少开支。
ssgvzors2#
在使用cmd.get\u results之后,用pandas读取磁盘输出的最佳方法是什么(e、 从Hive命令)。例如,考虑以下因素:
如果在成功运行查询后,我检查results.csv的前几个字节,我会看到以下内容:
$ head -c 300 results.csv b'flight_uid\twinning_price\tbid_price\timpressions_source_timestamp\n'b'0FY6ZsrnMy\x012000\x012270.0\x011427243278000\n0FamrXG9AW\x01710\x01747.0\x011427243733000\n0FY6ZsrnMy\x012000\x012270.0\x011427245266000\n0FY6ZsrnMy\x012000\x012270.0\x011427245088000\n0FamrXG9AW\x01330\x01747.0\x011427243407000\n0FamrXG9AW\x01710\x01747.0\x011427243981000\n0FamrXG9AW\x01490\x01747.0\x011427245289000\n
当我试着在Pandas中打开它时:它显然不起作用(我得到一个空的Dataframe),因为它没有正确格式化为csv文件。虽然我可以尝试打开results.csv并在pandas中打开它之前对其进行后期处理(删除b'等),但这将是一种非常黑客的加载方式。我是否正确使用接口?这是使用qds\ U sdk的最新版本:1.4.2,三小时前的一个版本。