xml—如何使用python将数据从hadoop保存到数据库

jk9hmnmh  于 2021-06-03  发布在  Hadoop
关注(0)|答案(2)|浏览(340)

我用hadoop处理一个xml文件,所以我用python编写了mapper文件,reducer文件。
假设需要处理的输入是test.xml

<report>
 <report-name name="ALL_TIME_KEYWORDS_PERFORMANCE_REPORT"/>
 <date-range date="All Time"/>
 <table>
  <columns>
   <column name="campaignID" display="Campaign ID"/>
   <column name="adGroupID" display="Ad group ID"/>
  </columns>
  <row campaignID="79057390" adGroupID="3451305670"/>
  <row campaignID="79057390" adGroupID="3451305670"/>
 </table>
</report>

mapper.py文件

import sys
import cStringIO
import xml.etree.ElementTree as xml

if __name__ == '__main__':
    buff = None
    intext = False
    for line in sys.stdin:
        line = line.strip()
        if line.find("<row") != -1:
        .............
        .............
        .............
        print '%s\t%s'%(campaignID,adGroupID )

reducer.py文件

import sys
if __name__ == '__main__':
    for line in sys.stdin:
        print line.strip()

我用以下命令运行hadoop

bin/hadoop jar contrib/streaming/hadoop-streaming-1.0.4.jar 
- file /path/to/mapper.py file -mapper /path/to/mapper.py file 
-file /path/to/reducer.py file -reducer /path/to/reducer.py file 
-input /path/to/input_file/test.xml 
-output /path/to/output_folder/to/store/file

当我运行上面的命令时,hadoop正在以我们前面提到的格式在输出路径中创建一个输出文件 reducer.py 用所需数据正确归档
现在我要做的是,当我运行上述命令时,我不想将输出数据存储在haddop默认创建的文本文件中,而是想将数据保存到 MYSQL 数据库
所以我用 reducer.py 将数据直接写入的文件 MYSQL 数据库,并尝试通过删除输出路径来运行上述命令,如下所示

bin/hadoop jar contrib/streaming/hadoop-streaming-1.0.4.jar 
- file /path/to/mapper.py file -mapper /path/to/mapper.py file 
-file /path/to/reducer.py file -reducer /path/to/reducer.py file 
-input /path/to/input_file/test.xml

我得到的错误如下

12/11/08 15:20:49 ERROR streaming.StreamJob: Missing required option: output
Usage: $HADOOP_HOME/bin/hadoop jar \
          $HADOOP_HOME/hadoop-streaming.jar [options]
Options:
  -input    <path>     DFS input file(s) for the Map step
  -output   <path>     DFS output directory for the Reduce step
  -mapper   <cmd|JavaClassName>      The streaming command to run
  -combiner <cmd|JavaClassName> The streaming command to run
  -reducer  <cmd|JavaClassName>      The streaming command to run
  -file     <file>     File/dir to be shipped in the Job jar file
  -inputformat TextInputFormat(default)|SequenceFileAsTextInputFormat|JavaClassName Optional.
  -outputformat TextOutputFormat(default)|JavaClassName  Optional.
   .........................
   .........................

毕竟我的疑问是如何将数据保存在 Database 在处理完文件之后?
在哪个文件中(mapper.py/reducer.py?)我们可以编写将数据写入数据库的代码吗
哪个命令用于运行hadoop将数据保存到数据库,因为当我在hadoop命令中删除输出文件夹路径时,它显示了一个错误。
有人能帮我解决以上问题吗。。。。。。。。。。。。。
编辑
已处理跟踪
创建 mapper 以及 reducer 读取xml文件并在某个文件夹中创建文本文件 hadoop command 例如:下面是文本文件(使用hadoop命令处理xml文件的结果)所在的文件夹
/home/local/user/hadoop/xml\u处理/xml\u输出/part-00000
这里的xml文件大小是 1.3 GB 用hadoop处理后 text file 已创建 345 MB 现在我想做的就是 reading the text file in the above path and saving data to the mysql database 尽可能快。
我已经用basic python尝试过了,但是正在采取一些措施 350 sec 处理文本文件并保存到mysql数据库。
如nichole所示,现在下载了sqoop并在下面的路径中解压
/home/local/user/sqoop-1.4.2.bin\uhadoop-0.20
加入到 bin 文件夹并键入 ./sqoop 我收到了下面的错误

sh-4.2$ ./sqoop
Warning: /usr/lib/hbase does not exist! HBase imports will fail.
Please set $HBASE_HOME to the root of your HBase installation.
Warning: $HADOOP_HOME is deprecated.

Try 'sqoop help' for usage.

我也试过了

./sqoop export --connect jdbc:mysql://localhost/Xml_Data --username root --table PerformaceReport --export-dir /home/local/user/Hadoop/xml_processing/xml_output/part-00000 --input-fields-terminated-by '\t'

