4 基于Hadoop的平台搭建与MapReduce作业设计

4.1 基于Hadoop的伪分布式平台搭建

在搭建Hadoop分布式系统平台[1]时,我们有两种方式可以选择,分别是伪分布的模式和完全分布式模式。这两种模式的主要区别在于前者是在本地一台机器中运行Hadoop框架中的各种服务,是一种模拟分布式的集群环境。而完全分布式的环境就是在真实的多个主机上配置Hadoop,并搭建整个集群环境。

4.1.1 搭建Hadoop伪分布式平台

使用分布式的环境对全基因组测序的环境进行搭建,下面介绍整个分布式环境的搭建流程。
图4-1 Hadoop伪分布式集群架构图

1)安装并检查Java版本
必须确保Hadoop集群安装的是合适版本的Java。可以通过Hadoop wiki界面Hadoop wiki界面来查看具。通过键入以下命令,查看本机的java版本信息:

elon@longsl:~$ java -version
java version "1.8.0_161"
Java(TM) SE Runtime Environment (build 1.8.0_161-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.161-b12, mixed mode)

2)创建Unix用户账号
最好创建特定的Unix用户账号以区分各Hadoop进程,及区分同一机器上的其他服务。在我的主机上,我创建一个名叫elon的用户名来执行Hadoop程序。

3)安装Hadoop
Apache Hadoop发布页下载一个稳定版的二进制发布版本包(通常打包为一个tar.gz结尾的文件),再解压缩到本地文件系统。在我的主机上执行以下命令即可:
elon@longsl:~$ tar -zxvf hadoop-2.7.6.tar.gz -C .

4)SSH配置
SSH免密码登录的原理是首先在主机A的本地生成公钥和私钥,然后将本机的公钥发送到要免密登录的主机上,然后在访问目的主机时,本机的私钥和目的主机中的公钥配对成功,即可以免密登录。
在本次伪分布式环境中,只需配置本机用户自己和自己免密登陆以及自己和localhost用户免密登陆即可。我本机的主机名为longsl,因此我需要为longsl用户生成公私密钥对:

elon@longsl:~$ ssh-keygen -t rsa
# 并且配置ssh longsl和ssh localhost能免密登陆:
elon@longsl:~$ ssh-copy-id longsl
elon@longsl:~$ ssh-copy-id localhost

5)配置Hadoop
Hadoop集群的配置文件在$HADOOP_HOME/etc/hadoop目录下,主要修改五个配置文件,分别是slaves、core-site.xml、hdfs-site.xml、mapred-site.xml和yarn-site.xml。
修改slaves文件,这个文件表明该集群中的子节点的主机名,在伪分布式环境中,直接将localhost添加进去即可。
配置core-site.xml,主要设置NameNode运行的主机信息。

<configuration>
    <property>
		<name>fs.defaultFS</name>
		<value>hdfs://longsl:8080/</value>
    </property>
<!-- 指定hadoop运行时产生文件的存储目录 -->
	<property>
		<name>hadoop.tmp.dir</name>
		<value>/home/elon/hadoop/tmp</value>
	</property>
</configuration>
<!-- 配置hdfs-site.xml,需要设置的dfs.replication的参数,主要用于设置集群中副本的个数,由于这个是伪分布模式,因此副本数设置为1即可。 -->
<configuration>
    <property>
		<name>dfs.replication</name>
		<value>1</value>
    </property>
</configuration>

配置mapred-site.xml,首先设置了日志聚合相关的参数配置,我们可以理解为将集群环境中原本存储于系统tmp目录下的作业运行日志作了归档处理,并存储在了HDFS上。接下来,对集群中在各个容器中作业的运行环境做了相应的配置,具体是设置了每个作业在map阶段和reduce阶段能够被分配的最大运行内存,另外设置了其可以使用的java虚拟机的最大内存资源数。最后对map阶段和reduce阶段能够被使用的CPU核数做了相应配置。

<configuration>
    <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>
<!-- log aggreation -->
	<property>
		<name>mapreduce.jobhistory.address</name>
		<value>localhost:10020</value>
	</property>
	<property>
		<name>mapreduce.jobhistory.webapp.address</name>
		<value>localhost:19888</value>
	</property>
	<property>
		<name>mapreduce.jobhistory.intermediate-done-dir</name>
		<value>/mr-history/done_intermediate</value>
	</property>
	<property>
		<name>mapreduce.jobhistory.done-dir</name>
		<value>/mr-history/done</value>
	</property>
<!-- configure RAM for a Container -->
	<property>
		<name>mapreduce.map.memory.mb</name>
		<value>2048</value>
	</property>
	<property>
		<name>mapreduce.reduce.memory.mb</name>
		<value>2048</value>
	</property>
	<property>
		<name>mapreduce.map.java.opts</name>
		<value>-Xmx1024m</value>
	</property>
	<property>
		<name>mapreduce.reduce.java.opts</name>
		<value>-Xmx1024m</value>
	</property>
<!-- configure cpu core on map and reduce -->
	<property>
		<name>mapreduce.map.cpu.vcores</name>
		<value>4</value>
	</property>
	<property>
		<name>mapreduce.reduce.cpu.vcores</name>
		<value>4</value>
	</property>
</configuration>

配置yarn-site.xml,主要需配置RM运行的主机,在伪分布式环境中就是指本地主机,另外NM节点上运行的服务是mapreduce_shuffle,这个选项是必须配置的,因为我们需要处理的就是mapreduce程序。最后开启了yarn上的日志聚合功能,并将各个容器上产生的日志存储在HDFS上。

<configuration>
	<property>
		<name>yarn.resourcemanager.hostname</name>
		<value>longsl</value>
</property>
	<property>
		<name>yarn.nodemanager.aux-services</name>
		<value>mapreduce_shuffle</value>
</property>
<!-- log aggreation -->
	<property>
		<name>yarn.log-aggregation-enable</name>
		<value>true</value>
	</property>
	<property>
		<name>yarn.nodemanager.remote-app-log-dir</name>
		<value>/user/container/logs</value>
	</property>
</configuration>

6)环境设置
需要设置Hadoop系统的Java安装的位置,通过在etc/hadoop/hadoop-env.sh文件中设置JAVA_HOME项。
export JAVA_HOME=/opt/jdk1.8

7)格式化HDFS文件系统
在能够使用之前,全新的HDFS安装需要进行格式化。通过创建存储目录和初始版本的namenode持久数据结构,格式化进程将创建一个空的文件系统。格式化HDFS是一个快速操作,以hdfs用户身份运行命令:hadoop namenode -format

4.1.2 启动和停止Hadoop集群

Hadoop自带脚本,可以运行脚本命令,启动或者停止如hdfs、yarn或者日志聚合等服务进程。为了使用这些脚本,需要告诉Hadoop集群中有哪些机器。文件slaves用于此目的,该文件包含了机器主机名或IP地址的列表,每行代表一个机器信息。文件slaves列举了可以运行datanode和节点管理器的机器。

1)启动HDFS守护进程
以elon用户身份运行命令start-dfs.sh,可以启动HDFS守护进程。默认情况下,该命令从core-site.xml配置项fs.defaultFS中找到namenode的主机名。更具体一些,start-dfs.sh脚本所做的事情如下:
1)在每台机器上启动一个namenode。
2)在slaves文件列举的每台机器上启动一个datanode
3)在每台机器上启动一个辅助namenode。

2)启动YARN守护进程
YARN守护进程以相同的方式启动,通过以yarn用户身份在托管资源管理器的机器上运行命令:start-yarn.sh。默认情况下,资源管理器总是和start-yarn.sh脚本运行在同一机器上。脚本明确完成以下事情。
1)在本地机器上启动一个资源管理器。
2)在slaves文件列举的每台机器上启动一个节点管理器。

同样,还提供了stop-dfs.sh和stop-yarn.sh脚本用于停止由相应的启动脚本启动的守护进程。下面是在我的集群环境中开启集群环境的实例:

elon@longsl:~$ start-dfs.sh && start-yarn.sh
Starting namenodes on [longsl]
longsl: starting namenode, logging to /home/elon/hadoop/logs/hadoop-elon-namenode-longsl.out
localhost: starting datanode, logging to /home/elon/hadoop/logs/hadoop-elon-datanode-longsl.out
Starting secondary namenodes [0.0.0.0]
0.0.0.0: starting secondarynamenode, logging to /home/elon/hadoop/logs/hadoop-elon-secondarynamenode-longsl.out
starting yarn daemons
starting resourcemanager, logging to /home/elon/hadoop/logs/yarn-elon-resourcemanager-longsl.out
localhost: starting nodemanager, logging to /home/elon/hadoop/logs/yarn-elon-nodemanager-longsl.out

4.2 伪分布式环境的MapReduce作业构建

图4-2 伪分布式的MapReduce任务流程

伪分布式的MapReduce任务执行图如图4-2所示,MapReduce框架中共分为Mapper阶段和Reducer阶段,本节会主要研究Mapper和Reducer的流程构造。在伪分布式环境中,Map任务和Reduce任务是串行执行下去的,其中先执行完所有Map任务再执行Reduce任务,另外Map任务和Reduce任务都会在本地执行器中执行。

4.2.1 Mapper流程构造

在Mapper阶段中,重点关注单样本的基因测序流程,从而得到变异检测的中间gVCF文件,基因测序Mapper阶段的流程如图4-3所示。

图4-3 基因测序Mapper阶段的流程图

在上图中,主要是对Mapper阶段从输入sample.txt文本文件开始,一直到输出端得到output输出文件并把得到的gVCF文件上传到HDFS上的展示。其中在输入的文本文件中保存的是本次测序中样本组中各样本的名称信息,作为测序对象传入模版文件中,在模版引擎的调用下生成测序可执行脚本,然后在Shell脚本执行引擎中调用该脚本,以脚本命令的模式执行基因测序分析流程。
在脚本执行过程中,需要访问HDFS系统,获取该测序样本的数据文件,最后生成gVCF文件并上传至HDFS上存储。另外对于Mapper的输入端的输出文本,设计是将key设置为同一个,这样在reduce程序调用时,可以在同一个reduce程序中读取到,这样样本就可以在同一个测序分析流程中被调用,形成最终的VCF文件。

4.2.2 Reducer流程构造

在Reduer阶段,主要侧重的是基因测序的单样本变异检测结果的合并,得到最终的VCF文件的过程,基因测序Reduce阶段流程图如图4-4所示。

在该阶段的分析流程中,主要是将上一步Mapper产生的输出文件作为此阶段的输入文件,而reduce程序对于键相同的元素和经过shuffle混洗之后在同一个reduce对象中处理,这样就可以将这一组样本生成的gVCF文件统一传递到reduce程序的shell分析流程脚本中处理,再调用FreeMarker模板引擎来生成合并gVCF文件的脚本文件,最后调用shell脚本执行引擎来调用该脚本文件,得到该组样本统一的变异检测文件。

图4-4 基因测序Reduce阶段流程图

同样,在最后需要将得到的VCF文件上传至HDFS上存储并同意管理,这样就完成了基因测序的完整流程,从单个样本的测序分析得到变异位点信息,到最后合并各个样本的变异位点信息得到最终能反映该生物物种的全基因组变异位点信息集合。
##4.3 基于Hadoop分布式环境搭建
在对Hadoop平台的分布式环境的搭建过程,其中的Java环境配置、用户创建、Hadoop二进制包的安装等步骤都和伪分布式的搭建方式一致,主要的不同之处在于其对Hadoop配置文件的配置不同。
###4.3.1 Hadoop分布式架构
在我的Hadoop分布式环境配置中,我利用了三台虚拟机来模拟真实主机,三台主机的主机名分别是node1、node2和node3,其中一台作为Master主机,另外两台作为slaves从节点。
Hadoop完全分布式架构图如图4-5所示,在该图中,主要通过配置集群中各个机器的Java环境变量和SSH免密登录,并在各个机器上配置Hadoop可执行包和修改Hadoop配置文件,搭建Hadoop分布式集群环境。

图4-5 Hadoop完全分布式架构图

4.3.2 Hadoop完全分布式配置

首先在node1上进行下面各项的配置,最后再将各个配置项同步复制到node2和node3节点上。

scp /home/node1/hadoop/etc/hadoop/ root@node2:/home/node2/hadoop/etc/Hadoop
scp /home/node1/hadoop/etc/hadoop/ root@node3:/home/node2/hadoop/etc/hadoop

(1) 配置slaves文件
$HADOOP_HOME/etc/Hadoop/slaves文件是用来配置Hadoop集群中的从节点的主机,将集群中所有的从节点都在slaves中进行记录,通常这些从节点就是承担计算任务的节点。在我的完全分布式环境中,我配置了两个从节点,一个主节点。因此需将从节点的主机名node2和node3写入slaves文件中。进行如下配置:

node2
node3

(2) 配置core-site.xml文件
在对core-site.xml文件进行配置时,指定node1为Master主节点,因此NameNode服务就指定在node1主机上。进行如下配置:

<configuration>
	<property>
	    <name>fs.defaultFS</name>
	    <value>hdfs://node1:8080</value>
	</property>
	<property>
		<name>hadoop.tmp.dir</name>
		<value>/root/hadoop/tmp</value>
	</property>
