Introduction to Kafka Object SerializationSetting Up the ProjectCreating and Sending Custom ObjectsConfiguring Kafka for Object SerializationHandling Common ErrorsConclusion and Best Practices

Configuring Kafka for Object Serialization

In this guide, we'll cover how to configure Apache Kafka to handle object serialization and deserialization. We will explore both application.yml configuration and Java-based configuration approaches. Let's dive in!

Application.yml Configuration

To configure Kafka for object serialization using the application.yml file, follow these steps:

  1. Add Dependencies: Ensure that you have the necessary Kafka and Spring Kafka dependencies in your pom.xml or build.gradle file.
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
  1. Configure Application.yml: Add the following configuration to your application.yml file to set up the Kafka producer and consumer properties for object serialization.
spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: '*'

Java-based Configuration

If you prefer to configure Kafka using Java code, follow these steps:

  1. Create a Kafka Configuration Class: Create a class annotated with @Configuration to set up Kafka producer and consumer factories, as well as Kafka templates.
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ConcurrentMessageListenerContainer<String, Object> kafkaListenerContainerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        configProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
        configProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        return new ConcurrentMessageListenerContainer<>(
                new DefaultKafkaConsumerFactory<>(configProps),
                new ContainerProperties("yourTopicName"));
    }
}

Explanation

  • ProducerFactory: Configures the producer factory to use StringSerializer for the key and JsonSerializer for the value.
  • KafkaTemplate: Provides a template for executing high-level operations.
  • ConcurrentMessageListenerContainer: Configures the consumer factory to use ErrorHandlingDeserializer and JsonDeserializer for deserializing the message values.

By following these steps, you can successfully configure Kafka to handle object serialization and deserialization either through application.yml or Java-based configuration. For more details on creating and sending custom objects, refer to the Creating and Sending Custom Objects page.

Read more

Introduction to Kafka Object Serialization

Setting Up the Project

Creating and Sending Custom Objects

Configuring Kafka for Object Serialization

Handling Common Errors

Conclusion and Best Practices

VideoToDocMade with VideoToPage
VideoToDocMade with VideoToPage