티스토리 뷰

  • Shuffle단을 수정

  • map과 reduce사이에 shuffle이라 부르는 처리가 프레임워크에 의해 자동을 처리된다.

  • 리듀스의 전처리 작업으로 사용자가 직접 정의하지 않지만 필요에 의해서 (복잡한 input데이터 활용해서 조인하거나 사용자 정의 기준으로 정렬하거나..) 커스터마이징 할 수 있어야 한다.

  • map작업 ---------> shuffle작업 -----------> reduce작업

    • map처리 후 데이터를 정렬해서 같은 키를 가진 데이터를 같은 장소에 모은다.

    • 이때 슬레이브 서버 간에 네트워크를 통한 전송이 발생

    • shuffle단에서 발생하는 이러한 작업을 프레임워크 내부에서 자동으로 처리

      • shuffle단에서 네트워크를 통한 전송이 발생

정렬

  • 정렬의 종류

    • 보조정렬

    • 부분정렬

    • 전체정렬

  • 보조정렬

    • 기존의 맵리듀스에서 정렬되는 기본 키 방식과 다르게 정렬 기준을 추가해서 작업

      1) 복합키(사용자정의 키)

      • Writable or WritableComparable를 상속 받아야 한다.

      • hadoop내부에서 인식하는 key나 value는 WritableComparable의 하위로 작성해서 인식하도록 해야 한다.

      • 사용자가 정렬하고 싶은 기준 키를 정의

      • 상위 클래스가 가지고 있는 readFields, write, compareTo메소드를 오버라이딩 해야 한다.

      • readFields, write메소드는 직렬화와 역직렬화를 위해 필요한 메소드

      • 직렬화와 역직렬화는 어떤 객체를 네트워크를 통해서 전송해서 하드디스크에 저장될 수 있는 타입으로 변환해주는 과정

      • readFields : 역직렬화 될 때 호출되는 메소드

      • write : 직렬화 될 때 호출되는 메소드

      2) 복합키 비교기

      • WritableComparator

      3) 파티셔너

      • Partitioner의 하위

      • 파티셔너는 맵 태스크의 출력데이터를 어떤 리듀서태스크의 입력데이터로 보낼지 결정. 이에 대한 결정은 맵태스크의 출력데이터 키값에 따라서 결정이 되어 정렬된다. 우리가 작성할 파티셔너는 연도별로 그룹핑을 해서 파티셔닝을 수행한다.

      4) 파티셔너 그룹키 비교기

      • WritableComparator

      5) 매퍼

      • Mapeer

      6) 리듀서

      • Reducer

      7) 드라이버

      • 맵리듀스를 실행하기 위한 Application

보조정렬

처리과정

  1. 정렬하려고 하는 기준을 정의한 사용자 키 클래스 작성 - CustomKey

  2. Mapper클래스의 map메소드에서 사용자정의키가 outputkey로 출력될 수 있도록 정의

  3. Reducer태스크에 분배할 수 있도록 하기 위해서 Partitioner를 정의

    • 같은 키를 갖고 있는 Mapper의 출력데이터를 같은 리듀서태스크로 보내기 위해서 해시코드를 이용해서 계산

  4. Reducer태스크로 보내기 전에 같은 그룹으로 그룹핑을 적용할 수 있도록 객체를 정의

    • 그룹키 비교기(GroupKeyComparator)

    • ex) air데이터에서는 같은 년도별로 데이터를 분류

  5. 4번에서 같은 그룹으로 정의한 데이터들의 내부에서 두 번째 기준을 적용해서 비교할 수 있도록 객체를 정의

    • 사용자정의키 비교기(복합키 비교기)

    • 1번에서 정의한 복합키를 기준으로 데이터를 비교해서 정렬하기 위해서

  6. 그룹별로 같은 키를 가지고 있는 객체가 집계되도록 reducer수정

  7. 드라이버에 shuffle단에서 실행될 클래스를 등록

    • CustomKey

    • AirSortPartitioner

    • GroupKeyComparator

    • CustomKeyComparator

  • 사용자 정의 키 클래스 작성

