大数据的时代,没有hadoop hbase怎么可以,还是老样子,从我公司的spring版本的项目中抽取出来,做成jfinal插件分享给大家。
插件很粗糙,大家可以自己修改
一个很简单的封装,大部分都是原生操作,配置是最简配置,大家可以自己扩展Configuration里面的set属性
最新版本的hbase-client已经实现了内部连接池,所以connection只需要在项目启动的时候获得,项目关闭的时候close即可。
首先加入依赖
<dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.3</version> </dependency>
添加HbasePlugin
package jfinal.plugin.hbase;
import com.jfinal.plugin.IPlugin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* jfinal hbase插件
*
* @author 孙宇
*/
public class HbasePlugin implements IPlugin {
private static final Logger logger = LoggerFactory.getLogger(Thread.currentThread().getClass());
private String quorum;
private String znode = "/hbase";
private String encoding = "UTF-8";
public HbasePlugin(String quorum) {
this.quorum = quorum;
}
@Override
public boolean start() {
Configuration config = HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum", quorum);
config.set("hbase.zookeeper.znode.parent", znode);
config.set("hbase.encoding", encoding);
try {
Hbase.connection = ConnectionFactory.createConnection(config);
} catch (IOException e) {
e.printStackTrace();
}
Hbase.aggregationClient = new AggregationClient(config);
return true;
}
@Override
public boolean stop() {
if (!Hbase.connection.isClosed()) {
try {
Hbase.connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
try {
Hbase.aggregationClient.close();
} catch (IOException e) {
e.printStackTrace();
}
return true;
}
}
添加Hbase工具类
package jfinal.plugin.hbase;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* hbase原生java操作工具类
*
* @author 孙宇
*/
public class Hbase {
private static final Logger logger = LoggerFactory.getLogger(Thread.currentThread().getClass());
private static final String aggregateImplementationCoprocessor = "org.apache.hadoop.hbase.coprocessor.AggregateImplementation";
public static Connection connection;
public static AggregationClient aggregationClient;
public static LongColumnInterpreter longColumnInterpreter = new LongColumnInterpreter();
private static Admin getAdmin(Connection connection) {
try {
return connection.getAdmin();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
public static void createTable(String tableName,
String[] familyNames) {
Admin admin = getAdmin(connection);
try {
HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
hTableDescriptor.addCoprocessor(aggregateImplementationCoprocessor);
for (String familyName : familyNames) {
HColumnDescriptor family = new HColumnDescriptor(familyName);
hTableDescriptor.addFamily(family);
}
admin.createTable(hTableDescriptor);
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void addCoprocessor(String tableName,
String className) {
Admin admin = getAdmin(connection);
try {
admin.disableTable(TableName.valueOf(tableName));
HTableDescriptor hTableDescriptor = admin.getTableDescriptor(TableName.valueOf(tableName));
hTableDescriptor.addCoprocessor(className);
admin.modifyTable(TableName.valueOf(tableName), hTableDescriptor);
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
admin.enableTable(TableName.valueOf(tableName));
} catch (IOException e) {
e.printStackTrace();
}
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void deleteTable(String tableName) {
Admin admin = getAdmin(connection);
try {
admin.disableTable(TableName.valueOf(tableName));
admin.deleteTable(TableName.valueOf(tableName));
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static HTableDescriptor tableDescriptor(String tableName) {
Admin admin = getAdmin(connection);
HTableDescriptor desc = null;
Table table = null;
try {
table = connection.getTable(TableName.valueOf(tableName));
desc = table.getTableDescriptor();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
table.close();
} catch (IOException e) {
e.printStackTrace();
}
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return desc;
}
public static void put(String tableName,
Put put) {
put(tableName, Arrays.asList(put));
}
public static void put(String tableName,
List<Put> puts) {
Table table = null;
try {
table = connection.getTable(TableName.valueOf(tableName));
table.put(puts);
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
table.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static List<Result> scan(String tableName) {
return scan(tableName, new Scan());
}
public static List<Result> scan(String tableName,
Integer pageSize) {
return scan(tableName, new Scan(), pageSize);
}
public static List<Result> scan(String tableName,
Scan scan) {
return scan(tableName, scan, null);
}
public static List<Result> scan(String tableName,
Scan scan,
Integer pageSize) {
List<Result> results = new ArrayList<>();
ResultScanner resultScanner = null;
Table table = null;
try {
table = connection.getTable(TableName.valueOf(tableName));
resultScanner = table.getScanner(scan);
if (pageSize != null) {
Result[] rs = resultScanner.next(pageSize);
for (Result result : rs) {
results.add(result);
}
} else {
for (Result result : resultScanner) {
results.add(result);
}
}
} catch (IOException e) {
logger.error(ExceptionUtils.getStackTrace(e));
} finally {
if (resultScanner != null) {
resultScanner.close();
}
try {
table.close();
} catch (IOException e) {
logger.error(ExceptionUtils.getStackTrace(e));
}
}
return results;
}
public static void delete(String tableName,
Delete del) {
Table table = null;
try {
table = connection.getTable(TableName.valueOf(tableName));
table.delete(del);
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
table.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static Result get(String tableName,
Get g) {
Result[] result = get(tableName, Arrays.asList(g));
if (result != null && result.length > 0) {
return result[0];
}
return null;
}
public static Result[] get(String tableName,
List<Get> gets) {
Table table = null;
try {
table = connection.getTable(TableName.valueOf(tableName));
return table.get(gets);
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
table.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return null;
}
public static long rowCount(String tableName,
Scan scan) {
try {
return aggregationClient.rowCount(TableName.valueOf(tableName), longColumnInterpreter, scan);
} catch (Throwable e) {
e.printStackTrace();
}
return 0;
}
}
再来一个测试类吧,都是原生操作,很容易,就不写注释了,英文都很简单
package test.jfinal.plugin.hbase;
import jfinal.plugin.hbase.Hbase;
import jfinal.plugin.hbase.HbasePlugin;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import java.util.List;
/**
* @author 孙宇
*/
public class TestHbase {
private static HbasePlugin p;
@BeforeClass
public static void beforeClass() {
// p = new HbasePlugin("127.0.0.1:2181");
p = new HbasePlugin("node67:2181,node68:2181,node69:2181");
p.start();
}
@AfterClass
public static void afterClass() {
p.stop();
}
@Test public void t1() {
Hbase.createTable("sunyutable", new String[]{"f1", "f2"});
}
@Test @Ignore public void t2() {
Hbase.deleteTable("sunyutable");
}
@Test public void t3() {
HTableDescriptor desc = Hbase.tableDescriptor("sunyutable");
System.out.println(desc);
}
@Test public void t4() {
Put put = new Put(Bytes.toBytes("row1"));
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("q1"), Bytes.toBytes("f1列族q1列的值111"));
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("q2"), Bytes.toBytes("f1列族q2列的值111"));
Hbase.put("sunyutable", put);
}
@Test public void t5() {
List<Result> results = Hbase.scan("sunyutable");
for (Result result : results) {
byte[] value = result.getValue(Bytes.toBytes("f1"), Bytes.toBytes("q1"));
System.out.println(Bytes.toString(value));
}
}
@Test public void t7() {
Delete del = new Delete(Bytes.toBytes("row1"));
Hbase.delete("sunyutable", del);
}
@Test public void t8() {
Get g = new Get(Bytes.toBytes("row1"));
Result result = Hbase.get("sunyutable", g);
byte[] value = result.getValue(Bytes.toBytes("f1"), Bytes.toBytes("q1"));
System.out.println(Bytes.toString(value));
}
@Test @Ignore public void t9() {
Hbase.addCoprocessor("sunyutable", "org.apache.hadoop.hbase.coprocessor.AggregateImplementation");
}
@Test public void t10() {
System.out.println(Hbase.rowCount("sunyutable", new Scan()));
}
}
更多的API调用与操作,请看这里
https://hbase.apache.org/book.html
源码的话,请看这里
https://git.oschina.net/sypro/jfinalplugin.git
spring版本的话,请看这里
https://git.oschina.net/sypro/demo.git