题目:现有如此三份数据:
1、users.dat 数据格式为: 2::M::56::16::70072
对应字段为:UserID BigInt, Gender String, Age Int, Occupation String, Zipcode String
对应字段中文解释:用户id,性别,年龄,职业,邮政编码
2、movies.dat 数据格式为: 2::Jumanji (1995)::Adventure|Children's|Fantasy
对应字段为:MovieID BigInt, Title String, Genres String
对应字段中文解释:电影ID,电影名字,电影类型
3、ratings.dat 数据格式为: 1::1193::5::978300760
对应字段为:UserID BigInt, MovieID BigInt, Rating Double, Timestamped String
对应字段中文解释:用户ID,电影ID,评分,评分时间戳
用户ID,电影ID,评分,评分时间戳,性别,年龄,职业,邮政编码,电影名字,电影类型
userid, movieId, rate, ts, gender, age, occupation, zipcode, movieName, movieType
(5)求好片(评分>=4.0)最多的那个年份的最好看的10部电影
思路:分四步,先求好看的电影,第二步降序选择最好的对应的年份,通过年份求好看电影,然后降序求出前10.较为简单,直接上主体代码:/**
* @author: lpj
* @date: 2018年3月16日 下午7:16:47
* @Description:
*/
package lpj.filmCritic;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.time.Year;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import lpj.filmBean.GoodMoiveGroup;
import lpj.filmBean.GoodMoiveGroup2;
import lpj.filmBean.GoodMovieBean;
import lpj.filmBean.GoodMovieBean2;
/**
*
*/
public class GoodMoiveMR {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
//---------------------------------
FileSystem fs = FileSystem.get(conf);//默认使用本地
Job job = Job.getInstance(conf);
job.setJarByClass(GoodMoiveMR.class);
job.setMapperClass(GoodMoiveMR_Mapper.class);
job.setReducerClass(GoodMoiveMR_Reducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
Path inputPath = new Path("/a/totalFilmInfos.txt");
Path outputPath = new Path("/a/homework11_5_1");
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
//----------------------------------------
FileSystem fs2 = FileSystem.get(conf);//默认使用本地
Job job2 = Job.getInstance(conf);
job2.setJarByClass(GoodMoiveMR.class);
job2.setMapperClass(GoodMoiveMR2_Mapper.class);
job2.setReducerClass(GoodMoiveMR2_Reducer.class);
job2.setOutputKeyClass(GoodMovieBean.class);
job2.setOutputValueClass(NullWritable.class);
job2.setGroupingComparatorClass(GoodMoiveGroup.class);
Path inputPath2 = new Path("/a/homework11_5_1");
Path outputPath2 = new Path("/a/homework11_5_2");
if (fs2.exists(outputPath2)) {
fs2.delete(outputPath2, true);
}
FileInputFormat.setInputPaths(job2, inputPath2);
FileOutputFormat.setOutputPath(job2, outputPath2);
//---------------------------------
FileSystem fs3 = FileSystem.get(conf);//默认使用本地
Job job3 = Job.getInstance(conf);
job3.setJarByClass(GoodMoiveMR.class);
job3.setMapperClass(GoodMoiveMR3_Mapper.class);
job3.setReducerClass(GoodMoiveMR3_Reducer.class);
job3.setMapOutputKeyClass(Text.class);
job3.setMapOutputValueClass(Text.class);
job3.setOutputKeyClass(Text.class);
job3.setOutputValueClass(Text.class);
URI uri = new URI("/a/homework11_5_2/part-r-00000");
job3.addCacheFile(uri);
Path inputPath3 = new Path("/a/totalFilmInfos.txt");
Path outputPath3 = new Path("/a/homework11_5_3");
if (fs3.exists(outputPath3)) {
fs3.delete(outputPath3, true);
}
FileInputFormat.setInputPaths(job3, inputPath3);
FileOutputFormat.setOutputPath(job3, outputPath3);
//----------------------------------------
FileSystem fs4 = FileSystem.get(conf);//默认使用本地
Job job4 = Job.getInstance(conf);
job4.setJarByClass(GoodMoiveMR.class);
job4.setMapperClass(GoodMoiveMR4_Mapper.class);
job4.setReducerClass(GoodMoiveMR4_Reducer.class);
job4.setOutputKeyClass(GoodMovieBean2.class);
job4.setOutputValueClass(NullWritable.class);
job4.setGroupingComparatorClass(GoodMoiveGroup2.class);
Path inputPath4 = new Path("/a/homework11_5_3");
Path outputPath4 = new Path("/a/homework11_5_4");
if (fs4.exists(outputPath4)) {
fs4.delete(outputPath4, true);
}
FileInputFormat.setInputPaths(job4, inputPath4);
FileOutputFormat.setOutputPath(job4, outputPath4);
//-------------------------
ControlledJob aJob = new ControlledJob(job.getConfiguration());
ControlledJob bJob = new ControlledJob(job2.getConfiguration());
ControlledJob cJob = new ControlledJob(job3.getConfiguration());
ControlledJob dJob = new ControlledJob(job4.getConfiguration());
aJob.setJob(job);
bJob.setJob(job2);
cJob.setJob(job3);
dJob.setJob(job4);
JobControl jc = new JobControl("jc");
jc.addJob(aJob);
jc.addJob(bJob);
jc.addJob(cJob);
jc.addJob(dJob);
bJob.addDependingJob(aJob);
cJob.addDependingJob(bJob);
dJob.addDependingJob(cJob);
Thread thread = new Thread(jc);
thread.start();
while(!jc.allFinished()){
thread.sleep(1000);
}
jc.stop();
}
public static class GoodMoiveMR_Mapper extends Mapper{
Text kout = new Text();
Text valueout = new Text();
@Override
protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
String [] reads = value.toString().trim().split("::");
//用户ID,电影ID,评分,评分时间戳,性别,年龄,职业,邮政编码,电影名字,电影类型
//userid, movieId, rate, ts, gender, age, occupation, zipcode, movieName, movieType
//(5)求好片(评分>=4.0)最多的 那个年份(电影年份)的最好看的10部电影 year ratenum movieid
String moivename = reads[8];
String year = moivename.substring(moivename.length() - 5, moivename.length() - 1);
int rate = Integer.parseInt(reads[2]);
if (rate >= 4) {
String kk = year;
String vv = rate + "";
kout.set(kk);
valueout.set(vv);
context.write(kout, valueout);
}
}
}
public static class GoodMoiveMR_Reducer extends Reducer{
Text kout = new Text();
Text valueout = new Text();
@Override
protected void reduce(Text key, Iterable values, Context context)throws IOException, InterruptedException {
int count = 0;
for(Text text : values){
count++;
}
String vv = count + "";
valueout.set(vv);
context.write(key, valueout);
}
}
//---------------------求年份----------------------------------
public static class GoodMoiveMR2_Mapper extends Mapper{
Text kout = new Text();
Text valueout = new Text();
GoodMovieBean gm = new GoodMovieBean();
@Override
protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
String [] reads = value.toString().trim().split(" ");
String year = reads[0];
int num = Integer.parseInt(reads[1]);
gm.setYear(year);
gm.setNum(num);
context.write(gm, NullWritable.get());
}
}
public static class GoodMoiveMR2_Reducer extends Reducer{
Text kout = new Text();
Text valueout = new Text();
@Override
protected void reduce(GoodMovieBean key, Iterable values, Context context)throws IOException, InterruptedException {
int count = 0;
for(NullWritable text : values){
count++;
if (count <= 1) {
context.write(key, NullWritable.get());
}else {
return;
}
}
}
}
//--------------------------求电影
public static class GoodMoiveMR3_Mapper extends Mapper{
Text kout = new Text();
Text valueout = new Text();
private static String goodmovieyear = "";
@SuppressWarnings("deprecation")
@Override
protected void setup(Context context)throws IOException, InterruptedException {
Path[] paths = context.getLocalCacheFiles();
String str = paths[0].toUri().toString();
BufferedReader bf = new BufferedReader(new FileReader(new File(str)));
String readline = null;
while((readline = bf.readLine()) != null){
goodmovieyear = readline.split(" ")[0];
}
IOUtils.closeStream(bf);
}
@Override
protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
String [] reads = value.toString().trim().split("::");
//用户ID,电影ID,评分,评分时间戳,性别,年龄,职业,邮政编码,电影名字,电影类型
//userid, movieId, rate, ts, gender, age, occupation, zipcode, movieName, movieType
//(5)求好片(评分>=4.0)最多的 那个年份(电影年份)的最好看的10部电影 year ratenum moviename
String moivename = reads[8];
String year = moivename.substring(moivename.length() - 5, moivename.length() - 1);
int rate = Integer.parseInt(reads[2]);
if (rate >= 4 && goodmovieyear.equals(year)) {
String kk = year + " " + moivename;
String vv = rate + "";
kout.set(kk);
valueout.set(vv);
context.write(kout, valueout);
}
}
}
public static class GoodMoiveMR3_Reducer extends Reducer{
Text kout = new Text();
Text valueout = new Text();
@Override
protected void reduce(Text key, Iterable values, Context context)throws IOException, InterruptedException {
int count = 0;
for(Text text : values){
count++;
}
String vv = count + "";
valueout.set(vv);
context.write(key, valueout);
}
}
//---------------------好看电影前10
public static class GoodMoiveMR4_Mapper extends Mapper{
Text kout = new Text();
Text valueout = new Text();
GoodMovieBean2 gm = new GoodMovieBean2();
@Override
protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
String [] reads = value.toString().trim().split(" ");
String year = reads[0];
String name = reads[1];
int num = Integer.parseInt(reads[2]);
gm.setYear(year);
gm.setName(name);
gm.setNum(num);
context.write(gm, NullWritable.get());
}
}
public static class GoodMoiveMR4_Reducer extends Reducer{
Text kout = new Text();
Text valueout = new Text();
@Override
protected void reduce(GoodMovieBean2 key, Iterable values, Context context)throws IOException, InterruptedException {
int count = 0;
for(NullWritable text : values){
count++;
if (count <= 10) {
context.write(key, NullWritable.get());
}else {
return;
}
}
}
}
}
结果:1999 American Beauty (1999) 2853
1999 Matrix, The (1999) 2171
1999 Sixth Sense, The (1999) 2163
1999 Being John Malkovich (1999) 1759
1999 Toy Story 2 (1999) 1302
1999 Galaxy Quest (1999) 1145
1999 Star Wars: Episode I - The Phantom Menace (1999) 1132
1999 Election (1999) 1130
1999 Fight Club (1999) 1096
1999 Green Mile, The (1999) 981
总结:job,job2和job3,job4基本上重复