Jfinal Redis读写分离的实现

Redis是个好的缓冲服务器,但它的集群却实现的不怎么样(一家之言,没有仔细研究,只读了下它的官方文档)。只有两种方法:

  1. 主从复制,从服务器只读

  2. 集群方式,但却是把数据分片放到不同服务器上。该方式对物理位置在一起的服务器而言很好用,但如果业务上的需要,集群分布在天南地北呢?


为此,我的业务规划为,缓冲写时指到到Redis Master上,通过Redis的主从复制,将数据复制到远程Redis Slave上,而远程的Jfianl就近读取Redis Slave。

实现用到两个类,请大神指正:

package com.ext.jfinal;

import com.jfinal.plugin.redis.Cache;
import com.jfinal.plugin.redis.IKeyNamingPolicy;
import com.jfinal.plugin.redis.serializer.ISerializer;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

public class xCache extends Cache {
    protected JedisPool jedisPool2;
    protected final ThreadLocal<Jedis> threadLocalJedis2 = new ThreadLocal<Jedis>();

    public xCache(String name, JedisPool jedisPool,JedisPool jedisPool2, ISerializer serializer, IKeyNamingPolicy keyNamingPolicy) {
        this.name = name;
        this.serializer = serializer;
        this.keyNamingPolicy = keyNamingPolicy;
        this.jedisPool = jedisPool;
        this.jedisPool2 = jedisPool2;
    }

    public Jedis getJedis2() {
        Jedis jedis = threadLocalJedis2.get();
        return jedis != null ? jedis : jedisPool2.getResource();
    }

    @Override
    public <T> T get(Object key) {
        Jedis jedis = getJedis2();
        try {
            return (T)valueFromBytes(jedis.get(keyToBytes(key)));
        }
        finally {close(jedis);}
    }

}

2. xRedisPlugin,直接抄了波总的原插件,改了点

/**
 * Copyright (c) 2011-2016, James Zhan 詹波 (jfinal@126.com).
 * <p/>
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * <p/>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p/>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.ext.jfinal;

import com.jfinal.kit.StrKit;
import com.jfinal.plugin.IPlugin;
import com.jfinal.plugin.redis.Cache;
import com.jfinal.plugin.redis.IKeyNamingPolicy;
import com.jfinal.plugin.redis.Redis;
import com.jfinal.plugin.redis.serializer.FstSerializer;
import com.jfinal.plugin.redis.serializer.ISerializer;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.Protocol;

/**
 * xRedisPlugin.
 * xRedisPlugin 支持多个读写分离
 */
public class xRedisPlugin implements IPlugin {

    private String cacheName;
    private String[] master; //主服务器连接参数
    private String[] slave;  //从服务器连接参数,只读
    
    private ISerializer serializer = null;
    private IKeyNamingPolicy keyNamingPolicy = null;
    private JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();

    public xRedisPlugin(String cacheName, String[] redisServer) {
        if (StrKit.isBlank(cacheName))
            throw new IllegalArgumentException("cacheName can not be blank.");
        this.cacheName = cacheName.trim();
        this.master = redisServer[0].split(",");
        if (redisServer.length==1){
            this.slave = this.master;
        } else {
            this.slave = redisServer[1].split(",");
        }
    }

