博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Hadoop MR编程
阅读量:4313 次
发布时间:2019-06-06

本文共 5269 字,大约阅读时间需要 17 分钟。

Hadoop开发job需要定一个Map/Reduce/Job(启动MR job,并传入参数信息),以下代码示例实现的功能:

1)将一个用逗号分割的文件,替换为“|”分割的文件;

2)对小文件合并,将文件合并为reduceNum个文件。

DataMap.java

package com.dx.fpd_load;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class DataMap extends Mapper
{ private final Text key = new Text(); @Override protected void map(LongWritable longWritable, Text value, Context context) throws IOException, InterruptedException { // 如果数据为空,则不进行处理,跳出map输入 if (value.getLength() == 0) { return; } String newValue = value.toString().replace(",", "|") + "|NULL|NULL"; String[] newValues = newValue.split("\\|"); // 输入的文件路径 String filePath = context.getInputSplit().toString().toUpperCase(); // 如果路径包含了fpd_bak才进行处理否则不处理 if (filePath.contains("fpd_bak".toUpperCase()) && newValues.length > 10) { key.set(newValues[6]); //objid context.write(key, new Text(newValue)); } }}

 

DataReducer.java

package com.dx.fpd_load;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;import java.io.IOException;public class DataReducer extends Reducer
{ public MultipleOutputs multipleOutputs; public final Text key = new Text(); @Override protected void setup(Context context) throws IOException, InterruptedException { multipleOutputs = new MultipleOutputs(context); } @Override protected void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException { for (Text text : values) { String data = text.toString(); String[] p_days = context.getConfiguration().getStrings("p_day"); String[] p_cities = context.getConfiguration().getStrings("p_city"); String p_day = "p_day"; if (p_days != null) { p_day = p_days[0]; } String p_city = "p_city"; if (p_cities != null) { p_city = p_cities[0]; } multipleOutputs.write("fpdload", NullWritable.get(), new Text(data), "/thetenet/my_hive_db/fpd_new/p_day=" + p_day + "/p_city=" + p_city + "/fpd_data"); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { multipleOutputs.close(); }}

DataJob.java

package com.dx.fpd_load;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;public class FingerLib_Load_DataJob {    public static void main(String[] args) throws Exception {        Configuration conf = new Configuration();        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();        String p_city = otherArgs[0];        String p_day = otherArgs[1];        String reducerNum = otherArgs[2];        String inputPath = otherArgs[3];        String outputPath = otherArgs[4];        if (p_day == null) {            throw new Exception("p_day is null");        }        conf.set("p_day", p_day);        if (p_city == null) {            throw new Exception("p_city is null");        }        conf.set("p_city", p_city);        Job job = Job.getInstance(conf);        job.setJobName("LoadDataIntoFPD_p_city" + p_city + "_p_day_" + p_day);        job.setJarByClass(DataJob.class);        job.setMapperClass(DataMap.class);        job.setReducerClass(DataReducer.class);        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(Text.class);        job.setOutputKeyClass(NullWritable.class);        job.setOutputValueClass(Text.class);        job.setNumReduceTasks(Integer.parseInt(reducerNum));        MultipleOutputs.addNamedOutput(job, "fpdload", TextOutputFormat.class, NullWritable.class, Text.class);        FileInputFormat.addInputPath(job, new Path(inputPath));        FileOutputFormat.setOutputPath(job, new Path(outputPath));        System.exit(job.waitForCompletion(true) ? 0 : 1);    }}

调用脚本:

#!/usr/bin/env bashsource /app/mylinux/login.sh#./submit_fpdload.sh 20171225 570 400DAY=$1CITY=$2REDUCER_NUMBER=$3JAR="/app/mylinux/service/dx-1.0-SNAPSHOT.jar"MAIN_CLASS="com.dx.fpd_load.DataJob"INPUT_PATH="/thetenet/my_hive_db/fpd_bak/p_day=$DAY/p_city=$CITY/"OUT_DIR="/thetenet/my_hive_db/fpd_load_out/"hadoop fs -rm -r /thetenet/my_hive_db/fpd_new/p_day=$DAY/p_city=$CITY/hadoop fs -rm -r $OUT_DIRtime yarn jar $JAR $MAIN_CLASS $CITY $DAY $REDUCER_NUMBER $INPUT_PATH $OUT_DIR#beeline -e "#alter table my_hive_db.fpd_new add if not exists partition(p_day=$DAY,p_city=$CITY)#location '/thetenet/my_hive_db/fpd_new/p_day=$DAY/p_city=$CITY/';"echo "Complete..."

 

转载于:https://www.cnblogs.com/yy3b2007com/p/8555107.html

你可能感兴趣的文章
作业2
查看>>
ios上架报错90080,90087,90209,90125 解决办法
查看>>
给button添加UAC的小盾牌图标
查看>>
如何退出 vim
查看>>
Robberies
查看>>
get post 提交
查看>>
R安装
查看>>
JavaScript高级特性-实现继承的七种方式
查看>>
20121016学习笔记四
查看>>
EntityFramework 学习 一 Stored Procedure
查看>>
Sliverlight之 故事板
查看>>
Java 必知必会的 20 种常用类库和 API
查看>>
HDU 1087 Super Jumping! Jumping! Jumping!
查看>>
0007_初始模块和字节码
查看>>
[效率提升]如何管理好你的电脑文件
查看>>
C++实验二
查看>>
Sultan's Dowry Problem - 苏丹新娘问题
查看>>
SharePoint2010 富文本框添加图片功能的扩展
查看>>
零零碎碎的知识
查看>>
UNIX基础--用户和基本账户管理
查看>>