MapReduce案例11——影评分析5(求特定年份最好看的10部电影)

2019-04-14 21:13发布

题目:现有如此三份数据: 1、users.dat 数据格式为: 2::M::56::16::70072 对应字段为:UserID BigInt, Gender String, Age Int, Occupation String, Zipcode String 对应字段中文解释:用户id,性别,年龄,职业,邮政编码 2、movies.dat 数据格式为: 2::Jumanji (1995)::Adventure|Children's|Fantasy 对应字段为:MovieID BigInt, Title String, Genres String 对应字段中文解释:电影ID,电影名字,电影类型 3、ratings.dat 数据格式为: 1::1193::5::978300760 对应字段为:UserID BigInt, MovieID BigInt, Rating Double, Timestamped String 对应字段中文解释:用户ID,电影ID,评分,评分时间戳 用户ID,电影ID,评分,评分时间戳,性别,年龄,职业,邮政编码,电影名字,电影类型 userid, movieId, rate, ts, gender, age, occupation, zipcode, movieName, movieType (5)求好片(评分>=4.0)最多的那个年份的最好看的10部电影思路:分四步,先求好看的电影,第二步降序选择最好的对应的年份,通过年份求好看电影,然后降序求出前10.较为简单,直接上主体代码:/** * @author: lpj * @date: 2018年3月16日 下午7:16:47 * @Description: */ package lpj.filmCritic; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.time.Year; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob; import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import lpj.filmBean.GoodMoiveGroup; import lpj.filmBean.GoodMoiveGroup2; import lpj.filmBean.GoodMovieBean; import lpj.filmBean.GoodMovieBean2; /** * */ public class GoodMoiveMR { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); //--------------------------------- FileSystem fs = FileSystem.get(conf);//默认使用本地 Job job = Job.getInstance(conf); job.setJarByClass(GoodMoiveMR.class); job.setMapperClass(GoodMoiveMR_Mapper.class); job.setReducerClass(GoodMoiveMR_Reducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); Path inputPath = new Path("/a/totalFilmInfos.txt"); Path outputPath = new Path("/a/homework11_5_1"); if (fs.exists(outputPath)) { fs.delete(outputPath, true); } FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); //---------------------------------------- FileSystem fs2 = FileSystem.get(conf);//默认使用本地 Job job2 = Job.getInstance(conf); job2.setJarByClass(GoodMoiveMR.class); job2.setMapperClass(GoodMoiveMR2_Mapper.class); job2.setReducerClass(GoodMoiveMR2_Reducer.class); job2.setOutputKeyClass(GoodMovieBean.class); job2.setOutputValueClass(NullWritable.class); job2.setGroupingComparatorClass(GoodMoiveGroup.class); Path inputPath2 = new Path("/a/homework11_5_1"); Path outputPath2 = new Path("/a/homework11_5_2"); if (fs2.exists(outputPath2)) { fs2.delete(outputPath2, true); } FileInputFormat.setInputPaths(job2, inputPath2); FileOutputFormat.setOutputPath(job2, outputPath2); //--------------------------------- FileSystem fs3 = FileSystem.get(conf);//默认使用本地 Job job3 = Job.getInstance(conf); job3.setJarByClass(GoodMoiveMR.class); job3.setMapperClass(GoodMoiveMR3_Mapper.class); job3.setReducerClass(GoodMoiveMR3_Reducer.class); job3.setMapOutputKeyClass(Text.class); job3.setMapOutputValueClass(Text.class); job3.setOutputKeyClass(Text.class); job3.setOutputValueClass(Text.class); URI uri = new URI("/a/homework11_5_2/part-r-00000"); job3.addCacheFile(uri); Path inputPath3 = new Path("/a/totalFilmInfos.txt"); Path outputPath3 = new Path("/a/homework11_5_3"); if (fs3.exists(outputPath3)) { fs3.delete(outputPath3, true); } FileInputFormat.setInputPaths(job3, inputPath3); FileOutputFormat.setOutputPath(job3, outputPath3); //---------------------------------------- FileSystem fs4 = FileSystem.get(conf);//默认使用本地 Job job4 = Job.getInstance(conf); job4.setJarByClass(GoodMoiveMR.class); job4.setMapperClass(GoodMoiveMR4_Mapper.class); job4.setReducerClass(GoodMoiveMR4_Reducer.class); job4.setOutputKeyClass(GoodMovieBean2.class); job4.setOutputValueClass(NullWritable.class); job4.setGroupingComparatorClass(GoodMoiveGroup2.class); Path inputPath4 = new Path("/a/homework11_5_3"); Path outputPath4 = new Path("/a/homework11_5_4"); if (fs4.exists(outputPath4)) { fs4.delete(outputPath4, true); } FileInputFormat.setInputPaths(job4, inputPath4); FileOutputFormat.setOutputPath(job4, outputPath4); //------------------------- ControlledJob aJob = new ControlledJob(job.getConfiguration()); ControlledJob bJob = new ControlledJob(job2.getConfiguration()); ControlledJob cJob = new ControlledJob(job3.getConfiguration()); ControlledJob dJob = new ControlledJob(job4.getConfiguration()); aJob.setJob(job); bJob.setJob(job2); cJob.setJob(job3); dJob.setJob(job4); JobControl jc = new JobControl("jc"); jc.addJob(aJob); jc.addJob(bJob); jc.addJob(cJob); jc.addJob(dJob); bJob.addDependingJob(aJob); cJob.addDependingJob(bJob); dJob.addDependingJob(cJob); Thread thread = new Thread(jc); thread.start(); while(!jc.allFinished()){ thread.sleep(1000); } jc.stop(); } public static class GoodMoiveMR_Mapper extends Mapper{ Text kout = new Text(); Text valueout = new Text(); @Override protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException { String [] reads = value.toString().trim().split("::"); //用户ID,电影ID,评分,评分时间戳,性别,年龄,职业,邮政编码,电影名字,电影类型 //userid, movieId, rate, ts, gender, age, occupation, zipcode, movieName, movieType //(5)求好片(评分>=4.0)最多的 那个年份(电影年份)的最好看的10部电影 year ratenum movieid String moivename = reads[8]; String year = moivename.substring(moivename.length() - 5, moivename.length() - 1); int rate = Integer.parseInt(reads[2]); if (rate >= 4) { String kk = year; String vv = rate + ""; kout.set(kk); valueout.set(vv); context.write(kout, valueout); } } } public static class GoodMoiveMR_Reducer extends Reducer{ Text kout = new Text(); Text valueout = new Text(); @Override protected void reduce(Text key, Iterable values, Context context)throws IOException, InterruptedException { int count = 0; for(Text text : values){ count++; } String vv = count + ""; valueout.set(vv); context.write(key, valueout); } } //---------------------求年份---------------------------------- public static class GoodMoiveMR2_Mapper extends Mapper{ Text kout = new Text(); Text valueout = new Text(); GoodMovieBean gm = new GoodMovieBean(); @Override protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException { String [] reads = value.toString().trim().split(" "); String year = reads[0]; int num = Integer.parseInt(reads[1]); gm.setYear(year); gm.setNum(num); context.write(gm, NullWritable.get()); } } public static class GoodMoiveMR2_Reducer extends Reducer{ Text kout = new Text(); Text valueout = new Text(); @Override protected void reduce(GoodMovieBean key, Iterable values, Context context)throws IOException, InterruptedException { int count = 0; for(NullWritable text : values){ count++; if (count <= 1) { context.write(key, NullWritable.get()); }else { return; } } } } //--------------------------求电影 public static class GoodMoiveMR3_Mapper extends Mapper{ Text kout = new Text(); Text valueout = new Text(); private static String goodmovieyear = ""; @SuppressWarnings("deprecation") @Override protected void setup(Context context)throws IOException, InterruptedException { Path[] paths = context.getLocalCacheFiles(); String str = paths[0].toUri().toString(); BufferedReader bf = new BufferedReader(new FileReader(new File(str))); String readline = null; while((readline = bf.readLine()) != null){ goodmovieyear = readline.split(" ")[0]; } IOUtils.closeStream(bf); } @Override protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException { String [] reads = value.toString().trim().split("::"); //用户ID,电影ID,评分,评分时间戳,性别,年龄,职业,邮政编码,电影名字,电影类型 //userid, movieId, rate, ts, gender, age, occupation, zipcode, movieName, movieType //(5)求好片(评分>=4.0)最多的 那个年份(电影年份)的最好看的10部电影 year ratenum moviename String moivename = reads[8]; String year = moivename.substring(moivename.length() - 5, moivename.length() - 1); int rate = Integer.parseInt(reads[2]); if (rate >= 4 && goodmovieyear.equals(year)) { String kk = year + " " + moivename; String vv = rate + ""; kout.set(kk); valueout.set(vv); context.write(kout, valueout); } } } public static class GoodMoiveMR3_Reducer extends Reducer{ Text kout = new Text(); Text valueout = new Text(); @Override protected void reduce(Text key, Iterable values, Context context)throws IOException, InterruptedException { int count = 0; for(Text text : values){ count++; } String vv = count + ""; valueout.set(vv); context.write(key, valueout); } } //---------------------好看电影前10 public static class GoodMoiveMR4_Mapper extends Mapper{ Text kout = new Text(); Text valueout = new Text(); GoodMovieBean2 gm = new GoodMovieBean2(); @Override protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException { String [] reads = value.toString().trim().split(" "); String year = reads[0]; String name = reads[1]; int num = Integer.parseInt(reads[2]); gm.setYear(year); gm.setName(name); gm.setNum(num); context.write(gm, NullWritable.get()); } } public static class GoodMoiveMR4_Reducer extends Reducer{ Text kout = new Text(); Text valueout = new Text(); @Override protected void reduce(GoodMovieBean2 key, Iterable values, Context context)throws IOException, InterruptedException { int count = 0; for(NullWritable text : values){ count++; if (count <= 10) { context.write(key, NullWritable.get()); }else { return; } } } } }结果:1999 American Beauty (1999) 2853 1999 Matrix, The (1999) 2171 1999 Sixth Sense, The (1999) 2163 1999 Being John Malkovich (1999) 1759 1999 Toy Story 2 (1999) 1302 1999 Galaxy Quest (1999) 1145 1999 Star Wars: Episode I - The Phantom Menace (1999) 1132 1999 Election (1999) 1130 1999 Fight Club (1999) 1096 1999 Green Mile, The (1999) 981
总结:job,job2和job3,job4基本上重复