雲端運算與 Apache Hadoop 應用

2009-01-01

隨著資訊科技的演進,網際網路的盛行,一種分享運算資源的新架構 ─ “雲端運算" 的概念於是形成。雲端運算要把應用系統從硬體平台關聯性抽離,達到 “平台無關化"、"儲體無關化" 目的。將整個硬體設備形成一個資源池 (Resource Pool) ,讓應用系統不再因為資源不足而停頓,建構一個真正服務導向運行環境。

目前資訊架構下,伺服器大多是功能導向如:

  • 資料庫伺服器 (Database Server)
  • 應用伺服器 (Application Server)
  • 網際網路伺服器 (Web Server)
  • 其他 …
  • Enterprise Architecture

在大型企業更有依業務導向,各業務間各自均擁有如前述之伺服器群,如:

  • 企業資源計畫伺服器群 (Enterprise Resource Planning)
  • 客戶關係伺服器群 (Customer Relationship Management)
  • 供應鏈伺服器群 (Supply Chain)
  • 其他 …
  • SOA

這些伺服器群中,常有下列情形出現:

  • 不同程度的剩餘運算能力閒置
  • 不同程度的剩餘儲存空間閒置
  • 各伺服器資源尖峰需求時段不同
  • 需要處理大量資料卻苦無專用超強服務器

許多企業採成本中心制,各成本中心均會有不同程度地使用上述服務器群。資訊基礎架構中應有計量機制,能將資訊服務公平地攤算給成本中心,實現企業內部 “使用者付費" 精神,有效率地提高資訊服務使用率,進而節省企業總體營運成本。於是產生 Platform as a Service (PaaS) 概念,讓資訊服務也能像水電計費一般,讓成本中心產出與費用透明。

許多網際網路新商業模式逐漸成熟,如:

  • 內容分享服務 (Content Sharing Service):依類別 (圖片、音樂、影片、檔案 …)、資料量、使用次數計費
  • 應用系統服務 (Software as a Service, SaaS):依功能、資料量、運算程度計費
  • 運算處理服務:如 Google、Yahoo、Amazon 推出不同類型應用服務,更有以獨立平台租用之 PaaS。
  • 其他 …
  • Cloud Computing

要想滿足前述之需求與解決方案,企業現有資訊架構應朝向下列方向發展:

  • 服務器簡集化 (Server Consolidation):將所有功能類似服務器化繁為簡,簡集成高能力大型服務器,有效率地使用設備資源。
  • Consolidation

  • 服務器虛擬化 (Server Virtualization):將不同運作模式或依業務別,建置虛擬機 (Virtual Machine) ,在總體資源共享架構下,又彼此相互獨立運作模式。
  • Virtualization

  • 儲存設備池 (Storage Pool):減用各服務器內部磁碟,使用網路儲存設備 (NAS) 及儲存區域網路 (SAN),簡化並強化資料備份保全機制,提供各運算平台均一存儲效能,精準掌握應用系統服務效能,不因儲存設備能力不一而困擾。
  • SAN

  • 系統計量管理系統 (System Metering):提供服務器內 CPU、MEM、Disk 計量外,且可再依各虛擬機分別計量,更要能依服務器群統計用量成本中心結算機制。
  • Management

在 Google 提出 Map/Reduce 雲端運算模式後,Apache 推出雲端運算平台 Hadoop,緊接著 Yahoo、Amazon 紛紛支持,Hadoop 一時獨領風騷蔚為風潮。Map/Reduce 雲端運算模式如下圖:
Map/Reduce
我假想下列情境並使用 Hadoop 提出解決方案:

  • 敵、我、商用飛機從四面八方而來
  • 各地防空雷達收集各飛行器座標
  • 動態解算各飛行器與戰略目標間距離

防空雷達輸出各飛行器座標如下格式:

  • 每座防空雷達收集飛行器座標成不同名稱之檔案
  • 檔案中每列記錄某飛行器座標
  • 記錄格式 飛行器代號=與戰略目標間 X 軸距離:與戰略目標間 Y 軸距離:與戰略目標間 Z 軸距離
  • 範例資料如下:
  • a01=10:10:10
    a02=12:12:12
    a03=14:14:14
    a04=16:16:16
    a05=18:18:18
    a06=9:9:9
    a07=8:8:8
    a08=7:7:7
    a09=6:6:6
    a01=1:2:3
    a10=5:5:5

使用多個 Hadoop 服務器其雲端運算需求如下:

  • 讀取各防空雷達檔案
  • 讀取各飛行器座標
  • 計算與戰略目標間距離

