티스토리 뷰

<mapreduce> 
1. MapReduce프레임워크 내부에서 사용되는 데이터 타입
분산 환경에서 처리되므로 데이터 타입이 일반 자바에서 사용하는 기본 데이터 타입이 아니라 하둡 내부에서
작성된 인터페이스(Writable)를 상속하는 특별한 클래스타입이어야 한다.
int - IntWritable
long - LongWritable
String - Text

2. 기본작업
1) Mapper
- Mapper를 상속
  Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
  KEYIN : mapper에 input되는 데이터의 key타입
    (byte offset이나 라인넘버로 생각 - LongWritable타입)
  VALUEIN : mapper에 input되는 데이터의 value타입
  KEYOUT : mapper에서 output되는 데이터의 key타입
  VALUEOUT : mapper에서 output되는 데이터의 value타입
    (무조건 1이므로 상수로 정의 - IntWritable타입)
  
- map메소드를 오버라이딩
  protected void map(key, value, Context객체){
      // key : 입력키
      // value : 입력데이터
      // Context객체 : hadoop MapReduce프레임워크 내부에서 통신을 담당하는 객체
       맵리듀스가 통신하면서 맵의 출력데이터를 기록하고 맵의 출력데이터를
       Shuffle하기 위해서 내보내는 작업을 수행
        메시지갱신, 처리, 프레임워크 내부에서 통신할때 필요한 여러가지
        기본 작업을 처리하는 객체
  }
  
2) Reducer
- 집계작업
- Reducer를 상속
- 리듀서에 전달된 입력값과 value를 활용해서 집계할 수 있도록 코드를 작성
- reduce메소드를 오버라이딩
  proteted void reduce(key, value, Context객체){
   // reduce메소드에 전달되는 key의 타입, value타입
   // book, [1, 1, 1, 1, 1, ...]
   key : Text
   value : Iterable 타입
  }
  
3) Driver
- 맵리듀스를 실행하기 위한 처리를 정의하는 클래스
1. 맵리듀스를 실행하기 위한 job을 생성
2. job을 처리할 실제 클래스에 대한 정보를 정의(Mapper, Reducer, Driver)
3. input데이터와 output데이터의 포맷을 정의(hdfs에 텍스트 파일의 형태로 input/output)
4. 리듀서의 출력데이터에 대한 key와 value의 타입을 명시
5. hdfs에 저장된 파일을 읽어오고 처리결과를 저장할 수 있도록 path정보를 설정
6. 1번 부터 5번 까지 설정한 내용을 기반으로 실제 job이 실행될 수 있도록 명령

wordcount 만들기

  • Mapper

< WordCountMapper.java >

package mapreduce.basic;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
	// map메소드의 실행 결과 중 map의 output value를 저장할 변수
	private final static IntWritable one = new IntWritable(1);
	private Text outputKey = new Text(); // map메소드 실행 결과 중 key를 저장할 변수
			
	@Override
	protected void map(LongWritable key, Text value, 
			Mapper<LongWritable, Text, Text, IntWritable>.Context context)
			throws IOException, InterruptedException {
		// value에 한 라인에 해당하는 문장이 전달된다. ex) read a book
		StringTokenizer st = new StringTokenizer(value.toString());
		while(st.hasMoreElements()) {
			String token = st.nextToken();
			outputKey.set(token); // 분리한 단어를 ouputKey라는 Text타입의 변수에 세팅
			// Context객체에 값을 셋팅
			context.write(outputKey, one);
		}
	}
}
  • Reducer

< WordCountReducer.java >

package mapreduce.basic;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

// book, [1, 1, 1, 1, ...]
// a, [1, 1, 1, ...]
public class WordCountReducer 
		extends Reducer<Text, IntWritable, Text, IntWritable>{
	// 결과값을 저장할 변수
	private IntWritable resultVal = new IntWritable();
	
	@Override
	protected void reduce(Text key, Iterable<IntWritable> values,
			Reducer<Text, IntWritable, Text, IntWritable>.Context context)
			throws IOException, InterruptedException {
		int sum = 0;
		for (IntWritable value : values) {
			sum = sum + value.get();
		}
		resultVal.set(sum);
		context.write(key, resultVal);
	}
}
  • Driver

< WordCountDriver.java >

package mapreduce.basic;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class WordCountDriver {
	public static void main(String[] args) throws Exception {
		//1. 맵리듀스를 실행하기 위한 job을 생성
		Configuration conf = new Configuration();
		Job job = new Job(conf, "mywordcount");
		
		//2. job을 처리할 실제 클래스에 대한 정보를 정의(Mapper, Reducer, Driver)
		job.setMapperClass(WordCountMapper.class);
		job.setReducerClass(WordCountReducer.class);
		job.setJarByClass(WordCountDriver.class);
		
		//3. input데이터와 output데이터의 포맷을 정의(hdfs에 텍스트 파일의 형태로 input/output)
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		
		//4. 리듀서의 출력데이터에 대한 key와 value의 타입을 명시
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		//5. hdfs에 저장된 파일을 읽어오고 처리결과를 저장할 수 있도록 path정보를 설정
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		//6. 1번 부터 5번 까지 설정한 내용을 기반으로 실제 job이 실행될 수 있도록 명령
		job.waitForCompletion(true);
	}
}
  • bulid.xml 을 Ant로 실행

  • 생성된 jar파일을 hadoop01의 /home/hadoop 폴더에 복사

  • 실행

  • 결과

 

댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함