jFinal Kafka插件

前阵子发了一个ActiveMQ的插件,我们项目还使用了Kafka,所以抽个时间把spring版的转换了jFinal plugin出来给大家分享。
至于kafka是个什么东西,我这里就不描述了,节约jfinal的数据库^_^,你能看到这篇分享,说明你对kafka有一定了解。

首先定义一个消费者模版KafkaConsumerTemplate

package jfinal.plugin.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
/**
 * 消费者模版
 *
 * @author 孙宇
 */
public class KafkaConsumerTemplate {
    private boolean runner = true;
    public KafkaConsumerTemplate(String servers,
                                 String keyDeserializer,
                                 String valueDeserializer,
                                 String group,
                                 String topic) {
        Properties props = new Properties();
        props.put("bootstrap.servers", servers);
        props.put("key.deserializer", keyDeserializer);
        props.put("value.deserializer", valueDeserializer);
        props.put("group.id", group);//不同ID 可以同时订阅消息
        /*
        props.put("enable.auto.commit", "false");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");*/
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topic));//订阅TOPIC
        try {
            while (runner) {
                ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
                for (TopicPartition partition : records.partitions()) {
                    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                    for (ConsumerRecord<String, String> record : partitionRecords) {
                        //可以自定义Handler,处理对应的TOPIC消息(partitionRecords.key())
                        System.out.println(record.offset() + ": " + record.value());
                    }
                    /*consumer.commitSync();//同步*/
                }
            }
        } finally {
            consumer.close();
        }
    }
    public void setRunner(boolean runner) {
        this.runner = runner;
    }
}




编写Kafka工具类

package jfinal.plugin.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
/**
 * kafka工具类
 *
 * @author 孙宇
 */
public class Kafka {
    public static final String defaultName = "main";
    private static final Logger logger = LoggerFactory.getLogger(Kafka.class);
    public static final ConcurrentHashMap<String, Producer> producerMap = new ConcurrentHashMap<>();
    public static final ConcurrentHashMap<String, KafkaConsumerTemplate> consumerMap = new ConcurrentHashMap<>();
    public static void addProducer(String name,
                                   String servers,
                                   String keySerializer,
                                   String valueSerializer) {
        logger.info("添加生产者:{}", name);
        if (producerMap.containsKey(name)) {
            logger.error("{}已存在!", name);
        } else {
            Properties props = new Properties();
            props.put("bootstrap.servers", servers);
            props.put("key.serializer", keySerializer);
            props.put("value.serializer", valueSerializer);
            /*props.put("acks", "all");
            props.put("retries ", 1);
            props.put("buffer.memory", 33554432);
            */
            Producer<String, String> producer = new KafkaProducer<>(props);
            producerMap.put(name, producer);
        }
    }
    public Producer getProducer(String name) {
        return producerMap.get(name);
    }
    public static Future send(String producerName,
                              ProducerRecord record) {
        return producerMap.get(producerName).send(record);
    }
    public static void addConsumer(String name,
                                   KafkaConsumerTemplate consumer) {
        logger.info("添加消费者:{}", name);
        if (consumerMap.containsKey(name)) {
            logger.error("{}已存在!", name);
        } else {
            consumerMap.put(name, consumer);
        }
    }
    public KafkaConsumerTemplate getConsumer(String name) {
        return consumerMap.get(name);
    }
}




编写插件KafkaPlugin

package jfinal.plugin.kafka;
import com.jfinal.plugin.IPlugin;
import org.apache.kafka.clients.producer.Producer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
/**
 * jFinal kafka插件
 *
 * @author 孙宇
 */
public class KafkaPlugin implements IPlugin {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    public KafkaPlugin(String name,
                       String servers,
                       String keySerializer,
                       String valueSerializer) {
        Kafka.addProducer(name, servers, keySerializer, valueSerializer);
    }
    @Override
    public boolean start() {
        return true;
    }
    @Override
    public boolean stop() {
        logger.info("销毁所有生产者和消费者开始");
        for (Map.Entry<String, Producer> entry : Kafka.producerMap.entrySet()) {
            entry.getValue().close();
        }
        for (Map.Entry<String, KafkaConsumerTemplate> entry : Kafka.consumerMap.entrySet()) {
            entry.getValue().setRunner(false);
        }
        logger.info("销毁所有生产者和消费者结束");
        return true;
    }
}





在使用的时候,首先

//添加生产者
KafkaPlugin p = new KafkaPlugin(Kafka.defaultName, "127.0.0.1:9092", "org.apache.kafka.common.serialization.StringSerializer", "org.apache.kafka.common.serialization.StringSerializer");
p.start();




有了生产者,也要有消费者

//添加消费者
KafkaConsumerTemplate consumer = new KafkaConsumerTemplate("127.0.0.1:9092", "org.apache.kafka.common.serialization.StringDeserializer", "org.apache.kafka.common.serialization.StringDeserializer", "test.group", "test.topic");
Kafka.addConsumer(Kafka.defaultName, consumer);




