jFinal Hadoop Hbase插件

大数据的时代,没有hadoop hbase怎么可以,还是老样子,从我公司的spring版本的项目中抽取出来,做成jfinal插件分享给大家。
插件很粗糙,大家可以自己修改

一个很简单的封装,大部分都是原生操作,配置是最简配置,大家可以自己扩展Configuration里面的set属性

最新版本的hbase-client已经实现了内部连接池,所以connection只需要在项目启动的时候获得,项目关闭的时候close即可。


首先加入依赖

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>1.2.3</version>
</dependency>



添加HbasePlugin

package jfinal.plugin.hbase;
import com.jfinal.plugin.IPlugin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
 * jfinal hbase插件
 *
 * @author 孙宇
 */
public class HbasePlugin implements IPlugin {
    private static final Logger logger = LoggerFactory.getLogger(Thread.currentThread().getClass());
    private String quorum;
    private String znode = "/hbase";
    private String encoding = "UTF-8";
    public HbasePlugin(String quorum) {
        this.quorum = quorum;
    }
    @Override
    public boolean start() {
        Configuration config = HBaseConfiguration.create();
        config.set("hbase.zookeeper.quorum", quorum);
        config.set("hbase.zookeeper.znode.parent", znode);
        config.set("hbase.encoding", encoding);
        try {
            Hbase.connection = ConnectionFactory.createConnection(config);
        } catch (IOException e) {
            e.printStackTrace();
        }
        Hbase.aggregationClient = new AggregationClient(config);
        return true;
    }
    @Override
    public boolean stop() {
        if (!Hbase.connection.isClosed()) {
            try {
                Hbase.connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        try {
            Hbase.aggregationClient.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return true;
    }
}




添加Hbase工具类

package jfinal.plugin.hbase;

import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
 * hbase原生java操作工具类
 *
 * @author 孙宇
 */
public class Hbase {

    private static final Logger logger = LoggerFactory.getLogger(Thread.currentThread().getClass());
    private static final String aggregateImplementationCoprocessor = "org.apache.hadoop.hbase.coprocessor.AggregateImplementation";
    public static Connection connection;
    public static AggregationClient aggregationClient;
    public static LongColumnInterpreter longColumnInterpreter = new LongColumnInterpreter();


    private static Admin getAdmin(Connection connection) {
        try {
            return connection.getAdmin();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    public static void createTable(String tableName,
                                   String[] familyNames) {
        Admin admin = getAdmin(connection);
        try {
            HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
            hTableDescriptor.addCoprocessor(aggregateImplementationCoprocessor);
            for (String familyName : familyNames) {
                HColumnDescriptor family = new HColumnDescriptor(familyName);
                hTableDescriptor.addFamily(family);
            }
            admin.createTable(hTableDescriptor);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                admin.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public static void addCoprocessor(String tableName,
                                      String className) {
        Admin admin = getAdmin(connection);
        try {
            admin.disableTable(TableName.valueOf(tableName));
            HTableDescriptor hTableDescriptor = admin.getTableDescriptor(TableName.valueOf(tableName));
            hTableDescriptor.addCoprocessor(className);
            admin.modifyTable(TableName.valueOf(tableName), hTableDescriptor);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                admin.enableTable(TableName.valueOf(tableName));
            } catch (IOException e) {
                e.printStackTrace();
            }
            try {
                admin.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public static void deleteTable(String tableName) {
        Admin admin = getAdmin(connection);
        try {
            admin.disableTable(TableName.valueOf(tableName));
            admin.deleteTable(TableName.valueOf(tableName));
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                admin.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public static HTableDescriptor tableDescriptor(String tableName) {
        Admin admin = getAdmin(connection);
        HTableDescriptor desc = null;
        Table table = null;
        try {
            table = connection.getTable(TableName.valueOf(tableName));
            desc = table.getTableDescriptor();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                table.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
            try {
                admin.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return desc;
    }

    public static void put(String tableName,
                           Put put) {
        put(tableName, Arrays.asList(put));
    }

    public static void put(String tableName,
                           List<Put> puts) {
        Table table = null;
        try {
            table = connection.getTable(TableName.valueOf(tableName));
            table.put(puts);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                table.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public static List<Result> scan(String tableName) {
        return scan(tableName, new Scan());
    }

    public static List<Result> scan(String tableName,
                                    Integer pageSize) {
        return scan(tableName, new Scan(), pageSize);
    }

    public static List<Result> scan(String tableName,
                                    Scan scan) {
        return scan(tableName, scan, null);
    }

    public static List<Result> scan(String tableName,
                                    Scan scan,
                                    Integer pageSize) {
        List<Result> results = new ArrayList<>();
        ResultScanner resultScanner = null;
        Table table = null;
        try {
            table = connection.getTable(TableName.valueOf(tableName));
            resultScanner = table.getScanner(scan);
            if (pageSize != null) {
                Result[] rs = resultScanner.next(pageSize);
                for (Result result : rs) {
                    results.add(result);
                }
            } else {
                for (Result result : resultScanner) {
                    results.add(result);
                }
            }
        } catch (IOException e) {
            logger.error(ExceptionUtils.getStackTrace(e));
        } finally {
            if (resultScanner != null) {
                resultScanner.close();
            }
            try {
                table.close();
            } catch (IOException e) {
                logger.error(ExceptionUtils.getStackTrace(e));
            }
        }
        return results;
    }

    public static void delete(String tableName,
                              Delete del) {
        Table table = null;
        try {
            table = connection.getTable(TableName.valueOf(tableName));
            table.delete(del);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                table.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public static Result get(String tableName,
                             Get g) {
        Result[] result = get(tableName, Arrays.asList(g));
        if (result != null && result.length > 0) {
            return result[0];
        }
        return null;
    }

    public static Result[] get(String tableName,
                               List<Get> gets) {
        Table table = null;
        try {
            table = connection.getTable(TableName.valueOf(tableName));
            return table.get(gets);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                table.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return null;
    }

    public static long rowCount(String tableName,
                                Scan scan) {
        try {
            return aggregationClient.rowCount(TableName.valueOf(tableName), longColumnInterpreter, scan);
        } catch (Throwable e) {
            e.printStackTrace();
        }
        return 0;
    }

}




再来一个测试类吧,都是原生操作,很容易,就不写注释了,英文都很简单

package test.jfinal.plugin.hbase;
import jfinal.plugin.hbase.Hbase;
import jfinal.plugin.hbase.HbasePlugin;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import java.util.List;
/**
 * @author 孙宇
 */
public class TestHbase {
    private static HbasePlugin p;
    @BeforeClass
    public static void beforeClass() {
        // p = new HbasePlugin("127.0.0.1:2181");
        p = new HbasePlugin("node67:2181,node68:2181,node69:2181");
        p.start();
    }
    @AfterClass
    public static void afterClass() {
        p.stop();
    }
    @Test    public void t1() {
        Hbase.createTable("sunyutable", new String[]{"f1", "f2"});
    }
    @Test    @Ignore    public void t2() {
        Hbase.deleteTable("sunyutable");
    }
    @Test    public void t3() {
        HTableDescriptor desc = Hbase.tableDescriptor("sunyutable");
        System.out.println(desc);
    }
    @Test    public void t4() {
        Put put = new Put(Bytes.toBytes("row1"));
        put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("q1"), Bytes.toBytes("f1列族q1列的值111"));
        put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("q2"), Bytes.toBytes("f1列族q2列的值111"));
        Hbase.put("sunyutable", put);
    }
    @Test    public void t5() {
        List<Result> results = Hbase.scan("sunyutable");
        for (Result result : results) {
            byte[] value = result.getValue(Bytes.toBytes("f1"), Bytes.toBytes("q1"));
            System.out.println(Bytes.toString(value));
        }
    }
    @Test    public void t7() {
        Delete del = new Delete(Bytes.toBytes("row1"));
        Hbase.delete("sunyutable", del);
    }
    @Test    public void t8() {
        Get g = new Get(Bytes.toBytes("row1"));
        Result result = Hbase.get("sunyutable", g);
        byte[] value = result.getValue(Bytes.toBytes("f1"), Bytes.toBytes("q1"));
        System.out.println(Bytes.toString(value));
    }
    @Test    @Ignore    public void t9() {
        Hbase.addCoprocessor("sunyutable", "org.apache.hadoop.hbase.coprocessor.AggregateImplementation");
    }
    @Test    public void t10() {
        System.out.println(Hbase.rowCount("sunyutable", new Scan()));
    }
}




更多的API调用与操作,请看这里

https://hbase.apache.org/book.html


源码的话,请看这里

https://git.oschina.net/sypro/jfinalplugin.git


spring版本的话,请看这里

https://git.oschina.net/sypro/demo.git


评论区

JFinal

2016-09-20 12:02

第一次分享 Hadoop Hbase 这样的插件,大数据时代必然的需求,超赞

sphsyv

2016-09-20 12:10

@JFinal 我的项目使用了activemq/hbase/kafka/mongodb/redis/solr/quartz等技术,以后逐渐分享出来吧,因为我都是用spring方式写的,有空就转几个出来。我现在在做车联网行业,都是大数据,有问题大家可以共同探讨哦。

JFinal

2016-09-20 12:31

@sphsyv 还请多多分享,这样社区的文化与氛围就慢慢形成了,社区还有几个重要功能上线以后,站长也会每天发布分享出来

IvyHelen

2016-09-21 16:07

@sphsyv 期待分享,这几个都是大数据行业比较典型的技术选型,期待学习。

LeoLian

2016-09-27 10:49

@sphsyv 同行啊,我也在车联网行业,做OBD的

sphsyv

2016-09-27 11:59

@LeoLian 哈,我以前公司也有OBD,现在公司做新能源

LeoLian

2016-09-30 15:57

@sphsyv 你不会是思建的吧

erzheyao

2018-11-28 21:24

热门分享

扫码入社