mapreduce java程序搜索四叉树索引,并运行geometryengine.contains使用wkt文件确认多边形中的点

dm7nw8vv  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(366)

这篇文章是一个map-reduce实现,建议我回答上一个问题:“如何优化配置单元中1个巨大文件/表的扫描,以确认/检查wkt几何图形中是否包含lat long点。”
我不太擅长为map reduce编写java程序,我主要使用hive、pig或spark在hadoop生态系统中进行开发。给出手头任务的背景:我正在尝试将每个纬度/经度ping与相应的邮政编码相关联。我有一个wkt多多边形形状文件(500mb)与所有的zip信息。我已经在Hive中加载了它,可以使用stu contains(多边形,点)进行连接。然而,这需要很长时间才能完成。为了克服这个瓶颈,我试图利用esri中的示例(“https://github.com/esri/gis-tools-for-hadoop/tree/master/samples/point-in-polygon-aggregation-mr)通过建立四叉树索引来搜索从多边形中的lat long派生的点。
我已经成功地编写了代码,它阻塞了集群的java堆内存。任何关于改进代码或寻找不同方法的建议都将受到极大的赞赏:错误消息:error:java heap space container被applicationmaster杀死。按要求杀死集装箱。出口代码为143,集装箱出口代码为非零143
我的代码:

public class MapperClass extends Mapper<LongWritable, Text, Text, IntWritable> {

    // column indices for values in the text file
    int longitudeIndex;
    int latitudeIndex;
    int wktZip; 
    int wktGeom;
    int wktLineCount;
    int wktStateID;

    // in boundaries.wkt, the label for the polygon is "wkt"
    //creating ArrayList to hold details of the file
    ArrayList<ZipPolyClass> nodes = new ArrayList<ZipPolyClass>();

    String labelAttribute;
    EsriFeatureClass featureClass;
    SpatialReference spatialReference;
    QuadTree quadTree;
    QuadTreeIterator quadTreeIter;
    BufferedReader csvWkt;

    // class to store all the values from wkt file and calculate geometryFromWKT 
    public class ZipPolyClass {

        public String zipCode;
        public String wktPoly;
        public String stateID;
        public int indexJkey;
        public Geometry wktGeomObj; 

        public ZipPolyClass(int ijk, String z, String w, String s ){
            zipCode = z;
            wktPoly = w;
            stateID = s;
            indexJkey = ijk;
            wktGeomObj = GeometryEngine.geometryFromWkt(wktPoly, 0, Geometry.Type.Unknown);
        }

    }

    //building quadTree Index from WKT multiPolygon and creating an iterator
    private void buildQuadTree(){
        quadTree = new QuadTree(new Envelope2D(-180, -90, 180, 90), 8);

        Envelope envelope = new Envelope();

        int j=0;

        while(j<nodes.size()){
            nodes.get(j).wktGeomObj.queryEnvelope(envelope);
            quadTree.insert(j, new Envelope2D(envelope.getXMin(), envelope.getYMin(), envelope.getXMax(), envelope.getYMax()));
        }

        quadTreeIter = quadTree.getIterator();
    }

    /**
     * Query the quadtree for the feature containing the given point
     * 
     * @param pt point as longitude, latitude
     * @return index to feature in featureClass or -1 if not found
     */
    private int queryQuadTree(Point pt)
    {
        // reset iterator to the quadrant envelope that contains the point passed
        quadTreeIter.resetIterator(pt, 0);

        int elmHandle = quadTreeIter.next();

        while (elmHandle >= 0){
            int featureIndex = quadTree.getElement(elmHandle);

            // we know the point and this feature are in the same quadrant, but we need to make sure the feature
            // actually contains the point
            if (GeometryEngine.contains(nodes.get(featureIndex).wktGeomObj, pt, spatialReference)){
                return featureIndex;
            }

            elmHandle = quadTreeIter.next();
        }

        // feature not found
        return -1;
    }

    /**
     * Sets up mapper with filter geometry provided as argument[0] to the jar
     */
    @Override
    public void setup(Context context)
    {
        Configuration config = context.getConfiguration();

        spatialReference = SpatialReference.create(4326);

        // first pull values from the configuration     
        String featuresPath = config.get("sample.features.input");
        //get column reference from driver class 
        wktZip = config.getInt("sample.features.col.zip", 0);
        wktGeom = config.getInt("sample.features.col.geometry", 18);
        wktStateID = config.getInt("sample.features.col.stateID", 3);
        latitudeIndex = config.getInt("samples.csvdata.columns.lat", 5);
        longitudeIndex = config.getInt("samples.csvdata.columns.long", 6);

        FSDataInputStream iStream = null;

        try {
            // load the text WKT file provided as argument 0
            FileSystem hdfs = FileSystem.get(config);
            iStream = hdfs.open(new Path(featuresPath));
            BufferedReader br = new BufferedReader(new InputStreamReader(iStream));
            String wktLine ;
            int i=0;

            while((wktLine = br.readLine()) != null){
                String [] val = wktLine.split("\\|");
                String qtZip = val[wktZip];
                String poly = val[wktGeom];
                String stID = val[wktStateID];
                ZipPolyClass zpc = new ZipPolyClass(i, qtZip, poly, stID);
                nodes.add(i,zpc);
                i++; // increment in the loop before end
                }

        } 
        catch (Exception e)
        {
            e.printStackTrace();
        } 
        finally
        {
            if (iStream != null)
            {
                try {
                    iStream.close();
                } catch (IOException e) { }
            }
        }

        // build a quadtree of our features for fast queries
        if (!nodes.isEmpty()) {
            buildQuadTree();
        }
    }

    @Override
    public void map(LongWritable key, Text val, Context context)
            throws IOException, InterruptedException {

        /* 
         * The TextInputFormat we set in the configuration, by default, splits a text file line by line.
         * The key is the byte offset to the first character in the line.  The value is the text of the line.
         */

        String line = val.toString();
        String [] values = line.split(",");

        // get lat long from file and convert to float
        float latitude = Float.parseFloat(values[latitudeIndex]);
        float longitude = Float.parseFloat(values[longitudeIndex]);

        // Create our Point directly from longitude and latitude
        Point point = new Point(longitude, latitude);

        int featureIndex = queryQuadTree(point);

        // Each map only processes one record at a time, so we start out with our count 
                // as 1. Since we have a distinct record file we will not run reducer
                IntWritable one = new IntWritable(1);

        if (featureIndex >= 0){

            String zipTxt =nodes.get(featureIndex).zipCode;
            String stateIDTxt = nodes.get(featureIndex).stateID;
            String latTxt = values[latitudeIndex];
            String longTxt = values[longitudeIndex];
            String pointTxt = point.toString();
            String name;
            name = zipTxt+"\t"+stateIDTxt+"\t"+latTxt+"\t"+longTxt+ "\t" +pointTxt;

            context.write(new Text(name), one);
        } else {
            context.write(new Text("*Outside Feature Set"), one);
        }
    }
}
bpsygsoo

bpsygsoo1#

通过修改arraylist以保持arraylist类型,我能够解决内存不足的问题。
创建一个类对象(大约50k)来保存文本文件的每一行,会消耗所有java堆内存。在此更改之后,即使在单节点虚拟沙盒中,代码也可以正常运行。我能在大约6分钟的时间里处理大约4000万行。

相关问题