ReduceJoin 的工作:
Map 端的主要工作:为来自不同表或者文件的 KV 对,打标签以区别不同来源的记录,然后用连接字段作为 key,其余部分和新加的标志位作为 value,最后进行输出。
Reduce 端的主要工作:在 Reduce 端以连接字段作为 key 的分组已经完成,只需要在每个分组中,将那么来源于不同文件的记录分开,最后完成合并即可。
ReduceJoin
示例
需求:输入数据为两个表:订单、商品信息,将商品信息中的数据,根据商品的 pid,合并到订单数据中。
TableBean
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| public class TableBean implements Writable {
private String id;
private String pid;
private int amount;
private String pname;
private String flag;
public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(id); dataOutput.writeUTF(pid); dataOutput.writeInt(amount); dataOutput.writeUTF(pname); dataOutput.writeUTF(flag); }
public void readFields(DataInput dataInput) throws IOException { id = dataInput.readUTF(); pid = dataInput.readUTF(); amount = dataInput.readInt(); pname = dataInput.readUTF(); flag = dataInput.readUTF(); }
@Override public String toString() { return id + "\t" + pname + "\t" + amount; }
}
|
Mapper
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean> {
private TableBean table = new TableBean();
private String fileName;
private Text outKey = new Text();
@Override protected void setup(Context context) throws IOException, InterruptedException { FileSplit inputSplit = ((FileSplit) context.getInputSplit()); fileName = inputSplit.getPath().getName(); }
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\t"); if (fileName.contains("order")) { table.setId(fields[0]); table.setPid(fields[1]); table.setAmount(Integer.parseInt(fields[2])); table.setFlag("order"); } else { table.setFlag("pd"); table.setPid(fields[0]); table.setPname(fields[1]); } outKey.set(table.getPid());
context.write(outKey, table); } }
|
Reducer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable> {
@Override protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {
List<TableBean> tableBeans = Lists.newArrayList(); for (TableBean value : values) { TableBean tableBean = new TableBean(); try { BeanUtils.copyProperties(tableBean, value); tableBeans.add(tableBean); } catch (IllegalAccessException | InvocationTargetException e) { e.printStackTrace(); } }
List<TableBean> orders = tableBeans.stream().filter(tableBean -> tableBean.getFlag().equals("order")) .collect(Collectors.toList()); Map<String, String> pd = tableBeans.stream().filter(tableBean -> tableBean.getFlag().equals("pd")) .collect(Collectors.toMap(TableBean::getPid, TableBean::getPname));
for (TableBean order : orders) { order.setPname(pd.get(order.getPid()));
context.write(order, NullWritable.get()); }
} }
|
省略 Driver,查看运行结果
缺点
合并的操作在 Reduce 阶段完成,Reduce 端的处理压力太大,Map 节点的运算负载很低,资源利用率不高,而且在 Reduce 阶段极易产生数据倾斜。 推荐使用 MapJoin
MapJoin
适用场景: 一个张表十分小,一个表十分大
优点
在 Map 端缓存多张表,提前处理业务逻辑,增加了 Map 业务,减少 Reduce 数据压力,尽可能减少数据倾斜
方法
1、在 Mapper 的 setup 阶段,将文件读取到缓存集合中
2、在驱动函数中加载缓存
实现
依然使用上例中的输入数据,输入结果也应该与上例一致。
Driver
由于 MapJoin 不需要 Reduce 端,此时可以将 Driver 中的 MapperKeyClass、MapperValueClass、ReduceClass 去掉
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(MapJoinDriver.class);
job.setMapperClass(MapJoinMapper.class);
job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path("D:\\dev\\join\\order.txt"));
FileOutputFormat.setOutputPath(job, new Path("D:\\dev\\join\\output1"));
job.addCacheFile(new URI("file:///d:/dev/join/pd.txt")); job.setNumReduceTasks(0);
boolean succeed = job.waitForCompletion(true);
System.exit(succeed ? 0 : 1);
}
|
Mapper
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
private Map<String, String> fieldMap = new HashMap<>();
private Text outKey = new Text();
@Override protected void setup(Context context) throws IOException, InterruptedException { String path = context.getCacheFiles()[0].getPath(); BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(path), StandardCharsets.UTF_8)); String line; while (StringUtils.isNotBlank(line = reader.readLine())){ String[] fields = line.split("\t"); fieldMap.put(fields[0], fields[1]); } IOUtils.closeStream(reader); }
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\t");
String pid = fields[1];
String pname = fieldMap.get(pid);
outKey.set(fields[0] + "\t" + pname + "\t" + fields[2]);
context.write(outKey, NullWritable.get()); } }
|
查看结果