欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

读取Hive中的数据写入Hbase

程序员文章站 2022-07-14 15:12:05
...

方式一:MR方式

1.首先将hive中的数据使用一定的分隔符生成对应的text文件,然后放到hdfs

public class LngLatOrder {
	public static void main(String[] args) {
		String table = args[0];
		String date = PersonUtils.getDate();
		String path = "/test/"+date+"/lnglat_order";
		String warehouseLocation = "/user/hive/warehouse";
		SparkSession spark = SparkSession
				.builder()
				.appName("LongitudeAndLatitude")
				.config("spark.sql.warehouse.dir", warehouseLocation)
				.enableHiveSupport()
				.getOrCreate();
		String split = "'\004\001'";
		//addr lng lat name mobile buyerid platform orderid
		String sql = "  select concat(name,"+split+",mobile,"+split+",buyerid,"+split
				+",platform,"+split+",addr,"+split+",lng,"+split
				+",lat ) from "+table+" ";
		Dataset<Row> ds = spark.sql(sql);
		ds.write().text(path);
		spark.close();
	}
}

2.读取text文件生成hfiles文件

public class test_demo{
    static Logger logger = LoggerFactory.getLogger(EducationLabel.class);

    public static class BulkLoadMap extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
    	@Override
    	public void map(LongWritable key, Text value,
            Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context)
            throws IOException, InterruptedException {
            String column = context.getConfiguration().get("column");
            String familys = context.getConfiguration().get("columnFamily");
            String rowkeyColumn = context.getConfiguration().get("rowkeyColumn");
            String rowkeyColumnSplit = context.getConfiguration()
                                              .get("rowkeyColumnSplit");
            String splitSymbol = context.getConfiguration().get("splitSymbol");

            if (BulkLoadXMl.isEmpty(rowkeyColumnSplit)) {
                rowkeyColumnSplit = "|";
            }

            String[] f = familys.split("\\" + rowkeyColumnSplit);
            String[] columF = column.split("\\" + rowkeyColumnSplit);

            if (f.length != columF.length) {
                System.err.println("定义的列和数据的个数不匹配");
            }

            String[] line = value.toString().split(splitSymbol);
            boolean checkColumnNumber = BulkLoadXMl.checkColumnNumber(columF,
                    line, ",");

            if (checkColumnNumber) {
                HashMap<String, String> mapF = new HashMap<String, String>(16);

                for (int i = 0; i < f.length; i++) {
                    for (int j = 0; j < columF.length; j++) {
                        mapF.put(f[i], columF[i]);
                    }
                }

                Object[] colums = BulkLoadXMl.columnNames(column.split("\\" +
                            rowkeyColumnSplit));
                Map<String, String> map = new HashMap<String, String>(16);

                for (int i = 0; i < colums.length; i++) {
                    String str = (String) colums[i];
                    map.put(str, line[i]);
                }
                Set<Map.Entry<String, String>> columFamilyAndCloums = mapF.entrySet();
               
                for (Map.Entry<String, String> cfc : columFamilyAndCloums) {
                    String family = (String) cfc.getKey();
                    String[] cols = ((String) cfc.getValue()).split(",");
                    Map<String, String> rowOneColumns = new HashMap<String, String>(16);

                    for (int i = 0; i < cols.length; i++) {
                        rowOneColumns.put(cols[i],
                            (String) map.get(cols[i]));
                    }
                    String crc32 = OtherUtils.getEducateCrc32(map);
                    String hkey = OtherUtils.getRowKeyByReversalMobile(rowkeyColumn, map);
                    BulkLoadXMl.contextWrite(rowOneColumns, hkey+crc32, family,context);
                }
            } else {
                System.out.println("列和数据的个数不匹配");
            }
        }
    }
    public static void bulkLoadApp(BulkLoads b) throws Exception {
        Configuration conf = HBaseConfiguration.create();
        String inputPath = b.getInPath();
        String outputPath = b.getOutPath();
        conf = setConf(conf,b);
        org.apache.hadoop.hbase.client.Connection connection=ConnectionFactory.createConnection(conf);
        HTable hTable = (HTable) connection.getTable(TableName.valueOf(b.getTable()));
        HbaseUtil.createTable(b.getTable(), b.getColumnFamily()
        		.split("\\"+b.getRowkeyColumnSplit()), Integer.parseInt(b.getNumRegions()));
        try {
            Job job = Job.getInstance(conf, "education label");
            job.setJarByClass(EducationLabel.class);
            job.setMapperClass(BulkLoadMap.class);
            job.setMapOutputKeyClass(ImmutableBytesWritable.class);
            job.setMapOutputValueClass(Put.class);
            job.setSpeculativeExecution(false);
            job.setReduceSpeculativeExecution(false);
            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(HFileOutputFormat2.class);

            FileInputFormat.setInputPaths(job, inputPath);
            FileOutputFormat.setOutputPath(job, new Path(outputPath));

            //hTable = new HTable(conf, b.getTable());

            hTable.setWriteBufferSize(6291456L);
            hTable.setAutoFlushTo(false);
            RegionLocator regionLocator=new HRegionLocator(TableName.valueOf(b.getTable()), (ClusterConnection) connection);
            HFileOutputFormat2.configureIncrementalLoad(job, hTable, regionLocator);

            if (job.waitForCompletion(true)) {
                System.out.println(
                    "================BulkLoad开始执行================");

                FsShell shell = new FsShell(conf);

                try {
                    shell.run(new String[] {
                            "-chmod", "-R", "777", b.getOutPath()
                        });
                } catch (Exception e) {
                	EducationLabel.logger.error("Couldnt change the file permissions ",
                        e);
                    throw new IOException(e);
                }

            } else {
            	EducationLabel.logger.error("loading failed.");
                System.exit(1);
            }
        } catch (IllegalArgumentException e) {
            e.printStackTrace();
        } finally {
            if (hTable != null) {
                hTable.close();
            }
        }
    }
    public static void main(String[] args) throws Exception {
        ArrayList<BulkLoads> bi = BulkLoadXMl.getBulkLoadInfo();

        for (BulkLoads b : bi) {
            bulkLoadApp(b);
        }
    }
   
}

