大数据的时代,没有hadoop hbase怎么可以,还是老样子,从我公司的spring版本的项目中抽取出来,做成jfinal插件分享给大家。
插件很粗糙,大家可以自己修改
一个很简单的封装,大部分都是原生操作,配置是最简配置,大家可以自己扩展Configuration里面的set属性
最新版本的hbase-client已经实现了内部连接池,所以connection只需要在项目启动的时候获得,项目关闭的时候close即可。
首先加入依赖
<dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.3</version> </dependency>
添加HbasePlugin
package jfinal.plugin.hbase; import com.jfinal.plugin.IPlugin; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.coprocessor.AggregationClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; /** * jfinal hbase插件 * * @author 孙宇 */ public class HbasePlugin implements IPlugin { private static final Logger logger = LoggerFactory.getLogger(Thread.currentThread().getClass()); private String quorum; private String znode = "/hbase"; private String encoding = "UTF-8"; public HbasePlugin(String quorum) { this.quorum = quorum; } @Override public boolean start() { Configuration config = HBaseConfiguration.create(); config.set("hbase.zookeeper.quorum", quorum); config.set("hbase.zookeeper.znode.parent", znode); config.set("hbase.encoding", encoding); try { Hbase.connection = ConnectionFactory.createConnection(config); } catch (IOException e) { e.printStackTrace(); } Hbase.aggregationClient = new AggregationClient(config); return true; } @Override public boolean stop() { if (!Hbase.connection.isClosed()) { try { Hbase.connection.close(); } catch (IOException e) { e.printStackTrace(); } } try { Hbase.aggregationClient.close(); } catch (IOException e) { e.printStackTrace(); } return true; } }
添加Hbase工具类
package jfinal.plugin.hbase; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.coprocessor.AggregationClient; import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; /** * hbase原生java操作工具类 * * @author 孙宇 */ public class Hbase { private static final Logger logger = LoggerFactory.getLogger(Thread.currentThread().getClass()); private static final String aggregateImplementationCoprocessor = "org.apache.hadoop.hbase.coprocessor.AggregateImplementation"; public static Connection connection; public static AggregationClient aggregationClient; public static LongColumnInterpreter longColumnInterpreter = new LongColumnInterpreter(); private static Admin getAdmin(Connection connection) { try { return connection.getAdmin(); } catch (IOException e) { e.printStackTrace(); } return null; } public static void createTable(String tableName, String[] familyNames) { Admin admin = getAdmin(connection); try { HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName)); hTableDescriptor.addCoprocessor(aggregateImplementationCoprocessor); for (String familyName : familyNames) { HColumnDescriptor family = new HColumnDescriptor(familyName); hTableDescriptor.addFamily(family); } admin.createTable(hTableDescriptor); } catch (IOException e) { e.printStackTrace(); } finally { try { admin.close(); } catch (IOException e) { e.printStackTrace(); } } } public static void addCoprocessor(String tableName, String className) { Admin admin = getAdmin(connection); try { admin.disableTable(TableName.valueOf(tableName)); HTableDescriptor hTableDescriptor = admin.getTableDescriptor(TableName.valueOf(tableName)); hTableDescriptor.addCoprocessor(className); admin.modifyTable(TableName.valueOf(tableName), hTableDescriptor); } catch (IOException e) { e.printStackTrace(); } finally { try { admin.enableTable(TableName.valueOf(tableName)); } catch (IOException e) { e.printStackTrace(); } try { admin.close(); } catch (IOException e) { e.printStackTrace(); } } } public static void deleteTable(String tableName) { Admin admin = getAdmin(connection); try { admin.disableTable(TableName.valueOf(tableName)); admin.deleteTable(TableName.valueOf(tableName)); } catch (IOException e) { e.printStackTrace(); } finally { try { admin.close(); } catch (IOException e) { e.printStackTrace(); } } } public static HTableDescriptor tableDescriptor(String tableName) { Admin admin = getAdmin(connection); HTableDescriptor desc = null; Table table = null; try { table = connection.getTable(TableName.valueOf(tableName)); desc = table.getTableDescriptor(); } catch (IOException e) { e.printStackTrace(); } finally { try { table.close(); } catch (IOException e) { e.printStackTrace(); } try { admin.close(); } catch (IOException e) { e.printStackTrace(); } } return desc; } public static void put(String tableName, Put put) { put(tableName, Arrays.asList(put)); } public static void put(String tableName, List<Put> puts) { Table table = null; try { table = connection.getTable(TableName.valueOf(tableName)); table.put(puts); } catch (IOException e) { e.printStackTrace(); } finally { try { table.close(); } catch (IOException e) { e.printStackTrace(); } } } public static List<Result> scan(String tableName) { return scan(tableName, new Scan()); } public static List<Result> scan(String tableName, Integer pageSize) { return scan(tableName, new Scan(), pageSize); } public static List<Result> scan(String tableName, Scan scan) { return scan(tableName, scan, null); } public static List<Result> scan(String tableName, Scan scan, Integer pageSize) { List<Result> results = new ArrayList<>(); ResultScanner resultScanner = null; Table table = null; try { table = connection.getTable(TableName.valueOf(tableName)); resultScanner = table.getScanner(scan); if (pageSize != null) { Result[] rs = resultScanner.next(pageSize); for (Result result : rs) { results.add(result); } } else { for (Result result : resultScanner) { results.add(result); } } } catch (IOException e) { logger.error(ExceptionUtils.getStackTrace(e)); } finally { if (resultScanner != null) { resultScanner.close(); } try { table.close(); } catch (IOException e) { logger.error(ExceptionUtils.getStackTrace(e)); } } return results; } public static void delete(String tableName, Delete del) { Table table = null; try { table = connection.getTable(TableName.valueOf(tableName)); table.delete(del); } catch (IOException e) { e.printStackTrace(); } finally { try { table.close(); } catch (IOException e) { e.printStackTrace(); } } } public static Result get(String tableName, Get g) { Result[] result = get(tableName, Arrays.asList(g)); if (result != null && result.length > 0) { return result[0]; } return null; } public static Result[] get(String tableName, List<Get> gets) { Table table = null; try { table = connection.getTable(TableName.valueOf(tableName)); return table.get(gets); } catch (IOException e) { e.printStackTrace(); } finally { try { table.close(); } catch (IOException e) { e.printStackTrace(); } } return null; } public static long rowCount(String tableName, Scan scan) { try { return aggregationClient.rowCount(TableName.valueOf(tableName), longColumnInterpreter, scan); } catch (Throwable e) { e.printStackTrace(); } return 0; } }
再来一个测试类吧,都是原生操作,很容易,就不写注释了,英文都很简单
package test.jfinal.plugin.hbase; import jfinal.plugin.hbase.Hbase; import jfinal.plugin.hbase.HbasePlugin; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; import java.util.List; /** * @author 孙宇 */ public class TestHbase { private static HbasePlugin p; @BeforeClass public static void beforeClass() { // p = new HbasePlugin("127.0.0.1:2181"); p = new HbasePlugin("node67:2181,node68:2181,node69:2181"); p.start(); } @AfterClass public static void afterClass() { p.stop(); } @Test public void t1() { Hbase.createTable("sunyutable", new String[]{"f1", "f2"}); } @Test @Ignore public void t2() { Hbase.deleteTable("sunyutable"); } @Test public void t3() { HTableDescriptor desc = Hbase.tableDescriptor("sunyutable"); System.out.println(desc); } @Test public void t4() { Put put = new Put(Bytes.toBytes("row1")); put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("q1"), Bytes.toBytes("f1列族q1列的值111")); put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("q2"), Bytes.toBytes("f1列族q2列的值111")); Hbase.put("sunyutable", put); } @Test public void t5() { List<Result> results = Hbase.scan("sunyutable"); for (Result result : results) { byte[] value = result.getValue(Bytes.toBytes("f1"), Bytes.toBytes("q1")); System.out.println(Bytes.toString(value)); } } @Test public void t7() { Delete del = new Delete(Bytes.toBytes("row1")); Hbase.delete("sunyutable", del); } @Test public void t8() { Get g = new Get(Bytes.toBytes("row1")); Result result = Hbase.get("sunyutable", g); byte[] value = result.getValue(Bytes.toBytes("f1"), Bytes.toBytes("q1")); System.out.println(Bytes.toString(value)); } @Test @Ignore public void t9() { Hbase.addCoprocessor("sunyutable", "org.apache.hadoop.hbase.coprocessor.AggregateImplementation"); } @Test public void t10() { System.out.println(Hbase.rowCount("sunyutable", new Scan())); } }
更多的API调用与操作,请看这里
https://hbase.apache.org/book.html
源码的话,请看这里
https://git.oschina.net/sypro/jfinalplugin.git
spring版本的话,请看这里
https://git.oschina.net/sypro/demo.git