< Customkey.java >

package mapred.exam.air.sort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;


/*
 *	맵리듀스 프레임워크 내부에서 key와 value는 네트워크에서 주고 받는 값이므로 네트워크
 *	전송을 하기 위해 제공되는 Writable의 하위로 반드시 작성해야 한다. 
 */
public class Customkey implements WritableComparable<Customkey>{
	// 1. 비교할 키를 멤버변수로 정의
	private String year;
	private Integer month;
	
	public Customkey() {
	}
	
	
	public Customkey(String year, Integer month) {
		super();
		this.year = year;
		this.month = month;
	}

	public String getYear() {
		return year;
	}


	public void setYear(String year) {
		this.year = year;
	}


	public Integer getMonth() {
		return month;
	}


	public void setMonth(Integer month) {
		this.month = month;
	}


	@Override
	public String toString() {
		return (new StringBuffer())
				.append(year)
				.append(",")
				.append(month)
				.toString();
	}

	// 2. 데이터를 쓰고 읽는 작업을 처리
	// 새로운 메소드를 작성하는 것이 아니라 hadoop프레임워크 내부에서 이런 작업을
	// 처리하기 위해 호출되는 메소드를 오버라이딩
	
	// 직렬화 될때 호출
	@Override
	public void write(DataOutput out) throws IOException {
		WritableUtils.writeString(out, year);
		out.writeInt(month);
	}
	
	// 역직렬화 될 때 호출
	@Override
	public void readFields(DataInput in) throws IOException {
		year = WritableUtils.readString(in);
		month = in.readInt();
	}
	
	// 사용자가 만들어 놓은 키를 기준으로 정렬하기 위해서 비교하게 될 메소드를 구현
	// year로 비교 year가 같으면 month로 비교
	@Override
	public int compareTo(Customkey keyObj) {
		int result = year.compareTo(keyObj.year);
		if(result==0) { // year가 같다.
			result = month.compareTo(keyObj.month);
		}
		return result;
	}
		
}
  • Mapper클래스의 map메소드에서 사용자정의키가 outputkey로 출력될 수 있도록 정의

< AirSortMapper.java >

package mapred.exam.air.sort;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class AirSortMapper extends Mapper<LongWritable, Text, Customkey, IntWritable>{
	private final static IntWritable one = new IntWritable(1);
	private Customkey outputKey = new Customkey();
	
	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Customkey, IntWritable>.Context context)
			throws IOException, InterruptedException {
		if(key.get()>0) {
			String[] line = value.toString().split(",");
			if(line!=null & line.length>0) {
				if(!line[15].equals("NA") && Integer.parseInt(line[15]) > 0) {
					outputKey.setYear(line[0]);
					outputKey.setMonth(Integer.parseInt(line[1]));
					context.write(outputKey, one);
				}
			}
		}
	}
}
  • Reducer태스크에 분배할 수 있도록 하기 위해서 Partitioner를 정의

< AirSortPartitioner.java >

package mapred.exam.air.sort;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;

// Partitioner인터페이스를 상속하는 경우 Mapper에 전달되는 key와 value의 
// 타입을 generic으로 명시
// year를 기준으로 해시코드를 구해서 같은 year를 갖고 있는 데이터를 같은 리듀서에서
// 작업할 수 있도록 분배
// => 같은 것 끼리 메모리버퍼에 쌓았다가 한번에 전송
public class AirSortPartitioner 
					extends Partitioner<Customkey, IntWritable>{
	
	// numPatition은 리듀스태스크의 갯수
	public int getPartition(Customkey key, IntWritable value, int numPartition) {
		
		return key.getYear().hashCode()%numPartition;
	}
}
  • Reducer태스크로 보내기 전에 같은 그룹으로 그룹핑을 적용할 수 있도록 객체를 정의

< GroupkeyCompareator.java >

package mapred.exam.air.sort;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

// 데이터를 그룹핑하기 위해서 필요한 객체
// 즉, 같은 년도끼리 데이터를 그룹화 하기 위해서 필요한 객체
public class GroupKeyComparator extends WritableComparator {
	public GroupKeyComparator() {
		super(Customkey.class, true);
	}
	
