博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
mapreduce开发 -- 多文件输出
阅读量:6786 次
发布时间:2019-06-26

本文共 17114 字,大约阅读时间需要 57 分钟。

多种方式实现

1. 实现MulitpleOutputFormat类(旧API),如MultipleTextOutputFormat 和 MultpleSequenceFileOutputFormat 是它的两个具体实现。

通过自己实现MulitpleOutputFormat类,重载 generateFileNameForKeyValue 方法,达到目的。

2. 由于MulitpleOutputFormat类(旧API)被标记为过时,且新的API中不包含这个类,所以自己参考MulitpleOutputFormat实现一个新的MulitpleOutputFormat类。

3. 使用 MultipleOutputs(新旧API)

4. 我把它称为 Writable 方法(引用)

注:旧API包含MulitpleOutputFormat 和 MultipleOutputs;新API只包含MultipleOutputs,但新MultipleOutputs类包含了旧API两个类的功能;

  新MultipleOutputs类也不具备可灵活设置输出文件名称的特性,所以实现一个新的MulitpleOutputFormat类;

 

具体实现示例

1. 实现MulitpleOutputFormat类,如MultipleTextOutputFormat 和 MultpleSequenceFileOutputFormat 是它的两个具体实现。

通过自己实现MulitpleOutputFormat类,重载 generateFileNameForKeyValue 方法,达到目的。