结果

Warning: /usr/lib/hbase does not exist! HBase imports will fail.
Please set $HBASE_HOME to the root of your HBase installation.
Warning: $HADOOP_HOME is deprecated.

12/11/27 11:54:57 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
12/11/27 11:54:57 INFO tool.CodeGenTool: Beginning code generation
12/11/27 11:54:57 ERROR sqoop.Sqoop: Got exception running Sqoop: java.lang.RuntimeException: Could not load db driver class: com.mysql.jdbc.Driver
java.lang.RuntimeException: Could not load db driver class: com.mysql.jdbc.Driver
    at org.apache.sqoop.manager.SqlManager.makeConnection(SqlManager.java:636)
    at org.apache.sqoop.manager.GenericJdbcManager.getConnection(GenericJdbcManager.java:52)
    at org.apache.sqoop.manager.SqlManager.execute(SqlManager.java:525)
    at org.apache.sqoop.manager.SqlManager.execute(SqlManager.java:548)
    at org.apache.sqoop.manager.SqlManager.getColumnTypesForRawQuery(SqlManager.java:191)
    at org.apache.sqoop.manager.SqlManager.getColumnTypes(SqlManager.java:175)
    at org.apache.sqoop.manager.ConnManager.getColumnTypes(ConnManager.java:262)
    at org.apache.sqoop.orm.ClassWriter.getColumnTypes(ClassWriter.java:1235)
    at org.apache.sqoop.orm.ClassWriter.generate(ClassWriter.java:1060)
    at org.apache.sqoop.tool.CodeGenTool.generateORM(CodeGenTool.java:82)
    at org.apache.sqoop.tool.ExportTool.exportTable(ExportTool.java:64)
    at org.apache.sqoop.tool.ExportTool.run(ExportTool.java:97)
    at org.apache.sqoop.Sqoop.run(Sqoop.java:145)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
    at org.apache.sqoop.Sqoop.runSqoop(Sqoop.java:181)
    at org.apache.sqoop.Sqoop.runTool(Sqoop.java:220)
    at org.apache.sqoop.Sqoop.runTool(Sqoop.java:229)
    at org.apache.sqoop.Sqoop.main(Sqoop.java:238)
    at com.cloudera.sqoop.Sqoop.main(Sqoop.java:57)

上面的sqoop命令对于读取文本文件并保存到数据库的功能是否有用,因为我们必须处理从文本文件和插入到数据库!!!!

uqxowvwt

uqxowvwt1#

将数据写入数据库的最佳位置是outputformat。减速机级别的写作可以做,但不是最好的事情。
如果用java编写了mapper和reducer,就可以使用dboutputformat。
因此,您可以编写一个符合reducer数据输出格式(key,value)的自定义outputformat,将数据下沉到mysql。
阅读雅虎开发者网络上关于如何编写自定义输出格式的教程

kyvafyod

kyvafyod2#

我用python编写了所有hadoop mr jobs。我只想说,您不需要使用python来移动数据。使用sqoop:http://sqoop.apache.org/
sqoop是一个开源工具,允许用户将关系数据库中的数据提取到hadoop中进行进一步处理。而且使用起来非常简单。你要做的就是
下载并配置sqoop
创建mysql表模式
指定hadoop hdfs文件名、结果表名和列分隔符。
阅读以下内容了解更多信息:http://archive.cloudera.com/cdh/3/sqoop/sqoopuserguide.html
使用sqoop的优点是,我们现在可以用一行命令将hdfs数据转换为任何类型的关系数据库(mysql、derby、hive等),反之亦然
对于您的用例,请进行必要的更改:
Map器.py


# !/usr/bin/env python

import sys
for line in sys.stdin:
        line = line.strip()
        if line.find("<row") != -1 :
            words=line.split(' ')
            campaignID=words[1].split('"')[1]
            adGroupID=words[2].split('"')[1]
            print "%s:%s:"%(campaignID,adGroupID)

流命令

bin/hadoop jar contrib/streaming/hadoop-streaming-1.0.4.jar - file /path/to/mapper.py file -mapper /path/to/mapper.py file -file /path/to/reducer.py file -reducer /path/to/reducer.py file -input /user/input -output /user/output

mysql数据库

create database test;
use test;
create table testtable ( a varchar (100), b varchar(100) );

sqoop公司

./sqoop export --connect jdbc:mysql://localhost/test --username root --table testnow --export-dir /user/output --input-fields-terminated-by ':'

注:
请根据您的需要更换Map
我在mapper和sqoop命令中都使用了“:”作为列分隔符。根据需要更换。
sqoop教程:我亲自跟随hadoop:the definitive 指南(奥雷利)以及http://archive.cloudera.com/cdh/3/sqoop/sqoopuserguide.html.

相关问题