티스토리 뷰
<< 사용자 정의 옵션 활용 >>
1. GenericOptionParser 활용
- hadoop을 실행할 때 -D옵션과 함께 속성=속성값을 입력하면 Mapper에서 이 정보를 사용할 수 있도록 정의
- commons-cli-xxx.jar 라이브러리를 추가(하둡 설치폴더 안 lib에 있음)
1) Mapper작성
- setup메소드 : Mapper가 실행될 때 한 번만 실행되는 메소드
- 하둡을 실행할 때 -D옵션과 함께 입력한 속성명을 지정하면 입력했던 속성값을 추출할 수 있다.
- Configuration객체를 이용해서 작업
2) Reducer
- 동일
3) Driver
- 실행할 때 사용자가 입력한 옵션을 이용할 수 있도록 설정해야 하므로 기존 방식을 모두 변경
- 사용자정의 옵션을 사용하기 위한 작업(command line에 사용자가 입력하는 옵션을 mapper가
사용할 수 있도록 하기 위한 조건)
- Driver클래스를 정의할 때 Tool인터페이스를 구현
- Configuration클래스를 상속
- Tool인터페이스의 run메소드를 오버라이딩
=> 기존의 Driver에서 정의했던 내용을 구현
=> GenericOptionParser를 활용해서 사용자가 입력한 명령행매개변수에서
- D옵션과 함께 입력한 값을 분리해서 Mapper로 전달하도록 작성
- 나머지 입력값은 기존의 명령행 매개변수와 동일하게 처리
- ToolRunner의 run메소드를 호출해서 Driver에 구현한 run메소드를 실행하도록 작성
하둡을 실행하면서 명령행에 입력하는 옵션을 Mapper에 전달하기 위해 ToolRunner의
run메소드를 호출해서 작업
ToolRunner.run(Configuration conf, Tool tool, String[] args)
=> 기존 Driver를 Tool타입으로 변경해야 한다.
=> ToolRunner클래스의 run메소드 내부에서 Tool객채의 run을 호출해서 실행하므로
스펙에 맞게 작성해 놓아야 실행이 된다.
NASDAQ 데이터 분석
-
이전에 만들었던 옵션에 따라서 상승마감, 하락마감을 선택 할 수 있도록 구현
-
Mapper
< StockOptionMapper.java >
package mapreduce.exam.option.stock;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class StockOptionMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text outputKey = new Text();
private String jobType; // 하둡 실행할 때 입력한 옵션값
@Override
protected void setup(
Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
// context.getConfiguration().get("옵션명"); -D옵션과 같이 입력하는 옵션명
jobType = context.getConfiguration().get("jobType");
}
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
if(key.get() > 0) { //제목행을 제외하고 작업을 수행
String[] line = value.toString().split(",");
if(line!=null && line.length>0) {
// 상승마감
if(jobType.equals("up")) {
// 년도 데이터를 output data의 키로 설정
outputKey.set(line[2].substring(0,4));
// 상승마감을 판단해서 값을 추출
float resultValue = Float.parseFloat(line[6]) - Float.parseFloat(line[3]);
if(resultValue > 0) {
// 상승마감 - Shuffle단으로 내보내기
context.write(outputKey, one);
}
//하락 마감
} else if(jobType.equals("down")) {
// 년도 데이터를 output data의 키로 설정
outputKey.set(line[2].substring(0,4));
// 상승마감을 판단해서 값을 추출
float resultValue = Float.parseFloat(line[6]) - Float.parseFloat(line[3]);
if(resultValue < 0) {
// 상승마감 - Shuffle단으로 내보내기
context.write(outputKey, one);
}
}
}
}
}
}
-
Driver
< StockOptionDriver.java >
package mapreduce.exam.option.stock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class StockOptionDriver extends Configured implements Tool{
@Override
public int run(String[] optionlist) throws Exception {
// 1. 사용자가 입력한 옵션을 처리하기 위해서 GenericOptionParser객체를 활용
GenericOptionsParser optionParser =
new GenericOptionsParser(getConf(), optionlist);
// optionlist에 포함된 -D옵션 이외에 사용자가 입력한 명령행 매개변수와 나머지 공통옵션들은
// 따로 분리하는 작업을 해야 한다. jobType은 mapper로 전달할 옵션이고 inputpath와
// outputpath는 일반 매개변수로 처리
// GenericOptionsParser의 getRemainingArgs메소드를 호출하면 공통옵션과
// 사용자가 입력한 매개변수를 따로 분리해서 리턴
// otherArgs : 아래와 같은 옵션과 같이 입력하지 않은 명령형 매개변수
// -D : 사용자가 입력한 값을 mapper에 전달
// -fs
// -conf
// -files...
String[] otherArgs = optionParser.getRemainingArgs();
// 2. job생성
Job job = new Job(getConf(), "stockOption");
job.setMapperClass(StockOptionMapper.class);
job.setReducerClass(StockOptionReducer.class);
job.setJarByClass(StockOptionDriver.class);
// 3. input, output 포맷정의
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
// 4. 리듀서의 출력데이터 key, value의 타입
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 5. hdfs에 저장된 파일을 읽고 처리결과를 저장할 수 있도록 path정의
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
job.waitForCompletion(true);
return 0;
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new StockOptionDriver(), args);
}
}
-
jar 파일 export 후 hadoop01로 복사
-
상승마감
-
하락마감
항공 데이터 분석
-
1987년 데이터를 월별로 옵션값을 선택하여 항공 출발 지연 데이터, 도착 지연 데이터가 몇 건인지 출력
-
mapper
< AirOptionMapper.java >
package mapred.exam.option.air;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class AirOptionMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text outputKey = new Text();
private String delayType;
@Override
protected void setup(
Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
delayType = context.getConfiguration().get("delayType");
}
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
if(key.get()>0) {
String[] line = value.toString().split(",");
if(line!=null & line.length>0) {
// 출발 지연
if(delayType.equals("DepDelay")) {
if(!line[15].equals("NA") && Integer.parseInt(line[15]) > 0) {
outputKey.set(line[1]);
context.write(outputKey, one);
}
// 도착 지연
} else if(delayType.equals("ArrDelay")) {
if(!line[14].equals("NA") && Integer.parseInt(line[14]) > 0) {
outputKey.set(line[1]);
context.write(outputKey, one);
}
}
}
}
}
}
-
Driver
< AirOptionDriver.java >
package mapred.exam.option.air;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class AirOptionDriver extends Configured implements Tool {
@Override
public int run(String[] optionlist) throws Exception {
GenericOptionsParser optionParser =
new GenericOptionsParser(getConf(), optionlist);
String[] otherArgs = optionParser.getRemainingArgs();
Job job = new Job(getConf(), "delayOption");
job.setMapperClass(AirOptionMapper.class);
job.setReducerClass(AirOptionReducer.class);
job.setJarByClass(AirOptionDriver.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
job.waitForCompletion(true);
return 0;
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new AirOptionDriver(), args);
}
}
-
jar 파일로 export 하여 hadoop01로 복사
-
출발 지연
-
도착지연
'Hadoop' 카테고리의 다른 글
빅데이터 플랫폼 구축 #10 - Mapreduce : 다중 값 출력 (0) | 2020.10.07 |
---|---|
빅데이터 플랫폼 구축 #9 - 이클립스에서 namenode 연결, 하둡 사용 (0) | 2020.10.07 |
빅데이터 플랫폼 구축 #7 - MapReduce 연습 (0) | 2020.10.04 |
빅데이터 플랫폼 구축 #6 - MapReduce (0) | 2020.10.04 |
빅데이터 플랫폼 구축 #5 - HDFS 활용 (0) | 2020.10.04 |
댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
- Total
- Today
- Yesterday
링크
TAG
- Free space management
- SQL
- linux
- hadoop
- springboot
- jdbc
- 빅데이터
- File Protection
- oracle
- Variable allocation
- Replacement Strategies
- Spring
- aop
- I/O Services of OS
- 하둡
- Java
- JSON
- 빅데이터 플랫폼
- Disk Scheduling
- Flume
- I/O Mechanisms
- Allocation methods
- SPARK
- mapreduce
- vmware
- gradle
- HDFS
- Disk System
- RAID Architecture
- maven
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
글 보관함