本站首页    管理页面    写新日志    退出


«August 2025»
12
3456789
10111213141516
17181920212223
24252627282930
31


公告
 本博客在此声明所有文章均为转摘,只做资料收集使用。

我的分类(专题)

日志更新

最新评论

留言板

链接

Blog信息
blog名称:
日志总数:1304
评论数量:2242
留言数量:5
访问次数:7590951
建立时间:2006年5月29日




[Apache(jakarta)]Hadoop Inside (3)
软件技术

lhwork 发表于 2006/12/13 15:30:23

之前的MapReduce Demo只能在一台机器上运行,现在是时候让它分布式运行了。在对MapReduce的运行流程和FileSystem进行了简单研究之后,现在尝试从配置着手,看看怎样让Hadoop在两台机器上面同时运行MapReduce。 首先看回这里       String tracker = conf.get("mapred.job.tracker", "local");       if ("local".equals(tracker)) {         this.jobSubmitClient = new LocalJobRunner(conf);       } else {         this.jobSubmitClient = (JobSubmissionProtocol)           RPC.getProxy(JobSubmissionProtocol.class,                        JobTracker.getAddress(conf), conf);       } 当tracker地址不为local,则tracker为Remote Client的 JobTracker 类,这里重点分析。 JobTracker有一个main函数,注释显示它仅仅用于调试,正常情况是作为DFS Namenode进程的一部分来运行。不过这里我们可以先从它着手开始分析。           tracker = new JobTracker(conf);   //构造 构造函数先获取一堆常量的值,然后清空'systemDir',接着启动RPC服务器。         InetSocketAddress addr = getAddress(conf);         this.localMachine = addr.getHostName();         this.port = addr.getPort();         this.interTrackerServer = RPC.getServer(this, addr.getPort(), 10, false, conf);         this.interTrackerServer.start(); 启动TrackInfoServer:         this.infoPort = conf.getInt("mapred.job.tracker.info.port", 50030);         this.infoServer = new JobTrackerInfoServer(this, infoPort);         this.infoServer.start(); TrackInfoServer 提供了通过HTTP方式获取JobTracker信息的方式,可以方便用于监测工作任务的进度。 启动三个守护线程:         new Thread(this.expireTrackers).start();  //Used to expire TaskTrackers that have gone down         new Thread(this.retireJobs).start();  //Used to remove old finished Jobs that have been around for too long         new Thread(this.initJobs).start();  //Used to init new jobs that have just been created 三个线程的用处已经注释,这里不作分析。下面开始分析 JobTracker.submitJob() 之前已经分析过 LocalJobRunner.submitJob(),它实例化内部类Job,在里面实现MapReduce流程。JobTracker就复杂一些,它实例化 JobInProgress,然后将这个Job提交到队列:         JobInProgress job = new JobInProgress(jobFile, this, this.conf);         synchronized (jobs) {             synchronized (jobsByArrival) {                 synchronized (jobInitQueue) {                     jobs.put(job.getProfile().getJobId(), job);                     jobsByArrival.add(job);                     jobInitQueue.add(job);                     jobInitQueue.notifyAll();                 }             }         } 此时RetireJobs线程开始处理超时和出错的Job,JobInitThread线程初始化工作任务: job.initTasks(); 开始分析 JobInProgress 在构造函数中,Tracker从发起端的DFS获取任务文件(xml和jar),然后保存到本地目录下面         JobConf default_job_conf = new JobConf(default_conf);         this.localJobFile = default_job_conf.getLocalFile(JobTracker.SUBDIR,             jobid + ".xml");         this.localJarFile = default_job_conf.getLocalFile(JobTracker.SUBDIR,             jobid + ".jar");         FileSystem fs = FileSystem.get(default_conf);         fs.copyToLocalFile(new File(jobFile), localJobFile);         conf = new JobConf(localJobFile);         this.profile = new JobProfile(conf.getUser(), jobid, jobFile, url,                                       conf.getJobName());         String jarFile = conf.getJar();         if (jarFile != null) {           fs.copyToLocalFile(new File(jarFile), localJarFile);           conf.setJar(localJarFile.getCanonicalPath());         } 这里要注意jarFile,JobConf的构造函数:   public JobConf(Configuration conf, Class aClass) {     this(conf);     String jar = findContainingJar(aClass);     if (jar != null) {       setJar(jar);     }   } 如果 aClass 是在一个jar里面,那么setJar(jar);就会被执行,这个jar会被copy到 LocalJobRunner 或是 JobTracker 的工作目录下面。所以这里有一个原则: 将要执行的MapReduce操作的所有class打包到一个jar中,这样才能执行分布式的MapReduce计算。 再看 JobInProgress.initTasks() 先从Jar中加载InputFormat         String ifClassName = jd.get("mapred.input.format.class");         InputFormat inputFormat;         if (ifClassName != null && localJarFile != null) {           try {             ClassLoader loader =               new URLClassLoader(new URL[]{ localJarFile.toURL() });             Class inputFormatClass = loader.loadClass(ifClassName);             inputFormat = (InputFormat)inputFormatClass.newInstance();           } catch (Exception e) {             throw new IOException(e.toString());           }         } else {           inputFormat = jd.getInputFormat();         } 接下来对文件块的大小进行排序 创建对应的Map任务         this.numMapTasks = splits.length;         // create a map task for each split         this.maps = new TaskInProgress[numMapTasks];         for (int i = 0; i < numMapTasks; i++) {             maps = new TaskInProgress(jobFile, splits, jobtracker, conf, this);         } 创建Reduce任务         this.reduces = new TaskInProgress[numReduceTasks];         for (int i = 0; i < numReduceTasks; i++) {             reduces = new TaskInProgress(jobFile, maps, i, jobtracker, conf, this);         } 最后对于每Split的信息进行缓存,并且创建状态类         for (int i = 0; i < maps.length; i++) {             String hints[][] = fs.getFileCacheHints(splits.getFile(), splits.getStart(), splits.getLength());             cachedHints.put(maps.getTIPId(), hints);         }         this.status = new JobStatus(status.getJobId(), 0.0f, 0.0f, JobStatus.RUNNING); 现在轮到 TaskInProgress,它将Job里面的Map和Reduce操作进行了封装,但是JobInProgress.initTasks()仅仅对task进行了初始化,并没有执行Task,经过一番跟踪,发现Task的执行,是由 TaskTracker 来处理。 TaskTracker,实现了TaskUmbilicalProtocol接口。在之前的文章中,LocalJobRunner的内部类Job也实现了这个接口,这里对比一下: 接口 JobSubmissionProtocol:   LocalJobRunner <---> JobTracker 接口 TaskUmbilicalProtocol:    LocalJobRunner.Job <---> TaskTracker 下面对TaskTracker进行分析,首先也是从main入口开始。 TaskTracker实现了Runnable,main实例化TaskTracker对象,然后执行run()方法。 在构造函数中,主要进行初始化         this.mapOutputFile = new MapOutputFile();         this.mapOutputFile.setConf(conf);         initialize(); initialize()里面,初始化一些变量值 ,然后初始化RPC服务器:         while (true) {             try {                 this.taskReportServer = RPC.getServer(this, this.taskReportPort, maxCurrentTasks, false, this.fConf);                 this.taskReportServer.start();                 break;             } catch (BindException e) {                 LOG.info("Could not open report server at " + this.taskReportPort + ", trying new port");                 this.taskReportPort++;             }                  }         while (true) {             try {                 this.mapOutputServer = new MapOutputServer(mapOutputPort, maxCurrentTasks);                 this.mapOutputServer.start();                 break;             } catch (BindException e) {                 LOG.info("Could not open mapoutput server at " + this.mapOutputPort + ", trying new port");                 this.mapOutputPort++;             }         } mapOutputServer使用一个循环来尝试各个端口绑定。 最后一句         this.jobClient = (InterTrackerProtocol) RPC.getProxy(InterTrackerProtocol.class, jobTrackAddr, this.fConf); 这里有一个新的接口InterTrackerProtocol,是TaskTracker和中央JobTracker通讯用的协议。通过这个接口, TaskTracker可以用来执行JobTracker中的Task了。接下来分析TaskServer的主流程,run()函数。 run()中, 有两个while循环。在内部while循环里面,执行 offerService() 方法。它里面也是一个while循环,开始几段代码用于JobTracker的心跳监测。接下来,它通过协议接口调用JobTracker,获取Task并执行:             if (mapTotal < maxCurrentTasks || reduceTotal < maxCurrentTasks) {                 Task t = jobClient.pollForNewTask(taskTrackerName);                 if (t != null) {                     TaskInProgress tip = new TaskInProgress(t, this.fConf);                     synchronized (this) {                       tasks.put(t.getTaskId(), tip);                       if (t.isMapTask()) {                           mapTotal++;                       } else {                           reduceTotal++;                       }                       runningTasks.put(t.getTaskId(), tip);                     }                     tip.launchTask();                 }             } tip.launchTask(); 开始执行这个Task,在方法内部:             this.runner = task.createRunner(TaskTracker.this);             this.runner.start(); Task 有两个子类 MapTask和ReduceTask,它们的createRunner()方法都会创建一个TaskRunner的子类,TaskRunner继承Thread,run()方法中:       String sep = System.getProperty("path.separator");       File workDir = new File(new File(t.getJobFile()).getParent(), "work");       workDir.mkdirs();                       StringBuffer classPath = new StringBuffer();       // start with same classpath as parent process       classPath.append(System.getProperty("java.class.path"));       classPath.append(sep);       JobConf job = new JobConf(t.getJobFile());       String jar = job.getJar();       if (jar != null) {                      // if jar exists, it into workDir         unJar(new File(jar), workDir);         File[] libs = new File(workDir, "lib").listFiles();         if (libs != null) {           for (int i = 0; i < libs.length; i++) {             classPath.append(sep);            // add libs from jar to classpath             classPath.append(libs);           }         }         classPath.append(sep);         classPath.append(new File(workDir, "classes"));         classPath.append(sep);         classPath.append(workDir);       } 获取工作目录,获取classpath。然后解压工作任务的jar包。       //  Build exec child jmv args.       Vector vargs = new Vector(8);       File jvm =                                  // use same jvm as parent         new File(new File(System.getProperty("java.home"), "bin"), "java");       vargs.add(jvm.toString());       String javaOpts = handleDeprecatedHeapSize(           job.get("mapred.child.java.opts", "-Xmx200m"),           job.get("mapred.child.heap.size"));       javaOpts = replaceAll(javaOpts, "@taskid@", t.getTaskId());       int port = job.getInt("mapred.task.tracker.report.port", 50050) + 1;       javaOpts = replaceAll(javaOpts, "@port@", Integer.toString(port));       String [] javaOptsSplit = javaOpts.split(" ");       for (int i = 0; i < javaOptsSplit.length; i++) {          vargs.add(javaOptsSplit);       }       // Add classpath.       vargs.add("-classpath");       vargs.add(classPath.toString());       // Add main class and its arguments       vargs.add(TaskTracker.Child.class.getName());  // main of Child       vargs.add(tracker.taskReportPort + "");        // pass umbilical port       vargs.add(t.getTaskId());                      // pass task identifier       // Run java       runChild((String[])vargs.toArray(new String[0]), workDir); 这里是构造启动Java进程的classpath和其它vm参数,最后在 runChild 中开一个子进程来执行这个Task。感觉够复杂的。 最后分析TaskTracker的内部类Child。它就是上面子进程执行的类。在main函数中           TaskUmbilicalProtocol umbilical =             (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,                                                 new InetSocketAddress(port), conf);                        Task task = umbilical.getTask(taskid);           JobConf job = new JobConf(task.getJobFile());           conf.addFinalResource(new File(task.getJobFile())); 可见该子进程也是通过RPC跟TaskTracker进行通讯。           startPinging(umbilical, taskid);        // start pinging parent 开一个进程,对TaskTracker进行心跳监测。               String workDir = job.getWorkingDirectory();               if (workDir != null) {                 FileSystem file_sys = FileSystem.get(job);                 file_sys.setWorkingDirectory(new File(workDir));               }               task.run(job, umbilical);           // run the task 这里才真正开始执行Task。 分析到此告一段落,下面开始构造一个分布式执行的环境。


阅读全文(1625) | 回复(0) | 编辑 | 精华
 



发表评论:
昵称:
密码:
主页:
标题:
验证码:  (不区分大小写,请仔细填写,输错需重写评论内容!)



站点首页 | 联系我们 | 博客注册 | 博客登陆

Sponsored By W3CHINA
W3CHINA Blog 0.8 Processed in 1.373 second(s), page refreshed 144761684 times.
《全国人大常委会关于维护互联网安全的决定》  《计算机信息网络国际联网安全保护管理办法》
苏ICP备05006046号