티스토리 뷰
-
Shuffle단을 수정
-
map과 reduce사이에 shuffle이라 부르는 처리가 프레임워크에 의해 자동을 처리된다.
-
리듀스의 전처리 작업으로 사용자가 직접 정의하지 않지만 필요에 의해서 (복잡한 input데이터 활용해서 조인하거나 사용자 정의 기준으로 정렬하거나..) 커스터마이징 할 수 있어야 한다.
-
map작업 ---------> shuffle작업 -----------> reduce작업
-
map처리 후 데이터를 정렬해서 같은 키를 가진 데이터를 같은 장소에 모은다.
-
이때 슬레이브 서버 간에 네트워크를 통한 전송이 발생
-
shuffle단에서 발생하는 이러한 작업을 프레임워크 내부에서 자동으로 처리
-
shuffle단에서 네트워크를 통한 전송이 발생
-
-
정렬
-
정렬의 종류
-
보조정렬
-
부분정렬
-
전체정렬
-
-
보조정렬
-
기존의 맵리듀스에서 정렬되는 기본 키 방식과 다르게 정렬 기준을 추가해서 작업
1) 복합키(사용자정의 키)
-
Writable or WritableComparable를 상속 받아야 한다.
-
hadoop내부에서 인식하는 key나 value는 WritableComparable의 하위로 작성해서 인식하도록 해야 한다.
-
사용자가 정렬하고 싶은 기준 키를 정의
-
상위 클래스가 가지고 있는 readFields, write, compareTo메소드를 오버라이딩 해야 한다.
-
readFields, write메소드는 직렬화와 역직렬화를 위해 필요한 메소드
-
직렬화와 역직렬화는 어떤 객체를 네트워크를 통해서 전송해서 하드디스크에 저장될 수 있는 타입으로 변환해주는 과정
-
readFields : 역직렬화 될 때 호출되는 메소드
-
write : 직렬화 될 때 호출되는 메소드
2) 복합키 비교기
-
WritableComparator
3) 파티셔너
-
Partitioner의 하위
-
파티셔너는 맵 태스크의 출력데이터를 어떤 리듀서태스크의 입력데이터로 보낼지 결정. 이에 대한 결정은 맵태스크의 출력데이터 키값에 따라서 결정이 되어 정렬된다. 우리가 작성할 파티셔너는 연도별로 그룹핑을 해서 파티셔닝을 수행한다.
4) 파티셔너 그룹키 비교기
-
WritableComparator
5) 매퍼
-
Mapeer
6) 리듀서
-
Reducer
7) 드라이버
-
맵리듀스를 실행하기 위한 Application
-
-
보조정렬
처리과정
-
정렬하려고 하는 기준을 정의한 사용자 키 클래스 작성 - CustomKey
-
Mapper클래스의 map메소드에서 사용자정의키가 outputkey로 출력될 수 있도록 정의
-
Reducer태스크에 분배할 수 있도록 하기 위해서 Partitioner를 정의
-
같은 키를 갖고 있는 Mapper의 출력데이터를 같은 리듀서태스크로 보내기 위해서 해시코드를 이용해서 계산
-
-
Reducer태스크로 보내기 전에 같은 그룹으로 그룹핑을 적용할 수 있도록 객체를 정의
-
그룹키 비교기(GroupKeyComparator)
-
ex) air데이터에서는 같은 년도별로 데이터를 분류
-
-
4번에서 같은 그룹으로 정의한 데이터들의 내부에서 두 번째 기준을 적용해서 비교할 수 있도록 객체를 정의
-
사용자정의키 비교기(복합키 비교기)
-
1번에서 정의한 복합키를 기준으로 데이터를 비교해서 정렬하기 위해서
-
-
그룹별로 같은 키를 가지고 있는 객체가 집계되도록 reducer수정
-
드라이버에 shuffle단에서 실행될 클래스를 등록
-
CustomKey
-
AirSortPartitioner
-
GroupKeyComparator
-
CustomKeyComparator
-
-
사용자 정의 키 클래스 작성
< Customkey.java >
package mapred.exam.air.sort;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;
/*
* 맵리듀스 프레임워크 내부에서 key와 value는 네트워크에서 주고 받는 값이므로 네트워크
* 전송을 하기 위해 제공되는 Writable의 하위로 반드시 작성해야 한다.
*/
public class Customkey implements WritableComparable<Customkey>{
// 1. 비교할 키를 멤버변수로 정의
private String year;
private Integer month;
public Customkey() {
}
public Customkey(String year, Integer month) {
super();
this.year = year;
this.month = month;
}
public String getYear() {
return year;
}
public void setYear(String year) {
this.year = year;
}
public Integer getMonth() {
return month;
}
public void setMonth(Integer month) {
this.month = month;
}
@Override
public String toString() {
return (new StringBuffer())
.append(year)
.append(",")
.append(month)
.toString();
}
// 2. 데이터를 쓰고 읽는 작업을 처리
// 새로운 메소드를 작성하는 것이 아니라 hadoop프레임워크 내부에서 이런 작업을
// 처리하기 위해 호출되는 메소드를 오버라이딩
// 직렬화 될때 호출
@Override
public void write(DataOutput out) throws IOException {
WritableUtils.writeString(out, year);
out.writeInt(month);
}
// 역직렬화 될 때 호출
@Override
public void readFields(DataInput in) throws IOException {
year = WritableUtils.readString(in);
month = in.readInt();
}
// 사용자가 만들어 놓은 키를 기준으로 정렬하기 위해서 비교하게 될 메소드를 구현
// year로 비교 year가 같으면 month로 비교
@Override
public int compareTo(Customkey keyObj) {
int result = year.compareTo(keyObj.year);
if(result==0) { // year가 같다.
result = month.compareTo(keyObj.month);
}
return result;
}
}
-
Mapper클래스의 map메소드에서 사용자정의키가 outputkey로 출력될 수 있도록 정의
< AirSortMapper.java >
package mapred.exam.air.sort;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class AirSortMapper extends Mapper<LongWritable, Text, Customkey, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Customkey outputKey = new Customkey();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Customkey, IntWritable>.Context context)
throws IOException, InterruptedException {
if(key.get()>0) {
String[] line = value.toString().split(",");
if(line!=null & line.length>0) {
if(!line[15].equals("NA") && Integer.parseInt(line[15]) > 0) {
outputKey.setYear(line[0]);
outputKey.setMonth(Integer.parseInt(line[1]));
context.write(outputKey, one);
}
}
}
}
}
-
Reducer태스크에 분배할 수 있도록 하기 위해서 Partitioner를 정의
< AirSortPartitioner.java >
package mapred.exam.air.sort;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
// Partitioner인터페이스를 상속하는 경우 Mapper에 전달되는 key와 value의
// 타입을 generic으로 명시
// year를 기준으로 해시코드를 구해서 같은 year를 갖고 있는 데이터를 같은 리듀서에서
// 작업할 수 있도록 분배
// => 같은 것 끼리 메모리버퍼에 쌓았다가 한번에 전송
public class AirSortPartitioner
extends Partitioner<Customkey, IntWritable>{
// numPatition은 리듀스태스크의 갯수
public int getPartition(Customkey key, IntWritable value, int numPartition) {
return key.getYear().hashCode()%numPartition;
}
}
-
Reducer태스크로 보내기 전에 같은 그룹으로 그룹핑을 적용할 수 있도록 객체를 정의
< GroupkeyCompareator.java >
package mapred.exam.air.sort;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
// 데이터를 그룹핑하기 위해서 필요한 객체
// 즉, 같은 년도끼리 데이터를 그룹화 하기 위해서 필요한 객체
public class GroupKeyComparator extends WritableComparator {
public GroupKeyComparator() {
super(Customkey.class, true);
}
@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable obj1, WritableComparable obj2) {
Customkey key1 = (Customkey) obj1;
Customkey key2 = (Customkey) obj2;
return key1.getYear().compareTo(key2.getYear());
}
}
-
같은 그룹으로 정의한 데이터들의 내부에서 두 번째 기준을 적용해서 비교할 수 있도록 객체를 정의
< CustomKeyComparator.java >
package mapred.exam.air.sort;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
// 사용자정의 키 객체를 비교하는 비교기 - 키가 같은지 비교하는 역할
public class CustomKeyComparator extends WritableComparator{
protected CustomKeyComparator() {
super(Customkey.class, true);
}
// WritableComparable의 타입이 정확하지 않기 때문에 warning발생
// 타입에 대한 부분을 무시하고 체크하지 않고 처리하겠다는 의미
@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable obj1, WritableComparable obj2) {
Customkey key1 = (Customkey) obj1;
Customkey key2 = (Customkey) obj2;
return key1.compareTo(key2);
}
}
-
그룹별로 같은 키를 가지고 있는 객체가 집계되도록 reducer수정
< AirSortReducer.java >
package mapred.exam.air.sort;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class AirSortReducer
extends Reducer<Customkey, IntWritable, Customkey, IntWritable>{
private IntWritable resultVal = new IntWritable();
Customkey resultKey = new Customkey();
@Override
protected void reduce(Customkey key, Iterable<IntWritable> values,
Reducer<Customkey, IntWritable, Customkey, IntWritable>.Context context) throws IOException, InterruptedException {
int sum = 0;
Integer beforeMonth = key.getMonth();
for (IntWritable value : values) {
//기존에 추출한 키의 month값이 다른 경우 더한 값을 내보내기
if(beforeMonth!=key.getMonth()) {
resultKey.setYear(key.getYear());
resultKey.setMonth(beforeMonth);
resultVal.set(sum);
context.write(resultKey, resultVal);
sum = 0;
}
sum = sum + value.get();
beforeMonth = key.getMonth();
}
if(key.getMonth()==beforeMonth) {
resultVal.set(sum);
resultKey.setYear(key.getYear());
resultKey.setMonth(key.getMonth());
context.write(resultKey, resultVal);
}
}
}
-
드라이버에 shuffle단에서 실행될 클래스를 등록
< AirSortDriver.java >
package mapred.exam.air.sort;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class AirSortDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "air_sort");
job.setMapperClass(AirSortMapper.class);
job.setReducerClass(AirSortReducer.class);
job.setJarByClass(AirSortDriver.class);
//Shuffle단
job.setPartitionerClass(AirSortPartitioner.class);
job.setGroupingComparatorClass(GroupKeyComparator.class);
job.setSortComparatorClass(CustomKeyComparator.class);
job.setMapOutputKeyClass(Customkey.class);
job.setMapOutputValueClass(IntWritable.class);
//3. input데이터와 output데이터의 포맷을 정의(hdfs에 텍스트 파일의 형태로 input/output)
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
//4. 리듀서의 출력데이터에 대한 key와 value의 타입을 명시
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//5. hdfs에 저장된 파일을 읽어오고 처리결과를 저장할 수 있도록 path정보를 설정
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//6. 1번 부터 5번 까지 설정한 내용을 기반으로 실제 job이 실행될 수 있도록 명령
job.waitForCompletion(true);
}
}
-
jar파일로 export
-
build path 추가
-
명령행 매개변수 실행
-
실행
-
결과
-
정렬 전
-
정렬 후
'Hadoop' 카테고리의 다른 글
빅데이터 플랫폼 구축 #17 - Spark : flatMap (0) | 2020.10.23 |
---|---|
빅데이터 플랫폼 구축 #16 - Spark : 설치 및 테스트 (0) | 2020.10.22 |
빅데이터 플랫폼 구축 #14 - 커스터마이징(1) : Combiner (0) | 2020.10.18 |
빅데이터 플랫폼 구축 #13 - Flume (2) (0) | 2020.10.11 |
빅데이터 플랫폼 구축 #12 - Flume (1) (0) | 2020.10.10 |
- Total
- Today
- Yesterday
- Flume
- I/O Mechanisms
- 하둡
- File Protection
- Free space management
- springboot
- Disk System
- SQL
- mapreduce
- hadoop
- Java
- 빅데이터
- oracle
- HDFS
- JSON
- maven
- Spring
- linux
- Disk Scheduling
- Allocation methods
- jdbc
- aop
- I/O Services of OS
- gradle
- Replacement Strategies
- SPARK
- vmware
- 빅데이터 플랫폼
- Variable allocation
- RAID Architecture
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |