欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Java/Web调用Hadoop进行MapReduce示例代码

程序员文章站 2023-12-13 19:56:46
hadoop环境搭建详见此文章。 我们已经知道hadoop能够通过hadoop jar ***.jar input output的形式通过命令行来调用,那么如何将其封...

hadoop环境搭建详见此文章。

我们已经知道hadoop能够通过hadoop jar ***.jar input output的形式通过命令行来调用,那么如何将其封装成一个服务,让java/web来调用它?使得用户可以用方便的方式上传文件到hadoop并进行处理,获得结果。首先,***.jar是一个hadoop任务类的封装,我们可以在没有jar的情况下运行该类的main方法,将必要的参数传递给它。input 和output则将用户上传的文件使用hadoop的javaapi put到hadoop的文件系统中。然后再通过hadoop的javaapi 从文件系统中取得结果文件。

搭建javaweb工程。本文使用spring、springmvc、mybatis框架, 当然,这不是重点,就算没有使用任何框架也能实现。

项目框架如下:

Java/Web调用Hadoop进行MapReduce示例代码

项目中使用到的jar包如下:

Java/Web调用Hadoop进行MapReduce示例代码Java/Web调用Hadoop进行MapReduce示例代码

在spring的配置文件中,加入

<bean id="multipartresolver" class="org.springframework.web.multipart.commons.commonsmultipartresolver"> 
   <property name="defaultencoding" value="utf-8" /> 
   <property name="maxuploadsize" value="10485760000" /> 
   <property name="maxinmemorysize" value="40960" /> 
</bean> 

使得项目支持文件上传。

新建一个login.jsp 点击登录后进入user/login

Java/Web调用Hadoop进行MapReduce示例代码

user/login中处理登录,登录成功后,【在hadoop文件系统中创建用户文件夹】,然后跳转到console.jsp

package com.chenjie.controller; 
 
import java.io.ioexception; 
  
import javax.annotation.resource; 
 
import javax.servlet.http.httpservletrequest; 
 
import javax.servlet.http.httpservletresponse; 
 
import org.apache.hadoop.conf.configuration; 
 
import org.apache.hadoop.fs.filesystem; 
 
import org.apache.hadoop.fs.path; 
 
import org.springframework.stereotype.controller; 
 
import org.springframework.web.bind.annotation.requestmapping; 

import com.chenjie.pojo.jsonresult; 
 
import com.chenjie.pojo.user; 
 
import com.chenjie.service.userservice; 
 
import com.chenjie.util.appconfig; 
 
import com.google.gson.gson; 
/** 
 
 * 用户请求控制器 
 
 * 
 
 * @author chen 
 
 * 
 
 */ 
 
@controller 
 
// 声明当前类为控制器 
 
@requestmapping("/user") 
 
// 声明当前类的路径 
 
public class usercontroller { 
 
  @resource(name = "userservice") 
 
  private userservice userservice;// 由spring容器注入一个userservice实例 
  /** 
 
   * 登录 
 
   * 
 
   * @param user 
 
   *      用户 
 
   * @param request 
 
   * @param response 
 
   * @throws ioexception 
 
   */ 
 
  @requestmapping("/login") 
 
  // 声明当前方法的路径 
 
  public string login(user user, httpservletrequest request, 
 
      httpservletresponse response) throws ioexception { 
 
    response.setcontenttype("application/json");// 设置响应内容格式为json 
 
    user result = userservice.login(user);// 调用userservice的登录方法 
 
    request.getsession().setattribute("user", result); 
 
    if (result != null) { 
 
      createhadoopfsfolder(result); 
 
      return "console"; 
 
    } 
 
    return "login"; 
 
  } 
 
  public void createhadoopfsfolder(user user) throws ioexception { 
 
    configuration conf = new configuration(); 
 
    conf.addresource(new path("/opt/hadoop-1.2.1/conf/core-site.xml")); 
 
    conf.addresource(new path("/opt/hadoop-1.2.1/conf/hdfs-site.xml")); 
 
 
 
    filesystem filesystem = filesystem.get(conf); 
 
    system.out.println(filesystem.geturi()); 
 
 
 
    path file = new path("/user/" + user.getu_username()); 
 
    if (filesystem.exists(file)) { 
 
      system.out.println("haddop hdfs user foler exists."); 
 
      filesystem.delete(file, true); 
 
      system.out.println("haddop hdfs user foler delete success."); 
 
    } 
 
    filesystem.mkdirs(file); 
 
    system.out.println("haddop hdfs user foler creat success."); 
 
  } 
} 

console.jsp中进行文件上传和任务提交、

Java/Web调用Hadoop进行MapReduce示例代码

文件上传和任务提交:

package com.chenjie.controller; 
 
import java.io.file; 
import java.io.ioexception; 
import java.net.inetsocketaddress; 
import java.net.uri; 
import java.util.arraylist; 
import java.util.iterator; 
import java.util.list; 
 
