前阵子发了一个ActiveMQ的插件,我们项目还使用了Kafka,所以抽个时间把spring版的转换了jFinal plugin出来给大家分享。
至于kafka是个什么东西,我这里就不描述了,节约jfinal的数据库^_^,你能看到这篇分享,说明你对kafka有一定了解。
首先定义一个消费者模版KafkaConsumerTemplate
package jfinal.plugin.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import java.util.Arrays; import java.util.List; import java.util.Properties; /** * 消费者模版 * * @author 孙宇 */ public class KafkaConsumerTemplate { private boolean runner = true; public KafkaConsumerTemplate(String servers, String keyDeserializer, String valueDeserializer, String group, String topic) { Properties props = new Properties(); props.put("bootstrap.servers", servers); props.put("key.deserializer", keyDeserializer); props.put("value.deserializer", valueDeserializer); props.put("group.id", group);//不同ID 可以同时订阅消息 /* props.put("enable.auto.commit", "false"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000");*/ KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic));//订阅TOPIC try { while (runner) { ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); for (ConsumerRecord<String, String> record : partitionRecords) { //可以自定义Handler,处理对应的TOPIC消息(partitionRecords.key()) System.out.println(record.offset() + ": " + record.value()); } /*consumer.commitSync();//同步*/ } } } finally { consumer.close(); } } public void setRunner(boolean runner) { this.runner = runner; } }
编写Kafka工具类
package jfinal.plugin.kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; /** * kafka工具类 * * @author 孙宇 */ public class Kafka { public static final String defaultName = "main"; private static final Logger logger = LoggerFactory.getLogger(Kafka.class); public static final ConcurrentHashMap<String, Producer> producerMap = new ConcurrentHashMap<>(); public static final ConcurrentHashMap<String, KafkaConsumerTemplate> consumerMap = new ConcurrentHashMap<>(); public static void addProducer(String name, String servers, String keySerializer, String valueSerializer) { logger.info("添加生产者:{}", name); if (producerMap.containsKey(name)) { logger.error("{}已存在!", name); } else { Properties props = new Properties(); props.put("bootstrap.servers", servers); props.put("key.serializer", keySerializer); props.put("value.serializer", valueSerializer); /*props.put("acks", "all"); props.put("retries ", 1); props.put("buffer.memory", 33554432); */ Producer<String, String> producer = new KafkaProducer<>(props); producerMap.put(name, producer); } } public Producer getProducer(String name) { return producerMap.get(name); } public static Future send(String producerName, ProducerRecord record) { return producerMap.get(producerName).send(record); } public static void addConsumer(String name, KafkaConsumerTemplate consumer) { logger.info("添加消费者:{}", name); if (consumerMap.containsKey(name)) { logger.error("{}已存在!", name); } else { consumerMap.put(name, consumer); } } public KafkaConsumerTemplate getConsumer(String name) { return consumerMap.get(name); } }
编写插件KafkaPlugin
package jfinal.plugin.kafka; import com.jfinal.plugin.IPlugin; import org.apache.kafka.clients.producer.Producer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; /** * jFinal kafka插件 * * @author 孙宇 */ public class KafkaPlugin implements IPlugin { private final Logger logger = LoggerFactory.getLogger(this.getClass()); public KafkaPlugin(String name, String servers, String keySerializer, String valueSerializer) { Kafka.addProducer(name, servers, keySerializer, valueSerializer); } @Override public boolean start() { return true; } @Override public boolean stop() { logger.info("销毁所有生产者和消费者开始"); for (Map.Entry<String, Producer> entry : Kafka.producerMap.entrySet()) { entry.getValue().close(); } for (Map.Entry<String, KafkaConsumerTemplate> entry : Kafka.consumerMap.entrySet()) { entry.getValue().setRunner(false); } logger.info("销毁所有生产者和消费者结束"); return true; } }
在使用的时候,首先
//添加生产者 KafkaPlugin p = new KafkaPlugin(Kafka.defaultName, "127.0.0.1:9092", "org.apache.kafka.common.serialization.StringSerializer", "org.apache.kafka.common.serialization.StringSerializer"); p.start();
有了生产者,也要有消费者
//添加消费者 KafkaConsumerTemplate consumer = new KafkaConsumerTemplate("127.0.0.1:9092", "org.apache.kafka.common.serialization.StringDeserializer", "org.apache.kafka.common.serialization.StringDeserializer", "test.group", "test.topic"); Kafka.addConsumer(Kafka.defaultName, consumer);
发送消息时,调用
Kafka.send(Kafka.defaultName, new ProducerRecord<String, String>("test.topic", "keykey", "msgmsg中文消息啊"));
当然,如果你需要测试,即时看到消息,那么还可以
Future f = Kafka.send(Kafka.defaultName, new ProducerRecord<String, String>("test.topic", "keykey", "msgmsg中文消息啊" + new Date())); f.get();
项目结束的时候调用
p.stop();
我们来编写个测试吧
首先我们定义消费者类
package test.jfinal.plugin.kafka; import jfinal.plugin.kafka.Kafka; import jfinal.plugin.kafka.KafkaConsumerTemplate; import java.util.concurrent.ExecutionException; /** * kafka测试类 * * @author 孙宇 */ public class TestKafkaConsumer { public static void main(String[] args) throws InterruptedException, ExecutionException { //添加消费者 KafkaConsumerTemplate consumer = new KafkaConsumerTemplate("127.0.0.1:9092", "org.apache.kafka.common.serialization.StringDeserializer", "org.apache.kafka.common.serialization.StringDeserializer", "test.group", "test.topic"); Kafka.addConsumer(Kafka.defaultName, consumer); } }
然后定义生产者和发送消息的类
package test.jfinal.plugin.kafka; import jfinal.plugin.kafka.Kafka; import jfinal.plugin.kafka.KafkaPlugin; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Date; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; /** * kafka测试类 * * @author 孙宇 */ public class TestKafkaProducer { public static void main(String[] args) throws InterruptedException, ExecutionException { //添加生产者 KafkaPlugin p = new KafkaPlugin(Kafka.defaultName, "127.0.0.1:9092", "org.apache.kafka.common.serialization.StringSerializer", "org.apache.kafka.common.serialization.StringSerializer"); p.start(); for (int i = 0; i < 10; i++) { TimeUnit.SECONDS.sleep(1); //模拟发送消息 Future f = Kafka.send(Kafka.defaultName, new ProducerRecord<String, String>("test.topic", "keykey", "msgmsg中文消息啊" + new Date())); f.get(); } p.stop(); } }
然后就可以进行测试了,
注意::注意::注意::
启动是有顺序的,由于测试类我没有使用线程,所以你要先启动
TestKafkaConsumer
等TestKafkaConsumer启动完毕后,再启动TestKafkaProducer
然后切换到TestKafkaConsumer的Console,就可以看到生产者发送的消息了
一定要切换控制台哦。
只是在测试的时候有启动顺序, 你真正项目使用的时候,生产者和消费者初始化时没顺序要求的。
老样子,源码如下:
jfinal版的
https://git.oschina.net/sypro/jfinalplugin.git
spring版的
https://git.oschina.net/sypro/demo.git
要求jFinal支持markdown!!!这个编辑器太难用了,文章太长,调格式的时候,上下滑动,把编辑器的工具条固定置顶也行啊。。。。。
markdown 逐步添上,现在没有时间,试试这个编辑器的全屏编辑功能,上下滑动就大大减少了