kafka
1.kafka的应用场景
- ⽇志收集:⼀个公司可以⽤Kafka收集各种服务的log,通过kafka以统⼀接⼝服务的⽅式 开放给各种consumer,例如hadoop、Hbase、Solr等。
- 消息系统:解耦和⽣产者和消费者、缓存消息等。
- ⽤户活动跟踪:Kafka经常被⽤来记录web⽤户或者app⽤户的各种活动,如浏览⽹⻚、 搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过 订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖 掘。
- 运营指标:Kafka也经常⽤来记录运营监控数据。包括收集各种分布式应⽤的数据,⽣产 各种操作的集中反馈,⽐如报警和报告。
2.消息的基础术语
名称 |
解释 |
Broker |
消息中间件处理节点,⼀个Kafka节点就是⼀个broker,⼀个或者 多个Broker可以组成⼀个Kafka集群 |
Topic |
Kafka根据topic对消息进⾏归类,发布到Kafka集群的每条消息都需 要指定⼀个topic |
producer |
kafka的生产者向kafka集群发送消息 |
Consumer |
kafka的消费者接受kafka集群的消息 |
ConsumerGroup |
每个Consumer属于⼀个特定的Consumer Group,⼀条消息可以 被多个不同的Consumer Group消费,但是⼀个Consumer Group 中只能有⼀个Consumer能够消费该消息 |
Partition |
物理上的概念,⼀个topic可以分为多个partition,每个partition内部消息是有序的 |
3.Kafka的基本语法(不同版本的kafka不同,命令可能不同)
参数 |
描述 |
—bootstrap-server |
连接的 Kafka Broker 主机名称和端口号。 |
—topic |
主题名称 |
—create |
创建主题 |
—delete |
删除主题 |
—alter |
修改主题 |
—list |
查看所用主题 |
—describe |
查看主题的详情描述 |
—partition |
设置分区数 |
—replication-factor |
设置分区副本 |
1
| kafka-topics --bootstrap-server IP地址:9092 --create --partitions 2--replication-factor 3 --topic second
|
1
| kafka-topcis--bootstrap-server IP地址:9092 --describe --topic first
|
1
| kafka-topics --bootstrap-server IP地址:9092 --alter --topic first --partitions 3
|
1
| kafka-topics --bootstrap-server IP地址:9092 --describe --topic first
|
1
| kafka-topics --bootstrap-server IP地址:9092 --delete --topic first
|
1
| kafka-console-producer --broker-list node09:9092 --topic first
|
1
| kafka-console-consumer --bootstrap-server IP地址:9092 --from-beginning --topic first
|
4.kafka消息的实现
1
| kafka-console-consumer --bootstrap-server 10.31.167.10:9092 --consumer-property group.id=testGroup --topic first
|
单薄消息:是指在同一个消费组下有多个消费者,那么只有一个消费者可以消费到消息
1 2
| kafka-console-consumer --bootstrap-server IP地址:9092 --consumer-property group.id=testGroup1 --topic test kafka-console-consumer --bootstrap-server IP地址:9092 --consumer-property group.id=testGroup2 --topic test
|
多播消息:是指在不同的消费组下有一个消费者,那么每个消费者都可以消费到消息
1 2 3 4
| #查看当前主题下有哪些消费组 kafka-consumer-groups --bootstrap-server node09:9092 --list # 查看消费组中的具体信息:⽐如当前偏移量、最后⼀条消息的偏移量、堆积的消息数量 kafka-consumer-groups --bootstrap-server node09:9092 --describe --group testGroup
|
5.kafka的API开发
1 2 3 4 5
| <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.0.0</version> </dependency>
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties;
public class CustomProducer { public static void main(String[] args) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node09:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties); kafkaProducer.send(new ProducerRecord<String, String>("first","方某")); kafkaProducer.close(); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties; import java.util.concurrent.ExecutionException;
public class CustomProducer { public static void main(String[] args){ Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node09:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
kafkaProducer.send(new ProducerRecord<String, String>("first", "", "卷王方"), new Callback() { public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e==null){ System.out.println("主题:"+recordMetadata.topic()+" | "+"分区:"+recordMetadata.partition()); } else { System.out.println("发送错误:"+e); } } });
kafkaProducer.close(); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties; import java.util.concurrent.ExecutionException;
public class CustomProducer { public static void main(String[] args) throws ExecutionException, InterruptedException { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node09:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
kafkaProducer.send(new ProducerRecord<String, String>("first", "", "卷王方"), new Callback() { public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e==null){ System.out.println("主题:"+recordMetadata.topic()+" | "+"分区:"+recordMetadata.partition()); } else { System.out.println("发送错误:"+e); } } }).get();
kafkaProducer.close(); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties; import java.util.concurrent.ExecutionException;
public class CustomProducer { public static void main(String[] args){ Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node09:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
kafkaProducer.send(new ProducerRecord<String, String>("second",1, "", "卷王方"), new Callback() { public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e==null){ System.out.println("主题:"+recordMetadata.topic()+" | "+"分区:"+recordMetadata.partition()); } else { System.out.println("发送错误:"+e); } } });
kafkaProducer.close(); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties; import java.util.concurrent.ExecutionException;
public class CustomProducer { public static void main(String[] args){ Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node09:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.atguigu.kafka.producer.myPartitioner");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
kafkaProducer.send(new ProducerRecord<String, String>("second","f", "卷王方"), new Callback() { public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e==null){ System.out.println("主题:"+recordMetadata.topic()+" | "+"分区:"+recordMetadata.partition()); } else { System.out.println("发送错误:"+e); } } });
kafkaProducer.close(); } }
------------------------------------------------------------------------------ import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster;
import java.util.Map;
public class myPartitioner implements Partitioner { public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) { return 0; }
public void close() {
}
public void configure(Map<String, ?> map) {
} }
|
6.生产者提高吞吐量
- 修改批次大小
修改等待时间
压缩数据
- 修改缓冲区大小 //但是修改以上配置,可能会提高延迟
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class Partition_1 { public static void main(String[] args) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node09:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432"); properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384"); properties.put(ProducerConfig.LINGER_MS_CONFIG,5); properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy"); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties); kafkaProducer.send(new ProducerRecord<String, String>("second","方某")); kafkaProducer.close(); } }
|
7.数据可靠性(ack应答)
可靠性总结:
acks=0:生产者直接发送数据,不需要应答,可靠性差,效率高
acks=1:生产者发送数据leader应答,可靠性中等,效率中等
acks=-1 :生产者发送数据leader和ISR队列里面的所有Folllwer应答,可靠性高,效率低。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node09:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
properties.put(ProducerConfig.LINGER_MS_CONFIG,5);
properties.put(ProducerConfig.ACKS_CONFIG,"1");
properties.put(ProducerConfig.RETRIES_CONFIG,3);
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy"); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
kafkaProducer.send(new ProducerRecord<String, String>("second","方某"));
kafkaProducer.close();
|
8.数据重复
开启事务必须开启幂等性(默认开启,幂等性可以保证Broker中数据只保存一条,当有相同时,Broker只会持久化一条。)
9.数据乱序