</configuration>

(3) 配置hdfs-site.xml文件
在对hdfs-site.xml文件进行配置时,主要和伪分布式不同的地方在于可以指定多个副本数了,其副本数的多少代表这个文件系统的可靠程度,副本数越高越可靠,但为了兼顾存储资源和高可靠性也不能设置太高,官方默认的副本数是3。另外我们可以对namenode和datanode的存储地址进行自定义,最后指定NameNode的辅助进程的运行主机。具体配置如下:

<configuration>
	<property>
	    <name>dfs.replication</name>
	    <value>3</value>
	</property>
	<property>
	    <name>dfs.namenode.name.dir</name>
    	<value>/root/hadoop/tmp/name</value>
	</property>
	<property>
    	<name>dfs.datanode.data.dir</name>
    	<value>/root/hadoop/tmp/data</value>
	</property>
	<property>
	    <name>dfs.namenode.secondary.http-address</name>
        <value>node1:8081</value>
	</property>
</configuration>

(4) 配置mapred-site.xml文件
在mapred-site.xml文件中,主要是与MapReduce任务的相关配置。其中要配置MapReduce使用的资源调度框架,这里默认是yarn。另外还要对map任务和reduce任务执行时的可用资源和java虚拟机的资源进行配置。具体配置如下:

<configuration>
	<property>
		<name>mapreduce.framework.name</name>
		<value>yarn</value>
	</property>
	<property>
		<name>mapreduce.map.memory.mb</name>
		<value>1600</value>
	</property>
	<property>
		<name>mapreduce.reduce.memory.mb</name>
		<value>1600</value>
	</property>
	<property>
        <name>mapreduce.map.java.opts</name>
		<value>-Xmx1300m</value>
	</property>
	<property>
		<name>mapreduce.reduce.java.opts</name>
		<value>-Xmx1300m</value>
	</property>
</configuration>

(5) 配置yarn-site.xml文件
在yarn-site.xml文件中,可以对resourcemanager运行的主机进行指定,这里我将其放在Master节点node1上运行,用于对集群中的作业进行任务调度和资源分配。由于我所创建的三个虚拟机上在我的单个真实主机上的,因此可用资源是非常紧张的,这要求我不同直接使用默认配置,而必须对yarn上的资源进行更为细致化的重新分配。

首先需要用户配置每个节点上可用的物理内存资源和CPU核数,因为一个node计算节点除了跑相关作业外,还可能运行着其他应用和服务等。下一步需要对yarn中可调度的单个容器资源进行配置,如配置调度的最小分配内存和最大分配内存,其中最小分配内存决定着该节点上最多可部署的容器个数,可通过yarn可用的物理内存除以每个容器最小可分配的内存来得到容器个数,同理CPU核数也是这样来指定。

接着还需要定义每个Map和Reduce任务需要的最大内存量。由于每个Map和每个Reduce都将在单独的Container中运行,因此这些最大内存设置应至少等于或大于YARN最小Container容量分配。最后,每个Map和Reduce任务的虚拟内存(物理+分页内存)上限由每个允许YARN容器的虚拟内存比决定,默认值为2.1。具体的配置细节如下:

<configuration>
	<property>
		<name>yarn.resourcemanager.hostname</name>
    	<value>node1</value>
	</property>
	<property>
		<name>yarn.nodemanager.aux-services</name>
		<value>mapreduce_shuffle</value>
	</property>
	<property>
		<name>yarn.nodemanager.resource.memory-mb</name>
		<value>2500</value>
	</property>
	<property>
        <name>yarn.nodemanager.resource.vcores</name>
        <value>4</value>
    </property>
	<property>
		 <name>yarn.scheduler.minimum-allocation-mb</name>
		 <value>2000</value>
	</property>
	<property>
		<name>yarn.scheduler.maximum-allocation-mb</name>
		<value>2500</value>
	</property>
	<property>
        <name>yarn.scheduler.minimum-allocation-vcores</name>
        <value>1</value>
    </property>
	<property>
        <name>yarn.scheduler.maximum-allocation-vcores</name>
        <value>4</value>
    </property>
    <property>
        <name>yarn.scheduler.increment-allocation-mb</name>
        <value>100</value>
    </property>
    <property>
        <name>yarn.scheduler.increment-allocation-vcores</name>
        <value>1</value>
    </property>
    <property>
        <name>yarn.app.mapreduce.am.resource.mb</name>
        <value>300</value>
    </property>
	<property>
		<name>yarn.nodemanager.vmem-pmem-ratio</name>
		<value>4</value>
	</property>
	<property>
		<name>yarn.nodemanager.vmem-check-enabled</name>
		<value>true</value>
	</property>