在 Hadoop 分散式檔案架構下:

  • /usr/hadoop/input 存放各防空雷達檔案,並以 Radar- 為檔名前置字樣。
  • /usr/hadoop/output 存放彙集來自各 Hadoop 節點之解算結果

我分別使用 Java 、C、PERL 等設計 Hadoop 應用系統說明如下:

使用 Hadoop Map/Reduce 框架設計之 Java 程式列表
package com.emprogria;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser; 

/**
 *
 * @author Rich Lee
 */
public class Radar {
    public static class Map
                        extends Mapper<LongWritable, Text, Text, DoubleWritable> {
        private Text cartian = new Text();
        private DoubleWritable dist = new DoubleWritable();
        @Override
        public void map(LongWritable key, Text value, Context context)
                         throws IOException, InterruptedException {
            String line = value.toString();
            String[] objectNamePostion = line.split("=");
                String[] valueList = objectNamePostion[1].split(":");
                if (valueList.length == 3) {
                    double x = Double.parseDouble(valueList[0]);
                    double y = Double.parseDouble(valueList[1]);
                    double z = Double.parseDouble(valueList[2]);
                    double _dist = Math.sqrt(x * x + y * y + z * z);
                    dist.set(_dist);
                    cartian.set(objectNamePostion[0]);
                    context.write(cartian, dist);
                }
            }
        }
    }

    public static class Reduce
           extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
        @Override
        public void reduce(Text key, Iterable<DoubleWritable> values, Context context)
                        throws IOException, InterruptedException {
            double _dist = 0.0;
            for (DoubleWritable val : values) {
                _dist = val.get();
                context.write(key, val);
            }
        }
    } 

    /**
     * @param args the command line arguments
     */
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: Radar <in> <out>");
            System.exit(2);
        }

        Job job = new Job(conf, "Radar");
        job.setJarByClass(Radar.class);
        job.setMapperClass(Map.class);
        job.setCombinerClass(Reduce.class);
        job.setReducerClass(Reduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
Shell 命令稿
HADOOP_HOME=~/hadoop-0.20.0
# 先清除前次運行結果目錄
$HADOOP_HOME/bin/hadoop fs -rmr output
# 執行 Radar 運算
$HADOOP_HOME/bin/hadoop jar ../dist/Radar.jar input/Radar.txt output
# 查看運行結果
$HADOOP_HOME/bin/hadoop fs -cat output/part-r-00000
運行過程與結果
09/08/26 16:37:53 INFO input.FileInputFormat: Total input paths to process : 1
09/08/26 16:37:54 INFO mapred.JobClient: Running job: job_200908261633_0001
09/08/26 16:37:55 INFO mapred.JobClient: map 0% reduce 0%
09/08/26 16:38:13 INFO mapred.JobClient: map 100% reduce 0%
09/08/26 16:38:28 INFO mapred.JobClient: map 100% reduce 100%
09/08/26 16:38:30 INFO mapred.JobClient: Job complete: job_200908261633_0001
09/08/26 16:38:30 INFO mapred.JobClient: Counters: 17
09/08/26 16:38:30 INFO mapred.JobClient: Job Counters
09/08/26 16:38:30 INFO mapred.JobClient: Launched reduce tasks=1
09/08/26 16:38:30 INFO mapred.JobClient: Launched map tasks=1
09/08/26 16:38:30 INFO mapred.JobClient: Data-local map tasks=1
09/08/26 16:38:30 INFO mapred.JobClient: FileSystemCounters
09/08/26 16:38:30 INFO mapred.JobClient: FILE_BYTES_READ=146
09/08/26 16:38:30 INFO mapred.JobClient: HDFS_BYTES_READ=115
09/08/26 16:38:30 INFO mapred.JobClient: FILE_BYTES_WRITTEN=324
09/08/26 16:38:30 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=227
09/08/26 16:38:30 INFO mapred.JobClient: Map-Reduce Framework
09/08/26 16:38:30 INFO mapred.JobClient: Reduce input groups=0
09/08/26 16:38:30 INFO mapred.JobClient: Combine output records=10
09/08/26 16:38:30 INFO mapred.JobClient: Map input records=10
09/08/26 16:38:30 INFO mapred.JobClient: Reduce shuffle bytes=0
09/08/26 16:38:30 INFO mapred.JobClient: Reduce output records=0
09/08/26 16:38:30 INFO mapred.JobClient: Spilled Records=20
09/08/26 16:38:30 INFO mapred.JobClient: Map output bytes=120
09/08/26 16:38:30 INFO mapred.JobClient: Combine input records=10
09/08/26 16:38:30 INFO mapred.JobClient: Map output records=10
09/08/26 16:38:30 INFO mapred.JobClient: Reduce input records=10
a01 17.320508075688775
a02 20.784609690826528
a03 24.24871130596428
a04 27.712812921102035
a05 31.176914536239792
a06 15.588457268119896
a07 13.856406460551018
a08 12.12435565298214
a09 10.392304845413264
a10 8.660254037844387
使用 Hadoop Streaming 機制設計調用 C 程式 (Mapper/Reducer) 列表
Hadoop Streaming Mapper
#include <stdio.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <math.h>

#define BUF_SIZE        64
#define DELIM           "\n"

int
getObjectInfo(char *buffer, char *objectName, char *objectCartisan) {
        int ret;

        ret = sscanf(buffer, "%[^'=']=%s", objectName, objectCartisan);
        return ret;
}

int
getObjectCartisanInfo(
        char *objectCartisan,
        double *x, double *y, double *z) {
        int ret;
        char X[16], Y[16], Z[16];

        memset(X, 0, sizeof(X));
        memset(Y, 0, sizeof(Y));
        memset(Z, 0, sizeof(Z));

        ret = sscanf(objectCartisan, "%[^':']:%[^':']:%s", X, Y, Z);
        if (ret==3) {
                *x = atof(X);
                *y = atof(Y);
                *z = atof(Z);
        }

        return ret;
}
int main(int argc, char *argv[]) {
        char buffer[BUF_SIZE];

        while (fgets(buffer, BUF_SIZE-1, stdin)) {
                int len = strlen(buffer);

                if (buffer[len-1] == '\n') {
                        buffer[len-1] = 0;
                }

                char objectName[16];
                char objectCartisan[48];

                memset(objectName, 0, sizeof(objectName));
                memset(objectCartisan, 0, sizeof(objectCartisan));

                if (getObjectInfo(buffer, objectName, objectCartisan)!=2)
                        continue;
                double x=0.0, y=0.0, z=0.0;
                if (getObjectCartisanInfo(objectCartisan, &x, &y, &z)!=3)
                        continue;

                double dist = 0.0;
                dist = sqrt(x*x + y*y + z*z);
                printf("%s\t%.4f\n", objectName, dist);
        }
}
Hadoop Streaming Reducer
/*
    Reducer
*/

#include <stdio.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>

#define BUF_SIZE        1024
#define DELIM           "\n"

int main(int argc, char *argv[]) {
        char buffer[BUF_SIZE];
        while (fgets(buffer, BUF_SIZE - 1, stdin)) {
                int len = strlen(buffer);
                if (buffer[len-1] == '\n') {
                        buffer[len-1] = 0;
                }

                char objectName[16];
                char dist[16];
                memset(objectName, 0, sizeof(objectName));
                memset(dist, 0, sizeof(dist));
                if (sscanf(buffer, "%s %s", objectName, dist)==2) {
                        printf("%s\t%s\n", objectName, dist);
                }
        }
}
Shell 命令稿
HADOOP_HOME=~/hadoop-0.20.0
TASK_HOME=/home/hadoop/NetBeansProjects/Radar/C
#
$HADOOP_HOME/bin/hadoop fs -rmr output
# 使用 Hadoop Streaming 機制調用 C Mapper 與 Reducer 程式
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-0.20.0-streaming.jar \
-mapper $TASK_HOME/bin/RadarMapper \
-reducer $TASK_HOME/bin/RadarReducer \
-file $TASK_HOME/bin/RadarMapper \
-file $TASK_HOME/bin/RadarReducer \
-input input/Radar.txt \
-output output
#
$HADOOP_HOME/bin/hadoop fs -cat output/part-00000
運行過程與結果
packageJobJar: [/home/hadoop/NetBeansProjects/Radar/C/bin/RadarMapper, /home/hadoop/NetBeansProjects/Radar/C/bin/RadarReducer, /tmp/hadoop-hadoop/hadoop-unjar6892254036758685595/] [] /tmp/streamjob6608472248507193138.jar tmpDir=null
09/08/26 16:41:07 INFO mapred.FileInputFormat: Total input paths to process : 1
09/08/26 16:41:08 INFO streaming.StreamJob: getLocalDirs(): [/tmp/hadoop-hadoop/mapred/local]
09/08/26 16:41:08 INFO streaming.StreamJob: Running job: job_200908261633_0002
09/08/26 16:41:08 INFO streaming.StreamJob: To kill this job, run:
09/08/26 16:41:08 INFO streaming.StreamJob: /home/hadoop/hadoop-0.20.0/bin/../bin/hadoop job -Dmapred.job.tracker=localhost:9001 -kill job_200908261633_0002
09/08/26 16:41:08 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_200908261633_0002
09/08/26 16:41:09 INFO streaming.StreamJob: map 0% reduce 0%
09/08/26 16:41:29 INFO streaming.StreamJob: map 50% reduce 0%
09/08/26 16:41:32 INFO streaming.StreamJob: map 100% reduce 0%
09/08/26 16:41:44 INFO streaming.StreamJob: map 100% reduce 100%
09/08/26 16:41:47 INFO streaming.StreamJob: Job complete: job_200908261633_0002
09/08/26 16:41:47 INFO streaming.StreamJob: Output: output
a01 17.3205
a02 20.7846
a03 24.2487
a04 27.7128
a05 31.1769
a06 15.5885
a07 13.8564
a08 12.1244
a09 10.3923
a10 8.6603
使用 Hadoop Streaming 機制設計調用 PERL 程式 (Mapper/Reducer) 列表
Hadoop Mapper
#!/usr/bin/perl

open(FH, "-"); 
while ($_=<FH>) {
	my $line = $_;
	chomp($line);

	my @objectInfo = split(/=/, $line); 
	if ($#objectInfo == 1) {
		my @objectCartisan = split(/:/, $objectInfo[1]); 

		if ($#objectCartisan == 2) {
			my ($dist, $x, $y, $z);
			$x = $objectCartisan[0];
			$y = $objectCartisan[1];
			$z = $objectCartisan[2];
			$dist = sqrt($x*$x + $y*$y + $z*$z);
			printf("%s\t%.4f\n", $objectInfo[0], $dist);
		}
	}
}

close FH;
Hadoop Reducer
#!/usr/bin/perl

open(FH, "-"); 
while ($_=<FH>) {
	my $line = $_;
	chomp($line);

	my @objectInfo = split(/\t/, $line);
	if ($#objectInfo == 1) {
		my ($objectName, $dist);
		$objectName = $objectInfo[0];
		$dist = $objectInfo[1];
		printf("%s\t%.4f\n", $objectName, $dist);
	}
}

close FH;
Shell 命令稿
HADOOP_HOME=~/hadoop-0.20.0
TASK_HOME=/home/hadoop/NetBeansProjects/Radar/perl
#
$HADOOP_HOME/bin/hadoop fs -rmr output
# 使用 Hadoop Streaming 機制調用 PERL Mapper 與 Reducer 程式
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-0.20.0-streaming.jar \
-mapper $TASK_HOME/RadarMapper.pl \
-reducer $TASK_HOME/RadarReducer.pl \
-file $TASK_HOME/RadarMapper.pl \
-file $TASK_HOME/RadarReducer.pl \
-input input/Radar.txt \
-output output
#
$HADOOP_HOME/bin/hadoop fs -cat output/part-00000
運行過程與結果
packageJobJar: [/home/hadoop/NetBeansProjects/Radar/perl/RadarMapper.pl, /home/hadoop/NetBeansProjects/Radar/perl/RadarReducer.pl, /tmp/hadoop-hadoop/hadoop-unjar5113620553380180845/] [] /tmp/streamjob8665651981311112320.jar tmpDir=null
09/08/26 16:45:16 INFO mapred.FileInputFormat: Total input paths to process : 1
09/08/26 16:45:17 INFO streaming.StreamJob: getLocalDirs(): [/tmp/hadoop-hadoop/mapred/local]
09/08/26 16:45:17 INFO streaming.StreamJob: Running job: job_200908261633_0003
09/08/26 16:45:17 INFO streaming.StreamJob: To kill this job, run:
09/08/26 16:45:17 INFO streaming.StreamJob: /home/hadoop/hadoop-0.20.0/bin/../bin/hadoop job -Dmapred.job.tracker=localhost:9001 -kill job_200908261633_0003
09/08/26 16:45:17 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_200908261633_0003
09/08/26 16:45:18 INFO streaming.StreamJob: map 0% reduce 0%
09/08/26 16:45:35 INFO streaming.StreamJob: map 100% reduce 0%
09/08/26 16:45:45 INFO streaming.StreamJob: map 100% reduce 17%
09/08/26 16:45:50 INFO streaming.StreamJob: map 100% reduce 100%
09/08/26 16:45:56 INFO streaming.StreamJob: Job complete: job_200908261633_0003
09/08/26 16:45:56 INFO streaming.StreamJob: Output: output
a01 17.3205
a02 20.7846
a03 24.2487
a04 27.7128
a05 31.1769
a06 15.5885
a07 13.8564
a08 12.1244
a09 10.3923
a10 8.6603
 

在 Map/Reduce 架構下,將 Input 目錄下檔案資料依列餵入 Mapper 程式,處理資料後以 <Key, Value> 輸出結果至 Reducer 程式。Reducer 程式匯集各節點處理後資料加以排序,輸出結果至 Output 目錄。

各節點運算能力差異過大時,會使得 Mapper 處理完畢時間不一,且 Reducer 發生等待最後 Mapper 處理結果現象,拖累平行運算效率。同時 Mapper 要處理資料之時間應遠大於將 Java 類別程式序列化所消耗時間,特別是使用 Hadoop Streaming 機制,每一筆資料均會執行 Mapper 一次,而 Mapper 又是一支執行檔,無論是運行時間與電腦資源消耗均大,若是 Mapper 要處理資料時間過短,就不值得使用 Map/Reduce 機制,此時使用平行運算 MPI/OpenMP (Message Passing Interface/Open Message Interface) 機制最為洽當。
Parallel Computing
使用 OpenMPI 也相對簡單,透過編譯器 Directives 達成平行運算 (Threading) 目的。以下說明用 C 語言及 OpenMP 平行運算機制,提出飛行器距離解算方案:

OpenMP C 程式列表
#include <omp.h>
#include <stdio.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <math.h>

#define BUF_SIZE	64
#define DELIM		"\n"
#define MAX_RECORD_NUM	64

int
getObjectInfo(char *buffer, char *objectName, char *objectCartisan) {
	int ret;
	ret = sscanf(buffer, "%[^'=']=%s", objectName, objectCartisan);
	return ret;
}

int
getObjectCartisanInfo(
	char *objectCartisan,
	double *x, double *y, double *z) {
	int ret;
	char X[16], Y[16], Z[16];
	memset(X, 0, sizeof(X));
	memset(Y, 0, sizeof(Y));
	memset(Z, 0, sizeof(Z));
	ret = sscanf(objectCartisan, "%[^':']:%[^':']:%s", X, Y, Z);
	if (ret==3) {
		*x = atof(X);
		*y = atof(Y);
		*z = atof(Z);
	}
	return ret;
}

int recordCount = 0;
char record[BUF_SIZE][MAX_RECORD_NUM];

int
loadDataFromFile(const char *fileName) {
	char buffer[BUF_SIZE];
	recordCount = 0;
	memset(record, 0, sizeof(record));
	FILE *fp = fopen(fileName, "rt");
	if (fp == NULL) return 0;
	memset(buffer, 0, sizeof(buffer));
	while (fgets(buffer, BUF_SIZE-1, fp) && (recordCount<MAX_RECORD_NUM-1)) {
		int len = strlen(buffer);
		if (buffer[len-1] == '\n') {
			buffer[len-1] = 0;
		}

		strcpy(record[recordCount++], buffer);
		memset(buffer, 0, sizeof(buffer));
	}
	fclose(fp);
	return recordCount;
}

int main(int argc, char *argv[]) {
	int i=0;
	char objectName[16];
	char objectCartisan[48];
	double x=0.0, y=0.0, z=0.0, dist=0.0, avrg_dist=0.0;
	if (argc == 1) return 0;
	if (!loadDataFromFile(argv[1])) return 0;

	/* 以下 for 程式區塊將平行處理 (Threading) */
	/* private(...) 內之變數在各 thread 中獨立私有 */
	/* reduction(...) 內之變數在各 thread 中共有 */

	#pragma omp parallel for private(objectName, objectCartisan, x, y, z, dist) reduction(+:avrg_dist) schedule(static, 1)


	for (i=0; i<recordCount; i++) {
		memset(objectName, 0, sizeof(objectName));
		memset(objectCartisan, 0, sizeof(objectCartisan));
		if (getObjectInfo(record[i], objectName, objectCartisan)!=2)
			continue;
		x = y = z = dist = 0.0;
		if (getObjectCartisanInfo(objectCartisan, &x, &y, &z)!=3)
			continue;
		dist = sqrt(x*x + y*y + z*z);
		avrg_dist += dist;
		printf("%4d\t%s\t%8.4f\n", omp_get_thread_num(), objectName, dist);
	}
	printf("Average:\t%8.4f\n", avrg_dist/recordCount);
}
執行結果
1 a02 20.7846
1 a04 27.7128
1 a06 15.5885
1 a08 12.1244
1 a10 8.6603
0 a01 17.3205
0 a03 24.2487
0 a05 31.1769
0 a07 13.8564
0 a09 10.3923
  Average 18.1865
廣告
%d 位部落客按了讚: