티스토리 뷰

<< 사용자 정의 옵션 활용 >>

1. GenericOptionParser 활용

   - hadoop을 실행할 때 -D옵션과 함께 속성=속성값을 입력하면 Mapper에서 이 정보를 사용할 수 있도록 정의

   - commons-cli-xxx.jar 라이브러리를 추가(하둡 설치폴더 안 lib에 있음)
     1) Mapper작성

     - setup메소드 : Mapper가 실행될 때 한 번만 실행되는 메소드
     - 하둡을 실행할 때 -D옵션과 함께 입력한 속성명을 지정하면 입력했던 속성값을 추출할 수 있다.
   - Configuration객체를 이용해서 작업

   	2) Reducer
	- 동일

3) Driver

- 실행할 때 사용자가 입력한 옵션을 이용할 수 있도록 설정해야 하므로 기존 방식을 모두 변경
- 사용자정의 옵션을 사용하기 위한 작업(command line에 사용자가 입력하는 옵션을 mapper가
  사용할 수 있도록 하기 위한 조건)
  - Driver클래스를 정의할 때 Tool인터페이스를 구현
  - Configuration클래스를 상속
  - Tool인터페이스의 run메소드를 오버라이딩
  	=> 기존의 Driver에서 정의했던 내용을 구현
  	=> GenericOptionParser를 활용해서 사용자가 입력한 명령행매개변수에서
  		- D옵션과 함께 입력한 값을 분리해서 Mapper로 전달하도록 작성
  		- 나머지 입력값은 기존의 명령행 매개변수와 동일하게 처리
  - ToolRunner의 run메소드를 호출해서 Driver에 구현한 run메소드를 실행하도록 작성
  	하둡을 실행하면서 명령행에 입력하는 옵션을 Mapper에 전달하기 위해 ToolRunner의
  	run메소드를 호출해서 작업
  	ToolRunner.run(Configuration conf, Tool tool, String[] args)
  	=> 기존 Driver를 Tool타입으로 변경해야 한다.
  	=> ToolRunner클래스의 run메소드 내부에서 Tool객채의 run을 호출해서 실행하므로
  		스펙에 맞게 작성해 놓아야 실행이 된다.

NASDAQ 데이터 분석

  • 이전에 만들었던 옵션에 따라서 상승마감, 하락마감을 선택 할 수 있도록 구현

  • Mapper

< StockOptionMapper.java >

package mapreduce.exam.option.stock;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class StockOptionMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
	private final static IntWritable one = new IntWritable(1);
	private Text outputKey = new Text();
	private String jobType; // 하둡 실행할 때 입력한 옵션값
	
	@Override
	protected void setup(
			Mapper<LongWritable, Text, Text, IntWritable>.Context context)
			throws IOException, InterruptedException {
		// context.getConfiguration().get("옵션명"); -D옵션과 같이 입력하는 옵션명
		jobType = context.getConfiguration().get("jobType");
	
	}
	
	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
			throws IOException, InterruptedException {
		if(key.get() > 0) { //제목행을 제외하고 작업을 수행
			String[] line = value.toString().split(",");
			if(line!=null && line.length>0) {
				// 상승마감
				if(jobType.equals("up")) {
					// 년도 데이터를 output data의 키로 설정
					outputKey.set(line[2].substring(0,4));
					// 상승마감을 판단해서 값을 추출
					float resultValue = Float.parseFloat(line[6]) - Float.parseFloat(line[3]);
					if(resultValue > 0) {
						// 상승마감 - Shuffle단으로 내보내기
						context.write(outputKey, one);	
					}
				//하락 마감	
				} else if(jobType.equals("down")) {
					// 년도 데이터를 output data의 키로 설정
					outputKey.set(line[2].substring(0,4));
					// 상승마감을 판단해서 값을 추출
					float resultValue = Float.parseFloat(line[6]) - Float.parseFloat(line[3]);
					if(resultValue < 0) {
						// 상승마감 - Shuffle단으로 내보내기
						context.write(outputKey, one);	
					}
				}		
			}
		}
	}
}
  • Driver

< StockOptionDriver.java >