</configuration>

(6) SSH免密登陆配置
在配置SSH免密登陆时,主要对针对Master能免密登陆到各个slave节点上,保证NameNode和ResourceManager服务能在集群的各个节点上进行便携访问。在我的分布式环境中,由于NN和RM都是运行在主节点node1上,因此我需要配置node1到node2和node3的免密登陆。具体操作:在node1上使用命令“ssh-keygen –t rsa”生成node1的密钥对,并将node1的公钥分别发送到node2和node3的~/.ssh/authorized文件中,即可实现SSH免密登陆。

4.3.3 启动和停止Hadoop集群

完全分布式和伪分布式一样,通过start-dfs.shstart-yarn.sh和stop-dfs.sh、stop-yarn.sh脚本,启动或者停止如hdfs、yarn或者日志聚合等服务进程。下面是在我的完全分布式集群中的启动实例:

root@node1:~# start-dfs.sh && start-yarn.sh
Starting namenodes on [node1]
node1: starting namenode, logging to /root/hadoop/logs/hadoop-root-namenode-node1.out
node2: starting datanode, logging to /root/hadoop/logs/hadoop-root-datanode-node2.out
node3: starting datanode, logging to /root/hadoop/logs/hadoop-root-datanode-node3.out
node1: starting datanode, logging to /root/hadoop/logs/hadoop-root-datanode-node1.out
Starting secondary namenodes [node1]
node1: starting secondarynamenode, logging to /root/hadoop/logs/hadoop-root-secondarynamenode-node1.out
starting yarn daemons
starting resourcemanager, logging to /root/hadoop/logs/yarn-root-resourcemanager-node1.out
node3: starting nodemanager, logging to /root/hadoop/logs/yarn-root-nodemanager-node3.out
node2: starting nodemanager, logging to /root/hadoop/logs/yarn-root-nodemanager-node2.out
node1: starting nodemanager, logging to /root/hadoop/logs/yarn-root-nodemanager-node1.out

从上面打印的信息可以看出,NameNode是运行在node1主节点上的,在集群中的三个节点都作为计算节点运行这DataNode进程和NodeManager进程。

4.4 分布式环境下MapReduce作业构建

图4-6 分布式环境下的MapReduce的流程

在分布式环境中,与伪分布式不同的地方在于,多个slave节点可以提供多个container容器供MapReduce任务来运行,分布式环境下的MapReduce的流程构造如图4-6所示。

从图中可以看出,在完全分布式环境中总共有多个计算节点,其中包含多个container容器,而我们提交的Map作业和Reduce作业就是在多个容器中分别进行运算,最后再将Map作业运行的结果通过一个Reduce作业进行规约计算。整个MapReduce运行流程,由原来的伪分布式中串行运算Map作业和Reduce作业,到现在分布式环境中并行运算Map作业,最后将Map的中间结果用一个Reduce作业来处理。

4.5 Shell脚本执行引擎的构建

在得到最终的执行脚本后,还需要通过特定的方法来调用执行该脚本文件。通过编写ShellScriptUtil工具类,执行指定的脚本并打印执行过程中出现的日志,并存放在指定的路径下,下面是shell脚本执行工具类的代码:

