多种方式实现:
1. 实现MulitpleOutputFormat类(旧API),如MultipleTextOutputFormat 和 MultpleSequenceFileOutputFormat 是它的两个具体实现。
通过自己实现MulitpleOutputFormat类,重载 generateFileNameForKeyValue 方法,达到目的。
2. 由于MulitpleOutputFormat类(旧API)被标记为过时,且新的API中不包含这个类,所以自己参考MulitpleOutputFormat实现一个新的MulitpleOutputFormat类。
3. 使用 MultipleOutputs(新旧API)
4. 我把它称为 Writable 方法(引用)
注:旧API包含MulitpleOutputFormat 和 MultipleOutputs;新API只包含MultipleOutputs,但新MultipleOutputs类包含了旧API两个类的功能;
新MultipleOutputs类也不具备可灵活设置输出文件名称的特性,所以实现一个新的MulitpleOutputFormat类;
具体实现示例:
1. 实现MulitpleOutputFormat类,如MultipleTextOutputFormat 和 MultpleSequenceFileOutputFormat 是它的两个具体实现。
通过自己实现MulitpleOutputFormat类,重载 generateFileNameForKeyValue 方法,达到目的。
package com.zxw.hadoop.driver;import java.io.IOException;import java.util.Iterator;import java.util.StringTokenizer;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.FileInputFormat;import org.apache.hadoop.mapred.FileOutputFormat;import org.apache.hadoop.mapred.JobClient;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.MapReduceBase;import org.apache.hadoop.mapred.Mapper;import org.apache.hadoop.mapred.OutputCollector;import org.apache.hadoop.mapred.RecordWriter;import org.apache.hadoop.mapred.Reducer;import org.apache.hadoop.mapred.Reporter;import org.apache.hadoop.mapred.RunningJob;import org.apache.hadoop.mapred.TextOutputFormat;import org.apache.hadoop.mapred.lib.MultipleOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import org.apache.hadoop.util.Progressable;public class WordCount_Old { public static class TokenizerMapper extends MapReduceBase implements Mapper{ private final static IntWritable count = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); output.collect(word, count); } } } public static class IntSumReducer extends MapReduceBase implements Reducer { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } result.set(sum); output.collect(key, result); } } public static class WordCountOutputFormat extends MultipleOutputFormat { private TextOutputFormat output = null; @Override protected RecordWriter getBaseRecordWriter( FileSystem fs, JobConf job, String name, Progressable progressable) throws IOException { if (output == null) { output = new TextOutputFormat (); } return output.getRecordWriter(fs, job, name, progressable); } @Override protected String generateFileNameForKeyValue(Text key, IntWritable value, String name) { char c = key.toString().toLowerCase().charAt(0); if (c >= 'a' && c <= 'z') { return c + ".txt"; } return "result.txt"; } } public static void main(String[] args) throws Exception { JobConf job = new JobConf(WordCount_Old.class); job.setJobName("WordCount_Old"); String[] otherArgs = new GenericOptionsParser(job, args) .getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount "); System.exit(2); } job.setJarByClass(WordCount_Old.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setOutputFormat(WordCountOutputFormat.class);// 设置输出格式 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); RunningJob runningJob = JobClient.runJob(job); runningJob.waitForCompletion(); }}
2. 由于MulitpleOutputFormat类被标记为过时,且新的API中不包含这个类,所以自己参考MulitpleOutputFormat实现一个新的MulitpleOutputFormat类。
代码说明:定义了LineRecordWriter,MultipleOutputFormat两个类,采用wordcount示例做测试;
package com.zxw.hadoop.mapreduce.lib.output;import java.io.DataOutputStream;import java.io.IOException;import java.io.UnsupportedEncodingException;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;/** * 本来属于TextOutputFormat内部类,现在抽取出来,供其他类使用 * @author connor * * @param* @param */public class LineRecordWriter extends RecordWriter { private static final String utf8 = "UTF-8"; private static final byte[] newline; protected DataOutputStream out; private final byte[] keyValueSeparator; static { try { newline = "\n".getBytes(utf8); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } public LineRecordWriter(DataOutputStream out, String keyValueSeparator) { this.out = out; try { this.keyValueSeparator = keyValueSeparator.getBytes(utf8); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } public LineRecordWriter(DataOutputStream out) { this(out, "\t"); } /** * Write the object to the byte stream, handling Text as a special case. * @param o the object to print * @throws IOException if the write throws, we pass it on */ private void writeObject(Object o) throws IOException { if (o instanceof Text) { Text to = (Text) o; out.write(to.getBytes(), 0, to.getLength()); } else { out.write(o.toString().getBytes(utf8)); } } public synchronized void write(K key, V value) throws IOException { boolean nullKey = key == null || key instanceof NullWritable; boolean nullValue = value == null || value instanceof NullWritable; if (nullKey && nullValue) { return; } if (!nullKey) { writeObject(key); } if (!(nullKey || nullValue)) { out.write(keyValueSeparator); } if (!nullValue) { writeObject(value); } out.write(newline); } public synchronized void close(TaskAttemptContext context) throws IOException { out.close(); }}
package com.zxw.hadoop.mapreduce.lib.output;import java.io.DataOutputStream;import java.io.IOException;import java.util.HashMap;import java.util.Iterator;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.io.compress.GzipCodec;import org.apache.hadoop.mapreduce.OutputCommitter;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.ReflectionUtils;public abstract class MultipleOutputFormatextends FileOutputFormat { private MultiRecordWriter writer = null; @Override public RecordWriter getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { if (writer == null) { writer = new MultiRecordWriter(job, getTaskOutputPath(job)); } return writer; } private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException { Path workPath = null; OutputCommitter committer = super.getOutputCommitter(conf); if (committer instanceof FileOutputCommitter) { workPath = ((FileOutputCommitter) committer).getWorkPath(); } else { Path outputPath = super.getOutputPath(conf); if (outputPath == null) { throw new IOException("Undefined job output-path"); } workPath = outputPath; } return workPath; } /**通过 key, value, conf 来确定输出文件名(含扩展名)*/ protected abstract String generateFileNameForKeyValue(K key, V value, Configuration conf); public class MultiRecordWriter extends RecordWriter { /** RecordWriter 的缓存 */ private HashMap > recordWriters = null; private TaskAttemptContext job = null; /** 输出目录 */ private Path workPath = null; public MultiRecordWriter(TaskAttemptContext job, Path workPath) { super(); this.job = job; this.workPath = workPath; recordWriters = new HashMap >(); } @Override public void write(K key, V value) throws IOException, InterruptedException { // 得到输出文件名 String baseName = generateFileNameForKeyValue(key, value, job.getConfiguration()); RecordWriter rw = this.recordWriters.get(baseName); if (rw == null) { rw = getBaseRecordWriter(job, baseName); this.recordWriters.put(baseName, rw); } rw.write(key, value); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { Iterator > values = this.recordWriters.values().iterator(); while (values.hasNext()) { values.next().close(context); } this.recordWriters.clear(); } private RecordWriter getBaseRecordWriter(TaskAttemptContext job, String baseName) throws IOException, InterruptedException { Configuration conf = job.getConfiguration(); boolean isCompressed = getCompressOutput(job); String keyValueSeparator = ","; RecordWriter recordWriter = null; if (isCompressed) { Class codecClass = getOutputCompressorClass( job, GzipCodec.class); CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf); Path file = new Path(workPath, baseName + codec.getDefaultExtension()); FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false); recordWriter = new LineRecordWriter (new DataOutputStream( codec.createOutputStream(fileOut)), keyValueSeparator); } else { Path file = new Path(workPath, baseName); FSDataOutputStream fileOut = file.getFileSystem(conf).create( file, false); recordWriter = new LineRecordWriter (fileOut, keyValueSeparator); } return recordWriter; } } }
package com.zxw.hadoop.driver;import java.io.IOException;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import com.zxw.hadoop.mapreduce.lib.output.MultipleOutputFormat;public class WordCount { public static class TokenizerMapper extends Mapper
3. 使用 MultipleOutputs
MultipleOutputs 是在 job 指定的 output 输出的基础上,新增加一些额外的输出,与 MulitpleOutputFormat 相比,它才是真正意义上的多文件输出。
package com.zxw.hadoop.driver;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class TestwithMultipleOutputs extends Configured implements Tool { public static class MapClass extends Mapper{ private MultipleOutputs mos; protected void setup(Context context) throws IOException,InterruptedException { this.mos = new MultipleOutputs (context); } public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{ String line = value.toString(); String[] tokens = line.split("-"); //1 => MOSInt-m-00000 mos.write("MOSInt",new Text(tokens[0]), new IntWritable(Integer.parseInt(tokens[1]))); //2 => MOSText-m-00000 mos.write("MOSText", new Text(tokens[0]),new Text(tokens[2])); //3 => tokens[0]/-m-00000 mos.write("MOSText", new Text(tokens[0]),new Text(line),tokens[0]+"/"); } protected void cleanup(Context context) throws IOException,InterruptedException { mos.close(); } } public int run(String[] args) throws Exception { Configuration conf = getConf(); Job job = new Job(conf,"word count with MultipleOutputs"); job.setJarByClass(TestwithMultipleOutputs.class); Path in = new Path(args[0]); Path out = new Path(args[1]); FileInputFormat.setInputPaths(job, in); FileOutputFormat.setOutputPath(job, out); job.setMapperClass(MapClass.class); job.setNumReduceTasks(0); MultipleOutputs.addNamedOutput(job,"MOSInt",TextOutputFormat.class,Text.class,IntWritable.class); MultipleOutputs.addNamedOutput(job,"MOSText",TextOutputFormat.class,Text.class,Text.class); System.exit(job.waitForCompletion(true)?0:1); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new TestwithMultipleOutputs(), args); System.exit(res); }}
如果不需要输出 output,只需要 namedOutput,可以在 Job 定义时设置 OutputFormat 格式为 NullOutputFormat,并去掉 reduce 的output.collect 方法,只保留 namenode 的 outputcollector 即可。
前面介绍了 MultipleOutputs 在 reducer 中的应用,其实它也可以应用在 mapper 过程中,具体方法与之类似,但是在 mapper 的输出中,只有 output 输出被发送到 reducer 阶段,作为 reducer 的输入,namedOutput 不会参与。4. 我把它称为 Writable 方法
这个方法我在《hadoop 开发者第二期》中“Nutch 中 mapreduce 应用的几个特殊点”中提到过,即 Nutch 的 FetcherOutputFormat 展示给我们的方法。
FetcherOutputFormat 的要求是在 reducer 之后分开存储 3 种不同的数据结构:Content, CrawlDatum 即 ParseImpl,但传统的 MapReduce 的job 输出不能实现这个要求,所以就借用了 NutchWritable 这个统一的对象,以此来囊括前面 3 个对象:
public class NutchWritable extends GenericWritableConfigurable {private static Class [] CLASSES = null;static {CLASSES = (Class []) new Class[] {org.apache.hadoop.io.NullWritable.class,……org.apache.nutch.crawl.CrawlDatum.class,……org.apache.nutch.parse.ParseImpl.class,……};}public NutchWritable() { }public NutchWritable(Writable instance) {set(instance);}}
在真正的 RecordWriter 写数据的时候,在将上述封装的NutchWritable 还原成封装之前的 Content, CrawlDatum, ParseImpl 对象,并根据对象的不同写到不同的文件中去,从而实现多文件的写入。
public void write(Text key, NutchWritable value)throws IOException {Writable w = value.get();if (w instanceof CrawlDatum)fetchOut.append(key, w);else if (w instanceof Content)contentOut.append(key, w);else if (w instanceof Parse)parseOut.write(key, (Parse)w);}
由于本文前面举的例子中的输入数据结构单一,Writable 方法不太适合,所以就没有实验结果。
最后我们借鉴《Hadoop, the Definite Guide》中的一个表格区分 3种方法的不同MultipleOutputFormat | MultipleOuputs | Writable | |
可灵活设置输出文件名称 | yes | no | yes |
输出类型可不一致 | no | yes | yes |
可以用在 mapper 或reducer 过程中 | no | yes | no |
OutputFormat 有各种格式 | No,除了TextOutputFormat 及SequenceOutputFormat,其他的需要自定义 | yes | yes |
每个 record 都有多个输出 | No, 实际上是对record 的分割 | yes | no |