Configuring Kafka for Object Serialization
In this guide, we'll cover how to configure Apache Kafka to handle object serialization and deserialization. We will explore both application.yml configuration and Java-based configuration approaches. Let's dive in!
Application.yml Configuration
To configure Kafka for object serialization using the application.yml
file, follow these steps:
- Add Dependencies: Ensure that you have the necessary Kafka and Spring Kafka dependencies in your
pom.xml
orbuild.gradle
file.
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
- Configure Application.yml: Add the following configuration to your
application.yml
file to set up the Kafka producer and consumer properties for object serialization.
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: '*'
Java-based Configuration
If you prefer to configure Kafka using Java code, follow these steps:
- Create a Kafka Configuration Class: Create a class annotated with
@Configuration
to set up Kafka producer and consumer factories, as well as Kafka templates.
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ConcurrentMessageListenerContainer<String, Object> kafkaListenerContainerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
configProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
configProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return new ConcurrentMessageListenerContainer<>(
new DefaultKafkaConsumerFactory<>(configProps),
new ContainerProperties("yourTopicName"));
}
}
Explanation
- ProducerFactory: Configures the producer factory to use StringSerializer for the key and JsonSerializer for the value.
- KafkaTemplate: Provides a template for executing high-level operations.
- ConcurrentMessageListenerContainer: Configures the consumer factory to use ErrorHandlingDeserializer and JsonDeserializer for deserializing the message values.
By following these steps, you can successfully configure Kafka to handle object serialization and deserialization either through application.yml
or Java-based configuration. For more details on creating and sending custom objects, refer to the Creating and Sending Custom Objects page.