Avro 설정
2025. 3. 14. 09:55ㆍk8s/Kafka
pom.xml
<dependencies>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.5.1</version> <!-- Match your Confluent version -->
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.1</version> <!-- Match your Avro version -->
</dependency>
</dependencies>
<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>
application.yml
spring:
kafka:
bootstrap-servers: {접속정보}
# Non-Avro Kafka Producer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# Custom properties for Avro (under kafka.avro)
avro:
schema-registry-url: {접속정보}
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
Config.java
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.avro.schema-registry-url}")
private String schemaRegistryUrl;
// Generic Non-Avro Producer Factory
@Bean
public ProducerFactory<String, String> nonAvroProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> nonAvroKafkaTemplate() {
return new KafkaTemplate<>(nonAvroProducerFactory());
}
// Avro Producer Factory for GenericRecord
@Bean
public ProducerFactory<String, Object> avroProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put("schema.registry.url", schemaRegistryUrl);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Object> avroKafkaTemplate() {
return new KafkaTemplate<>(avroProducerFactory());
}
}
Sample.java
@Service
public class KafkaService {
private final KafkaTemplate<String, String> nonAvroKafkaTemplate;
private final KafkaTemplate<String, Object> avroKafkaTemplate;
@Autowired
public KafkaService(KafkaTemplate<String, String> nonAvroKafkaTemplate,
KafkaTemplate<String, Object> avroKafkaTemplate) {
this.nonAvroKafkaTemplate = nonAvroKafkaTemplate;
this.avroKafkaTemplate = avroKafkaTemplate;
}
// Send normal String message
public void sendNonAvroMessage(String topic, String message) {
nonAvroKafkaTemplate.send(topic, message);
}
// Send Avro GenericRecord (or SpecificRecord)
public void sendAvroMessage(String topic, Object avroRecord) {
avroKafkaTemplate.send(topic, avroRecord);
}
}
Producer
@PostMapping("/{topic}")
public void sendMessage(HttpServletRequest request, @PathVariable String topic, @RequestBody Map<String, Object> message) {
// 1. Extract schema and records
List<Map<String, Object>> records = (List<Map<String, Object>>) message.get("records");
Map<String, Object> valueRecord = (Map<String, Object>) records.get(0).get("value"); // Assuming single record
// 2. Parse schema
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse((String)message.get("value_schema"));
// 3. Build GenericRecord
GenericRecord avroRecord = new GenericData.Record(schema);
for (Schema.Field field : schema.getFields()) {
String fieldName = field.name();
Object fieldValue = valueRecord.get(fieldName);
avroRecord.put(fieldName, fieldValue);
}
// 4. Send GenericRecord to Kafka (Avro serializer will auto-register schema)
producerService.sendMessage(topic, avroRecord);
}
'k8s > Kafka' 카테고리의 다른 글
java.lang.NullPointerException: Cannot invoke "java.lang.CharSequence.toString()" because "charSequence" is null (1) | 2025.03.14 |
---|---|
org.apache.avro.SchemaParseException: Illegal character in: x-forwarded-for (0) | 2025.03.14 |
Broker 외부에 노출하기 (0) | 2025.03.10 |
kafka-ui에 schema registry 연결하기 (0) | 2025.03.09 |
Kafka Topic 생성 및 Producer/Consumer 테스트 (0) | 2025.03.03 |