最近在微信群里看到大家在讨论Redis,我这里有个集群项目,
EhCachePlugin + RedisPlugin + ActiveRecordPlugin
小组合拳来一下,不多说,上马!
发布订阅Cache事件:
import com.alibaba.fastjson.JSONObject; import com.jfinal.kit.JsonKit; import com.jfinal.kit.StrKit; import com.jfinal.log.Log; import com.jfinal.plugin.IPlugin; import com.jfinal.plugin.redis.Cache; import com.jfinal.plugin.redis.Redis; import redis.clients.jedis.JedisPubSub; import java.util.Objects; /** * 订阅发布Cache事件 * @author dfz 2021-07-31 15:46:29 */ public class RedisCachePubSub implements IPlugin { private static final Log log = Log.getLog(RedisCachePubSub.class); private final String channel = "JF_RedisCachePubSub"; private final String id; private final String redisCacheName; private JedisPubSub sub; public RedisCachePubSub(String redisCacheName) { this.redisCacheName = redisCacheName; this.id = StrKit.getRandomUUID(); } public Cache getRedis(){ return Redis.use(redisCacheName); } /** * 发布 删除 公告 */ public Long remove(String cacheName, Object key) { return getRedis().publish(channel, by("remove", cacheName, key, null).toJson()); } /** * 发布 删除全部 公告 */ public Long removeAll(String cacheName) { return getRedis().publish(channel, by("removeAll", cacheName, null, null).toJson()); } /** * 发布 put 公告 */ public Long put(String cacheName, Object key, Object value) { return getRedis().publish(channel, by("put", cacheName, key, value).toJson()); } @Override public boolean start() { //订阅 sub = getRedis().subscribeThread(new JedisPubSub() { @Override public void onMessage(String channel, String message) { log.debug(message); Msg msg = parse(message); if (Objects.nonNull(msg) && notMe(msg)){ if ("put".equals(msg.getEvent())){ CaffeineKit.put(msg.getCacheName(), msg.getKey(), msg.getValue()); }else // if ("remove".equals(msg.getEvent())){ CaffeineKit.remove(msg.getCacheName(), msg.getKey()); }else // if ("removeAll".equals(msg.getEvent())){ CaffeineKit.removeAll(msg.getCacheName()); } } } }, channel); return true; } protected Msg by(String event, String cacheName, Object key, Object value) { Msg msg = new Msg(); msg.setMsgId(id); msg.setEvent(event); msg.setCacheName(cacheName); msg.setKey(key); msg.setValue(value); return msg; } protected Msg parse(String json){ return JSONObject.parseObject(json, RedisCachePubSub.Msg.class); } protected boolean notMe(Msg msg){ return ! id.equals(msg.getMsgId()); } @Override public boolean stop() { sub.unsubscribe(); return true; } @Override public String toString() { return "RedisCachePubSub{" + "channel='" + channel + '\'' + ", id='" + id + '\'' + ", redisCacheName='" + redisCacheName + '\'' + ", sub=" + sub + '}'; } public static class Msg{ private String msgId; private String event; private String cacheName; private Object key; private Object value; public String toJson(){ return JsonKit.toJson(this); } public String getMsgId() { return msgId; } public void setMsgId(String msgId) { this.msgId = msgId; } public String getEvent() { return event; } public void setEvent(String event) { this.event = event; } public String getCacheName() { return cacheName; } public void setCacheName(String cacheName) { this.cacheName = cacheName; } public Object getKey() { return key; } public void setKey(Object key) { this.key = key; } public Object getValue() { return value; } public void setValue(Object value) { this.value = value; } @Override public String toString() { return "Msg{" + "msgId='" + msgId + '\'' + ", event='" + event + '\'' + ", cacheName='" + cacheName + '\'' + ", key=" + key + ", value=" + value + '}'; } } }
上面的EhCachePlugin的CacheKit缓存操作工具类,被我临时更换为CaffeineKit了,据说这玩意更牛。
自用可换回CacheKit
https://jfinal.com/share/2495 (附CaffeineKit代码)
DB数据库操作使用的ICache,我项目是租户模式,这里代码简洁抽出来一下:
import com.jfinal.plugin.activerecord.cache.ICache; import com.jfinal.plugin.redis.Cache; import java.util.Set; /** 根据站点的资源K configName, 把所有的资源都隔离开 * * ActiveRecordPlugin arp = new ActiveRecordPlugin(druidPlugin); * //注意设置cache * arp.setCache(new HandleEhCache(null)); * @author dufuzhong * 2017年5月19日 下午3:11:01 */ public class HandleEhCache implements ICache { private final RedisCachePubSub redisPS; public HandleEhCache(RedisCachePubSub redisPS) { this.redisPS = redisPS; } private String keyHandle(Object key) { return key.toString(); } private Cache getRedis() { return redisPS.getRedis(); } @Override @SuppressWarnings("unchecked") public <T>T get(String cacheName, Object key) { key = keyHandle(key); Object value = CaffeineKit.get(cacheName, key); if (redisPS != null && value == null) { value = getRedis().get(cacheName + key); if(value != null){ CaffeineKit.put(cacheName, key, value); } } return (T) value; } @Override public void put(String cacheName, Object key, Object value) { key = keyHandle(key); CaffeineKit.put(cacheName, key, value); if (redisPS != null) { getRedis().set(cacheName + key, value); //发布订阅 put消息 //redisPS.put(cacheName, key, value); //发布订阅 删除消息 (不使用put,使用删除,当取值时 get 会做Redis的get二次加载) redisPS.remove(cacheName, key); } } @Override public void remove(String cacheName, Object key) { key = keyHandle(key); CaffeineKit.remove(cacheName, key); if (redisPS != null){ getRedis().del(cacheName + key); //发布订阅 删除消息 redisPS.remove(cacheName, key); } } @Override public void removeAll(String cacheName) { if (redisPS != null){ Set<String> keys = getRedis().keys(cacheName + "*"); if(! keys.isEmpty()){ Object[] array = keys.toArray(); getRedis().del(array); //发布订阅 删除全部消息 redisPS.removeAll(cacheName); } } CaffeineKit.removeAll(cacheName); } @Override public String toString() { return "HandleEhCache{" + "redisPS=" + redisPS + '}'; } }
OK再来个测试类,大家都喜欢看在线演示嘛:)
import com.jfinal.admin.common.AppConfig; import com.jfinal.plugin.activerecord.ActiveRecordPlugin; import com.jfinal.plugin.activerecord.Db; import com.jfinal.plugin.activerecord.DbKit; import com.jfinal.plugin.activerecord.Record; import com.jfinal.plugin.druid.DruidPlugin; import com.jfinal.plugin.ehcache.EhCachePlugin; import com.jfinal.plugin.redis.RedisPlugin; import org.junit.Test; import java.util.List; import java.util.Scanner; public class HandleEhCacheTest { HandleEhCache cache; @Test public void a() { run(() -> { Scanner input = new Scanner(System.in); String cacheName = "account"; List<Record> list = Db.findByCache(cacheName, "all", "SELECT * FROM account "); System.out.println(list); cache.put(cacheName, "a", "v"); cache.put(cacheName, "b", "v"); Object v = cache.get(cacheName, "a"); System.out.println("a=" + v); v = cache.get(cacheName, "b"); System.out.println("b=" + v); /* IDEA控制台不能输入的话,则需要修改配置重启IDEA即可 * 第一步:工具类中 (帮助)help -- (编辑自定义VM选项)Edit Constom VM Options... 第二步: 打开配置,添加-Deditable.java.test.console=true * 重启IDEA * */ System.out.println("等待输入1回车"); input.next(); v = cache.get(cacheName, "a"); System.out.println("a=" + v); v = cache.get(cacheName, "b"); System.out.println("b=" + v); System.out.println("等待输入3回车"); input.next(); v = cache.get(cacheName, "a"); System.out.println("a=" + v); v = cache.get(cacheName, "b"); System.out.println("b=" + v); }); } @Test public void b(){ run(() -> { Scanner input = new Scanner(System.in); String cacheName = "account"; List<Record> list = cache.get(cacheName, "all");; System.out.println(list); Object v = cache.get(cacheName, "a"); System.out.println("a=" + v); v = cache.get(cacheName, "b"); System.out.println("b=" + v); cache.remove(cacheName, "a"); System.out.println("等待输入2回车"); input.next(); cache.removeAll(cacheName); }); } private void run(Runnable runnable) { String configName = DbKit.MAIN_CONFIG_NAME; EhCachePlugin ehCachePlugin = new EhCachePlugin(); RedisPlugin redisPlugin = getRedisPlugin(configName); RedisCachePubSub redisPS = new RedisCachePubSub(configName); cache = new HandleEhCache(redisPS); DruidPlugin druidPlugin = AppConfig.getDruidPlugin(); ActiveRecordPlugin arp = new ActiveRecordPlugin(druidPlugin); //注意设置cache arp.setCache(cache); ehCachePlugin.start(); redisPlugin.start(); redisPS.start(); druidPlugin.start(); arp.start(); try { runnable.run(); }finally { druidPlugin.stop(); redisPS.stop(); redisPlugin.stop(); ehCachePlugin.stop(); } } private RedisPlugin getRedisPlugin(String cacheName) { return new RedisPlugin(cacheName, //改成自己的 "xxx.xxx.xxx.xxx", 6379, "*****"); } }
运行结果:
运行a方法,启动一个jvm
再运行b方法,再起一个jvm
可以看到 Db.findByCache( 把集合存入了 缓存 ,没有看过源码的社友们,我把源码搬过来,大家回忆一下:
然后因为我们设置的 HandleEhCache implements ICache 。
put会先放入内存缓存,再放入Redis数据库了。
b方法取值时(新jvm)get 会先从内存取值,没有时会去Redis里面取,并且放入内存缓存,方便下次快速取值。
好执行剩下的演示:
可以看到 cache.remove 和 cache.removeAll 都是可以远程删除另一台jvm的缓存。
但是没有做定期清理和同步清理二级缓存等配置了,这里相当于永久存储了。
如果有很多定制业务,也推荐大家直接使用 j2cache了
好啦,分享到这里就结束了。
西安的小伙伴们要注意安全哈,最近疫情闹得有点凶,居家办公第一天。。。
能写点什么就写点什么吧。。。
点个赞呗~