2026. 2. 10. 22:53ㆍk8s/Kafka
도입부
Kafka 로그를 Elasticsearch에 쌓는다고 하면 그냥 “토픽 → ES”라고 생각하기 쉬운데, 실제로는 Kafka Connect → Elasticsearch Sink 설정이 핵심이다. 나도 이번에 그걸 처음 알았다.
처음엔 커넥터 JSON만 넣으면 끝날 줄 알았는데, 403 권한 에러와 자동 인덱스 생성 문제까지 만나면서 꽤 깊게 파고들었다. 이 글은 그 과정을 정리한 기록이다.
환경
- Kafka Connect
- Elasticsearch (Kubernetes 내부 서비스)
- 커넥터:
minecraft-es-sink - 토픽:
minecraft-logs
1. 커넥터 등록 (기본 흐름)
포트포워드 후 커넥터 JSON을 등록한다.
kubectl -n kafka port-forward svc/kafka-connect 8083:8083
curl -sS -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
--data-binary @k8s/connectors/minecraft-es-sink.json
2. 커넥터 JSON 예시
{
"name": "minecraft-es-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "minecraft-logs",
"connection.url": "http://<ES_HOST>:<ES_PORT>",
"connection.username": "${env:ES_USERNAME}",
"connection.password": "${env:ES_PASSWORD}",
"key.ignore": "true",
"schema.ignore": "true",
"behavior.on.null.values": "ignore",
"write.method": "insert",
"type.name": "_doc",
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"external.resource.usage": "INDEX",
"topic.to.external.resource.mapping": "minecraft-logs:minecraft-logs",
"name": "minecraft-es-sink"
}
}
3. 409 Conflict (이미 존재하는 커넥터)
이미 존재하면 409가 뜬다. 이때는 PUT 업데이트를 사용한다.
jq '.config' k8s/connectors/minecraft-es-sink.json | \
curl -sS -X PUT http://localhost:8083/connectors/minecraft-es-sink/config \
-H "Content-Type: application/json" \
--data-binary @-
4. 상태가 EMPTY/FAILED인 이유
로그에 다음 에러가 반복된다면 원인은 자동 인덱스 생성 권한 문제다.
action [indices:admin/create] is unauthorized for user [kafka-connect]5. 권한 확인
kubectl -n kafka exec -it deploy/kafka-connect -- sh -c \
'curl -sS -u "$ES_USERNAME:$ES_PASSWORD" \
http://es-es-http.elastic.svc.cluster.local:9200/_security/_authenticate'
kubectl -n kafka exec -it deploy/kafka-connect -- sh -c \
'curl -sS -u "$ES_USERNAME:$ES_PASSWORD" -H "Content-Type: application/json" \
http://es-es-http.elastic.svc.cluster.local:9200/_security/user/_has_privileges -d "\
{\"index\":[{\"names\":[\"minecraft-logs\"],\"privileges\":[\"create_index\",\"write\",\"create_doc\",\"index\"]}]}"'
6. 인덱스 수동 생성
kubectl -n kafka exec -it deploy/kafka-connect -- sh -c \
'curl -sS -u "$ES_USERNAME:$ES_PASSWORD" -X PUT \
http://es-es-http.elastic.svc.cluster.local:9200/minecraft-logs'
7. 자동 생성 끄고 기존 인덱스 고정
jq '.config + {
"external.resource.usage":"INDEX",
"topic.to.external.resource.mapping":"minecraft-logs:minecraft-logs"
}' k8s/connectors/minecraft-es-sink.json > /tmp/minecraft-es-sink.json
curl -sS -X PUT http://localhost:8083/connectors/minecraft-es-sink/config \
-H "Content-Type: application/json" \
--data-binary @/tmp/minecraft-es-sink.json
8. task 재시작 및 상태 확인
curl -sS -X POST http://localhost:8083/connectors/minecraft-es-sink/tasks/0/restart
curl -sS http://localhost:8083/connectors/minecraft-es-sink/status
정상이라면 tasks[0].state = RUNNING.
9. ES 적재 확인
kubectl -n kafka exec -it deploy/kafka-connect -- sh -c \
'curl -sS -u "$ES_USERNAME:$ES_PASSWORD" \
http://es-es-http.elastic.svc.cluster.local:9200/minecraft-logs/_search?size=1'
10. 실제 삽질 포인트
커넥터 플러그인 없어서 이미지 직접 제작
기본 Kafka Connect 이미지에 ES Sink 커넥터가 없어서 플러그인을 포함한 이미지를 따로 만들어야 했다.API Key 미지원 → 계정/비번 방식 사용
환경에서 API Key가 안 되어서connection.username/password로 인증했다.
(추가) Dockerfile 예시 — Elasticsearch Sink 플러그인 포함 이미지
FROM confluentinc/cp-kafka-connect:<VERSION>
ENV CONNECT_PLUGIN_PATH=/usr/share/java,/usr/share/confluent-hub-components
RUN confluent-hub install --no-prompt \
confluentinc/kafka-connect-elasticsearch:<CONNECTOR_VERSION>
정리
- 자동 인덱스 생성 모드는 편하지만 권한 이슈가 있으면 계속 실패
- 인덱스를 미리 만들고
external.resource.usage=INDEX로 고정하면 안정적 RUNNING상태 + ES 검색 결과가 나오면 정상
'k8s > Kafka' 카테고리의 다른 글
| Filebeat 멀티라인 적용 후 Kafka Connect ES Sink 장애 해결기 (DLQ + 오프셋 스킵) (0) | 2026.02.17 |
|---|---|
| Kafka Connect로 Minecraft 로그를 Elasticsearch에 적재하고 운영하는 방법 (커넥터 수정/확인 포함) (0) | 2026.02.14 |
| elasticsearch의 index 생성 주기 설정 (4) | 2025.08.16 |
| Kafka에서 지난 데이터 전송하기 (0) | 2025.08.16 |
| Kafka Consumer에서 elasticsearch @timestamp 전달할때 (0) | 2025.08.16 |