import javax.servlet.http.httpservletrequest; 
import javax.servlet.http.httpservletresponse; 
 
import org.apache.hadoop.conf.configuration; 
import org.apache.hadoop.fs.fsdatainputstream; 
import org.apache.hadoop.fs.filesystem; 
import org.apache.hadoop.fs.path; 
import org.apache.hadoop.mapred.jobclient; 
import org.apache.hadoop.mapred.jobconf; 
import org.apache.hadoop.mapred.jobid; 
import org.apache.hadoop.mapred.jobstatus; 
import org.apache.hadoop.mapred.runningjob; 
import org.springframework.stereotype.controller; 
import org.springframework.web.bind.annotation.requestmapping; 
import org.springframework.web.multipart.multipartfile; 
import org.springframework.web.multipart.multiparthttpservletrequest; 
import org.springframework.web.multipart.commons.commonsmultipartresolver; 
 
import com.chenjie.pojo.user; 
import com.chenjie.util.utils; 
 
@controller 
// 声明当前类为控制器 
@requestmapping("/hadoop") 
// 声明当前类的路径 
public class hadoopcontroller { 
 
  @requestmapping("/upload") 
  // 声明当前方法的路径 
  //文件上传 
  public string upload(httpservletrequest request, 
      httpservletresponse response) throws ioexception { 
    list<string> filelist = (list<string>) request.getsession() 
        .getattribute("filelist");//得到用户已上传文件列表 
    if (filelist == null) 
      filelist = new arraylist<string>();//如果文件列表为空,则新建 
    user user = (user) request.getsession().getattribute("user"); 
    if (user == null) 
      return "login";//如果用户未登录,则跳转登录页面 
    commonsmultipartresolver multipartresolver = new commonsmultipartresolver( 
        request.getsession().getservletcontext());//得到在spring配置文件中注入的文件上传组件 
    if (multipartresolver.ismultipart(request)) {//如果请求是文件请求 
      multiparthttpservletrequest multirequest = (multiparthttpservletrequest) request; 
 
      iterator<string> iter = multirequest.getfilenames();//得到文件名迭代器 
      while (iter.hasnext()) { 
        multipartfile file = multirequest.getfile((string) iter.next()); 
        if (file != null) { 
          string filename = file.getoriginalfilename(); 
          file folder = new file("/home/chenjie/cjhadooponline/" 
              + user.getu_username()); 
          if (!folder.exists()) { 
            folder.mkdir();//如果文件不目录存在,则在服务器本地创建 
          } 
          string path = "/home/chenjie/cjhadooponline/" 
              + user.getu_username() + "/" + filename; 
 
          file localfile = new file(path); 
 
          file.transferto(localfile);//将上传文件拷贝到服务器本地目录 
          // filelist.add(path); 
        } 
        handleuploadfiles(user, filelist);//处理上传文件 
      } 
 
    } 
    request.getsession().setattribute("filelist", filelist);//将上传文件列表保存在session中 
    return "console";//返回console.jsp继续上传文件 
  } 
 
  @requestmapping("/wordcount") 
  //调用hadoop进行mapreduce 
  public void wordcount(httpservletrequest request, 
      httpservletresponse response) { 
    system.out.println("进入controller wordcount "); 
    user user = (user) request.getsession().getattribute("user"); 
    system.out.println(user); 
    // if(user == null) 
    // return "login"; 
    wordcount c = new wordcount();//新建单词统计任务 
    string username = user.getu_username(); 
    string input = "hdfs://chenjie-virtual-machine:9000/user/" + username 
        + "/wordcountinput";//指定hadoop文件系统的输入文件夹 
    string output = "hdfs://chenjie-virtual-machine:9000/user/" + username 
        + "/wordcountoutput";//指定hadoop文件系统的输出文件夹 
    string reslt = output + "/part-r-00000";//默认输出文件 
    try { 
      thread.sleep(3*1000); 
      c.main(new string[] { input, output });//调用单词统计任务 
      configuration conf = new configuration();//新建hadoop配置 
      conf.addresource(new path("/opt/hadoop-1.2.1/conf/core-site.xml"));//添加hadoop配置,找到hadoop部署信息 
      conf.addresource(new path("/opt/hadoop-1.2.1/conf/hdfs-site.xml"));//hadoop配置,找到文件系统 
 
      filesystem filesystem = filesystem.get(conf);//得打文件系统 
      path file = new path(reslt);//找到输出结果文件 
      fsdatainputstream instream = filesystem.open(file);//打开 
      uri uri = file.touri();//得到输出文件路径 
      system.out.println(uri); 
      string data = null; 
      while ((data = instream.readline()) != null) { 
        //system.out.println(data); 
        response.getoutputstream().println(data);//讲结果文件写回用户网页 
      } 
//     inputstream in = filesystem.open(file); 
//     outputstream out = new fileoutputstream("result.txt"); 
//     ioutils.copybytes(in, out, 4096, true); 
      instream.close(); 
    } catch (exception e) { 
      system.err.println(e.getmessage()); 
    } 
  } 
 