    public boolean start() {
        JedisPool jedisPool=null,jedisPool2=null;
        // 设置主服务器
        if(master.length==1){   //只传了IP
            jedisPool = new JedisPool(jedisPoolConfig, master[0]);      
        } else if(master.length==2) {  //传了IP和Port
            jedisPool = new JedisPool(jedisPoolConfig, master[0], Integer.parseInt(master[1]));
        } else if(master.length==3) {  //传了IP,Port,timeout
            jedisPool = new JedisPool(jedisPoolConfig, master[0], Integer.parseInt(master[1]),  Integer.parseInt(master[2]));
        } else if(master.length==4) {  //传了IP,Port,timeout, password
            jedisPool = new JedisPool(jedisPoolConfig, master[0], Integer.parseInt(master[1]),  Integer.parseInt(master[2]),master[3]);
        } else if(master.length==5) {  //传了IP,Port,timeout, password,database
            jedisPool = new JedisPool(jedisPoolConfig, master[0], Integer.parseInt(master[1]),  Integer.parseInt(master[2]),master[3],Integer.parseInt(master[4]));
        } else if(master.length==6) {   //传了IP,Port,timeout, password,database,clientName
            jedisPool = new JedisPool(jedisPoolConfig, master[0], Integer.parseInt(master[1]),  Integer.parseInt(master[2]),master[3],Integer.parseInt(master[4]),master[5]);
        }
        // 设置从服务器
        if(slave.length==1){   //只传了IP
            jedisPool2 = new JedisPool(jedisPoolConfig, slave[0]);
        } else if(slave.length==2) {  //传了IP和Port
            jedisPool2 = new JedisPool(jedisPoolConfig, slave[0], Integer.parseInt(slave[1]));
        } else if(slave.length==3) {  //传了IP,Port,timeout
            jedisPool2 = new JedisPool(jedisPoolConfig, slave[0], Integer.parseInt(slave[1]),  Integer.parseInt(slave[2]));
        } else if(slave.length==4) {  //传了IP,Port,timeout, password
            jedisPool2 = new JedisPool(jedisPoolConfig, slave[0], Integer.parseInt(slave[1]),  Integer.parseInt(slave[2]),slave[3]);
        } else if(slave.length==5) {  //传了IP,Port,timeout, password,database
            jedisPool2 = new JedisPool(jedisPoolConfig, slave[0], Integer.parseInt(slave[1]),  Integer.parseInt(slave[2]),slave[3],Integer.parseInt(slave[4]));
        } else if(slave.length==6) {   //传了IP,Port,timeout, password,database,clientName
            jedisPool2 = new JedisPool(jedisPoolConfig, slave[0], Integer.parseInt(slave[1]),  Integer.parseInt(slave[2]),slave[3],Integer.parseInt(slave[4]),slave[5]);
        }
        
        if (serializer == null)
            serializer = FstSerializer.me;
        if (keyNamingPolicy == null)
            keyNamingPolicy = IKeyNamingPolicy.defaultKeyNamingPolicy;

        xCache cache = new xCache(cacheName, jedisPool,jedisPool2,serializer, keyNamingPolicy);
        Redis.addCache(cache);
        return true;
    }

    public boolean stop() {
//        Cache cache = Redis.removeCache(cacheName);
//        if (cache == Redis.mainCache)
//            Redis.mainCache = null;
//        cache.jedisPool.destroy();
        return true;
    }

    /**
     * 当RedisPlugin 提供的设置属性仍然无法满足需求时,通过此方法获取到
     * JedisPoolConfig 对象,可对 redis 进行更加细致的配置
     * <pre>
     * 例如:
     * redisPlugin.getJedisPoolConfig().setMaxTotal(100);
     * </pre>
     */
    public JedisPoolConfig getJedisPoolConfig() {
        return jedisPoolConfig;
    }

    // ---------

    public void setSerializer(ISerializer serializer) {
        this.serializer = serializer;
    }

    public void setKeyNamingPolicy(IKeyNamingPolicy keyNamingPolicy) {
        this.keyNamingPolicy = keyNamingPolicy;
    }

    // ---------

    public void setTestWhileIdle(boolean testWhileIdle) {
        jedisPoolConfig.setTestWhileIdle(testWhileIdle);
    }

    public void setMinEvictableIdleTimeMillis(int minEvictableIdleTimeMillis) {
        jedisPoolConfig.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
    }

    public void setTimeBetweenEvictionRunsMillis(int timeBetweenEvictionRunsMillis) {
        jedisPoolConfig.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
    }

    public void setNumTestsPerEvictionRun(int numTestsPerEvictionRun) {
        jedisPoolConfig.setNumTestsPerEvictionRun(numTestsPerEvictionRun);
    }
}

3. AppConfig中启动插件

//配置redis缓存, 要注意生产环境的配置, 服务器参数为:ip,端口,超时,密码,数据库编号,客户端
//String redisServer = PropKit.get("redisServer", "127.0.0.1");
xRedisPlugin redis = new xRedisPlugin("Redis",new String[]{"127.0.0.1,6379","127.0.0.1,6380"});
redis.setSerializer(JdkSerializer.me);    // 设置Redis序列化方式
JedisPoolConfig jedisPoolConfig = redis.getJedisPoolConfig();
jedisPoolConfig.setMaxTotal(1000);
jedisPoolConfig.setMaxIdle(50);
jedisPoolConfig.setMinIdle(20);
jedisPoolConfig.setMaxWaitMillis(10*1000);
me.add(redis);

还需要什么改进,可能产生什么Bug,请各位指正


评论区

JFinal

2017-08-16 16:11

redis 的集群用法没有经验,帮不到你,可以去群里问问