kafka

1.kafka的应用场景

  • ⽇志收集:⼀个公司可以⽤Kafka收集各种服务的log,通过kafka以统⼀接⼝服务的⽅式 开放给各种consumer,例如hadoop、Hbase、Solr等。
  • 消息系统:解耦和⽣产者和消费者、缓存消息等。
  • ⽤户活动跟踪:Kafka经常被⽤来记录web⽤户或者app⽤户的各种活动,如浏览⽹⻚、 搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过 订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖 掘。
  • 运营指标:Kafka也经常⽤来记录运营监控数据。包括收集各种分布式应⽤的数据,⽣产 各种操作的集中反馈,⽐如报警和报告。

2.消息的基础术语

名称 解释
Broker 消息中间件处理节点,⼀个Kafka节点就是⼀个broker,⼀个或者 多个Broker可以组成⼀个Kafka集群
Topic Kafka根据topic对消息进⾏归类,发布到Kafka集群的每条消息都需 要指定⼀个topic
producer kafka的生产者向kafka集群发送消息
Consumer kafka的消费者接受kafka集群的消息
ConsumerGroup 每个Consumer属于⼀个特定的Consumer Group,⼀条消息可以 被多个不同的Consumer Group消费,但是⼀个Consumer Group 中只能有⼀个Consumer能够消费该消息
Partition 物理上的概念,⼀个topic可以分为多个partition,每个partition内部消息是有序的

3.Kafka的基本语法(不同版本的kafka不同,命令可能不同)

  • 创建主题topic(kafka-topics)
参数 描述
—bootstrap-server 连接的 Kafka Broker 主机名称和端口号。
—topic 主题名称
—create 创建主题
—delete 删除主题
—alter 修改主题
—list 查看所用主题
—describe 查看主题的详情描述
—partition 设置分区数
—replication-factor 设置分区副本
1
kafka-topics --bootstrap-server IP地址:9092 --create --partitions 2--replication-factor 3 --topic second
  • 查看主题的详情
1
kafka-topcis--bootstrap-server IP地址:9092 --describe --topic first
  • 修改分区数(注意:分区数只能增加,不能减少)
1
kafka-topics --bootstrap-server  IP地址:9092 --alter --topic first --partitions 3
  • 再次查看 first 主题的详情
1
kafka-topics --bootstrap-server  IP地址:9092 --describe --topic first
  • 删除 topic
1
kafka-topics --bootstrap-server  IP地址:9092 --delete --topic first
  • 生产者的命令
1
kafka-console-producer --broker-list   node09:9092 --topic first
  • 消费者的命令
1
kafka-console-consumer --bootstrap-server  IP地址:9092 --from-beginning --topic first

4.kafka消息的实现

  • 单播消息
1
kafka-console-consumer --bootstrap-server 10.31.167.10:9092 --consumer-property group.id=testGroup --topic first

单薄消息:是指在同一个消费组下有多个消费者,那么只有一个消费者可以消费到消息

  • 多播消息
1
2
kafka-console-consumer --bootstrap-server IP地址:9092 --consumer-property group.id=testGroup1 --topic test
kafka-console-consumer --bootstrap-server IP地址:9092 --consumer-property group.id=testGroup2 --topic test

多播消息:是指在不同的消费组下有一个消费者,那么每个消费者都可以消费到消息

  • 查看消费组消息
1
2
3
4
#查看当前主题下有哪些消费组
kafka-consumer-groups --bootstrap-server node09:9092 --list
# 查看消费组中的具体信息:⽐如当前偏移量、最后⼀条消息的偏移量、堆积的消息数量
kafka-consumer-groups --bootstrap-server node09:9092 --describe --group testGroup

5.kafka的API开发

  • 导入Maven依赖
1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
  • 代码实现 —-异步发送
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;

public class CustomProducer {
public static void main(String[] args) {
//连接的配置
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node09:9092");
//对KEY和VALUE的序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
//发消息的客户端
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
//发送消息
kafkaProducer.send(new ProducerRecord<String, String>("first","方某"));
//关闭
kafkaProducer.close();
}
}
  • 代码实现 —-异步发送带有回调函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class CustomProducer {
public static void main(String[] args){
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node09:9092");

properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());


KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

kafkaProducer.send(new ProducerRecord<String, String>("first", "", "卷王方"), new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e==null){
System.out.println("主题:"+recordMetadata.topic()+" | "+"分区:"+recordMetadata.partition());
}
else {
System.out.println("发送错误:"+e);
}
}
});

kafkaProducer.close();
}
}
  • 代码实现 —同步发送
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class CustomProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node09:9092");

properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());


KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

kafkaProducer.send(new ProducerRecord<String, String>("first", "", "卷王方"), new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e==null){
System.out.println("主题:"+recordMetadata.topic()+" | "+"分区:"+recordMetadata.partition());
}
else {
System.out.println("发送错误:"+e);
}
}
}).get();

kafkaProducer.close();
}
}
  • 指定分区
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class CustomProducer {
public static void main(String[] args){
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node09:9092");

properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());


KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

kafkaProducer.send(new ProducerRecord<String, String>("second",1, "", "卷王方"), new Callback() {
//在send方法中可以指定分区,不指定分区时靠KEY和VALUE的哈希值,没有KEY是靠分区的黏性输送,直到分区满了,之后在换分区
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e==null){
System.out.println("主题:"+recordMetadata.topic()+" | "+"分区:"+recordMetadata.partition());
}
else {
System.out.println("发送错误:"+e);
}
}
});

kafkaProducer.close();
}
}
  • 自定义过滤器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class CustomProducer {
public static void main(String[] args){
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node09:9092");

properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.atguigu.kafka.producer.myPartitioner");

KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

kafkaProducer.send(new ProducerRecord<String, String>("second","f", "卷王方"), new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e==null){
System.out.println("主题:"+recordMetadata.topic()+" | "+"分区:"+recordMetadata.partition());
}
else {
System.out.println("发送错误:"+e);
}
}
});

kafkaProducer.close();
}
}

------------------------------------------------------------------------------
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

public class myPartitioner implements Partitioner {
public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
return 0;//对数据进行过滤,可以自定义条件
}

public void close() {

}

public void configure(Map<String, ?> map) {

}
}

6.生产者提高吞吐量

  • 修改批次大小
  • 修改等待时间

  • 压缩数据

  • 修改缓冲区大小 //但是修改以上配置,可能会提高延迟
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class Partition_1 {
public static void main(String[] args) {
Properties properties = new Properties();
//连接集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node09:9092");
//KEY和VALUE的序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
//缓冲区大小
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
//批次大小
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
//修改等待时间
properties.put(ProducerConfig.LINGER_MS_CONFIG,5);
//压缩
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
//发送数据
kafkaProducer.send(new ProducerRecord<String, String>("second","方某"));
//关闭资源
kafkaProducer.close();
}
}

7.数据可靠性(ack应答)

  • ACK应带级别

可靠性总结:

acks=0:生产者直接发送数据,不需要应答,可靠性差,效率高

acks=1:生产者发送数据leader应答,可靠性中等,效率中等

acks=-1 :生产者发送数据leader和ISR队列里面的所有Folllwer应答,可靠性高,效率低。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Properties properties = new Properties();
//连接集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node09:9092");
//KEY和VALUE的序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
//缓冲区大小
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
//批次大小
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
//修改等待时间
properties.put(ProducerConfig.LINGER_MS_CONFIG,5);
//修改acks
properties.put(ProducerConfig.ACKS_CONFIG,"1");
//重试次数
properties.put(ProducerConfig.RETRIES_CONFIG,3);
//压缩
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
//发送数据
kafkaProducer.send(new ProducerRecord<String, String>("second","方某"));
//关闭资源
kafkaProducer.close();

8.数据重复

  • 开启事务必须开启幂等性(默认开启,幂等性可以保证Broker中数据只保存一条,当有相同时,Broker只会持久化一条。)

9.数据乱序