/**
 * shell脚本工具包
* 用于将一个指定的可执行脚本进行执行的Java程序
*/
public class ShellScriptUtil {
    // 日志记录
    private static Logger theLogger = Logger.getLogger(TemplateEngine.class.getName());
    /**
     * 调用Shell脚本执行的方法
     * @param paths 指定多个路径参数
     *              第一个指定的是shell模版
     *              第二个参数指定的是脚本执行结果存放路径
     *              第三个参数指定的是执行脚本中日志存放路径,是可选参数,未给出此参数则默认为无日志输出
     */
    public static void callProcess(String... paths) {
        File outputFile;
        File logFile;
        Process process;
        String scriptPath = paths[0];
        String chmod = "chmod u+x " + scriptPath;
        try {
            // 为shell脚本增加可执行权限
            Runtime.getRuntime().exec(chmod).waitFor();
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("执行脚本:" + scriptPath);
        ProcessBuilder pb = new ProcessBuilder("./" + scriptPath);
        pb.inheritIO();

        // 指定shell脚本执行的结果输出路径和执行时日志文件的输出路径
        if (paths.length == 3) {
            outputFile = new File(paths[1]);
            pb.redirectOutput(outputFile);
            logFile = new File(paths[2]);
            pb.redirectError(logFile);
        }
        // 指定shell脚本执行的日志输出路径
        if (paths.length == 2) {
            logFile = new File(paths[1]);
            if (logFile.exists()) {
                logFile.delete();
            }
            pb.redirectError(logFile);
        }
        try {
            process = pb.start();
            process.waitFor();
        } catch (IOException e) {
            theLogger.error("发生I/O错误...");
        } catch (InterruptedException e) {
            theLogger.error("当前线程在等待时被另一个线程中断...");
        }
    }
}

4.6 MapReduce作业编写与整体调度

在MapReduce计算框架中,通过Mapper和Reducer来整体调度整个WGS分析流程,这里需要自定义map()函数和reduce()函数。其中map()函数是用来传入要分析的样本名称,并通过样本名称对不同的样本分多个map任务来并行执行,从而调用模板类,并执行最终的WGS分析脚本。下面是wgsMapper类的代码:

/*
    编写全基因组测序的Mapper类
 */
public class wgsMapper extends Mapper<LongWritable, Text, Text, Text> {

    private static String LOG_DIRECTORY = "./wgs-logs";
    private static String SCRIPT_DIRECTORY = "./wgs-scripts";

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        String sample_name = value.toString();
        wgs(sample_name);
        context.write(new Text("1"),new Text(sample_name));
    }

    private static void wgs(String sampleName) {

        HashMap<String, String> templateMap = new HashMap<>();
        templateMap.put("sample_name", sampleName);

        String template = "wgsMapper.template";
        String scriptPath = SCRIPT_DIRECTORY + "/wgs_mapper_" + templateMap.get("sample_name") + ".sh";
        String logPath = LOG_DIRECTORY + "/wgs_mapper_" + templateMap.get("sample_name") + ".log";

        // 从模板创建具体脚本
        File scriptFile = TemplateEngine.createDynamicContentAsFile(template, templateMap, scriptPath);

        if (scriptFile != null) {
            ShellScriptUtil.callProcess(scriptPath, logPath);
        }
    }
}

而在reduce()函数中,通过传递的样本名称到Reducer脚本模版中,对指定的gvcf文件作合并处理,从而得到最终的VCF文件。下面是wgsReducer类的代码:

/*
    编写全基因组测序的Reducer类
 */
public class wgsReducer extends Reducer<Text, Text, Text, Text> {

    private static String LOG_DIRECTORY = "./wgs-logs";
    private static String SCRIPT_DIRECTORY = "./wgs-scripts";

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

        HashMap<String, String> templateMap = new HashMap<>();
        Iterator<Text> value = values.iterator();
        int count = 0;
        String str = "";
        while (value.hasNext()) {
            str = value.next().toString();
            templateMap.put("sample_name" + (++count), str);
        }
        mergeGVCF(templateMap);
        context.write(new Text("1"), new Text(str));
    }

    private static void mergeGVCF(HashMap<String, String> templateMap) {

        String template = "wgsReducer.template";
        String scriptPath = SCRIPT_DIRECTORY + "/wgs_reducer_" + ".sh";
        String logPath = LOG_DIRECTORY + "/wgs_reducer_" + ".log";

        // 从模板创建具体脚本
        File scriptFile = TemplateEngine.createDynamicContentAsFile(template, templateMap, scriptPath);

        if (scriptFile != null) {
            ShellScriptUtil.callProcess(scriptPath, logPath);
        }
    }
}

4.7 本章小结

在本章中,主要介绍了Hadoop伪分布式环境及分布式环境进行搭建,还对WGS分析流程与MapReduce计算框架如何结合起来,其中就涉及到如何分多个map任务并行执行分析流程,对于WGS测序过程在linux环境中运行的情况,引入FreeMarker第三方库,通过编写脚本模板,并结合map和reduce作业,使得不同的map或者reduce任务在执行过程中,以不同的样本数据来执行不同的WGS脚本,达到高效并行化的全基因组测序的目的,最后再通过调用Driver驱动类执行整个MapReduce程序。

下一章主要对构建的测序平台做一个多样本数据的测试,并进行优化和系统扩展。


  1. Tom White. Hadoop权威指南[M]. 王海,华东,刘喻,吕粤海译. 清华大学出版社 第四版 2017.

隐藏边栏