Kafka使用总结:Producer

Kafka作为消息中间件是各公司平台架构绕不开的话题。

不管你是把Kafka作为队列,还是消息通道,都需要在应用中通过producer写数据到Kafka,再用consumer从Kafka中消费。应用往Kafka写数据的原因有很多:用户行为分析、日志存储、异步通信等。多样化的使用场景带来了多样化的需求:消息是否能丢失?是否容忍重复?消息的吞吐量?消息的延迟?

这么苛刻的要求Kafka能满足吗?

image

Kafka Producer

首先,创建ProducerRecord必须包含Topic和Value,key和partition可选。然后,序列化key和value对象为ByteArray,并发送到网络。

接下来,消息发送到partitioner。如果创建ProducerRecord时指定了partition,此时partitioner啥也不用做,简单的返回指定的partition即可。如果未指定partition,partitioner会基于ProducerRecord的key生成partition。producer选择好partition后,增加record到对应topic和partition的batch record。最后,专有线程负责发送batch record到合适的Kafka broker。

当broker收到消息时,它会返回一个应答(response)。如果消息成功写入Kafka,broker将返回RecordMetadata对象(包含topic,partition和offset);相反,broker将返回error。这时producer收到error会尝试重试发送消息几次,直到producer返回error。

Producer实战
构造Kafka Producer

创建Properties对象,配置producer参数。根据Properties创建producer对象。Kafka producer必选参数有3个:

  • bootstrap.servers :Kafka broker的列表,包含host和port。此处不必包含Kafka集群所有的broker,因为producer会通过其它broker查询到所需信息。但至少包含2个broker;
  • key.serializer:序列化key参数,值为类名,org.apache.kafka.common.serialization.Serializer接口的实现;
  • value.serializer:序列化value参数,值为类名,使用方式同key.serializer

最简代码实现如下:

1
2
3
4
5
6
7
private Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.String-
Serializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serializa-
tion.StringSerializer");
producer = new KafkaProducer<String, String>(kafkaProps);

创建Properties对象,key和value为String类型,选用Kafka自带的StringSerializer。通过属性配置可以控制Producer的行为。

实例化producer后,接着发送消息。这里主要有3种发送消息的方法:

  • 立即发送:只管发送消息到server端,不care消息是否成功发送。大部分情况下,这种发送方式会成功,因为Kafka自身具有高可用性,producer会自动重试;但有时也会丢失消息;
  • 同步发送:通过send()方法发送消息,并返回Future对象。get()方法会等待Future对象,看send()方法是否成功;
  • 异步发送:通过带有回调函数的send()方法发送消息,当producer收到Kafka broker的response会触发回调函数

以上所有情况,一定要时刻考虑发送消息可能会失败,想清楚如何去处理异常。

通常我们是一个producer起一个线程开始发送消息。为了优化producer的性能,一般会有下面几种方式:单个producer起多个线程发送消息;使用多个producer。

下面开始详细展示上面所提到的三种发送消息的方法,以及各种类型错误的处理方式。

发送消息到Kafka

最简单的方法如下:

1
2
3
4
5
6
7
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products","France");
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}

创建ProducerRecord对象,Producer使用send()方法发送ProducerRecord。send()方法会返回带有RecordMetadata的Future对象,这里只简单的忽略返回值,所以我们并不会知道消息是否发送成功;

但即使如此简单,Producer发送消息到Kafka也仍然得处理些异常:当序列化消息失败会抛出SerializationException;buffer溢出会抛出BufferExhaustedException;当发送线程终止会抛出InterruptException。

同步发消息
1
2
3
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
producer.send(record).get();

这里,我们使用Future.get()方法等待Kafka的状态返回。Producer可以实现自己的Future来处理Kafka broker返回的异常。如果Producer发送消息成功,它会返回RecordMetadata对象(可用来检索消息的offset)。

Kafka Producer一般有两类错误。可重试错误会通过重试发送消息解决。比如,连接重连可解决连接错误;partition重新选举leader可解决“no leader”错误。Kafka Producer能配置重试次数,超过重试次数还不能解决的会抛出错误。另外一类就是不能通过重试处理的错误,比如,消息大小太大,这种情况下Kafka Producer会立即报错。

异步发送消息

如果应用和Kafka集群间的网络质量太差,那么同步发送消息的方式发送每条消息后需要等待较长时间才收到应答。这对高并发海量消息发送简直就是灾难,因为等待应答的时间远超过消息发送时间。另外,有些app压根就不要求返回值。况且,即使发送消息失败了,只要写下对应的错误日志即可。

为了异步发送消息,同时可以处理错误。Producer支持带有回调函数的发送消息方法。

1
2
3
4
5
6
7
8
9
10
11
private class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
} }
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA");
producer.send(record, new DemoProducerCallback());

