Kafka Connect → Elasticsearch Sink 설정

2026. 2. 10. 22:53k8s/Kafka

도입부
Kafka 로그를 Elasticsearch에 쌓는다고 하면 그냥 “토픽 → ES”라고 생각하기 쉬운데, 실제로는 Kafka Connect → Elasticsearch Sink 설정이 핵심이다. 나도 이번에 그걸 처음 알았다.
처음엔 커넥터 JSON만 넣으면 끝날 줄 알았는데, 403 권한 에러와 자동 인덱스 생성 문제까지 만나면서 꽤 깊게 파고들었다. 이 글은 그 과정을 정리한 기록이다.


환경

  • Kafka Connect
  • Elasticsearch (Kubernetes 내부 서비스)
  • 커넥터: minecraft-es-sink
  • 토픽: minecraft-logs

1. 커넥터 등록 (기본 흐름)
포트포워드 후 커넥터 JSON을 등록한다.

kubectl -n kafka port-forward svc/kafka-connect 8083:8083
curl -sS -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  --data-binary @k8s/connectors/minecraft-es-sink.json

2. 커넥터 JSON 예시

{
  "name": "minecraft-es-sink",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "minecraft-logs",
    "connection.url": "http://<ES_HOST>:<ES_PORT>",
    "connection.username": "${env:ES_USERNAME}",
    "connection.password": "${env:ES_PASSWORD}",
    "key.ignore": "true",
    "schema.ignore": "true",
    "behavior.on.null.values": "ignore",
    "write.method": "insert",
    "type.name": "_doc",
    "errors.tolerance": "all",
    "errors.log.enable": "true",
    "errors.log.include.messages": "true",
    "external.resource.usage": "INDEX",
    "topic.to.external.resource.mapping": "minecraft-logs:minecraft-logs",
    "name": "minecraft-es-sink"
  }
}

3. 409 Conflict (이미 존재하는 커넥터)
이미 존재하면 409가 뜬다. 이때는 PUT 업데이트를 사용한다.

jq '.config' k8s/connectors/minecraft-es-sink.json | \
curl -sS -X PUT http://localhost:8083/connectors/minecraft-es-sink/config \
  -H "Content-Type: application/json" \
  --data-binary @-

4. 상태가 EMPTY/FAILED인 이유
로그에 다음 에러가 반복된다면 원인은 자동 인덱스 생성 권한 문제다.

action [indices:admin/create] is unauthorized for user [kafka-connect]

5. 권한 확인

kubectl -n kafka exec -it deploy/kafka-connect -- sh -c \
'curl -sS -u "$ES_USERNAME:$ES_PASSWORD" \
http://es-es-http.elastic.svc.cluster.local:9200/_security/_authenticate'
kubectl -n kafka exec -it deploy/kafka-connect -- sh -c \
'curl -sS -u "$ES_USERNAME:$ES_PASSWORD" -H "Content-Type: application/json" \
http://es-es-http.elastic.svc.cluster.local:9200/_security/user/_has_privileges -d "\
{\"index\":[{\"names\":[\"minecraft-logs\"],\"privileges\":[\"create_index\",\"write\",\"create_doc\",\"index\"]}]}"'

6. 인덱스 수동 생성

kubectl -n kafka exec -it deploy/kafka-connect -- sh -c \
'curl -sS -u "$ES_USERNAME:$ES_PASSWORD" -X PUT \
http://es-es-http.elastic.svc.cluster.local:9200/minecraft-logs'

7. 자동 생성 끄고 기존 인덱스 고정

jq '.config + {
  "external.resource.usage":"INDEX",
  "topic.to.external.resource.mapping":"minecraft-logs:minecraft-logs"
}' k8s/connectors/minecraft-es-sink.json > /tmp/minecraft-es-sink.json

curl -sS -X PUT http://localhost:8083/connectors/minecraft-es-sink/config \
  -H "Content-Type: application/json" \
  --data-binary @/tmp/minecraft-es-sink.json

8. task 재시작 및 상태 확인

curl -sS -X POST http://localhost:8083/connectors/minecraft-es-sink/tasks/0/restart
curl -sS http://localhost:8083/connectors/minecraft-es-sink/status

정상이라면 tasks[0].state = RUNNING.


9. ES 적재 확인

kubectl -n kafka exec -it deploy/kafka-connect -- sh -c \
'curl -sS -u "$ES_USERNAME:$ES_PASSWORD" \
http://es-es-http.elastic.svc.cluster.local:9200/minecraft-logs/_search?size=1'

10. 실제 삽질 포인트

  • 커넥터 플러그인 없어서 이미지 직접 제작
    기본 Kafka Connect 이미지에 ES Sink 커넥터가 없어서 플러그인을 포함한 이미지를 따로 만들어야 했다.

  • API Key 미지원 → 계정/비번 방식 사용
    환경에서 API Key가 안 되어서 connection.username/password로 인증했다.


(추가) Dockerfile 예시 — Elasticsearch Sink 플러그인 포함 이미지

FROM confluentinc/cp-kafka-connect:<VERSION>

ENV CONNECT_PLUGIN_PATH=/usr/share/java,/usr/share/confluent-hub-components

RUN confluent-hub install --no-prompt \
  confluentinc/kafka-connect-elasticsearch:<CONNECTOR_VERSION>

정리

  • 자동 인덱스 생성 모드는 편하지만 권한 이슈가 있으면 계속 실패
  • 인덱스를 미리 만들고 external.resource.usage=INDEX로 고정하면 안정적
  • RUNNING 상태 + ES 검색 결과가 나오면 정상