	@SuppressWarnings("rawtypes")
	@Override
	public int compare(WritableComparable obj1, WritableComparable obj2) {
		Customkey key1 = (Customkey) obj1;
		Customkey key2 = (Customkey) obj2;
		return key1.getYear().compareTo(key2.getYear());
	}
}
  • 같은 그룹으로 정의한 데이터들의 내부에서 두 번째 기준을 적용해서 비교할 수 있도록 객체를 정의

< CustomKeyComparator.java >

package mapred.exam.air.sort;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

// 사용자정의 키 객체를 비교하는 비교기 - 키가 같은지 비교하는 역할
public class CustomKeyComparator extends WritableComparator{

	protected CustomKeyComparator() {
		super(Customkey.class, true);
	}

	// WritableComparable의 타입이 정확하지 않기 때문에  warning발생
	// 타입에 대한 부분을 무시하고 체크하지 않고 처리하겠다는 의미
	@SuppressWarnings("rawtypes")
	@Override
	public int compare(WritableComparable obj1, WritableComparable obj2) {
		Customkey key1 = (Customkey) obj1;
		Customkey key2 = (Customkey) obj2;
		return key1.compareTo(key2);
	}
}
  • 그룹별로 같은 키를 가지고 있는 객체가 집계되도록 reducer수정

< AirSortReducer.java >

package mapred.exam.air.sort;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class AirSortReducer 
		extends Reducer<Customkey, IntWritable, Customkey, IntWritable>{
	private IntWritable resultVal = new IntWritable();
	Customkey resultKey = new Customkey();
	
	@Override
	protected void reduce(Customkey key, Iterable<IntWritable> values,
			Reducer<Customkey, IntWritable, Customkey, IntWritable>.Context context) throws IOException, InterruptedException {
		int sum = 0;
		Integer beforeMonth = key.getMonth();
		for (IntWritable value : values) {
			//기존에 추출한 키의 month값이 다른 경우 더한 값을 내보내기
			if(beforeMonth!=key.getMonth()) {
				resultKey.setYear(key.getYear());
				resultKey.setMonth(beforeMonth);
				resultVal.set(sum);
				context.write(resultKey, resultVal);
				sum = 0;
			}
			sum = sum + value.get();
			beforeMonth = key.getMonth();
		}
		if(key.getMonth()==beforeMonth) {
			resultVal.set(sum);
			resultKey.setYear(key.getYear());
			resultKey.setMonth(key.getMonth());
			context.write(resultKey, resultVal);
		}
		
	}
}
  • 드라이버에 shuffle단에서 실행될 클래스를 등록

< AirSortDriver.java >

package mapred.exam.air.sort;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class AirSortDriver {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = new Job(conf, "air_sort");
		
		job.setMapperClass(AirSortMapper.class);
		job.setReducerClass(AirSortReducer.class);
		job.setJarByClass(AirSortDriver.class);

		//Shuffle단
		job.setPartitionerClass(AirSortPartitioner.class);
		job.setGroupingComparatorClass(GroupKeyComparator.class);
		job.setSortComparatorClass(CustomKeyComparator.class);
		job.setMapOutputKeyClass(Customkey.class);
		job.setMapOutputValueClass(IntWritable.class);
		
		//3. input데이터와 output데이터의 포맷을 정의(hdfs에 텍스트 파일의 형태로 input/output)
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		
		//4. 리듀서의 출력데이터에 대한 key와 value의 타입을 명시
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		//5. hdfs에 저장된 파일을 읽어오고 처리결과를 저장할 수 있도록 path정보를 설정
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		//6. 1번 부터 5번 까지 설정한 내용을 기반으로 실제 job이 실행될 수 있도록 명령
		job.waitForCompletion(true);
	}
}
  • jar파일로 export

  • build path 추가

  • 명령행 매개변수 실행

  • 실행

  • 결과

  • 정렬 전

  • 정렬 후

댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함