在 MapReduce 的 数据序列化类型 中,介绍了几种常见的 Hadoop 序列化类,实现了一个基础的 WordCount
Demo,使用到了 Long、String、Integer 对应的序列化类,那么接下来就需要了解一下 Hadoop 具体的怎么序列化的。
Hadoop 序列化
序列化概述
什么是序列化、反序列化
序列化:就是把内存中的对象,转换为字节序列
或其他数据传输协议,以便存储到磁盘或网络传输。
反序列化:就是将收到的字节序列
或其他数据传输协议
或磁盘的持久化数据
,转换成内存中的对象。
为什么要序列化?
一般来说,对象
只能生存在内存中,断电即消失。而且,对象
只能由本地进程使用,不能被发送到网络上的另外一台计算机中。
然而,序列化
可以存储 对象
,且可以将对象 发送到远程计算机
。
为什么不用 Java 自身的序列化?
Java 的序列化是一个重量级的框架(Serializable),一个对象被序列化后,别额外附带很多信息,如:校验信息、Header、继承体系等,不便于在网络中高效传输。基于此,Hadoop 开发了一套属于自己的序列化机制:Writable。
Hadoop 序列化的特点
- 紧凑:高效使用存储空间
- 快速:读写数据的额外开销小
- 可扩展:随着通信协议的升级而升级
- 互操作:支持多语言交互
自定义实现序列化
实现步骤:
- 实现 Writable 接口
- 反序列化时,需要反射调用空参构造函数
- 重写序列化方法
- 重写反序列化 方法
- 反序列化的顺序和序列化的顺序保持一致
- 重写 toString
- 实现 Comparable 接口(MapReduce 的 Shuffle 过程要求对 key 必须能排序;当需要排序的时候才做)
序列化 Demo
需求:根据 测试文件,统计每个手机号的上行流量
、下行流量
、总流量
。
文件中,倒数第三列为上行流量,倒数第二列为下行流量,最后一列为网络请求状态码。
创建统计流量的 Bean 对象
创建一个统计流量的 Bean 对象,并实现序列化操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| public class FlowBean implements Writable {
private long upFlow;
private long downFlow;
private long sumFlow;
public FlowBean() { }
public void set(long upFlow, long downFlow){ this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; }
@Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; }
public void write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(upFlow); dataOutput.writeLong(downFlow); dataOutput.writeLong(sumFlow); }
public void readFields(DataInput dataInput) throws IOException { upFlow = dataInput.readLong(); downFlow = dataInput.readLong(); sumFlow = dataInput.readLong(); } }
|
MapReduce 程序
Mapper
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
private FlowBean flowBean = new FlowBean(); private Text outKey = new Text();
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString(); String[] fields = line.split("\t"); outKey.set(fields[1]);
int length = fields.length; long upFlow = Long.parseLong(fields[length - 3]); long downFlow = Long.parseLong(fields[length - 2]);
flowBean.setUpFlow(upFlow); flowBean.setDownFlow(downFlow);
context.write(outKey, flowBean); } }
|
Reducer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
private FlowBean flowBean = new FlowBean(); @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
long sumUpFlow = 0; long sumDownFlow = 0;
for (FlowBean value : values) { sumUpFlow += value.getUpFlow(); sumDownFlow += value.getDownFlow(); }
flowBean.set(sumUpFlow, sumDownFlow); context.write(key, flowBean); } }
|
Deiver
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration);
job.setJarByClass(FlowCountDriver.class);
job.setMapperClass(FlowCountMapper.class); job.setReducerClass(FlowCountReducer.class);
job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean succeed = job.waitForCompletion(true);
System.exit(succeed ? 0 : 1); }
|
运行测试
设置输入输出路径:
查看输出结果: