Kafka的安装
安装zookeeper
默认端口:2181
默认安装位置:/usr/local/Cellar/zookeeper
配置文件位置:/usr/local/etc/zookeeper
日志文件位置:/usr/local/var/log/zookeeper/zookeeper.log
启动zookeeper
1
| nohup zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties &
|
安装Kafka
默认端口:9092
默认安装位置:/usr/local/Cellar/kafka
配置文件位置:/usr/local/etc/kafka
日志文件位置:/usr/local/var/lib/kafka-logs
启动kafka
nohup kafka-server-start /usr/local/etc/kafka/server.properties &
订阅发布Demo
创建一个Topic
1
| kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
|
查看创建的Topic
1
| kafka-topics --list --zookeeper localhost:2181
|
生产者生产消息
1
| kafka-console-producer --broker-list localhost:9092 --topic test
|
消费者消费消息
1
| kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic test1 --from-beginning
|
–from-beginning: 将从第一个消息开始接收
SpringBoot集成Kafka
源码地址:https://gitee.com/IBLiplus/kafka-demo.git
项目启动前按照上述安装启动步骤,在本地启动kafka.
创建Maven项目,引入一下依赖
1 2 3 4 5
| <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>pring-kafka</artifactId> <version>2.5.7.RELEASE</version> </dependency>
|
添加如下配置,端口号可以自己定
配置文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| server.port=9010 spring.kafka.bootstrap-servers= 127.0.0.1:9092
spring.kafka.producer.retries= 0
spring.kafka.producer.batch-size= 16384
spring.kafka.producer.buffer-memory= 33554432
spring.kafka.producer.key-serializer= org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer = org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.acks= 1
spring.kafka.consumer.auto-commit-interval= 1S
spring.kafka.consumer.auto-offset-reset= earliest
spring.kafka.consumer.enable-auto-commit= false
spring.kafka.consumer.key-deserializer= org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer= org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.listener.concurrency= 5
spring.kafka.listener.ack-mode= manual_immediate spring.kafka.listener.missing-topics-fatal= false
|
生产者生产消息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| @Component public class ProductDemo { Logger log = LoggerFactory.getLogger(ProductDemo.class);
@Resource private KafkaTemplate<String, Object> kafkaTemplate;
public static final String TOPIC_TEST = "topic.test"; public static final String TOPIC_GROUP1 = "topic.group1"; public static final String TOPIC_GROUP2 = "topic.group2";
public void send(String obj) { log.info("准备发送消息为:{}", obj); ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC_TEST, obj); future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() { @Override public void onFailure(Throwable throwable) { log.info(TOPIC_TEST + " - 生产者 发送消息失败:" + throwable.getMessage()); } @Override public void onSuccess(SendResult<String, Object> stringObjectSendResult) { log.info(TOPIC_TEST + " - 生产者 发送消息成功:" + stringObjectSendResult.toString()); } }); } }
|
消费消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| @Component public class ConsumerDemo { Logger log = LoggerFactory.getLogger(ConsumerDemo.class);
@KafkaListener(topics = ProductDemo.TOPIC_TEST, groupId = ProductDemo.TOPIC_GROUP1) public void topic_test(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
Optional message = Optional.ofNullable(record.value()); if (message.isPresent()) { Object msg = message.get(); log.info("topic_test 消费了: Topic:" + topic + ",Message:" + msg); ack.acknowledge(); } }
@KafkaListener(topics = ProductDemo.TOPIC_TEST, groupId = ProductDemo.TOPIC_GROUP2) public void topic_test1(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
Optional message = Optional.ofNullable(record.value()); if (message.isPresent()) { Object msg = message.get(); log.info("topic_test1 消费了: Topic:" + topic + ",Message:" + msg); ack.acknowledge(); } } }
|
测试接口:
1 2 3 4 5 6 7 8 9
| @Resource private ProductDemo productDemo;
@GetMapping("/kafka/test") public void testKafka(){ logger.info("start test"); productDemo.send("hello kafka"); logger.info("end test"); }
|
山脚太拥挤 我们更高处见。