  @requestmapping("/mapreducestates") 
  //得到mapreduce的状态 
  public void mapreduce(httpservletrequest request, 
      httpservletresponse response) { 
    float[] progress=new float[2]; 
    try { 
      configuration conf1=new configuration(); 
      conf1.set("mapred.job.tracker", utils.jobtracker); 
       
      jobstatus jobstatus = utils.getjobstatus(conf1); 
//     while(!jobstatus.isjobcomplete()){ 
//       progress = utils.getmapreduceprogess(jobstatus); 
//       response.getoutputstream().println("map:" + progress[0] + "reduce:" + progress[1]); 
//       thread.sleep(1000); 
//     } 
      jobconf jc = new jobconf(conf1); 
       
      jobclient jobclient = new jobclient(jc); 
      jobstatus[] jobsstatus = jobclient.getalljobs();  
      //这样就得到了一个jobstatus数组,随便取出一个元素取名叫jobstatus  
      jobstatus = jobsstatus[0];  
      jobid jobid = jobstatus.getjobid(); //通过jobstatus获取jobid  
      runningjob runningjob = jobclient.getjob(jobid); //通过jobid得到runningjob对象  
      runningjob.getjobstate();//可以获取作业状态,状态有五种,为jobstatus.failed 、jobstatus.killed、jobstatus.prep、jobstatus.running、jobstatus.succeeded  
      jobstatus.getusername();//可以获取运行作业的用户名。  
      runningjob.getjobname();//可以获取作业名。  
      jobstatus.getstarttime();//可以获取作业的开始时间,为utc毫秒数。  
      float map = runningjob.mapprogress();//可以获取map阶段完成的比例,0~1,  
      system.out.println("map=" + map); 
      float reduce = runningjob.reduceprogress();//可以获取reduce阶段完成的比例。 
      system.out.println("reduce="+reduce); 
      runningjob.getfailureinfo();//可以获取失败信息。  
      runningjob.getcounters();//可以获取作业相关的计数器,计数器的内容和作业监控页面上看到的计数器的值一样。  
       
       
    } catch (ioexception e) { 
      progress[0] = 0; 
      progress[1] = 0; 
    } 
   
    request.getsession().setattribute("map", progress[0]); 
    request.getsession().setattribute("reduce", progress[1]); 
  } 
   
  //处理文件上传 
  public void handleuploadfiles(user user, list<string> filelist) { 
    file folder = new file("/home/chenjie/cjhadooponline/" 
        + user.getu_username()); 
    if (!folder.exists()) 
      return; 
    if (folder.isdirectory()) { 
      file[] files = folder.listfiles(); 
      for (file file : files) { 
        system.out.println(file.getname()); 
        try { 
          putfiletohadoopfsfolder(user, file, filelist);//将单个文件上传到hadoop文件系统 
        } catch (ioexception e) { 
          system.err.println(e.getmessage()); 
        } 
      } 
    } 
  } 
 
  //将单个文件上传到hadoop文件系统 
  private void putfiletohadoopfsfolder(user user, file file, 
      list<string> filelist) throws ioexception { 
    configuration conf = new configuration(); 
    conf.addresource(new path("/opt/hadoop-1.2.1/conf/core-site.xml")); 
    conf.addresource(new path("/opt/hadoop-1.2.1/conf/hdfs-site.xml")); 
 
    filesystem filesystem = filesystem.get(conf); 
    system.out.println(filesystem.geturi()); 
 
    path localfile = new path(file.getabsolutepath()); 
    path foler = new path("/user/" + user.getu_username() 
        + "/wordcountinput"); 
    if (!filesystem.exists(foler)) { 
      filesystem.mkdirs(foler); 
    } 
     
    path hadoopfile = new path("/user/" + user.getu_username() 
        + "/wordcountinput/" + file.getname()); 
//   if (filesystem.exists(hadoopfile)) { 
//     system.out.println("file exists."); 
//   } else { 
//     filesystem.mkdirs(hadoopfile); 
//   } 
    filesystem.copyfromlocalfile(true, true, localfile, hadoopfile); 
    filelist.add(hadoopfile.touri().tostring()); 
 
  } 
 
} 

启动hadoop:

Java/Web调用Hadoop进行MapReduce示例代码

运行结果:

可以在任意平台下,登录该项目地址,上传文件,得到结果。

Java/Web调用Hadoop进行MapReduce示例代码

Java/Web调用Hadoop进行MapReduce示例代码


Java/Web调用Hadoop进行MapReduce示例代码

Java/Web调用Hadoop进行MapReduce示例代码

Java/Web调用Hadoop进行MapReduce示例代码

Java/Web调用Hadoop进行MapReduce示例代码

运行成功。

源代码:https://github.com/tudoupaisimalingshu/cjhadooponline

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。

上一篇:

下一篇: