티스토리 뷰
<mapreduce> |
1. MapReduce프레임워크 내부에서 사용되는 데이터 타입 분산 환경에서 처리되므로 데이터 타입이 일반 자바에서 사용하는 기본 데이터 타입이 아니라 하둡 내부에서 작성된 인터페이스(Writable)를 상속하는 특별한 클래스타입이어야 한다. int - IntWritable long - LongWritable String - Text 2. 기본작업 1) Mapper - Mapper를 상속 Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> KEYIN : mapper에 input되는 데이터의 key타입 (byte offset이나 라인넘버로 생각 - LongWritable타입) VALUEIN : mapper에 input되는 데이터의 value타입 KEYOUT : mapper에서 output되는 데이터의 key타입 VALUEOUT : mapper에서 output되는 데이터의 value타입 (무조건 1이므로 상수로 정의 - IntWritable타입) - map메소드를 오버라이딩 protected void map(key, value, Context객체){ // key : 입력키 // value : 입력데이터 // Context객체 : hadoop MapReduce프레임워크 내부에서 통신을 담당하는 객체 맵리듀스가 통신하면서 맵의 출력데이터를 기록하고 맵의 출력데이터를 Shuffle하기 위해서 내보내는 작업을 수행 메시지갱신, 처리, 프레임워크 내부에서 통신할때 필요한 여러가지 기본 작업을 처리하는 객체 } 2) Reducer - 집계작업 - Reducer를 상속 - 리듀서에 전달된 입력값과 value를 활용해서 집계할 수 있도록 코드를 작성 - reduce메소드를 오버라이딩 proteted void reduce(key, value, Context객체){ // reduce메소드에 전달되는 key의 타입, value타입 // book, [1, 1, 1, 1, 1, ...] key : Text value : Iterable 타입 } 3) Driver - 맵리듀스를 실행하기 위한 처리를 정의하는 클래스 1. 맵리듀스를 실행하기 위한 job을 생성 2. job을 처리할 실제 클래스에 대한 정보를 정의(Mapper, Reducer, Driver) 3. input데이터와 output데이터의 포맷을 정의(hdfs에 텍스트 파일의 형태로 input/output) 4. 리듀서의 출력데이터에 대한 key와 value의 타입을 명시 5. hdfs에 저장된 파일을 읽어오고 처리결과를 저장할 수 있도록 path정보를 설정 6. 1번 부터 5번 까지 설정한 내용을 기반으로 실제 job이 실행될 수 있도록 명령 |
wordcount 만들기
-
Mapper
< WordCountMapper.java >
package mapreduce.basic;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
// map메소드의 실행 결과 중 map의 output value를 저장할 변수
private final static IntWritable one = new IntWritable(1);
private Text outputKey = new Text(); // map메소드 실행 결과 중 key를 저장할 변수
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
// value에 한 라인에 해당하는 문장이 전달된다. ex) read a book
StringTokenizer st = new StringTokenizer(value.toString());
while(st.hasMoreElements()) {
String token = st.nextToken();
outputKey.set(token); // 분리한 단어를 ouputKey라는 Text타입의 변수에 세팅
// Context객체에 값을 셋팅
context.write(outputKey, one);
}
}
}
-
Reducer
< WordCountReducer.java >
package mapreduce.basic;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
// book, [1, 1, 1, 1, ...]
// a, [1, 1, 1, ...]
public class WordCountReducer
extends Reducer<Text, IntWritable, Text, IntWritable>{
// 결과값을 저장할 변수
private IntWritable resultVal = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum = sum + value.get();
}
resultVal.set(sum);
context.write(key, resultVal);
}
}
-
Driver
< WordCountDriver.java >
package mapreduce.basic;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class WordCountDriver {
public static void main(String[] args) throws Exception {
//1. 맵리듀스를 실행하기 위한 job을 생성
Configuration conf = new Configuration();
Job job = new Job(conf, "mywordcount");
//2. job을 처리할 실제 클래스에 대한 정보를 정의(Mapper, Reducer, Driver)
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
job.setJarByClass(WordCountDriver.class);
//3. input데이터와 output데이터의 포맷을 정의(hdfs에 텍스트 파일의 형태로 input/output)
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
//4. 리듀서의 출력데이터에 대한 key와 value의 타입을 명시
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//5. hdfs에 저장된 파일을 읽어오고 처리결과를 저장할 수 있도록 path정보를 설정
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//6. 1번 부터 5번 까지 설정한 내용을 기반으로 실제 job이 실행될 수 있도록 명령
job.waitForCompletion(true);
}
}
-
bulid.xml 을 Ant로 실행
-
생성된 jar파일을 hadoop01의 /home/hadoop 폴더에 복사
-
실행
-
결과
'Hadoop' 카테고리의 다른 글
빅데이터 플랫폼 구축 #8 - Mapreduce : 사용자 정의 옵션 활용 (0) | 2020.10.05 |
---|---|
빅데이터 플랫폼 구축 #7 - MapReduce 연습 (0) | 2020.10.04 |
빅데이터 플랫폼 구축 #5 - HDFS 활용 (0) | 2020.10.04 |
빅데이터 플랫폼 구축 #4 - Java,Hadoop , mapreduce (0) | 2020.10.03 |
빅데이터 플랫폼 구축 #3 - 가상머신 복제, 각 머신 연결 (0) | 2020.10.03 |
댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
- Total
- Today
- Yesterday
링크
TAG
- HDFS
- Free space management
- 하둡
- 빅데이터 플랫폼
- JSON
- oracle
- SPARK
- Replacement Strategies
- SQL
- File Protection
- jdbc
- hadoop
- vmware
- I/O Mechanisms
- Java
- aop
- Disk System
- mapreduce
- 빅데이터
- Allocation methods
- springboot
- linux
- Variable allocation
- gradle
- Flume
- Spring
- maven
- Disk Scheduling
- I/O Services of OS
- RAID Architecture
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
글 보관함