package com.cma.hbase.test;
import java.io.BufferedInputStream; import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.Reader; import java.util.ArrayList; import java.util.List; import org.apache.commons.io.FileUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; import com.cma.hbase.Constants; import com.cma.hbase.entity.DataModel; import com.cma.hbase.tools.HbaseUtils; public class HbaseTest { public static void main(String[] args) { 创建一张表 createTable("hello_baby",new String[]{"code","ws","wd","t","ps","rh","vis","r"}); 写入一条数据 writeRecord("hello_baby","row1","code","","code"); writeRecord("hello_baby","row1","ws","","ws"); writeRecord("hello_baby","row1","wd","","wd"); writeRecord("hello_baby","row1","t","","t"); writeRecord("hello_baby","row1","ps","","ps"); writeRecord("hello_baby","row1","rh","","rh"); writeRecord("hello_baby","row1","vis","","vis"); writeRecord("hello_baby","row1","r","","r"); 写入一组数据 writeRecordList("hello_baby"); //查询出一条数据 getRecord("hello_baby","row1"); 删除一行 deleteRecord("hello_baby","row1"); 删除一张表 dropTable("hello_baby"); 清空一张表 clearTable("hello_baby"); } /** * 清空一张表 * @param string */ private static void clearTable(String tableName) { Configuration cfg = HbaseUtils.getCfg(); try { HBaseAdmin admin = new HBaseAdmin(cfg); } catch (MasterNotRunningException e) { e.printStackTrace(); } catch (ZooKeeperConnectionException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); }finally{ System.out.println("tableName: "+tableName+" drop over!"); } } /** * 写入一组数据 * @param tableName */ private static void writeRecordList(String tableName) { Long start = System.currentTimeMillis(); Configuration cfg = HbaseUtils.getCfg(); try { HTable htable = new HTable(cfg,tableName); List<Put> puts = getPuts(); System.out.println(puts.size()); htable.put(puts); } catch (IOException e) { e.printStackTrace(); }finally{ System.out.println("tableName:"+tableName+" write over!"); } Long end = System.currentTimeMillis(); System.out.println("cost time: "+(end -start)); } private static List<Put> getPuts() { List<Put> putList = new ArrayList<Put>(); List<Put> putRecord = null; try { List<String> lines = FileUtils.readLines(new File("/home/guest/data/201307310800.csv")); for(int i = 1 ;i < lines.size();i++){ putRecord = getPutsByLine(lines.get(i)); putList.addAll(putRecord); } } catch (IOException e) { e.printStackTrace(); } return putList; } /** * 获得一组Put * @param line * @return */ private static List<Put> getPutsByLine(String line) { List<Put> puts = new ArrayList<Put>(); Put put = null; if(StringUtils.isNotBlank(line)){ String[] columns = line.split(","); String[] families = Constants.FAMILIES; String rowKey = "201307310800"+columns[0]; for(int i = 0;i < columns.length;i++){ String family = families[i]; String qualifier = ""; String value = columns[i]; put = getPut(rowKey,family,qualifier,value); puts.add(put); } } return puts; } /** * 组装一个Put * @param rowKey * @param family * @param qualifier * @param value * @return */ private static Put getPut(String rowKey, String family, String qualifier, String value) { Put put = new Put(Bytes.toBytes(rowKey)); put.add(Bytes.toBytes(family), Bytes.toBytes(qualifier), Bytes.toBytes(value)); return put; } /** * 查询出一条数据 * @param tableName * @param rowKey */ private static void getRecord(String tableName, String rowKey) { Configuration cfg = HbaseUtils.getCfg(); try { HTable htable = new HTable(cfg,tableName); Get get = new Get(Bytes.toBytes(rowKey)); Result rs = htable.get(get); for(KeyValue kv : rs.raw()){ System.out.print(new String(kv.getRow())+" *** "); System.out.print(new String(kv.getFamily())+" *** "); System.out.print(new String(kv.getQualifier())+" *** "); System.out.print(new String(kv.getValue())); System.out.println(); } } catch (IOException e) { e.printStackTrace(); } } /** * 删除一张表 * @param tableName */ private static void dropTable(String tableName) { Configuration cfg = HbaseUtils.getCfg(); try { HBaseAdmin admin = new HBaseAdmin(cfg); admin.disableTable(tableName); admin.deleteTable(tableName); } catch (MasterNotRunningException e) { e.printStackTrace(); } catch (ZooKeeperConnectionException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); }finally{ System.out.println("tableName: "+tableName+" drop over!"); } } /** * 删除一行 * @param tableName * @param rowKey */ private static void deleteRecord(String tableName, String rowKey) { Configuration cfg = HbaseUtils.getCfg(); try { HTable htable = new HTable(cfg,tableName); Delete del = new Delete(Bytes.toBytes(rowKey)); htable.delete(del); } catch (IOException e) { e.printStackTrace(); }finally{ System.out.println("rowKey: "+rowKey+" delete over!"); } } /** * 存储一列 * @param tableName * @param rowKey * @param family * @param qualifier * @param value */ private static void writeRecord(String tableName,String rowKey,String family,String qualifier,String value) { Configuration cfg = HbaseUtils.getCfg(); try { HTable htable = new HTable(cfg,tableName); Put put = new Put(Bytes.toBytes(rowKey)); put.add(Bytes.toBytes(family), Bytes.toBytes(qualifier), Bytes.toBytes(value)); htable.put(put); } catch (IOException e) { e.printStackTrace(); }finally{ System.out.println("family: "+family+" put over!"); } } /** * 创建一张表 * @param tableName * @param families */ private static void createTable(String tableName,String[] families) { HBaseAdmin admin = null; try { Configuration cfg = HbaseUtils.getCfg(); System.out.println(cfg); admin = new HBaseAdmin(cfg); if(admin.tableExists(tableName)){ System.out.println("表:"+tableName+" 已经存在!"); }else{ HTableDescriptor tableDesc = new HTableDescriptor(tableName); for(String column : families){ tableDesc.addFamily(new HColumnDescriptor(column)); } admin.createTable(tableDesc); } } catch (MasterNotRunningException e) { e.printStackTrace(); } catch (ZooKeeperConnectionException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); }finally{ System.out.println("over!"); } } }