发送消息时,调用

Kafka.send(Kafka.defaultName, new ProducerRecord<String, String>("test.topic", "keykey", "msgmsg中文消息啊"));



当然,如果你需要测试,即时看到消息,那么还可以

Future f = Kafka.send(Kafka.defaultName, new ProducerRecord<String, String>("test.topic", "keykey", "msgmsg中文消息啊" + new Date()));
f.get();




项目结束的时候调用

p.stop();




我们来编写个测试吧
首先我们定义消费者类

package test.jfinal.plugin.kafka;
import jfinal.plugin.kafka.Kafka;
import jfinal.plugin.kafka.KafkaConsumerTemplate;
import java.util.concurrent.ExecutionException;
/**
 * kafka测试类
 *
 * @author 孙宇
 */
public class TestKafkaConsumer {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        //添加消费者
        KafkaConsumerTemplate consumer = new KafkaConsumerTemplate("127.0.0.1:9092", "org.apache.kafka.common.serialization.StringDeserializer", "org.apache.kafka.common.serialization.StringDeserializer", "test.group", "test.topic");
        Kafka.addConsumer(Kafka.defaultName, consumer);
    }
}




然后定义生产者和发送消息的类

package test.jfinal.plugin.kafka;
import jfinal.plugin.kafka.Kafka;
import jfinal.plugin.kafka.KafkaPlugin;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Date;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
 * kafka测试类
 *
 * @author 孙宇
 */
public class TestKafkaProducer {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        //添加生产者
        KafkaPlugin p = new KafkaPlugin(Kafka.defaultName, "127.0.0.1:9092", "org.apache.kafka.common.serialization.StringSerializer", "org.apache.kafka.common.serialization.StringSerializer");
        p.start();
        for (int i = 0; i < 10; i++) {
            TimeUnit.SECONDS.sleep(1);
            //模拟发送消息
            Future f = Kafka.send(Kafka.defaultName, new ProducerRecord<String, String>("test.topic", "keykey", "msgmsg中文消息啊" + new Date()));
            f.get();
        }
        p.stop();
    }
}




然后就可以进行测试了,
注意::注意::注意::
启动是有顺序的,由于测试类我没有使用线程,所以你要先启动
TestKafkaConsumer
等TestKafkaConsumer启动完毕后,再启动TestKafkaProducer
然后切换到TestKafkaConsumer的Console,就可以看到生产者发送的消息了
一定要切换控制台哦。

只是在测试的时候有启动顺序, 你真正项目使用的时候,生产者和消费者初始化时没顺序要求的。

老样子,源码如下:
jfinal版的

https://git.oschina.net/sypro/jfinalplugin.git



spring版的

https://git.oschina.net/sypro/demo.git


要求jFinal支持markdown!!!这个编辑器太难用了,文章太长,调格式的时候,上下滑动,把编辑器的工具条固定置顶也行啊。。。。。

评论区

JFinal

2016-09-29 11:50

第一个 jfinal Kafka 分享,很有价值,超赞

markdown 逐步添上,现在没有时间,试试这个编辑器的全屏编辑功能,上下滑动就大大减少了

Dreamlu

2016-09-29 18:25

羡慕你们这些用高大上技术的公司(。˘•ε•˘。)

sphsyv

2016-09-29 18:29

@Dreamlu 哈,这些不用不行啊,大数据缺不了这些

JFinal

2016-09-29 23:12

@sphsyv 编辑器的工具条固定置顶已配置好,需要按 ctrl + F5 强制刷新缓存才能生效,有问题多反馈,感谢支持

IvyHelen

2016-10-09 17:13

Kafka的插件两周前就开始想写了,之前SpringMVC中使用了,但是最近太忙没有时间,感谢大神的分享,省去了很多时间。 学习学习!

陈佳霖

2018-03-15 12:06

@JFinal git目录是改了么

zy依然范特西

2018-03-20 10:25

这个Jfinal+kafka目录是改了吗 找不到呀

zy依然范特西

2018-03-20 10:33

@JFinal 这个目录改了吗 找不到

JFinal

2018-03-20 11:41

zy依然范特西

2018-03-20 16:59

Jfinal kafka 我打开目录好像不在了的

Psbye

2018-03-23 17:58

咨询一个问题就 消费者异常停止,然后启动之后就接收不到消息了,这个楼主有遇到吗?生产者一直在生产数据。只有重启生产者才会有数据接收成功

hotsmile

2018-05-29 20:01

不错,也要搞大数据了

liliddd

2018-09-26 14:43

@JFinal 代码目录在哪里呀 ,找不到了

凉月

2020-01-06 14:26

大佬都是那么优秀,16年就开始接触kafka了,而我去年才知道。不知道这个插件对于新的版本还有用吗?

weijl

2020-05-23 10:44

git 找不到了呀

caicai

2021-06-02 12:02

404了

苏三的歌

2022-04-19 16:55

git地址404了

热门分享

扫码入社