为什么我不能运行这个python脚本作为hadoop工作的Map器?

myzjeezk  于 2021-06-02  发布在  Hadoop
关注(0)|答案(0)|浏览(226)

我正在尝试使用hadoopmapreduce在cassandra数据库中插入数据。hadoop集群由6个数据节点和一个名称节点组成。这个 sudo nodetool status 命令显示所有7个示例的列表。以下是Map器脚本:


# !/usr/bin/env python

# -*- coding: utf-8 -*-

from cassandra.cluster import Cluster
import logging

log = logging.getLogger()
log.setLevel('INFO')

cassandra_client = None
class CassandraClient:
    session = None
    insert_page_statement = None

    def connect(self, nodes):
        cluster = Cluster(nodes)
        metadata = cluster.metadata
        self.session = cluster.connect()
        log.info('Connected to cluster: ' + metadata.cluster_name)

    def close(self):
        self.session.cluster.shutdown()
        self.session.shutdown()
        log.info('Connection closed.')

    def insertPage(self):
        self.session.execute(
        """
        INSERT INTO mykeyspace.mytable ( id )
        VALUES ( uuid() )
        """)

def readLoop():
    for line in sys.stdin:
        cassandra_client.insertPage()

import sys

def main():
    global cassandra_client
    logging.basicConfig()
    cassandra_client = CassandraClient()
    cassandra_client.connect(['172.31.16.105'])  #namenode ip
    readLoop()
    cassandra_client.close()

if __name__ == "__main__":
    main()

当我这样运行它时,这个脚本运行得很好:

cat sample.txt | ./mapper.py

我得到以下输出

INFO:cassandra.policies:Using datacenter 'dc1' for DCAwareRoundRobinPolicy (via host '172.31.16.105'); if incorrect, please specify a local_dc to the constructor, or limit contact points to local cluster nodes
INFO:cassandra.cluster:New Cassandra host <Host: 172.31.25.194 dc1> discovered
INFO:cassandra.cluster:New Cassandra host <Host: 172.31.20.151 dc1> discovered
INFO:cassandra.cluster:New Cassandra host <Host: 172.31.27.94 dc1> discovered
INFO:cassandra.cluster:New Cassandra host <Host: 172.31.29.41 dc1> discovered
INFO:cassandra.cluster:New Cassandra host <Host: 172.31.31.135 dc1> discovered
INFO:cassandra.cluster:New Cassandra host <Host: 172.31.24.228 dc1> discovered
INFO:root:Connected to cluster: ojcluster
INFO:root:Connection closed.

但是,当我使用以下命令将其作为hadoop作业运行时:

hadoop jar /usr/hdp/2.3.6.0-3796/hadoop-mapreduce/hadoop-streaming-2.7.1.2.3.6.0-3796.jar -files "mapper.py"  -D mapred.reduce.tasks=0 -input sample.txt -output output_folder -mapper mapper.py

我开始通过以下方式得到性爱

17/02/21 18:06:24 INFO mapreduce.Job:  map 0% reduce 0%
17/02/21 18:06:29 INFO mapreduce.Job: Task Id : attempt_1487537772279_0007_m_000000_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
    at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:322)
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:535)
    at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1724)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)

17/02/21 18:06:29 INFO mapreduce.Job: Task Id : attempt_1487537772279_0007_m_000001_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
    at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:322)
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:535)
    at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1724)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)

最后,我明白了 Streaming Command Failed. 我做错什么了?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题