最近在微信群里看到大家在讨论Redis,我这里有个集群项目,
EhCachePlugin + RedisPlugin + ActiveRecordPlugin
小组合拳来一下,不多说,上马!
发布订阅Cache事件:
import com.alibaba.fastjson.JSONObject;
import com.jfinal.kit.JsonKit;
import com.jfinal.kit.StrKit;
import com.jfinal.log.Log;
import com.jfinal.plugin.IPlugin;
import com.jfinal.plugin.redis.Cache;
import com.jfinal.plugin.redis.Redis;
import redis.clients.jedis.JedisPubSub;
import java.util.Objects;
/**
* 订阅发布Cache事件
* @author dfz 2021-07-31 15:46:29
*/
public class RedisCachePubSub implements IPlugin {
private static final Log log = Log.getLog(RedisCachePubSub.class);
private final String channel = "JF_RedisCachePubSub";
private final String id;
private final String redisCacheName;
private JedisPubSub sub;
public RedisCachePubSub(String redisCacheName) {
this.redisCacheName = redisCacheName;
this.id = StrKit.getRandomUUID();
}
public Cache getRedis(){
return Redis.use(redisCacheName);
}
/**
* 发布 删除 公告
*/
public Long remove(String cacheName, Object key) {
return getRedis().publish(channel, by("remove", cacheName, key, null).toJson());
}
/**
* 发布 删除全部 公告
*/
public Long removeAll(String cacheName) {
return getRedis().publish(channel, by("removeAll", cacheName, null, null).toJson());
}
/**
* 发布 put 公告
*/
public Long put(String cacheName, Object key, Object value) {
return getRedis().publish(channel, by("put", cacheName, key, value).toJson());
}
@Override
public boolean start() {
//订阅
sub = getRedis().subscribeThread(new JedisPubSub() {
@Override
public void onMessage(String channel, String message) {
log.debug(message);
Msg msg = parse(message);
if (Objects.nonNull(msg) && notMe(msg)){
if ("put".equals(msg.getEvent())){
CaffeineKit.put(msg.getCacheName(), msg.getKey(), msg.getValue());
}else //
if ("remove".equals(msg.getEvent())){
CaffeineKit.remove(msg.getCacheName(), msg.getKey());
}else //
if ("removeAll".equals(msg.getEvent())){
CaffeineKit.removeAll(msg.getCacheName());
}
}
}
}, channel);
return true;
}
protected Msg by(String event, String cacheName, Object key, Object value) {
Msg msg = new Msg();
msg.setMsgId(id);
msg.setEvent(event);
msg.setCacheName(cacheName);
msg.setKey(key);
msg.setValue(value);
return msg;
}
protected Msg parse(String json){
return JSONObject.parseObject(json, RedisCachePubSub.Msg.class);
}
protected boolean notMe(Msg msg){
return ! id.equals(msg.getMsgId());
}
@Override
public boolean stop() {
sub.unsubscribe();
return true;
}
@Override
public String toString() {
return "RedisCachePubSub{" +
"channel='" + channel + '\'' +
", id='" + id + '\'' +
", redisCacheName='" + redisCacheName + '\'' +
", sub=" + sub +
'}';
}
public static class Msg{
private String msgId;
private String event;
private String cacheName;
private Object key;
private Object value;
public String toJson(){
return JsonKit.toJson(this);
}
public String getMsgId() {
return msgId;
}
public void setMsgId(String msgId) {
this.msgId = msgId;
}
public String getEvent() {
return event;
}
public void setEvent(String event) {
this.event = event;
}
public String getCacheName() {
return cacheName;
}
public void setCacheName(String cacheName) {
this.cacheName = cacheName;
}
public Object getKey() {
return key;
}
public void setKey(Object key) {
this.key = key;
}
public Object getValue() {
return value;
}
public void setValue(Object value) {
this.value = value;
}
@Override
public String toString() {
return "Msg{" +
"msgId='" + msgId + '\'' +
", event='" + event + '\'' +
", cacheName='" + cacheName + '\'' +
", key=" + key +
", value=" + value +
'}';
}
}
}上面的EhCachePlugin的CacheKit缓存操作工具类,被我临时更换为CaffeineKit了,据说这玩意更牛。
自用可换回CacheKit
https://jfinal.com/share/2495 (附CaffeineKit代码)
DB数据库操作使用的ICache,我项目是租户模式,这里代码简洁抽出来一下:
import com.jfinal.plugin.activerecord.cache.ICache;
import com.jfinal.plugin.redis.Cache;
import java.util.Set;
/** 根据站点的资源K configName, 把所有的资源都隔离开
*
* ActiveRecordPlugin arp = new ActiveRecordPlugin(druidPlugin);
* //注意设置cache
* arp.setCache(new HandleEhCache(null));
* @author dufuzhong
* 2017年5月19日 下午3:11:01
*/
public class HandleEhCache implements ICache {
private final RedisCachePubSub redisPS;
public HandleEhCache(RedisCachePubSub redisPS) {
this.redisPS = redisPS;
}
private String keyHandle(Object key) {
return key.toString();
}
private Cache getRedis() {
return redisPS.getRedis();
}
@Override
@SuppressWarnings("unchecked")
public <T>T get(String cacheName, Object key) {
key = keyHandle(key);
Object value = CaffeineKit.get(cacheName, key);
if (redisPS != null && value == null) {
value = getRedis().get(cacheName + key);
if(value != null){
CaffeineKit.put(cacheName, key, value);
}
}
return (T) value;
}
@Override
public void put(String cacheName, Object key, Object value) {
key = keyHandle(key);
CaffeineKit.put(cacheName, key, value);
if (redisPS != null) {
getRedis().set(cacheName + key, value);
//发布订阅 put消息
//redisPS.put(cacheName, key, value);
//发布订阅 删除消息 (不使用put,使用删除,当取值时 get 会做Redis的get二次加载)
redisPS.remove(cacheName, key);
}
}
@Override
public void remove(String cacheName, Object key) {
key = keyHandle(key);
CaffeineKit.remove(cacheName, key);
if (redisPS != null){
getRedis().del(cacheName + key);
//发布订阅 删除消息
redisPS.remove(cacheName, key);
}
}
@Override
public void removeAll(String cacheName) {
if (redisPS != null){
Set<String> keys = getRedis().keys(cacheName + "*");
if(! keys.isEmpty()){
Object[] array = keys.toArray();
getRedis().del(array);
//发布订阅 删除全部消息
redisPS.removeAll(cacheName);
}
}
CaffeineKit.removeAll(cacheName);
}
@Override
public String toString() {
return "HandleEhCache{" +
"redisPS=" + redisPS +
'}';
}
}OK再来个测试类,大家都喜欢看在线演示嘛:)
import com.jfinal.admin.common.AppConfig;
import com.jfinal.plugin.activerecord.ActiveRecordPlugin;
import com.jfinal.plugin.activerecord.Db;
import com.jfinal.plugin.activerecord.DbKit;
import com.jfinal.plugin.activerecord.Record;
import com.jfinal.plugin.druid.DruidPlugin;
import com.jfinal.plugin.ehcache.EhCachePlugin;
import com.jfinal.plugin.redis.RedisPlugin;
import org.junit.Test;
import java.util.List;
import java.util.Scanner;
public class HandleEhCacheTest {
HandleEhCache cache;
@Test public void a() {
run(() -> {
Scanner input = new Scanner(System.in);
String cacheName = "account";
List<Record> list = Db.findByCache(cacheName, "all", "SELECT * FROM account ");
System.out.println(list);
cache.put(cacheName, "a", "v");
cache.put(cacheName, "b", "v");
Object v = cache.get(cacheName, "a");
System.out.println("a=" + v);
v = cache.get(cacheName, "b");
System.out.println("b=" + v);
/* IDEA控制台不能输入的话,则需要修改配置重启IDEA即可
* 第一步:工具类中 (帮助)help -- (编辑自定义VM选项)Edit Constom VM Options... 第二步: 打开配置,添加-Deditable.java.test.console=true
* 重启IDEA
* */
System.out.println("等待输入1回车");
input.next();
v = cache.get(cacheName, "a");
System.out.println("a=" + v);
v = cache.get(cacheName, "b");
System.out.println("b=" + v);
System.out.println("等待输入3回车");
input.next();
v = cache.get(cacheName, "a");
System.out.println("a=" + v);
v = cache.get(cacheName, "b");
System.out.println("b=" + v);
});
}
@Test public void b(){
run(() -> {
Scanner input = new Scanner(System.in);
String cacheName = "account";
List<Record> list = cache.get(cacheName, "all");;
System.out.println(list);
Object v = cache.get(cacheName, "a");
System.out.println("a=" + v);
v = cache.get(cacheName, "b");
System.out.println("b=" + v);
cache.remove(cacheName, "a");
System.out.println("等待输入2回车");
input.next();
cache.removeAll(cacheName);
});
}
private void run(Runnable runnable) {
String configName = DbKit.MAIN_CONFIG_NAME;
EhCachePlugin ehCachePlugin = new EhCachePlugin();
RedisPlugin redisPlugin = getRedisPlugin(configName);
RedisCachePubSub redisPS = new RedisCachePubSub(configName);
cache = new HandleEhCache(redisPS);
DruidPlugin druidPlugin = AppConfig.getDruidPlugin();
ActiveRecordPlugin arp = new ActiveRecordPlugin(druidPlugin);
//注意设置cache
arp.setCache(cache);
ehCachePlugin.start();
redisPlugin.start();
redisPS.start();
druidPlugin.start();
arp.start();
try {
runnable.run();
}finally {
druidPlugin.stop();
redisPS.stop();
redisPlugin.stop();
ehCachePlugin.stop();
}
}
private RedisPlugin getRedisPlugin(String cacheName) {
return new RedisPlugin(cacheName, //改成自己的
"xxx.xxx.xxx.xxx", 6379, "*****");
}
}运行结果:
运行a方法,启动一个jvm
再运行b方法,再起一个jvm

可以看到 Db.findByCache( 把集合存入了 缓存 ,没有看过源码的社友们,我把源码搬过来,大家回忆一下:
然后因为我们设置的 HandleEhCache implements ICache 。
put会先放入内存缓存,再放入Redis数据库了。
b方法取值时(新jvm)get 会先从内存取值,没有时会去Redis里面取,并且放入内存缓存,方便下次快速取值。
好执行剩下的演示:


可以看到 cache.remove 和 cache.removeAll 都是可以远程删除另一台jvm的缓存。
但是没有做定期清理和同步清理二级缓存等配置了,这里相当于永久存储了。
如果有很多定制业务,也推荐大家直接使用 j2cache了
好啦,分享到这里就结束了。
西安的小伙伴们要注意安全哈,最近疫情闹得有点凶,居家办公第一天。。。
能写点什么就写点什么吧。。。
点个赞呗~