java kafka 测试
新建springboot项目 : 连接
源码:
https://github.com/chaoren399/kafka-demo.git
2. pom.xml文件添加kafka依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
生产消息
新建类:
KafkaProducerDemo
package com.example.sprintbootdemo;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.Scanner;
public class KafkaProducerDemo {
public static void main(String[] args) throws InterruptedException {
Scanner sc = new Scanner(System.in);
Producer<String, Object> producer = createProducer();
String mess = null;
boolean flag = true;
while (flag) {
System.out.print("生产消息(输入exit 退出): ");
mess = sc.nextLine();
if (mess != null && (flag = (!"exit".equals(mess)))) {
producer.send(new ProducerRecord<>("test", mess));
Thread.sleep(1000);
System.out.println("send message success....");
}
}
}
/**
* 创建生产者
*
* @return
*/
public static Producer<String, Object> createProducer() {
// 使用生产者配置
Properties properties = buildProducerProperties();
// 创建生产者对象
KafkaProducer<String, Object> producer = new KafkaProducer<>(properties);
return producer;
}
/**
* 构建生产者配置
*
* @return
*/
public static Properties buildProducerProperties() {
Properties properties = new Properties();
// properties.put("bootstrap.servers", "127.0.0.1:9092");
// properties.put("bootstrap.servers", "123.57.143.145:32008");
properties.put("bootstrap.servers", "123.57.143.145:31002");
properties.put("acks", "all");
properties.put("retries", 0);
properties.put("batch.size", 16384);
properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", StringSerializer.class.getName());
properties.put("value.serializer", StringSerializer.class.getName());
return properties;
}
}
消费消息
package com.example.sprintbootdemo;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumerDemo {
public static void main(String[] args) {
//创建消费者, 并消费消息
Consumer<String, Object> consumer = createConsumer();
while (true) {
ConsumerRecords<String, Object> records = consumer.poll(100);
for (ConsumerRecord<String, Object> record : records) {
System.out.println("消费消息: " + record.value());
}
}
}
/**
* 创建消费者
*
* @return
*/
public static Consumer<String, Object> createConsumer() {
// 使用消费者配置
Properties properties = buildConsumerProperties();
// 创建消费者对象
Consumer<String, Object> consumer = new KafkaConsumer<>(properties);
// 订阅 test 主题
consumer.subscribe(Arrays.asList("test"));
// 返回消费者
return consumer;
}
/**
* 构建消费者配置
*
* @return
*/
public static Properties buildConsumerProperties() {
Properties properties = new Properties();
properties.put("bootstrap.servers", "127.0.0.1:9092");
properties.put("group.id", "test");
properties.put("enable.auto.commit", false);
properties.put("auto.commit.interval.ms", "1000");
properties.put("session.timeout.ms", "30000");
properties.put("key.deserializer", StringDeserializer.class.getName());
properties.put("value.deserializer", StringDeserializer.class.getName());
return properties;
}
}
然后运行: 生产者和消费者 demo
在生产者中输入 消息
正常是在消费端可以展示生产者生产的数据
正常是在消费端可以展示生产者生产的数据
欢迎来撩 : 汇总all