使用回调函数的前提是实现org.apache.kafka.clients.producer.Callback 接口。如果Kafka返回错误,onCompletion捕获到非null异常。示例代码仅仅打印出了异常信息,实际应用开发需根据实际情况添加业务逻辑处理。

序列化

在前面的列子中,可看出Producer配置必须指定序列化方法(serializer,默认是String serializer)。

这里将讲解如何构建定制化的序列化器,然后介绍Avro序列化器。

定制序列化器

当你需要发送到Kafka的对象非String和Integer,那你要么自己实现对应的序列化器,要么使用像Avro、Thrift或者Protobuf之类的业界通用的序列化库。这里强烈推荐使用这些工业化的通用的序列化库。

为了让大家理解序列化器的工作原理,这里还是先讲讲如何构建定制化的序列化器。下面先建一个简单的Customer类:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class Customer {
private int customerID;
private String customerName;
public Customer(int ID, String name) {
this.customerID = ID;
this.customerName = name;
}
public int getID() {
return customerID;
}
public String getName() {
return customerName;
} }

接着创建Customer类的序列化器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import org.apache.kafka.common.errors.SerializationException;
import java.nio.ByteBuffer;
import java.util.Map;
public class CustomerSerializer implements Serializer<Customer> {
@Override
public void configure(Map configs, boolean isKey) {
// nothing to configure
}
@Override
/**
We are serializing Customer as:
4 byte int representing customerId
4 byte int representing length of customerName in UTF-8 bytes (0 if name is
Null)
N bytes representing customerName in UTF-8
**/
public byte[] serialize(String topic, Customer data) {
try {
byte[] serializedName;
int stringSize;
if (data == null)
return null;
else {
if (data.getName() != null) {
serializeName = data.getName().getBytes("UTF-8");
stringSize = serializedName.length;
} else {
serializedName = new byte[0];
stringSize = 0;
}
}
ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize);
buffer.putInt(data.getID());
buffer.putInt(stringSize);
buffer.put(serializedName);
return buffer.array();
} catch (Exception e) {
throw new SerializationException("Error when serializing Customer to byte[] " + e);
}
}
@Override
public void close() {
// nothing to close
}
}
Avro的序列化

Avro详细说明见官方文档,这里只列出部分要用到的特性。

Avro Schema是用Json描述,如下:

1
2
3
4
5
6
7
8
{"namespace": "customerManagement.avro",
"type": "record",
"name": "Customer",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "email", "type": ["null", "string"], "default": "null"}
] }

Avro依赖模式(Schema)来实现数据结构定义,所以读写Avro文件都得依赖其Schema。Kafka中Schema Registry提供元数据的存储和解析。那Producer的序列化和Consumer的反序列化都会去Schema Registry读取对应的Schema。

image

Avro的使用有两种:一种是使用Avro Schema生成的类(官方提供生成工具,比如,avro-tools-1.7.0.jar);一种是直接Avro Schema。Kafka Producer使用Avro序列化器的方式与其它序列化器相同。下面先说使用Avro Schema生成的类的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", schemaUrl);
String topic = "customerContacts";
int wait = 500;
Producer<String, Customer> producer = new KafkaProducer<String, Customer>(props);
// We keep producing new events until someone ctrl-c
while (true) {
Customer customer = CustomerGenerator.getNext();
System.out.println("Generated customer " + customer.toString());
ProducerRecord<String, Customer> record =
new ProducerRecord<>(topic, customer.getId(), customer);
producer.send(record);
}

其中,schema.registry.url是schema存储的位置,KafkaAvroSerializer 是Avro的序列化器,Customer 是生成的类。

如果你想直接使用Avro Schema,方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", url);
String schemaString = "{\"namespace\": \"customerManagement.avro\",
\"type\": \"record\", " +
"\"name\": \"Customer\"," +
"\"fields\": [" +
"{\"name\": \"id\", \"type\": \"int\"}," +
"{\"name\": \"name\", \"type\": \"string\"}," +
"{\"name\": \"email\", \"type\": [\"null\",\"string
\"], \"default\":\"null\" }" +
"]}";
Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(props);
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(schemaString);
for (int nCustomers = 0; nCustomers < customers; nCustomers++) {
String name = "exampleCustomer" + nCustomers;
String email = "example " + nCustomers + "@example.com";
GenericRecord customer = new GenericData.Record(schema);
customer.put("id", nCustomer);
customer.put("name", name);
customer.put("email", email);
ProducerRecord<String, GenericRecord> data =
new ProducerRecord<String, GenericRecord>("customerContacts", name, customer);
producer.send(data);
} }

未完待续。。。

Enjoy!


侠天,专注于大数据、机器学习和数学相关的内容,并有个人公众号:bigdata_ny分享相关技术文章。

若发现以上文章有任何不妥,请联系我。

image