jFinal自带的redis插件只支持单机模式,不支持集群,我写了个支持集群模式的插件
这个插件不但支持集群模式,还修复了集群模式下,没有keys、flushDB、dbSize等方法的调用。
注意:这个插件依赖阿里的fastjson。
集群模式与单机模式,完全不同的东西,单机是先创建JedisPool,然后调用里面getResource()方法来获得Jedis对象,然后使用Jedis来操作redis。
集群使用的是JedisCluster,他本身就实现了pool功能,所以不需要自己关闭连接,但是创建的方式与单机完全不一样,他是需要构造
Set<HostAndPort> jedisClusterNodes
来告诉redis使用了哪些服务器,而且集群模式是不支持密码的。
首先定义插件类,用于初始化集群模式的JedisCluster对象
package test.sunyu.tools.redis;
import com.alibaba.fastjson.JSON;
import com.jfinal.plugin.IPlugin;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPoolConfig;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* jFinal的redis插件不支持redis集群模式,我自己扩展一个吧
*
* @author 孙宇
*/
public class RedisClusterPlugin implements IPlugin {
private Integer maxTotal = 1000;
private Integer maxIdle = 200;
private Long maxWaitMillis = 2000L;
private Boolean testOnBorrow = true;
private String cluster;
private String defaultName = "main";
public RedisClusterPlugin(String cluster) {
this.cluster = cluster;
}
@Override
public boolean start() {
if (cluster != null) {
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(maxTotal);
jedisPoolConfig.setMaxIdle(maxIdle);
jedisPoolConfig.setMaxWaitMillis(maxWaitMillis);
jedisPoolConfig.setTestOnBorrow(testOnBorrow);
Set<HostAndPort> jedisClusterNodes = new HashSet<HostAndPort>();
List<Map> clusterList = JSON.parseArray(cluster, Map.class);
if (clusterList != null && clusterList.size() > 0) {
for (Map<String, String> c : clusterList) {
jedisClusterNodes.add(new HostAndPort(c.get("host"), Integer.parseInt(c.get("port"))));
}
JedisCluster jedisCluster = new JedisCluster(jedisClusterNodes, jedisPoolConfig);
RedisClusterTools.addJedisCluster(defaultName, jedisCluster);
}
return true;
}
return false;
}
@Override
public boolean stop() {
return true;
}
public Integer getMaxTotal() {
return maxTotal;
}
public void setMaxTotal(Integer maxTotal) {
this.maxTotal = maxTotal;
}
public Integer getMaxIdle() {
return maxIdle;
}
public void setMaxIdle(Integer maxIdle) {
this.maxIdle = maxIdle;
}
public Long getMaxWaitMillis() {
return maxWaitMillis;
}
public void setMaxWaitMillis(Long maxWaitMillis) {
this.maxWaitMillis = maxWaitMillis;
}
public Boolean getTestOnBorrow() {
return testOnBorrow;
}
public void setTestOnBorrow(Boolean testOnBorrow) {
this.testOnBorrow = testOnBorrow;
}
public String getCluster() {
return cluster;
}
public void setCluster(String cluster) {
this.cluster = cluster;
}
public String getDefaultName() {
return defaultName;
}
public void setDefaultName(String defaultName) {
this.defaultName = defaultName;
}
}
然后编写集群模式的工具类,其实这里我也可以将所有的方法写在这个类中,比如get、put等等,就像jFinal的Cache那样,但是我嫌麻烦,所以就没那样写,只写了keys、flushDB、dbSize这三个方法的直接调用。
package test.sunyu.tools.redis;
import org.nutz.lang.Lang;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;
import sunyu.tools.redis.JedisClusterCallback;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* redis集群工具类
*
* @author 孙宇
*/
public class RedisClusterTools {
private static String defaultName = "main";
private static final ConcurrentHashMap<String, JedisCluster> jedisClusterMap = new ConcurrentHashMap<>();
public static void addJedisCluster(String jedisClusterName,
JedisCluster jedisCluster) {
if (jedisCluster != null && jedisClusterName != null && !jedisClusterMap.containsKey(jedisClusterName)) {
jedisClusterMap.put(jedisClusterName, jedisCluster);
}
}
/**
* 获得一个JedisCluster
*
* @param jedisClusterName
*
* @return
*/
public static JedisCluster getJedisCluster(String jedisClusterName) {
return jedisClusterMap.get(jedisClusterName);
}
/**
* 执行集群指令
*
* @param jedisClusterName
* @param action
* @param <T>
*
* @return
*/
public static <T> T clusterExecute(String jedisClusterName,
JedisClusterCallback<T> action) {
JedisCluster jedis = getJedisCluster(jedisClusterName);
try {
return action.doInJedisCluster(jedis);
} catch (Throwable throwable) {
throw Lang.wrapThrow(throwable);
}
}
/**
* 执行集群指令
*
* @param action
* @param <T>
*
* @return
*/
public static <T> T clusterExecute(JedisClusterCallback<T> action) {
JedisCluster jedis = getJedisCluster(defaultName);
try {
return action.doInJedisCluster(jedis);
} catch (Throwable throwable) {
throw Lang.wrapThrow(throwable);
}
}
/**
* 由于JedisCluster没有实现keys操作,这里自己实现以下
*
* @param pattern
*
* @return
*/
public static Set<String> clusterKeys(String jedisClusterName,
String pattern) {
Set<String> keys = new HashSet<>();
Map<String, JedisPool> clusterNodes = getJedisCluster(jedisClusterName).getClusterNodes();
return clusterKeys(pattern, keys, clusterNodes);
}
private static Set<String> clusterKeys(String pattern,
Set<String> keys,
Map<String, JedisPool> clusterNodes) {
for (String k : clusterNodes.keySet()) {
JedisPool jp = clusterNodes.get(k);
Jedis connection = jp.getResource();
try {
keys.addAll(connection.keys(pattern));
} catch (Exception e) {
e.printStackTrace();
} finally {
connection.close();
}
}
return keys;
}
/**
* 由于JedisCluster没有实现keys操作,这里自己实现以下
*
* @param pattern
*
* @return
*/
public static Set<String> clusterKeys(String pattern) {
Set<String> keys = new HashSet<>();
Map<String, JedisPool> clusterNodes = getJedisCluster(defaultName).getClusterNodes();
return clusterKeys(pattern, keys, clusterNodes);
}
public static Set<byte[]> clusterKeys(String jedisClusterName,
byte[] pattern) {
Set<byte[]> keys = new HashSet<>();
Map<String, JedisPool> clusterNodes = getJedisCluster(jedisClusterName).getClusterNodes();
return clusterKeys(pattern, keys, clusterNodes);
}
private static Set<byte[]> clusterKeys(byte[] pattern,
Set<byte[]> keys,
Map<String, JedisPool> clusterNodes) {
for (Object k : clusterNodes.keySet()) {
JedisPool jp = clusterNodes.get(k);
Jedis connection = jp.getResource();
try {
keys.addAll(connection.keys(pattern));
} catch (Exception e) {
e.printStackTrace();
} finally {
connection.close();
}
}
return keys;
}
public static Set<byte[]> clusterKeys(byte[] pattern) {
Set<byte[]> keys = new HashSet<>();
Map<String, JedisPool> clusterNodes = getJedisCluster(defaultName).getClusterNodes();
return clusterKeys(pattern, keys, clusterNodes);
}
public static void clusterFlushDB(String jedisClusterName) {
Map<String, JedisPool> clusterNodes = getJedisCluster(jedisClusterName).getClusterNodes();
clusterFlushDb(clusterNodes);
}
private static void clusterFlushDb(Map<String, JedisPool> clusterNodes) {
for (Object k : clusterNodes.keySet()) {
JedisPool jp = clusterNodes.get(k);
Jedis connection = jp.getResource();
try {
connection.flushDB();
} catch (Exception e) {
e.printStackTrace();
} finally {
connection.close();
}
}
}
public static void clusterFlushDB() {
Map<String, JedisPool> clusterNodes = getJedisCluster(defaultName).getClusterNodes();
clusterFlushDb(clusterNodes);
}
public static Long clusterDbSize(String jedisClusterName) {
Long total = 0L;
Map<String, JedisPool> clusterNodes = getJedisCluster(jedisClusterName).getClusterNodes();
return clusterDbSize(total, clusterNodes);
}
private static Long clusterDbSize(Long total,
Map<String, JedisPool> clusterNodes) {
for (Object k : clusterNodes.keySet()) {
JedisPool jp = clusterNodes.get(k);
Jedis connection = jp.getResource();
try {
total += connection.dbSize();
} catch (Exception e) {
e.printStackTrace();
} finally {
connection.close();
}
}
return total;
}
public static Long clusterDbSize() {
Long total = 0L;
Map<String, JedisPool> clusterNodes = getJedisCluster(defaultName).getClusterNodes();
return clusterDbSize(total, clusterNodes);
}
}
因为上面工具类没有将所有jedis集群模式的方法全都写出来,所以我使用了一个回调,将方法的调用交给了程序员,偷懒的做法。
package sunyu.tools.redis;
import redis.clients.jedis.JedisCluster;
/**
* 懒得写所有方法,回调一下得了
*
* @author 孙宇
*/
public interface JedisClusterCallback<T> {
T doInJedisCluster(JedisCluster jedis) throws Throwable;
}
测试类,能直接调用的只有keys、flushDB、dbSize这三个方法。。如果想调用get/put/hget等等。。。。请传递回调方法,然后自己在里面实现,当然,集群模式是不需要自己close链接的。并且集群模式内部实现了连接池,也不用用户去管理了。
集群模式需要知道所有的服务器节点,所以为了初始化方便,我使用了json格式
String clusterJson = "[{host:'192.168.11.81',port:'7001'},{host:'192.168.11.81',port:'7002'},{host:'192.168.11.81',port:'7003'},{host:'192.168.11.81',port:'7007'},{host:'192.168.11.81',port:'7008'},{host:'192.168.11.82',port:'7004'},{host:'192.168.11.82',port:'7005'},{host:'192.168.11.82',port:'7006'},{host:'192.168.11.82',port:'7009'},{host:'192.168.11.82',port:'7010'}]";就是个集合,每一项都是一个节点,必须有host和port属性
package test.sunyu.tools.redis;
import com.alibaba.fastjson.JSON;
import org.junit.Test;
import redis.clients.jedis.JedisCluster;
import sunyu.tools.redis.JedisClusterCallback;
import java.util.Set;
/**
* @author 孙宇
*/
public class TestRedisClusterTools {
@Test public void t1() {
String clusterJson = "[{host:'192.168.11.81',port:'7001'},{host:'192.168.11.81',port:'7002'},{host:'192.168.11.81',port:'7003'},{host:'192.168.11.81',port:'7007'},{host:'192.168.11.81',port:'7008'},{host:'192.168.11.82',port:'7004'},{host:'192.168.11.82',port:'7005'},{host:'192.168.11.82',port:'7006'},{host:'192.168.11.82',port:'7009'},{host:'192.168.11.82',port:'7010'}]";
RedisClusterPlugin p = new RedisClusterPlugin(clusterJson);
p.start();
//集群模式本身是没有keys命令的,这里我自己实现了
Set<String> allKeys = RedisClusterTools.clusterKeys("*");
System.out.println(JSON.toJSONString(allKeys));
//大小
//RedisClusterTools.clusterDbSize();
//删除
//RedisClusterTools.clusterFlushDB();
//其他操作,其他操作全都是回调方式,因为我懒得写里面所有方法了,只要是jedis支持的方法,集群模式这个工具都支持
RedisClusterTools.clusterExecute(new JedisClusterCallback<Object>() {
@Override
public Object doInJedisCluster(JedisCluster jedis) throws Throwable {
//jedis.get("");
//jedis.set("", "");
//jedis.del("");
//jedis.hget("", "");
//....不写了,自己看文档吧
return null;
}
});
}
}