Avro 설정

2025. 3. 14. 09:55k8s/Kafka

pom.xml

	<dependencies>
		<dependency>
		    <groupId>io.confluent</groupId>
		    <artifactId>kafka-avro-serializer</artifactId>
		    <version>7.5.1</version> <!-- Match your Confluent version -->
		</dependency>
		<dependency>
		    <groupId>org.apache.avro</groupId>
		    <artifactId>avro</artifactId>
		    <version>1.11.1</version> <!-- Match your Avro version -->
		</dependency>
	</dependencies>
    
	<repositories>
	    <repository>
	        <id>confluent</id>
	        <url>https://packages.confluent.io/maven/</url>
	    </repository>
	</repositories>

application.yml

spring:
  kafka:
    bootstrap-servers: {접속정보}
    # Non-Avro Kafka Producer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

    # Custom properties for Avro (under kafka.avro)
    avro:
      schema-registry-url: {접속정보}
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer

Config.java

import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.avro.schema-registry-url}")
    private String schemaRegistryUrl;

    // Generic Non-Avro Producer Factory
    @Bean
    public ProducerFactory<String, String> nonAvroProducerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> nonAvroKafkaTemplate() {
        return new KafkaTemplate<>(nonAvroProducerFactory());
    }

    // Avro Producer Factory for GenericRecord
    @Bean
    public ProducerFactory<String, Object> avroProducerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put("schema.registry.url", schemaRegistryUrl);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Object> avroKafkaTemplate() {
        return new KafkaTemplate<>(avroProducerFactory());
    }
}

Sample.java

@Service
public class KafkaService {

    private final KafkaTemplate<String, String> nonAvroKafkaTemplate;
    private final KafkaTemplate<String, Object> avroKafkaTemplate;

    @Autowired
    public KafkaService(KafkaTemplate<String, String> nonAvroKafkaTemplate,
                        KafkaTemplate<String, Object> avroKafkaTemplate) {
        this.nonAvroKafkaTemplate = nonAvroKafkaTemplate;
        this.avroKafkaTemplate = avroKafkaTemplate;
    }

    // Send normal String message
    public void sendNonAvroMessage(String topic, String message) {
        nonAvroKafkaTemplate.send(topic, message);
    }

    // Send Avro GenericRecord (or SpecificRecord)
    public void sendAvroMessage(String topic, Object avroRecord) {
        avroKafkaTemplate.send(topic, avroRecord);
    }
}

Producer

    @PostMapping("/{topic}")
    public void sendMessage(HttpServletRequest request, @PathVariable String topic, @RequestBody Map<String, Object> message) {

        // 1. Extract schema and records
        List<Map<String, Object>> records = (List<Map<String, Object>>) message.get("records");
        Map<String, Object> valueRecord = (Map<String, Object>) records.get(0).get("value"); // Assuming single record

        // 2. Parse schema
        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse((String)message.get("value_schema"));

        // 3. Build GenericRecord
        GenericRecord avroRecord = new GenericData.Record(schema);
        for (Schema.Field field : schema.getFields()) {
            String fieldName = field.name();
            Object fieldValue = valueRecord.get(fieldName);
            avroRecord.put(fieldName, fieldValue);
        }

        // 4. Send GenericRecord to Kafka (Avro serializer will auto-register schema)
        producerService.sendMessage(topic, avroRecord);
    }