jFinal自带的redis插件只支持单机模式,不支持集群,我写了个支持集群模式的插件
这个插件不但支持集群模式,还修复了集群模式下,没有keys、flushDB、dbSize等方法的调用。
注意:这个插件依赖阿里的fastjson。
集群模式与单机模式,完全不同的东西,单机是先创建JedisPool,然后调用里面getResource()方法来获得Jedis对象,然后使用Jedis来操作redis。
集群使用的是JedisCluster,他本身就实现了pool功能,所以不需要自己关闭连接,但是创建的方式与单机完全不一样,他是需要构造
Set<HostAndPort> jedisClusterNodes
来告诉redis使用了哪些服务器,而且集群模式是不支持密码的。
首先定义插件类,用于初始化集群模式的JedisCluster对象
package test.sunyu.tools.redis; import com.alibaba.fastjson.JSON; import com.jfinal.plugin.IPlugin; import redis.clients.jedis.HostAndPort; import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisPoolConfig; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; /** * jFinal的redis插件不支持redis集群模式,我自己扩展一个吧 * * @author 孙宇 */ public class RedisClusterPlugin implements IPlugin { private Integer maxTotal = 1000; private Integer maxIdle = 200; private Long maxWaitMillis = 2000L; private Boolean testOnBorrow = true; private String cluster; private String defaultName = "main"; public RedisClusterPlugin(String cluster) { this.cluster = cluster; } @Override public boolean start() { if (cluster != null) { JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); jedisPoolConfig.setMaxTotal(maxTotal); jedisPoolConfig.setMaxIdle(maxIdle); jedisPoolConfig.setMaxWaitMillis(maxWaitMillis); jedisPoolConfig.setTestOnBorrow(testOnBorrow); Set<HostAndPort> jedisClusterNodes = new HashSet<HostAndPort>(); List<Map> clusterList = JSON.parseArray(cluster, Map.class); if (clusterList != null && clusterList.size() > 0) { for (Map<String, String> c : clusterList) { jedisClusterNodes.add(new HostAndPort(c.get("host"), Integer.parseInt(c.get("port")))); } JedisCluster jedisCluster = new JedisCluster(jedisClusterNodes, jedisPoolConfig); RedisClusterTools.addJedisCluster(defaultName, jedisCluster); } return true; } return false; } @Override public boolean stop() { return true; } public Integer getMaxTotal() { return maxTotal; } public void setMaxTotal(Integer maxTotal) { this.maxTotal = maxTotal; } public Integer getMaxIdle() { return maxIdle; } public void setMaxIdle(Integer maxIdle) { this.maxIdle = maxIdle; } public Long getMaxWaitMillis() { return maxWaitMillis; } public void setMaxWaitMillis(Long maxWaitMillis) { this.maxWaitMillis = maxWaitMillis; } public Boolean getTestOnBorrow() { return testOnBorrow; } public void setTestOnBorrow(Boolean testOnBorrow) { this.testOnBorrow = testOnBorrow; } public String getCluster() { return cluster; } public void setCluster(String cluster) { this.cluster = cluster; } public String getDefaultName() { return defaultName; } public void setDefaultName(String defaultName) { this.defaultName = defaultName; } }
然后编写集群模式的工具类,其实这里我也可以将所有的方法写在这个类中,比如get、put等等,就像jFinal的Cache那样,但是我嫌麻烦,所以就没那样写,只写了keys、flushDB、dbSize这三个方法的直接调用。
package test.sunyu.tools.redis; import org.nutz.lang.Lang; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisPool; import sunyu.tools.redis.JedisClusterCallback; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; /** * redis集群工具类 * * @author 孙宇 */ public class RedisClusterTools { private static String defaultName = "main"; private static final ConcurrentHashMap<String, JedisCluster> jedisClusterMap = new ConcurrentHashMap<>(); public static void addJedisCluster(String jedisClusterName, JedisCluster jedisCluster) { if (jedisCluster != null && jedisClusterName != null && !jedisClusterMap.containsKey(jedisClusterName)) { jedisClusterMap.put(jedisClusterName, jedisCluster); } } /** * 获得一个JedisCluster * * @param jedisClusterName * * @return */ public static JedisCluster getJedisCluster(String jedisClusterName) { return jedisClusterMap.get(jedisClusterName); } /** * 执行集群指令 * * @param jedisClusterName * @param action * @param <T> * * @return */ public static <T> T clusterExecute(String jedisClusterName, JedisClusterCallback<T> action) { JedisCluster jedis = getJedisCluster(jedisClusterName); try { return action.doInJedisCluster(jedis); } catch (Throwable throwable) { throw Lang.wrapThrow(throwable); } } /** * 执行集群指令 * * @param action * @param <T> * * @return */ public static <T> T clusterExecute(JedisClusterCallback<T> action) { JedisCluster jedis = getJedisCluster(defaultName); try { return action.doInJedisCluster(jedis); } catch (Throwable throwable) { throw Lang.wrapThrow(throwable); } } /** * 由于JedisCluster没有实现keys操作,这里自己实现以下 * * @param pattern * * @return */ public static Set<String> clusterKeys(String jedisClusterName, String pattern) { Set<String> keys = new HashSet<>(); Map<String, JedisPool> clusterNodes = getJedisCluster(jedisClusterName).getClusterNodes(); return clusterKeys(pattern, keys, clusterNodes); } private static Set<String> clusterKeys(String pattern, Set<String> keys, Map<String, JedisPool> clusterNodes) { for (String k : clusterNodes.keySet()) { JedisPool jp = clusterNodes.get(k); Jedis connection = jp.getResource(); try { keys.addAll(connection.keys(pattern)); } catch (Exception e) { e.printStackTrace(); } finally { connection.close(); } } return keys; } /** * 由于JedisCluster没有实现keys操作,这里自己实现以下 * * @param pattern * * @return */ public static Set<String> clusterKeys(String pattern) { Set<String> keys = new HashSet<>(); Map<String, JedisPool> clusterNodes = getJedisCluster(defaultName).getClusterNodes(); return clusterKeys(pattern, keys, clusterNodes); } public static Set<byte[]> clusterKeys(String jedisClusterName, byte[] pattern) { Set<byte[]> keys = new HashSet<>(); Map<String, JedisPool> clusterNodes = getJedisCluster(jedisClusterName).getClusterNodes(); return clusterKeys(pattern, keys, clusterNodes); } private static Set<byte[]> clusterKeys(byte[] pattern, Set<byte[]> keys, Map<String, JedisPool> clusterNodes) { for (Object k : clusterNodes.keySet()) { JedisPool jp = clusterNodes.get(k); Jedis connection = jp.getResource(); try { keys.addAll(connection.keys(pattern)); } catch (Exception e) { e.printStackTrace(); } finally { connection.close(); } } return keys; } public static Set<byte[]> clusterKeys(byte[] pattern) { Set<byte[]> keys = new HashSet<>(); Map<String, JedisPool> clusterNodes = getJedisCluster(defaultName).getClusterNodes(); return clusterKeys(pattern, keys, clusterNodes); } public static void clusterFlushDB(String jedisClusterName) { Map<String, JedisPool> clusterNodes = getJedisCluster(jedisClusterName).getClusterNodes(); clusterFlushDb(clusterNodes); } private static void clusterFlushDb(Map<String, JedisPool> clusterNodes) { for (Object k : clusterNodes.keySet()) { JedisPool jp = clusterNodes.get(k); Jedis connection = jp.getResource(); try { connection.flushDB(); } catch (Exception e) { e.printStackTrace(); } finally { connection.close(); } } } public static void clusterFlushDB() { Map<String, JedisPool> clusterNodes = getJedisCluster(defaultName).getClusterNodes(); clusterFlushDb(clusterNodes); } public static Long clusterDbSize(String jedisClusterName) { Long total = 0L; Map<String, JedisPool> clusterNodes = getJedisCluster(jedisClusterName).getClusterNodes(); return clusterDbSize(total, clusterNodes); } private static Long clusterDbSize(Long total, Map<String, JedisPool> clusterNodes) { for (Object k : clusterNodes.keySet()) { JedisPool jp = clusterNodes.get(k); Jedis connection = jp.getResource(); try { total += connection.dbSize(); } catch (Exception e) { e.printStackTrace(); } finally { connection.close(); } } return total; } public static Long clusterDbSize() { Long total = 0L; Map<String, JedisPool> clusterNodes = getJedisCluster(defaultName).getClusterNodes(); return clusterDbSize(total, clusterNodes); } }
因为上面工具类没有将所有jedis集群模式的方法全都写出来,所以我使用了一个回调,将方法的调用交给了程序员,偷懒的做法。
package sunyu.tools.redis; import redis.clients.jedis.JedisCluster; /** * 懒得写所有方法,回调一下得了 * * @author 孙宇 */ public interface JedisClusterCallback<T> { T doInJedisCluster(JedisCluster jedis) throws Throwable; }
测试类,能直接调用的只有keys、flushDB、dbSize这三个方法。。如果想调用get/put/hget等等。。。。请传递回调方法,然后自己在里面实现,当然,集群模式是不需要自己close链接的。并且集群模式内部实现了连接池,也不用用户去管理了。
集群模式需要知道所有的服务器节点,所以为了初始化方便,我使用了json格式
String clusterJson = "[{host:'192.168.11.81',port:'7001'},{host:'192.168.11.81',port:'7002'},{host:'192.168.11.81',port:'7003'},{host:'192.168.11.81',port:'7007'},{host:'192.168.11.81',port:'7008'},{host:'192.168.11.82',port:'7004'},{host:'192.168.11.82',port:'7005'},{host:'192.168.11.82',port:'7006'},{host:'192.168.11.82',port:'7009'},{host:'192.168.11.82',port:'7010'}]";
就是个集合,每一项都是一个节点,必须有host和port属性
package test.sunyu.tools.redis; import com.alibaba.fastjson.JSON; import org.junit.Test; import redis.clients.jedis.JedisCluster; import sunyu.tools.redis.JedisClusterCallback; import java.util.Set; /** * @author 孙宇 */ public class TestRedisClusterTools { @Test public void t1() { String clusterJson = "[{host:'192.168.11.81',port:'7001'},{host:'192.168.11.81',port:'7002'},{host:'192.168.11.81',port:'7003'},{host:'192.168.11.81',port:'7007'},{host:'192.168.11.81',port:'7008'},{host:'192.168.11.82',port:'7004'},{host:'192.168.11.82',port:'7005'},{host:'192.168.11.82',port:'7006'},{host:'192.168.11.82',port:'7009'},{host:'192.168.11.82',port:'7010'}]"; RedisClusterPlugin p = new RedisClusterPlugin(clusterJson); p.start(); //集群模式本身是没有keys命令的,这里我自己实现了 Set<String> allKeys = RedisClusterTools.clusterKeys("*"); System.out.println(JSON.toJSONString(allKeys)); //大小 //RedisClusterTools.clusterDbSize(); //删除 //RedisClusterTools.clusterFlushDB(); //其他操作,其他操作全都是回调方式,因为我懒得写里面所有方法了,只要是jedis支持的方法,集群模式这个工具都支持 RedisClusterTools.clusterExecute(new JedisClusterCallback<Object>() { @Override public Object doInJedisCluster(JedisCluster jedis) throws Throwable { //jedis.get(""); //jedis.set("", ""); //jedis.del(""); //jedis.hget("", ""); //....不写了,自己看文档吧 return null; } }); } }