MapReduce的输出格式是怎样的
MapReduce的输出格式是怎样的
本篇内容主要讲解“MapReduce的输出格式是怎样的”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“MapReduce的输出格式是怎样的”吧!
MapReduce的输出格式
Hadoop 都有相应的输出格式。默认情况下只有一个 Reduce,输出只有一个文件,默认文件名为 part-r-00000,输出文件的个数与 Reduce 的个数一致。 如果有两个Reduce,输出结果就有两个文件,第一个为part-r-00000,第二个为part-r-00001,依次类推。
OutputFormat 接口
OutputFormat主要用于描述输出数据的格式,它能够将用户提供的key/value对写入特定格式的文件中。 通过OutputFormat 接口,实现具体的输出格式,过程有些复杂也没有这个必要。Hadoop 自带了很多 OutputFormat 的实现,它们与InputFormat实现相对应,足够满足我们业务的需要。 OutputFormat 类的层次结构如下图所示。
OutputFormat 是 MapReduce 输出的基类,所有实现 MapReduce 输出都实现了 OutputFormat 接口。 我们可以把这些实现接口类分为以下几种类型,分别一一介绍。
文本输出
默认的输出格式是 TextOutputFormat,它把每条记录写为文本行。它的键和值可以是任意类型,因为 TextOutputFormat 调用 toString() 方法把它们转换为字符串。 每个键/值对由制表符进行分割,当然也可以设定 mapreduce.output.textoutputformat.separator 属性(旧版本 API 中的 mapred.textoutputformat.separator)改变默认的分隔符。 与 TextOutputFormat 对应的输入格式是 KeyValueTextInputFormat,它通过可配置的分隔符将键/值对文本分割。
可以使用 NullWritable 来省略输出的键或值(或两者都省略,相当于 NullOutputFormat 输出格式,后者什么也不输出)。 这也会导致无分隔符输出,以使输出适合用 TextInputFormat 读取。
二进制输出
1、关于SequenceFileOutputFormat
顾名思义,SequenceFileOutputFormat 将它的输出写为一个顺序文件。如果输出需要作为后续 MapReduce 任务的输入,这便是一种好的输出格式, 因为它的格式紧凑,很容易被压缩。
2、关于SequenceFileAsBinaryOutputFormat
SequenceFileAsBinaryOutputFormat 把键/值对作为二进制格式写到一个 SequenceFile 容器中。
3、关于MapFileOutputFormat
MapFileOutputFormat 把 MapFile 作为输出。MapFile 中的键必须顺序添加,所以必须确保 reducer 输出的键已经排好序。
多个输出
上面我们提到,默认情况下只有一个 Reduce,输出只有一个文件。有时可能需要对输出的文件名进行控制或让每个 reducer 输出多个文件。 我们有两种方式实现reducer输出多个文件。
1、Partitioner
我们考虑这样一个需求:按学生的年龄段,将数据输出到不同的文件路径下。这里我们分为三个年龄段:小于等于20岁、大于20岁小于等于50岁和大于50岁。
我们采用的方法是每个年龄段对应一个 reducer。为此,我们需要通过以下两步实现。
第一步:把作业的 reducer 数设为年龄段数即为3。
job.setPartitionerClass(PCPartitioner.class);//设置Partitioner类job.setNumReduceTasks(3);//reduce个数设置为3
第二步:写一个 Partitioner,把同一个年龄段的数据放到同一个分区。
publicstaticclassPCPartitionerextendsPartitioner<Text,Text>{@OverridepublicintgetPartition(Textkey,Textvalue,intnumReduceTasks){//TODOAuto-generatedmethodstubString[]nameAgeScore=value.toString().split("\t");Stringage=nameAgeScore[1];//学生年龄intageInt=Integer.parseInt(age);//按年龄段分区//默认指定分区0if(numReduceTasks==0)return0;//年龄小于等于20,指定分区0if(ageInt<=20){return0;}//年龄大于20,小于等于50,指定分区1if(ageInt>20&&ageInt<=50){return1%numReduceTasks;}//剩余年龄,指定分区2elsereturn2%numReduceTasks;}}
这种方法实现多文件输出,也只能满足此种需求。很多情况下是无法实现的,因为这样做存在两个缺点。
第一,需要在作业运行之前需要知道分区数和年龄段的个数,如果分区数很大或者未知,就无法操作。
第二,一般来说,让应用程序来严格限定分区数并不好,因为可能导致分区数少或分区不均。
2、MultipleOutputs 类
MultipleOutputs 类可以将数据写到多个文件,这些文件的名称源于输出的键和值或者任意字符串。这允许每个 reducer(或者只有 map 作业的 mapper)创建多个文件。 采用name-m-nnnnn 形式的文件名用于 map 输出,name-r-nnnnn 形式的文件名用于 reduce 输出,其中 name 是由程序设定的任意名字, nnnnn 是一个指明块号的整数(从 0 开始)。块号保证从不同块(mapper 或 reducer)写的输出在相同名字情况下不会冲突。
假如这里有一份邮箱数据文件,我们期望统计邮箱出现次数并按照邮箱的类别,将这些邮箱分别输出到不同文件路径下。数据集示例如下所示。
wolys@21cn.comzss1984@126.com294522652@qq.comsimulateboy@163.comzhoushigang_123@163.comsirenxing424@126.comlixinyu23@qq.comchenlei1201@gmail.com370433835@qq.comcxx0409@126.comviv093@sina.comq62148830@163.com65993266@qq.comsummeredison@sohu.comzhangbao-autumn@163.comdiduo_007@yahoo.com.cnfxh852@163.com
下面我们编写 MapReduce 程序,实现上述业务需求。
importjava.io.IOException;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.conf.Configured;importorg.apache.hadoop.fs.FileSystem;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;importorg.apache.hadoop.mapreduce.lib.output.MultipleOutputs;importorg.apache.hadoop.util.Tool;importorg.apache.hadoop.util.ToolRunner;publicclassEmailextendsConfiguredimplementsTool{publicstaticclassMailMapperextendsMapper<LongWritable,Text,Text,IntWritable>{privatefinalstaticIntWritableone=newIntWritable(1);@Overrideprotectedvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{context.write(value,one);}}publicstaticclassMailReducerextendsReducer<Text,IntWritable,Text,IntWritable>{privateIntWritableresult=newIntWritable();privateMultipleOutputs<Text,IntWritable>multipleOutputs;@Overrideprotectedvoidsetup(Contextcontext)throwsIOException,InterruptedException{multipleOutputs=newMultipleOutputs<Text,IntWritable>(context);}protectedvoidreduce(TextKey,Iterable<IntWritable>Values,Contextcontext)throwsIOException,InterruptedException{intbegin=Key.toString().indexOf("@");intend=Key.toString().indexOf(".");if(begin>=end){return;}//获取邮箱类别,比如qqStringname=Key.toString().substring(begin+1,end);intsum=0;for(IntWritablevalue:Values){sum+=value.get();}result.set(sum);multipleOutputs.write(Key,result,name);}@Overrideprotectedvoidcleanup(Contextcontext)throwsIOException,InterruptedException{multipleOutputs.close();}}@Overridepublicintrun(String[]args)throwsException{Configurationconf=newConfiguration();//读取配置文件Pathmypath=newPath(args[1]);FileSystemhdfs=mypath.getFileSystem(conf);//创建输出路径if(hdfs.isDirectory(mypath)){hdfs.delete(mypath,true);}Jobjob=Job.getInstance();//新建一个任务job.setJarByClass(Email.class);//主类FileInputFormat.addInputPath(job,newPath(args[0]));//输入路径FileOutputFormat.setOutputPath(job,newPath(args[1]));//输出路径job.setMapperClass(MailMapper.class);//Mapperjob.setReducerClass(MailReducer.class);//Reducerjob.setOutputKeyClass(Text.class);//key输出类型job.setOutputValueClass(IntWritable.class);//value输出类型job.waitForCompletion(true);return0;}publicstaticvoidmain(String[]args)throwsException{String[]args0={"hdfs://single.hadoop.dajiangtai.com:9000/junior/mail.txt","hdfs://single.hadoop.dajiangtai.com:9000/junior/mail-out/"};intec=ToolRunner.run(newConfiguration(),newEmail(),args0);System.exit(ec);}}
在 reducer 中,在 setup() 方法中构造一个 MultipleOutputs 的实例并将它赋给一个实例变量。在 reduce() 方法中使用 MultipleOutputs 实例来写输出, 而不是 context 。write() 方法作用于键、值、和名字。
程序运行之后,输出文件的命名如下所示。
/mail-out/163-r-00000/mail-out/126-r-00000/mail-out/21cn-r-00000/mail-out/gmail-r-00000/mail-out/qq-r-00000/mail-out/sina-r-00000/mail-out/sohu-r-00000/mail-out/yahoo-r-00000/mail-out/part-r-00000
在 MultipleOutputs 的 write() 方法中指定的基本路径相当于输出路径进行解释,因为它可以包含文件路径分隔符(/), 创建任意深度的子目录是有可能的。
数据库输出
DBOutputFormat 适用于将作业输出数据(中等规模的数据)转存到Mysql、Oracle等数据库。
到此,相信大家对“MapReduce的输出格式是怎样的”有了更深的了解,不妨来实际操作一番吧!这里是恰卡编程网网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!