package mapreduce.exam.option.stock;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class StockOptionDriver extends Configured implements Tool{
	@Override
	public int run(String[] optionlist) throws Exception {
		// 1. 사용자가 입력한 옵션을 처리하기 위해서 GenericOptionParser객체를 활용
		GenericOptionsParser optionParser = 
				new GenericOptionsParser(getConf(), optionlist);
		
		// optionlist에 포함된 -D옵션 이외에 사용자가 입력한 명령행 매개변수와 나머지 공통옵션들은
		// 따로 분리하는 작업을 해야 한다. jobType은 mapper로 전달할 옵션이고 inputpath와
		// outputpath는 일반 매개변수로 처리
		// GenericOptionsParser의 getRemainingArgs메소드를 호출하면 공통옵션과
		// 사용자가 입력한 매개변수를 따로 분리해서 리턴
		// otherArgs : 아래와 같은 옵션과 같이 입력하지 않은 명령형 매개변수
		// -D : 사용자가 입력한 값을 mapper에 전달
		// -fs 
		// -conf
		// -files...
		String[] otherArgs = optionParser.getRemainingArgs();

		// 2. job생성
		Job job = new Job(getConf(), "stockOption");
		job.setMapperClass(StockOptionMapper.class);
		job.setReducerClass(StockOptionReducer.class);
		job.setJarByClass(StockOptionDriver.class);
		
		// 3. input, output 포맷정의
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		
		// 4. 리듀서의 출력데이터 key, value의 타입
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		// 5. hdfs에 저장된 파일을 읽고 처리결과를 저장할 수 있도록 path정의
		FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
		job.waitForCompletion(true);
		return 0;
	}
	
	public static void main(String[] args) throws Exception {
		ToolRunner.run(new Configuration(), new StockOptionDriver(), args);
	}
}
  • jar 파일 export 후 hadoop01로 복사

  •  

    상승마감

     

  • 하락마감

항공 데이터 분석

  • 1987년 데이터를 월별로 옵션값을 선택하여 항공 출발 지연 데이터, 도착 지연 데이터가 몇 건인지 출력

  • mapper

< AirOptionMapper.java >

package mapred.exam.option.air;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class AirOptionMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
	private final static IntWritable one = new IntWritable(1);
	private Text outputKey = new Text();
	private String delayType;
	
	@Override
	protected void setup(
			Mapper<LongWritable, Text, Text, IntWritable>.Context context)
			throws IOException, InterruptedException {
		delayType = context.getConfiguration().get("delayType");
	}
	
	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
			throws IOException, InterruptedException {
		if(key.get()>0) {
			String[] line = value.toString().split(",");
			if(line!=null & line.length>0) {
				// 출발 지연
				if(delayType.equals("DepDelay")) {
					if(!line[15].equals("NA") && Integer.parseInt(line[15]) > 0) {
						outputKey.set(line[1]);
						context.write(outputKey, one);
					}
				// 도착 지연
				} else if(delayType.equals("ArrDelay")) {
					if(!line[14].equals("NA") && Integer.parseInt(line[14]) > 0) {
						outputKey.set(line[1]);
						context.write(outputKey, one);
					}
				}
			}
		}
	}
}
  • Driver

< AirOptionDriver.java >

package mapred.exam.option.air;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class AirOptionDriver extends Configured implements Tool {
	
	@Override
	public int run(String[] optionlist) throws Exception {
		GenericOptionsParser optionParser = 
				new GenericOptionsParser(getConf(), optionlist);
		String[] otherArgs = optionParser.getRemainingArgs();
		
		Job job = new Job(getConf(), "delayOption");
		
		job.setMapperClass(AirOptionMapper.class);
		job.setReducerClass(AirOptionReducer.class);
		job.setJarByClass(AirOptionDriver.class);
		
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
		job.waitForCompletion(true);
		return 0;
	}
	
	public static void main(String[] args) throws Exception {
		ToolRunner.run(new Configuration(),  new AirOptionDriver(), args);
	}
}
  • jar 파일로 export 하여 hadoop01로 복사

  • 출발 지연

  • 도착지연

댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함