package com.zxw.hadoop.driver;import java.io.IOException;import java.util.Iterator;import java.util.StringTokenizer;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.FileInputFormat;import org.apache.hadoop.mapred.FileOutputFormat;import org.apache.hadoop.mapred.JobClient;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.MapReduceBase;import org.apache.hadoop.mapred.Mapper;import org.apache.hadoop.mapred.OutputCollector;import org.apache.hadoop.mapred.RecordWriter;import org.apache.hadoop.mapred.Reducer;import org.apache.hadoop.mapred.Reporter;import org.apache.hadoop.mapred.RunningJob;import org.apache.hadoop.mapred.TextOutputFormat;import org.apache.hadoop.mapred.lib.MultipleOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import org.apache.hadoop.util.Progressable;public class WordCount_Old {	public static class TokenizerMapper extends MapReduceBase implements			Mapper
{ private final static IntWritable count = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector
output, Reporter reporter) throws IOException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); output.collect(word, count); } } } public static class IntSumReducer extends MapReduceBase implements Reducer
{ private IntWritable result = new IntWritable(); public void reduce(Text key, Iterator
values, OutputCollector
output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } result.set(sum); output.collect(key, result); } } public static class WordCountOutputFormat extends MultipleOutputFormat
{ private TextOutputFormat
output = null; @Override protected RecordWriter
getBaseRecordWriter( FileSystem fs, JobConf job, String name, Progressable progressable) throws IOException { if (output == null) { output = new TextOutputFormat
(); } return output.getRecordWriter(fs, job, name, progressable); } @Override protected String generateFileNameForKeyValue(Text key, IntWritable value, String name) { char c = key.toString().toLowerCase().charAt(0); if (c >= 'a' && c <= 'z') { return c + ".txt"; } return "result.txt"; } } public static void main(String[] args) throws Exception { JobConf job = new JobConf(WordCount_Old.class); job.setJobName("WordCount_Old"); String[] otherArgs = new GenericOptionsParser(job, args) .getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount
"); System.exit(2); } job.setJarByClass(WordCount_Old.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setOutputFormat(WordCountOutputFormat.class);// 设置输出格式 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); RunningJob runningJob = JobClient.runJob(job); runningJob.waitForCompletion(); }}

 

2. 由于MulitpleOutputFormat类被标记为过时,且新的API中不包含这个类,所以自己参考MulitpleOutputFormat实现一个新的MulitpleOutputFormat类。

代码说明:定义了LineRecordWriter,MultipleOutputFormat两个类,采用wordcount示例做测试;

package com.zxw.hadoop.mapreduce.lib.output;import java.io.DataOutputStream;import java.io.IOException;import java.io.UnsupportedEncodingException;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;/** * 本来属于TextOutputFormat内部类,现在抽取出来,供其他类使用 * @author connor * * @param 
* @param
*/public class LineRecordWriter
extends RecordWriter
{ private static final String utf8 = "UTF-8"; private static final byte[] newline; protected DataOutputStream out; private final byte[] keyValueSeparator; static { try { newline = "\n".getBytes(utf8); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } public LineRecordWriter(DataOutputStream out, String keyValueSeparator) { this.out = out; try { this.keyValueSeparator = keyValueSeparator.getBytes(utf8); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } public LineRecordWriter(DataOutputStream out) { this(out, "\t"); } /** * Write the object to the byte stream, handling Text as a special case. * @param o the object to print * @throws IOException if the write throws, we pass it on */ private void writeObject(Object o) throws IOException { if (o instanceof Text) { Text to = (Text) o; out.write(to.getBytes(), 0, to.getLength()); } else { out.write(o.toString().getBytes(utf8)); } } public synchronized void write(K key, V value) throws IOException { boolean nullKey = key == null || key instanceof NullWritable; boolean nullValue = value == null || value instanceof NullWritable; if (nullKey && nullValue) { return; } if (!nullKey) { writeObject(key); } if (!(nullKey || nullValue)) { out.write(keyValueSeparator); } if (!nullValue) { writeObject(value); } out.write(newline); } public synchronized void close(TaskAttemptContext context) throws IOException { out.close(); }}

 

package com.zxw.hadoop.mapreduce.lib.output;import java.io.DataOutputStream;import java.io.IOException;import java.util.HashMap;import java.util.Iterator;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.io.compress.GzipCodec;import org.apache.hadoop.mapreduce.OutputCommitter;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.ReflectionUtils;public abstract class MultipleOutputFormat
extends FileOutputFormat
{ private MultiRecordWriter writer = null; @Override public RecordWriter
getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { if (writer == null) { writer = new MultiRecordWriter(job, getTaskOutputPath(job)); } return writer; } private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException { Path workPath = null; OutputCommitter committer = super.getOutputCommitter(conf); if (committer instanceof FileOutputCommitter) { workPath = ((FileOutputCommitter) committer).getWorkPath(); } else { Path outputPath = super.getOutputPath(conf); if (outputPath == null) { throw new IOException("Undefined job output-path"); } workPath = outputPath; } return workPath; } /**通过 key, value, conf 来确定输出文件名(含扩展名)*/ protected abstract String generateFileNameForKeyValue(K key, V value, Configuration conf); public class MultiRecordWriter extends RecordWriter
{ /** RecordWriter 的缓存 */ private HashMap
> recordWriters = null; private TaskAttemptContext job = null; /** 输出目录 */ private Path workPath = null; public MultiRecordWriter(TaskAttemptContext job, Path workPath) { super(); this.job = job; this.workPath = workPath; recordWriters = new HashMap
>(); } @Override public void write(K key, V value) throws IOException, InterruptedException { // 得到输出文件名 String baseName = generateFileNameForKeyValue(key, value, job.getConfiguration()); RecordWriter
rw = this.recordWriters.get(baseName); if (rw == null) { rw = getBaseRecordWriter(job, baseName); this.recordWriters.put(baseName, rw); } rw.write(key, value); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { Iterator
> values = this.recordWriters.values().iterator(); while (values.hasNext()) { values.next().close(context); } this.recordWriters.clear(); } private RecordWriter
getBaseRecordWriter(TaskAttemptContext job, String baseName) throws IOException, InterruptedException { Configuration conf = job.getConfiguration(); boolean isCompressed = getCompressOutput(job); String keyValueSeparator = ","; RecordWriter
recordWriter = null; if (isCompressed) { Class
codecClass = getOutputCompressorClass( job, GzipCodec.class); CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf); Path file = new Path(workPath, baseName + codec.getDefaultExtension()); FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false); recordWriter = new LineRecordWriter
(new DataOutputStream( codec.createOutputStream(fileOut)), keyValueSeparator); } else { Path file = new Path(workPath, baseName); FSDataOutputStream fileOut = file.getFileSystem(conf).create( file, false); recordWriter = new LineRecordWriter
(fileOut, keyValueSeparator); } return recordWriter; } } }

 

package com.zxw.hadoop.driver;import java.io.IOException;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import com.zxw.hadoop.mapreduce.lib.output.MultipleOutputFormat;public class WordCount {	public static class TokenizerMapper extends			Mapper
{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer
{ private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static class AlphabetOutputFormat extends MultipleOutputFormat
{ @Override protected String generateFileNameForKeyValue(Text key, IntWritable value, Configuration conf) { char c = key.toString().toLowerCase().charAt(0); if (c >= 'a' && c <= 'z') { return c + ".txt"; } return "other.txt"; } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount
"); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setOutputFormatClass(AlphabetOutputFormat.class);// 设置输出格式 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }}

 

3. 使用 MultipleOutputs

 MultipleOutputs 是在 job 指定的 output 输出的基础上,新增加一些额外的输出,与 MulitpleOutputFormat 相比,它才是真正意义上的多文件输出。

package com.zxw.hadoop.driver;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class TestwithMultipleOutputs extends Configured implements Tool {	public static class MapClass extends Mapper
{ private MultipleOutputs
mos; protected void setup(Context context) throws IOException,InterruptedException { this.mos = new MultipleOutputs
(context); } public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{ String line = value.toString(); String[] tokens = line.split("-"); //1 => MOSInt-m-00000 mos.write("MOSInt",new Text(tokens[0]), new IntWritable(Integer.parseInt(tokens[1]))); //2 => MOSText-m-00000 mos.write("MOSText", new Text(tokens[0]),new Text(tokens[2])); //3 => tokens[0]/-m-00000 mos.write("MOSText", new Text(tokens[0]),new Text(line),tokens[0]+"/"); } protected void cleanup(Context context) throws IOException,InterruptedException { mos.close(); } } public int run(String[] args) throws Exception { Configuration conf = getConf(); Job job = new Job(conf,"word count with MultipleOutputs"); job.setJarByClass(TestwithMultipleOutputs.class); Path in = new Path(args[0]); Path out = new Path(args[1]); FileInputFormat.setInputPaths(job, in); FileOutputFormat.setOutputPath(job, out); job.setMapperClass(MapClass.class); job.setNumReduceTasks(0); MultipleOutputs.addNamedOutput(job,"MOSInt",TextOutputFormat.class,Text.class,IntWritable.class); MultipleOutputs.addNamedOutput(job,"MOSText",TextOutputFormat.class,Text.class,Text.class); System.exit(job.waitForCompletion(true)?0:1); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new TestwithMultipleOutputs(), args); System.exit(res); }}

 如果不需要输出 output,只需要 namedOutput,可以在 Job 定义时设置 OutputFormat 格式为 NullOutputFormat,并去掉 reduce 的output.collect 方法,只保留 namenode 的 outputcollector 即可。

前面介绍了 MultipleOutputs 在 reducer 中的应用,其实它也可以应用在 mapper 过程中,具体方法与之类似,但是在 mapper 的输出中,只有 output 输出被发送到 reducer 阶段,作为 reducer 的输入,namedOutput 不会参与。

 4. 我把它称为 Writable 方法

 这个方法我在《hadoop 开发者第二期》中“Nutch 中 mapreduce 应用的几个特殊点”中提到过,即 Nutch 的 FetcherOutputFormat 展示给我们的方法。

FetcherOutputFormat 的要求是在 reducer 之后分开存储 3 种不同的数据结构:Content, CrawlDatum 即 ParseImpl,但传统的 MapReduce 的job 输出不能实现这个要求,所以就借用了 NutchWritable 这个统一的对象,以此来囊括前面 3 个对象:

 

public class NutchWritable extends GenericWritableConfigurable {private static Class
[] CLASSES = null;static {CLASSES = (Class
[]) new Class[] {org.apache.hadoop.io.NullWritable.class,……org.apache.nutch.crawl.CrawlDatum.class,……org.apache.nutch.parse.ParseImpl.class,……};}public NutchWritable() { }public NutchWritable(Writable instance) {set(instance);}}

 在真正的 RecordWriter 写数据的时候,在将上述封装的NutchWritable 还原成封装之前的 Content, CrawlDatum, ParseImpl 对象,并根据对象的不同写到不同的文件中去,从而实现多文件的写入。

 

public void write(Text key, NutchWritable value)throws IOException {Writable w = value.get();if (w instanceof CrawlDatum)fetchOut.append(key, w);else if (w instanceof Content)contentOut.append(key, w);else if (w instanceof Parse)parseOut.write(key, (Parse)w);}

 由于本文前面举的例子中的输入数据结构单一,Writable 方法不太适合,所以就没有实验结果。

最后我们借鉴《Hadoop, the Definite Guide》中的一个表格区分 3种方法的不同

  MultipleOutputFormat MultipleOuputs Writable
可灵活设置输出文件名称 yes no yes
输出类型可不一致 no yes yes
可以用在 mapper 或reducer 过程中 no yes no
OutputFormat 有各种格式 No,除了TextOutputFormat 及
SequenceOutputFormat,
其他的需要自定义
yes yes
每个 record 都有多个输出 No, 实际上是对record 的分割 yes no

 

 

转载于:https://www.cnblogs.com/coptimt/p/3591602.html

你可能感兴趣的文章
Win7下chm文件无法打开问题解决方法
查看>>
DDOS***类型以及iptables防范ddos脚本
查看>>
我的友情链接
查看>>
基于MVC+EasyUI的Web开发框架经验总结(9)--在Datagrid里面实现外键字段的转义操作...
查看>>
总结:Linux磁盘分区管理
查看>>
Ext.form.field.CheckBox复选框和Ext.form.field.Radio单选框
查看>>
JNI 实现 Broadcast
查看>>
eclipse 快捷键
查看>>
基础命令学习
查看>>
loading图标
查看>>
sql Left right join 多表 注意表的连接顺序
查看>>
HTML5与CSS3基础教程第八版学习笔记11~15章
查看>>
Redis -- 过期时间 和 缓存 例子
查看>>
babel7-按需加载polyfill
查看>>
Android 权限设置大全1
查看>>
Android eclipse中程序调试
查看>>
博客园博客兼容手机浏览
查看>>
第7题——买苹果
查看>>
disruptor架构四 多生产者多消费者执行
查看>>
C# - 什么是事件绑定?
查看>>