我目前正在编写代码来处理一个单一的图像使用 Hadoop
,所以我的输入只有一个文件( .png
). 我有运行作业的工作代码,但不是按顺序运行 mappers
,它只运行一个 mapper
永远不会产生别人 mappers
.
我已经创建了我自己的扩展 FileInputFormat
以及 RecordReader
类来创建(我以为是“n”自定义的) splits
->“n” map
任务。
我一直在网上疯狂地搜索这种性质的例子来学习,但我所能找到的都是处理将整个文件作为一个分裂(确切地说是一个分裂)的例子 mapper
)或者使用文本文件中的固定行数(例如,3行) map
任务。
我要做的是发送一对坐标 ((x1, y1), (x2, y2))
对每个 mapper
其中坐标对应于图像中某个矩形的左上/右下像素。
如有任何建议/指导/示例/示例链接,将不胜感激。
自定义文件输入格式
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
public class FileInputFormat1 extends FileInputFormat
{
@Override
public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
return new RecordReader1();
}
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return true;
}
}
自定义recordreader
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
public class RecordReader1 extends RecordReader<KeyChunk1, NullWritable> {
private KeyChunk1 key;
private NullWritable value;
private ImagePreprocessor IMAGE;
public RecordReader1()
{
}
@Override
public void close() throws IOException {
}
@Override
public float getProgress() throws IOException, InterruptedException {
return IMAGE.getProgress();
}
@Override
public KeyChunk1 getCurrentKey() throws IOException, InterruptedException {
return key;
}
@Override
public NullWritable getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
boolean gotNextValue = IMAGE.hasAnotherChunk();
if (gotNextValue)
{
if (key == null)
{
key = new KeyChunk1();
}
if (value == null)
{
value = NullWritable.get();
}
int[] data = IMAGE.getChunkIndicesAndIndex();
key.setChunkIndex(data[2]);
key.setStartRow(data[0]);
key.setStartCol(data[1]);
key.setChunkWidth(data[3]);
key.setChunkHeight(data[4]);
}
else
{
key = null;
value = null;
}
return gotNextValue;
}
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
Configuration config = taskAttemptContext.getConfiguration();
IMAGE = new ImagePreprocessor(
config.get("imageName"),
config.getInt("v_slices", 1),
config.getInt("h_slices", 1),
config.getInt("kernel_rad", 2),
config.getInt("grad_rad", 1),
config.get("hdfs_address"),
config.get("local_directory")
);
}
}
imagepreprocessor类(用于自定义recordreader-仅显示必要的信息)
import java.awt.image.BufferedImage;
import java.io.IOException;
public class ImagePreprocessor {
private String filename;
private int num_v_slices;
private int num_h_slices;
private int minSize;
private int width, height;
private int chunkWidth, chunkHeight;
private int indexI, indexJ;
String hdfs_address, local_directory;
public ImagePreprocessor(String filename, int num_v_slices, int num_h_slices, int kernel_radius, int gradient_radius,
String hdfs_address, String local_directory) throws IOException{
this.hdfs_address = hdfs_address;
this.local_directory = local_directory;
// all "validate" methods throw errors if input data is invalid
checkValidFilename(filename);
checkValidNumber(num_v_slices, "vertical strips");
this.num_v_slices = num_v_slices;
checkValidNumber(num_h_slices, "horizontal strips");
this.num_h_slices = num_h_slices;
checkValidNumber(kernel_radius, "kernel radius");
checkValidNumber(gradient_radius, "gradient radius");
this.minSize = 1 + 2 * (kernel_radius + gradient_radius);
getImageData(); // loads image and saves width/height to class variables
validateImageSize();
chunkWidth = validateWidth((int)Math.ceil(((double)width) / num_v_slices));
chunkHeight = validateHeight((int)Math.ceil(((double)height) / num_h_slices));
indexI = 0;
indexJ = 0;
}
public boolean hasAnotherChunk()
{
return indexI < num_h_slices;
}
public int[] getChunkIndicesAndIndex()
{
int[] ret = new int[5];
ret[0] = indexI;
ret[1] = indexJ;
ret[2] = indexI*num_v_slices + indexJ;
ret[3] = chunkWidth;
ret[4] = chunkHeight;
indexJ += 1;
if (indexJ >= num_v_slices)
{
indexJ = 0;
indexI += 1;
}
return ret;
}
}
谢谢你的时间!
1条答案
按热度按时间myzjeezk1#
你应该重写方法
public InputSplit[] getSplits(JobConf job, int numSplits)
在你的FileInputFormat1
班级。根据创建自己的类InputSplit
有矩形坐标,所以在里面FileInputFormat
您可以获取此信息以将正确的键/值对返回给Map器。可能实施getSplits
在FileInputFormat
我可以帮你看看这里。