Hbase API 简单封装
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>欢迎转载,转载请注明出处-VirgoArt,www.cnblogs.com
一、Web项目链接配置(这里可以优化成连接池)(关于构造,是为了满足JUnit测试需求)
package com.pj.util; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; public class HBaseConnectionUtil implements ApplicationListener<ContextRefreshedEvent> { public static Connection connection; private static boolean isCreate = false; public HBaseConnectionUtil() { if (!isCreate) { this.onApplicationEvent(null); } } /** 这里结合Spring配置,实现初始化完成后的消息事件触发 */ @Override public void onApplicationEvent(ContextRefreshedEvent arg0) { if (!isCreate) { Configuration conf = HBaseConfiguration.create(); conf.set(SystemConfigUtil.getInstance().getProValue("HBASE_ZK_PATH"), SystemConfigUtil.getInstance().getProValue("HBASE_ZK_IP")); try { connection = ConnectionFactory.createConnection(conf); } catch (IOException e) { e.printStackTrace(); } isCreate = true; } } }
View Code
二、常用的表级、列级操作
package com.pj.util; import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.LoggerFactory; import com.pj.entity.ResultBean; import ch.qos.logback.classic.Logger; public class HBaseUtil { public final static Logger log = (Logger) LoggerFactory.getLogger(HBaseUtil.class); /** 封装Table * * @deprecated 建议使用getTablePlus */ @Deprecated public static Table getTable(String tableName) { try { return HBaseConnectionUtil.connection.getTable(TableName.valueOf(tableName)); } catch (IOException e) { log.info(tableName + "表对象创建失败."); e.printStackTrace(); return null; } } public static ResultBean<Table> getTablePlus(String tableName) { ResultBean<Table> res = null; try { Table table = HBaseConnectionUtil.connection.getTable(TableName.valueOf(tableName)); res = new ResultBean<Table>(table); } catch (IOException e) { log.info(tableName + "表对象创建失败."); res = new ResultBean<>(e); } return res; } /** 通过Get进行单条查询,返回为单条结果 * * @param table * @param rowKey * @param clazz * @return */ @SuppressWarnings({ "deprecation", "unchecked" }) public static <T> ResultBean<T> getDataFBean(Table table, String rowKey, Class<?> clazz) { Get get = new Get(Bytes.toBytes(rowKey)); Result result = null; try { result = table.get(get); } catch (IOException e) { return new ResultBean<T>(e); } T t = null; try { t = (T) clazz.newInstance(); } catch (InstantiationException e) { return new ResultBean<T>(e); } catch (IllegalAccessException e) { return new ResultBean<T>(e); } Method[] methods = clazz.getMethods(); try { if (null == result) { return new ResultBean<T>(true, "Get Null result"); } for (KeyValue keyValue : result.list()) { String row = Bytes.toString(result.getRow()); String k = Bytes.toString(keyValue.getQualifier()); String v = Bytes.toString(keyValue.getValue()); for (Method method : methods) { // 如果Bean中存在rowKey字段,用Key进行赋值 if (method.getName().equalsIgnoreCase("setRowKey")) { try { method.invoke(t, row); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (IllegalArgumentException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } } if (method.getName().equalsIgnoreCase("set" + k)) { try { method.invoke(t, v); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (IllegalArgumentException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } } } } } catch (Exception e) { return new ResultBean<T>(e); } return new ResultBean<T>(t); } /** 封装结果集 */ @Deprecated public static ResultScanner getResultScanner(Table table, Scan scan) { try { return table.getScanner(scan); } catch (IOException e) { log.info("扫描" + table.getName() + "失败."); e.printStackTrace(); return null; } } /** get方式查询数据 * * @param table * @param getList * @param clazz * @return */ public static List<?> getResultScannerPlus(Table table, List<Get> getList, Class<?> clazz) { List<Object> list = new ArrayList<Object>(); Result[] results; try { results = table.get(getList); for (Result result : results) { Object instance = clazz.newInstance(); for (Cell kv : result.rawCells()) { Field field = clazz.getDeclaredField(new String(CellUtil.cloneQualifier(kv))); field.setAccessible(true); field.set(instance, new String(CellUtil.cloneValue(kv))); } list.add(instance); } } catch (Exception e) { e.printStackTrace(); } return list; } public static ResultBean<ResultScanner> getResultScannerPlus(Table table, Scan scan) { try { //scan.setCaching(1000); return new ResultBean<ResultScanner>(table.getScanner(scan)); } catch (IOException e) { log.info("扫描" + table.getName() + "失败."); e.printStackTrace(); return new ResultBean<>(e); } } /** 封装结果数据Map */ @SuppressWarnings({ "unchecked", "deprecation" }) public static <T> Map<String, T> getResultObjectMap(ResultScanner resultScanner, Class<?> clazz) { Map<String, T> resultMap = new HashMap<String, T>(); Method[] methods = clazz.getMethods(); for (Result res : resultScanner) { String key = Bytes.toString(res.getRow()); T t = null; try { t = (T) clazz.newInstance(); } catch (InstantiationException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } for (KeyValue keyValue : res.list()) { String k = Bytes.toString(keyValue.getQualifier()); String v = Bytes.toString(keyValue.getValue()); for (Method method : methods) { // 如果Bean中存在rowKey字段,用Key进行赋值 if (method.getName().equalsIgnoreCase("setRowKey")) { try { method.invoke(t, key); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (IllegalArgumentException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } } if (method.getName().equalsIgnoreCase("set" + k)) { try { method.invoke(t, v); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (IllegalArgumentException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } } } } resultMap.put(key, t); } return resultMap; } /** 封装结果数据List */ @SuppressWarnings({ "unchecked", "deprecation" }) public static <T> List<T> getResultObjectList_bck(ResultScanner resultScanner, Class<?> clazz) { List<T> resultList = new ArrayList<>(); Method[] methods = clazz.getMethods(); for (Result res : resultScanner) { String key = Bytes.toString(res.getRow()); T t = null; try { t = (T) clazz.newInstance(); } catch (InstantiationException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } for (int i = 0; i < res.list().size(); i++) { KeyValue keyValue = res.list().get(i); String k = Bytes.toString(keyValue.getQualifier()); String v = Bytes.toString(keyValue.getValue()); for (int j = 0; j < methods.length; i++) { Method method = methods[i]; if (method.getName().equalsIgnoreCase("setRowKey")) { try { method.invoke(t, key); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (IllegalArgumentException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } } if (method.getName().equalsIgnoreCase("set" + k)) { try { method.invoke(t, v); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (IllegalArgumentException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } } } } /* * for (KeyValue keyValue : res.list()) { String k = * Bytes.toString(keyValue.getQualifier()); String v = * Bytes.toString(keyValue.getValue()); for (Method method : methods) { // * 如果Bean中存在rowKey字段,用Key进行赋值 if * (method.getName().equalsIgnoreCase("setRowKey")) { try { method.invoke(t, * key); } catch (IllegalAccessException e) { e.printStackTrace(); } catch * (IllegalArgumentException e) { e.printStackTrace(); } catch * (InvocationTargetException e) { e.printStackTrace(); } } if * (method.getName().equalsIgnoreCase("set" + k)) { try { method.invoke(t, v); } * catch (IllegalAccessException e) { e.printStackTrace(); } catch * (IllegalArgumentException e) { e.printStackTrace(); } catch * (InvocationTargetException e) { e.printStackTrace(); } } } } */ resultList.add(t); } return resultList; } @SuppressWarnings({ "deprecation", "unchecked" }) public static <T> List<T> getResultObjectList(ResultScanner resultScanner, Class<?> clazz) { List<T> resultList = new ArrayList<>(); // Method[] methods = clazz.getMethods(); for (Result res : resultScanner) { String key = Bytes.toString(res.getRow()); T t = null; try { t = (T) clazz.newInstance(); } catch (InstantiationException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } for (int i = 0; i < res.list().size(); i++) { KeyValue keyValue = res.list().get(i); String k = Bytes.toString(keyValue.getQualifier()); String v = Bytes.toString(keyValue.getValue()); Field field = null; try { if (!k.equals("serialVersionUID")) { if (i == 1) { field = clazz.getDeclaredField("rowKey"); field.setAccessible(true); field.set(t, key); } field = clazz.getDeclaredField(k); field.setAccessible(true); field.set(t, v); } } catch (IllegalArgumentException | NoSuchFieldException | SecurityException | IllegalAccessException e) { e.printStackTrace(); } } resultList.add(t); } return resultList; } /** 封装Put get */ public static Put getPut(String rowKye) { Put put = new Put(Bytes.toBytes(rowKye)); return put; } /** 封装Put set * * @throws IOException */ @Deprecated public static void setPut(Table table, Put put) throws IOException { table.put(put); } public static ResultBean<String> setPutPlus(Table table, Put put) { try { table.put(put); return new ResultBean<String>("Put Success"); } catch (IOException e) { e.printStackTrace(); return new ResultBean<String>(e); } } /** 封装Put add */ @SuppressWarnings("deprecation") public static Put putAdd(Put put, String colunmFamily, String colunmName, String value) { put.add(Bytes.toBytes(colunmFamily), Bytes.toBytes(colunmName), Bytes.toBytes(value)); return put; } /** 封装Delete,ROWKEY delete */ @Deprecated public static void deleteRowByRowKey(Table table, String rowKey) { Delete delete = new Delete(Bytes.toBytes(rowKey)); try { table.delete(delete); } catch (IOException e) { log.info(rowKey + "行删除失败."); e.printStackTrace(); } } public static ResultBean<String> deleteRowByRowKeyPlus(Table table, String rowKey) { Delete delete = new Delete(Bytes.toBytes(rowKey)); try { table.delete(delete); return new ResultBean<String>("Delete Success"); } catch (IOException e) { log.info(rowKey + "行删除失败."); return new ResultBean<String>(e); } } /** Close:需要调用时注意序列,从小到大进行排列 */ public static void close(Object... objects) { for (Object o : objects) { if (null != o) { if (o instanceof ResultScanner) { ((ResultScanner) o).close(); } else if (o instanceof Scan) { } else if (o instanceof Table) { try { ((Table) o).close(); } catch (IOException e) { e.printStackTrace(); } } else if (o instanceof Connection) { try { ((Connection) o).close(); } catch (IOException e) { e.printStackTrace(); } } else { throw new RuntimeException("未知对象:" + o.getClass().getName()); } } } } /** 列过滤器:SingleColumnValueFilter<br> * 用于在进行条件查询时,通过过滤参数条件自动生成过滤器(单个)<br> * compareOp: * <code>LESS 小于,LESS_OR_EQUAL 小于等于,EQUAL 等于,NOT_EQUAL 不等于,GREATER_OR_EQUAL 大于等于,GREATER 大于</code> */ public static Filter setSingleColumnValueFilter(String colunmFilterString, String splitString, String compareOp) { String[] sp = colunmFilterString.split(splitString); return new SingleColumnValueFilter(Bytes.toBytes(sp[0]), Bytes.toBytes(sp[1]), CompareOp.valueOf(compareOp), Bytes.toBytes(sp[2].toString())); } /** 对列过滤器设计过滤条件:SingleColumnValueFilter<br> * 用于在进行条件查询时,通过对象合成过滤条件 */ public static List<String> setSingleColumnValueFilterParam(String colunmFamily, Object object, String splitString) { List<String> params = new ArrayList<>(); for (Field field : object.getClass().getDeclaredFields()) { Object value = ReflectUtil.getFiledValueByName(field.getName(), object); if (null != value && !"".equals(value.toString())) { params.add(colunmFamily + splitString + field.getName() + splitString + value.toString()); } } return params; } /** 对HTable求Count(在有查询条件时,表现有问题,初步判断是由于FirstKeyOnlyFilter与其他过滤器不兼容) */ public static long getCountByFilter(String tableName, FilterList fl) { long count = 0; Scan scan = new Scan(); scan.setFilter(fl); ResultScanner rs = null; try { rs = HBaseUtil.getTable(tableName).getScanner(scan); Iterator<Result> it = rs.iterator(); while (it.hasNext()) { it.next(); count += 1; } } catch (IOException e) { e.printStackTrace(); } close(rs); return count; } /** 做全表扫描时,根据page对象进行分页(可以通过FilterList传入过滤条件) */ public static <T> List<T> scanTableByPage(String tableName, Page page, Class<?> clazz, FilterList fl) { List<T> rsList = null; Scan scan = new Scan();// 全表扫描Scan Filter pageFilter = null;// 分页过滤器 ResultScanner rs = null; page.setTotalRecord((int) getCountByFilter(tableName, fl));// 设置page对象中总记录数 int pageNow = page.getPageNo();// 当前页 int pageSize = page.getPageSize();// 页大小 int recordSize = page.getTotalPage();// 总条数 int begin = 0;// 内容显示时开始的行号 if (pageNow == 1) { // 首页分页处理,直接返回页大小值 pageFilter = new PageFilter(pageSize); fl.addFilter(pageFilter); scan.setFilter(fl); rs = HBaseUtil.getResultScanner(HBaseUtil.getTable(tableName), scan); rsList = HBaseUtil.getResultObjectList(rs, clazz); } else { begin = (pageNow - 1) * pageSize + 1; if (begin > recordSize) { // 如果是尾页,修改开始扫描行号 begin = (pageNow - 1) * pageSize + 1; } pageFilter = new PageFilter(begin);// 一次性扫描前n页+1的数据 fl.addFilter(pageFilter); scan.setFilter(fl); rs = HBaseUtil.getResultScanner(HBaseUtil.getTable(tableName), scan); Iterator<Result> it = rs.iterator(); int b = 0; byte[] startRow = null; while (it.hasNext()) { b++; if (b == begin) { startRow = it.next().getRow();// 通过迭代获取当前显示页首数据RowKey } it.next(); } pageFilter = new PageFilter(pageSize);// 重置分页过滤器为页面数据大小 fl.addFilter(pageFilter); scan.setFilter(fl); scan.setStartRow(startRow);// 设置起始扫描RowKey rs = HBaseUtil.getResultScanner(HBaseUtil.getTable(tableName), scan); rsList = HBaseUtil.getResultObjectList(rs, clazz); } // page.setTotalRecord(rsList.size());// // 由于getCountByFilter在条件环境下有问题,重新进行值的设定 close(rs); return rsList; } /** 从KeyValue类型中,getKeyString方法获取真实的RowKey */ @SuppressWarnings({ "deprecation", "static-access" }) public static String getRealRowKey(KeyValue kv) { int rowLength = Bytes.toShort(kv.getBuffer(), kv.getOffset() + kv.ROW_OFFSET); String rowKey = Bytes.toStringBinary(kv.getBuffer(), kv.getOffset() + kv.ROW_OFFSET + Bytes.SIZEOF_SHORT, rowLength); return rowKey; } public static Put putAddByBean(Put put, Object obj) { for (Field field : obj.getClass().getDeclaredFields()) { String firstLetter = field.getName().substring(0, 1).toUpperCase(); String getter = "get" + firstLetter + field.getName().substring(1); Object value = null; try { // 构造的GET方法,如果没有,抛异常NoSuchMethodException Method getMethod = obj.getClass().getMethod(getter, new Class[] {}); // 代理obj的GET方法,获取返回值 value = getMethod.invoke(obj, new Object[] {}); } catch (NoSuchMethodException e) { e.printStackTrace(); } catch (SecurityException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (IllegalArgumentException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } if (null == value || value.equals("") || value.equals("null")) { } else { HBaseUtil.putAdd(put, "cf1", field.getName(), value.toString()); } } return put; } }
View Code
三、简单的HDFS文件存储(为了支持Hbase中的文件索引)
package com.pj.util; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; /** 操作分布式集群HDFS工具类 * * @author ChenWen */ public class HDFSOptionUtil { /** 将本地文件上传到HDFS(不清楚为什么本地文件复制到HDFS,API会将本地文件删除掉):方法将true改成false * * @param localPathStr * @param hdfsPathStr * @throws IOException */ public static void localfileUpload(String localPathStr, String hdfsPathStr) throws IOException { Configuration conf = new Configuration(); conf.addResource(new Path("hdfs-site.xml")); conf.addResource(new Path("core-site.xml")); FileSystem fs = FileSystem.get(conf); Path localPath = new Path(localPathStr); Path hdfsPath = new Path(hdfsPathStr); fs.copyFromLocalFile(true, localPath, hdfsPath); } /** HDFS文件下载 * * @param hdfsPathStr * @param localPathStr * @throws IOException */ public static void fsFileDownload(String hdfsPathStr, String localPathStr) throws IOException { Configuration conf = new Configuration(); conf.addResource(new Path("hdfs-site.xml")); conf.addResource(new Path("core-site.xml")); FileSystem fs = FileSystem.get(conf); FSDataInputStream in = null; FileOutputStream out = null; try { File aimFilePath = new File(localPathStr); if (!aimFilePath.exists()) { aimFilePath.mkdirs(); } Path srcPath = new Path(hdfsPathStr); in = fs.open(srcPath); File aimFile = new File(aimFilePath, srcPath.getName()); if (!aimFile.exists()) { aimFile.createNewFile(); } out = new FileOutputStream(aimFile); IOUtils.copyBytes(in, out, 4096, false); } finally { IOUtils.closeStream(in); IOUtils.closeStream(out); } } }
View Code
四、补充
1.使用的SystemConfig是用来获取系统配置文件键值对。
2.ResultBean用来自定义返回值,具有异常栈存储、消息、值存储。