配置文件的读取: 

public class BulkLoadXMl {
	public static ArrayList<BulkLoads> getBulkLoadInfo() {
		try {
			DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
			DocumentBuilder db = factory.newDocumentBuilder();
			Document doc = db.parse("bulkloads.xml");
			NodeList nodes = doc.getChildNodes();
			Node root = nodes.item(0);
			NodeList bulkloads = root.getChildNodes();
			ArrayList<BulkLoads> list = new ArrayList<BulkLoads>();
			BulkLoads b = new BulkLoads();
			for(int i=0;i<bulkloads.getLength();i++){
				Node bulkload = bulkloads.item(i);
				NodeList texts = bulkload.getChildNodes();
				for (int j = 0; j < texts.getLength(); j++) {
					Node text = texts.item(j);
					if (text.getNodeName().equals("splitSymbol")) {
						b.setSplitSymbol(text.getTextContent());
					} else if (text.getNodeName().equals("inPath")) {
						b.setInPath(text.getTextContent());
					} else if (text.getNodeName().equals("outPath")) {
						b.setOutPath(text.getTextContent());
					} else if (text.getNodeName().equals("table")) {
						b.setTable(text.getTextContent());
					} else if (text.getNodeName().equals("numRegions")) {
						b.setNumRegions(text.getTextContent());
					} else if (text.getNodeName().equals("columnFamily")) {
						b.setColumnFamily(text.getTextContent());
					} else if (text.getNodeName().equals("rowkeyColumn")) {
						b.setRowkeyColumn(text.getTextContent());
					}else if (text.getNodeName().equals("rowkeyColumnSplit")) {
						b.setRowkeyColumnSplit(text.getTextContent());
					}else if (text.getNodeName().equals("column")) {
						b.setColumn(text.getTextContent());
					}
				}
			}
			list.add(b);
			return list;
		} catch (ParserConfigurationException e) {
			e.printStackTrace();
		} catch (SAXException se) {
			se.printStackTrace();
		} catch (IOException ie) {
			ie.printStackTrace();
		}
		return null;
	}
	/**
	 * 判断是否为null
	 */
	public static boolean isEmpty(String s){
		if(s == null || s.equals("")) return true ;
		else return false;
	}
	/**
	 * 获取所有的colum
	 */
	public static Object[] columnNames(String[] array){
		ArrayList<String> list = new ArrayList<String>();
		for(int i=0;i<array.length;i++){
			String[] co = array[i].split(",");
			for (String s : co) {
				list.add(s);
			}
		}
		return list.toArray();
	}
	/**
	 * 第一个参数要分割的字符串 
	 * 第二个参数用什么分割
	 * 第三个参数是取出集合中的值
	 */
	public static String getRowKey(String rowkeyColumn,String rowkeyColumnSplit,Map<String,String> map){
		String[] split = rowkeyColumn.split(",");
		StringBuilder sb = new StringBuilder ();
		for (String str : split) {
			sb.append(map.get(str));
			sb.append(rowkeyColumnSplit);
		}
		String string = sb.toString();
		String hkey = string.substring(0, string.lastIndexOf(rowkeyColumnSplit));
		return hkey;
	}
	/**
	 * 第一个参数是map
	 * 第二个参数是 rowkey
	 * 第三个是列族
	 * 第四个参数context
	 */
	@SuppressWarnings("unchecked")
	public static void contextWrite(Map<String,String> map,String hkey,String family,org.apache.hadoop.mapreduce.Mapper.Context context){
		try {
			Set<Entry<String,String>> entrySet = map.entrySet();
			for (Entry<String, String> entry : entrySet) {
				String col = entry.getKey(); 
				String hvalue = entry.getValue();  
				final byte[] rowKey = Bytes.toBytes(hkey);  
				final ImmutableBytesWritable HKey = new ImmutableBytesWritable(rowKey);  
				Put HPut = new Put(rowKey);  
				//其实不推荐关闭WAL,不过关了的确可以提升性能...因为HBase在写数据前会先把操作持久化在WAL中,以保证在异常情况下,HBase可以按照WAL的记录来恢复还未持久化的数据。
				//HPut.setWriteToWAL(false);
				byte[] cell = Bytes.toBytes(hvalue);  
				HPut.add(Bytes.toBytes(family), Bytes.toBytes(col), cell);  
				context.write(HKey, HPut);
			}
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	public static boolean isEq(String name1,String name2){
		if(name1.equals(name2)){
			return false;
		}else{
			return true;
		}
	}
	public static String split(String[] linesArray){
		StringBuffer sb = new StringBuffer();
		for (String str : linesArray) {
			sb.append(str);
			sb.append("\t");
		}
		return sb.toString()+"\r\n";
	}
	/**
	 * 添加
	 * @throws InterruptedException 
	 * @throws IOException 
	 */
	private static String name = null;
	private static StringBuffer buffer = new StringBuffer(); 
	private static StringBuffer outBuffer = new StringBuffer();
	private static boolean flag = true;
	public static HashMap<String, String>  saveSameMobileAndDistrinct(Text value,Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context) throws IOException, InterruptedException{
		HashMap<String, String> map = new HashMap<String, String>();
		String[] linesArray = value.toString().split("\t");
		if(name != null){
			if(!isEq(name,linesArray[0])){
				outBufferAppend(value);	 
			}else{
				flag = false;
				judgeMoblie(outBuffer.toString(),context);
				clear(); flag= true;
				append(linesArray[0],value);   
			}
		}else{
			//首次添加
			append(linesArray[0],value);  
		}
		map.put("flag", flag+"");
		map.put("value", outBuffer.toString());
		return  map;
	}
	
	public static void append(String name,Text value){
		bufferAppend(name);
		outBufferAppend(value);
	}
	public static void  bufferAppend(String ba){
		buffer.append(ba);
		name=ba;
	}
	public static void clear(){
		buffer.delete(0, buffer.length());
		outBuffer.delete(0, outBuffer.length());
	}
	public static void outBufferAppend(Text value){
		outBuffer.append(value);
		outBuffer.append("\r\n");
	}
	/**
	 * 添加单个联系人
	 * @throws InterruptedException 
	 * @throws IOException 
	 */
	public static void savePerson(String person,Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context) throws IOException, InterruptedException{
		String column = context.getConfiguration().get("column");
		String familys = context.getConfiguration().get("columnFamily");
		String tableName = context.getConfiguration().get("table");
		String[] values = person.split("\t");
		String[] colums = column.split(",");
		String rz = HbaseUtil.repairZero("0");
		String hkey = colums[0]+"_"+rz;
		HashMap<String, String> map = new HashMap<String,String>();
		for(int i=0;i<colums.length;i++){
			if(colums.length != values.length){
				 continue;
			}
			map.put(colums[i]+"_"+rz, values[i]);
		}
		Set<Entry<String,String>> es = map.entrySet();
		for (Entry<String, String> entry : es) {
			String col = entry.getKey(); 
			String hvalue = entry.getValue();  
			final byte[] rowKey = Bytes.toBytes(map.get(hkey));  
			final ImmutableBytesWritable HKey = new ImmutableBytesWritable(rowKey);  
			Put HPut = new Put(rowKey);  
			byte[] cell = Bytes.toBytes(hvalue);  
			HPut.add(Bytes.toBytes(familys), Bytes.toBytes(col), cell);  
			context.write(HKey, HPut);
		}
		HbaseUtil.insert(tableName, map.get(colums[0]+"_"+rz), familys, "total", 0+1+"");
	}
	/**
	 * 添加多个联系人
	 * @throws InterruptedException 
	 * @throws IOException 
	 */
	public static void saveAssociatePersons(String persons,Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context) throws IOException, InterruptedException{
		String column = context.getConfiguration().get("column");
		String familys = context.getConfiguration().get("columnFamily");
		String tableName = context.getConfiguration().get("table");
		
		String[] person = persons.split("\r\n");
		int j;
		for(j=0;j<person.length;j++){
			String rz = HbaseUtil.repairZero(j+"");
			String[] values = person[j].split("\t");
			String[] colums = column.split(",");
			String hkey = values[0];
			HashMap<String, String> map = new HashMap<String,String>();
			for(int i=0;i<colums.length;i++){
				if(colums.length != values.length){
					 continue;
				}
				map.put(colums[i]+"_"+rz, values[i]);
			}
			putHbase(map,hkey,familys,context);
			HbaseUtil.insert(tableName, map.get(colums[0]+"_"+rz), familys, "total", j+1+"");
		}
	}
	
	/**
	 *把数据添加hbase当中
	 */
	public static void putHbase(Map<String, String> map,String hkey ,String familys,Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context) throws IOException, InterruptedException{
		Set<Entry<String,String>> es = map.entrySet();
		for (Entry<String, String> entry : es) {
			String col = entry.getKey(); 
			String hvalue = entry.getValue();  
			final byte[] rowKey = Bytes.toBytes(hkey);  
			final ImmutableBytesWritable HKey = new ImmutableBytesWritable(rowKey);  
			Put HPut = new Put(rowKey);  
			byte[] cell = Bytes.toBytes(hvalue);  
			HPut.add(Bytes.toBytes(familys), Bytes.toBytes(col), cell);  
			context.write(HKey, HPut);
		}
	}
	/**
	 * 判断手机号
	 * @throws InterruptedException 
	 * @throws IOException 
	 */
	public static void judgeMoblie(String mobile,Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context) throws IOException, InterruptedException{
		try {
			int length = mobile.toString().split("\r\n").length;
			if(length > 1){
				//有多个联系人
				saveAssociatePersons(mobile.toString(),context);
			}else{
				//就只有一个联系人
				savePerson(mobile.toString(),context);
			}
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	//加载最后一个手机号
	public static void lastDate(String lastData,Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context) throws IOException, InterruptedException{
		if(lastData != null && lastData != ""){
			judgeMoblie(lastData,context);
		}
	}
	
	//字符串翻轉
	public static String reverse(String str){  
        return new StringBuilder(str).reverse().toString();  
    }
	
	public static boolean checkColumnNumber(String[] XmlColumnFamilys, String[] line, String splitSymbol){
	    int xmlColumNumber = 0;
	    for (String XmlColumnFamily : XmlColumnFamilys){
	      String[] xmlColums = XmlColumnFamily.split(splitSymbol);
	      xmlColumNumber += xmlColums.length;
	    }
	    if (xmlColumNumber == line.length){
	    	return true;
	    }else {
//	    	System.out.println("xmlColumNumber :" +xmlColumNumber+" ; line.length:"+line.length+"       ");
	    	return false;
	    }
	  }
	
	//rowkey=moblie+crc
	public static String rowkey(String hkey,Map<String,String> map){  
		return reverse(hkey)+"|" + Person2HbaseUtil.getCrc32(map);
    }
  }

 

方式二:程序从hive 或者其他的OLTP中传入Hbase有很多种方式,要是小量数据的话,可以直接读取hive中的数据put到hbase中就可以了。 

package com.hdshu.spark.npm.hbase.gongan;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import com.hdshu.spark.utils.HBaseJobUtils;
import com.hdshu.spark.utils.SparkJobUtils;
import com.hdshu.spark.utils.property.PropertyUtil;

import scala.Tuple2;



public class Test_Demo_Spark {
public static void main(String[] args) throws IOException {
		
	    SparkSession spark = SparkSession
	      .builder()
	      .appName("Test_Demo_Spark ")
	      .config("spark.sql.warehouse.dir", SparkJobUtils.warehouseLocation)
	      .config("spark.sql.shuffle.partitions", SparkJobUtils.npmod_spark_sql_shuffle_partitions)
	      .config("spark.storage.memoryFraction","0.3")
	      .config("spark.shuffle.memoryFraction","0.6")
	      .config("spark.default.parallelism","200")
	      .enableHiveSupport()
	      .getOrCreate();
	    
	  
	    String npmod_sql1 = " select reverse(cast(mobile as string)) as rowkey,cast(v as string) as orders from test.test  " ;
	    Dataset<Row> npmod_DF1 = spark.sql(npmod_sql1);
	    npmod_DF1.show();
		JavaRDD<Row> javaRDD = npmod_DF1.javaRDD();
		JavaPairRDD<ImmutableBytesWritable, Put> mapToPair2 = javaRDD.mapToPair(new PairFunction<Row, ImmutableBytesWritable, Put>() {

			private static final long serialVersionUID = -6175819829004359177L;

			@Override
			public Tuple2<ImmutableBytesWritable, Put> call(Row row)
					throws Exception {
				
				String rowKey = row.getAs("rowkey");
				
				String orders = row.getAs("orders");
				
				Put put = new Put(Bytes.toBytes(rowKey));
	            
	            // 组织 列族  order 信息
				put.addColumn(Bytes.toBytes("orders"), Bytes.toBytes("orders"), Bytes.toBytes(orders));
				//(new ImmutableBytesWritable, put)
				return new Tuple2<ImmutableBytesWritable, Put>(new ImmutableBytesWritable(), put);
			}
		});

		
		String hbase_zookeeper_quorum =  HBaseJobUtils.hbase_BD_zookeeper_quorum;
		String output_table =PropertyUtil.getString("inc.hbase.PDS.order","inc.properties");

		Configuration conf = new Configuration();
		conf.set("hbase.zookeeper.quorum", hbase_zookeeper_quorum );
	    conf.set("hbase.zookeeper.property.clientPort", "2181");
	    conf.set(TableOutputFormat.OUTPUT_TABLE, "half_year_orders");
		
		Job job = Job.getInstance(conf);
		job.setOutputKeyClass(ImmutableBytesWritable.class);
		job.setOutputValueClass(Result.class);
		job.setOutputFormatClass(TableOutputFormat.class);
		
		mapToPair2.saveAsNewAPIHadoopDataset(job.getConfiguration());
	    
		spark.close();
		
	}

}

最后可以写查询接口了

连接池的初始化

private static  Connection hTablePool;
	
	private static final String ZOOKEEPER = "hbase.zookeeper.quorum";
    public synchronized static Connection gethTablePool() {
                if (hTablePool == null) {
                	conf = new Configuration();
            		conf.set(ZOOKEEPER, host);
            		conf.set("hbase.client.retries.number", "3"); 
            		conf.set("hbase.rpc.timeout", "600000"); 
            		conf.set("hbase.client.operation.timeout", "1200000"); 
            		conf.set("hbase.client.scanner.timeout.period", "600000");
            		conf.set("hbase.zookeeper.property.clientPort", port);
            		
            		try {
						hTablePool = ConnectionFactory.createConnection(conf);
					} catch (IOException e) {
						e.printStackTrace();
					}
        }
        return hTablePool;
    }
public String queryTestDemo(String tableName,String mobile) throws Exception{
		mobile=StringUtils.reverse(mobile);
		String orders = getMobile(HbaseApi.getOneRow(tableName, mobile));
		return orders;
	}


public static Result getOneRow(String tableName, String rowKey) throws IOException {
			Result rsResult = null;
			Table table = null;
			try {
				table = hTablePool.getTable(TableName.valueOf(tableName));
				Get get = new Get(rowKey.getBytes()) ;
				rsResult = table.get(get) ;
			} catch (Exception e) {
				e.printStackTrace() ;
			}
			finally	{
				close(table);
			}
			return rsResult;
		}
public static String getMobile(Result r){
		Cell[] rc = r.rawCells();
		for (int i=0;i<rc.length;i++) {
			Cell cell = rc[i];
			if(i==0) return new String(CellUtil.cloneValue(cell));
		}
		return null;
	}

使用spark方式写入

在使用Spark时经常需要把数据落入HBase中,如果使用普通的Java API,写入会速度很慢。还好Spark提供了Bulk写入方式的接口。那么Bulk写入与普通写入相比有什么优势呢?

  • BulkLoad不会写WAL,也不会产生flush以及split。
  • 如果我们大量调用PUT接口插入数据,可能会导致大量的GC操作。除了影响性能之外,严重时甚至可能会对HBase节点的稳定性造成影响。但是采用Bulk就不会有这个顾虑。
  • 过程中没有大量的接口调用消耗性能
    import org.apache.hadoop.hbase.client.{Put, Result}
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.hadoop.mapreduce.Job
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.hadoop.conf.Configuration
    
    /**
      * Created by shaonian
      */
    object HBaseBulk {
    
      def main(args: Array[String]): Unit = {
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Bulk")
      val sc = new SparkContext(sparkConf)
    
      val conf = new Configuration()
      conf.set("hbase.zookeeper.quorum", "zk1,zk2,zk3")
      conf.set("hbase.zookeeper.property.clientPort", "2181")
      conf.set(TableOutputFormat.OUTPUT_TABLE, "bulktest")
      val job = Job.getInstance(conf)
      job.setOutputKeyClass(classOf[ImmutableBytesWritable])
      job.setOutputValueClass(classOf[Result])
      job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
    
      val init = sc.makeRDD(Array("1,james,32", "2,lebron,30", "3,harden,28"))
      val rdd = init.map(_.split(",")).map(arr => {
        val put = new Put(Bytes.toBytes(arr(0)))
        put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("name"), Bytes.toBytes(arr(1)))
        put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("age"), Bytes.toBytes(arr(2).toInt))
        (new ImmutableBytesWritable, put)
      })
      rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)
      sc.stop()
      }

     

 

 

 

 

相关标签: spark