본문 바로가기
PROGRAM

[Apache Kafka] 개념, 설치 및 Producer/Consumer 사용 예제

by ojava 2022. 4. 20.
반응형

Apache Kafka

Apache Software Foundation의 Scalar 언어로 된 오픈 소스 메시지 브로커 프로젝트
: Open Source Message Broker Project
: 텍스트 형태의 메시지나 다양한 문서 Format을 가진 데이터에 대한 전달을 수행할 수 있음

고성능 데이터 파이프라인, 스트리밍 분석, 데이터 통합 ​​및 미션 크리티컬 애플리케이션을 위해 수천 개의 회사에서 사용하는 오픈 소스 분산 이벤트 스트리밍 플랫폼 (https://kafka.apache.org/)

실시간 데이터 피드를 관리하기 위해 통일된 높은 처리량, 낮은 지연 시간을 지닌 플랫폼 제공

데이터 전달, 데이터 지연 시간을 낮추고 원하는 곳으로 데이터를 전달하는 용도로 사용

Rabbit MQ도 데이터 이관 용도로 사용하나 데이터 이관 용량이나 처리 속도 측면에서 Kafka가 더 좋은 성능을 가짐

 

기존 데이터 연결 방식은 아래와 같은 단점을 가짐

  • End-to-End 연결 방식의 아키텍처
    : 각 DB에서 원하는 시스템으로 각각 연결해줘야 함
  • 데이터 종류 증가로 인해 연동의 복잡성 증가
    : 연동하고자 하는 하드웨어, OS를 고려해야 하고 연결 시 장애 발생 요소 등도 신경 써야 됨
  • 서로 다른 데이터마다 각각의 파이프라인 연결 구조를 가져야 해서 구성 과정이 힘듦
  • 기존 방식의 연결은 기존 목적 이외에는 확장이 어려운 구조

 

이런 불편함을 해결하고자, 모든 시스템으로 데이터를 실시간 전송 및 처리할 수 있는 시스템이 필요해졌고 데이터가 많아지더라도 확장이 용이한 시스템에 대한 요구가 증가하면서 Kafka가 등장함

각 데이터베이스와 시스템 사이에 Kafka를 도입하여, End-to-End 방식이 아니라, 보내는 쪽(Producer)과 받는 쪽(Consumer)이 이 데이터가 어디서 왔고 어디로 연결할지 신경 쓸 필요 없이 Kafka에서 중계역할을 수행하게 됨.
그래서 메시지 '브로커' 라는 이름을 붙인 것으로 보인다.

  • Producer (데이터 보내는 주체, Database 등)와 Consumer (데이터 가공 주체, Hadoop, Search Engine 등) 분리
  • 전달받은 메시지를 다양한 형태의 여러 Consumer에게 보내는 것을 허용
  • 높은 처리량을 위해 메시지 최적화시켜서 내부적으로 보관
  • 연결 방식의 확장 (Scale-out) 가능
  • 다양한 형태의 Eco-system 연동되므로 보내고 받는 용도뿐 아니라, 다양한 기능으로 이용 가능
  • 높은 처리량, 확장 가능, 영구 보관, 고가용성

 

 

Kafka Broker

실행된 Kafka Application Server를 의미한다.

일반적으로 1대 이상, 기본적으로 3대 이상의 Broker Cluster 구성하는 것을 권장함
: 데이터 연동 과정에서 안정적인 서비스를 제공하기 위해서 여러대로 클러스터링 하는 것으로 보임
: 이 과정에서 데이터 연계 시 각 Broker에게 데이터를 전달하는 코디네이터 역할을 수행하는 것이 Zookeeper

Apache Zookeeper
- 여러 대의 Broker를 중재하고 연결하는 역할
- 내부 Broker의 메타데이터 (Broker ID, Controller ID 등) 저장
- Controller 정보 저장

N개 Broker로 구성한 Broker Cluster 중 1대는 Controller 기능 수행

Controller 역할
- 각 Broker에게 담당 파티션 할당 수행
- Broker 정상 동작 모니터링 관리

 

Kafka 설치 및 폴더 구조

https://kafka.apache.org/downloads

 

Apache Kafka

Apache Kafka: A Distributed Streaming Platform.

kafka.apache.org

현재 기준 최신 버전은 3.1.0 버전이다. (2022.01.24 Release)

Scala 2.13 버전으로 다운로드할 예정임, 설치형이 아니기 때문에 OS별로 다운로드 링크가 별도로 있는 게 아니라 다운로드한 후 압축 해제하면 되는 구조이다.

 

kafka 다운로드 시 받게 되는 폴더 구조

Kafka는 3대 이상의 Broker로 Cluster를 구성하는 게 가장 좋지만, 예제에서는 Single Cluster 형태로 구성할 예정이다. 만약 구성할 일이 있다면 Zookeeper도 3대 이상, Broker도 3대 이상으로 구성하는게 안정적임

config 폴더에는 설정 파일들이 모여있는데, zookeeper.properties는 zookeeper 관련 설정, server.properties는 apache kafka 관련 설정임.

폴더 안에 connect-* 형태의 파일이 많은데 apache kafka는 데이터 연결 기능뿐 아니라 로그 연동, 다른 쪽 데이터를 가져와서 데이터 전달하는 source와 sync 기능이 있는데 이 내용과 관련한 내용임

bin 폴더에는 실행을 위한 각종 명령어가 모여있는데 Linux, Mac OS에서 사용할 수 있는 shell 프로그램이 모여있다. windows용 파일은 windows 폴더 밑에 별도로 bat 파일들만 따로 모여있다.

 

 

1) Kafka Client

Kafka와 데이터를 주고받기 위해 사용하는 Java Library

실제 데이터 통신이 필요할 때, 원하는 DB와 시스템을 End-to-End 방식으로 연계하는 게 아니라 중간에 Kafka Cluster를 두고 Kafka-client Application 간에 데이터 통신을 하게 됨

https://docs.confluent.io/platform/current/clients/index.html

 

Kafka Clients | Confluent Documentation

Home Build Applications for Kafka Kafka Clients Kafka Clients This section describes the clients included with Confluent Platform. Confluent Platform includes client libraries for multiple languages that provide both low-level access to Apache Kafka® and

docs.confluent.io

Producer, Consumer, Admin, Stream 등 Kafka 관련 API를 제공

다양한 3rd party library가 존재하여 연계가 쉬움 (C/C++, Go, Java, Python, .NET 등)

** Spring Framework 중 Maven에 연결할 수 있는 denpendency는 아래와 같음
https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients

 

Kafka 서버 기동 및 Topic 생성, Producer/Consumer 만들기

아까 다운로드한 Kafka의 압축을 풀어둔 경로를 $KAFKA_HOME으로 지정했다는 가정하에 아래와 같이 명령어를 수행시키게 되면 각 기능 동작이 가능하다.

  1. Zookeeper 및 Kafka 서버 구동 (Zookeeper 먼저 구동하고 kafka 서버 구동)
    - $KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
    - $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties

    단, 윈도우 서버에서 수행 시 압축 풀어둔 폴더로 이동해서 아래와 같은 명령어 수행
    - .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
    - .\bin\windows\kafka-server-start.sh .\config\server.properties

    밑에 써둔 명령어 모두 sh 실행 기준인데 윈도우는 각각 경로에 맞춰서 변경만 해주면 동일함
  2. Topic 생성 (--topic 다음에는 원하는 이름 지정)
    $KAFKA_HOME/bin/kafka-topic.sh --create --topic ojava-kafka-test --bootstrap-server localhost:9092 --partitions 1

  3. Topic 목록 확인
    $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

  4. Topic 정보 확인
    $KAFKA_HOME/bin/kafka-topic.sh --describe --topic ojava-kafka-test --bootstrap-server localhost:9092

Producer가 메시지를 보내고자 하면 kafka에서는 기본적으로 Topic을 만들고 이곳에 메시지를 저장한다. consumer는 관심 있는 Topic에 관심을 신청하면 해당 Topic을 받게 된다.

localhost:9092는 Kafka 구동 시 켜지는 기본 포트를 이용한 것임 (변경 시 각자 맞춰서 명령어 수정)

 

위에서 만든 Topic을 이용해서 Producer와 Consumer가 메시지를 만들고 사용하는 방식을 테스트해보자.

  1. Producer를 통한 메시지 생성
    $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic ojava-kafka-test
    -- broker-list에 기동한 kafka 서버 주소 입력
    -- topic에 위에서 만든 topic 명칭 입력

  2. Consumer를 통한 메시지 사용
    $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic ojava-kafka-test \ -- from-beginning
    -- bootstrap-server에는 메시지를 받아올 kafka 서버 주소 입력
    -- topic도 받아오고자 하는 Topic 이름을 지정한다.
    -- from-beginning은 처음부터 메시지 내용을 다 가져오고자 할 때 사용 (중간에 만든 Consumer라도 처음부터 받아올 수 있음)

여러 개의 Consumer를 만들어도 동일한 Topic에서 데이터 받아올 수 있음

 

 

반응형

2) Kafka Connect

앞에서 Kafka Client에 대해 알아봤다면, 이번에는 Kafka Connect에 대해 알아보자.

Kafka Connect는 데이터를 직접 Import / Export 할 수 있는 기능

코드 없이 Configuration으로 데이터 이동이 가능하고, Standalone mode와 Distribution mode를 지원한다.

  • RESTful API 통해서 지원
  • Stram 또는 Batch 형태로 데이터 전송 가능
  • Custom Connection을 통한 다양한 Plug-in 제공
    : File, AWS S3, hive, MySQL 등 다양한 스토리지 서비스 연계 가능

가져오는 쪽을 Kafka Connect Source, 보내는 쪽을 Kafka Connect Sink라고 한다.

Source System (보내고자 하는 데이터베이스 또는 파일)
-> Kafka Connect Source -> Kafka Cluster -> Kafka Connect Sink
-> Target System (데이터를 수신할 별도의 시스템)

 

Kafka Connect 설치

Linux 기반 시스템은 아래 URL을 통해 다운로드하고, 윈도우 기반은 해당 링크에 접속해서 다운로드하고 압축 풀면 된다.

curl -O http://packages.confluent.io/archive/7.1/confluent-community-7.1.0.tar.gz
tar xvf confluent-community-6.1.0.tar.gz
cd  $KAFKA_CONNECT_HOME

 

 

Kafka Connect 설정

./etc/kafka/connect-distributed.properties에서 설정 정보 관리

Kafak Connect 실행

./bin/connect-distributed ./etc/kafka/connect-distributed.properties (앞에서 설정한 설정 정보를 불러옴)

윈도우의 경우 .\bin\windows\connect-distributed.bat .\etc\kafka\connect-distributed.properties

Topic 목록 확인

Kafka Connect 실행 후 앞에서 수행했던 Topic 목록 확인을 다시 하게 되면, 내가 만들지 않은 Topic이 추가로 보이게 된다. Kafka Connect에서 자동으로 만들어주는 내용들 (connect-* 로 시작하는 topic들이 이에 해당)

./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list 

 

 

JDBC Connector 설치 (Source AND Sink)

https://docs.confluent.io/kafka-connect-jdbc/current/index.html

현재 최신 버전은 confluentinc-kafka-connect-jdbc-10.4.1.zip 

JDBC 다운로드한 뒤에, Kafka Connect 압축 푼 폴더에서 etc/kafka/connect-distributed.properties 파일을 찾아 마지막에 아래 plugin 정보 추가

기존에 있던 내용 주석처리하고 아래 내용 추가
plugin.path=[confluentinc-kafka-connect-jdbc-10.4.1/lib 폴더 경로]

예를 들어 윈도우인 경우, 아래와 같이 추가
plugin.path=\C:\\DEV\\apache\\confluentinc-kafka-connect-jdbc-10.4.1/lib\\lib

JdbcSourceConnector에서 MariaDB 사용하기 위해 mariadb 드라이버 복사

Kafka Connect 경로에서 ./share/java/kafka/ 폴더에 mariadb-java-client-2.7.2.jar  파일 복사

 

 

Kafka Connect 관련 명령어 모음

RESTful API를 지원한다고 했으므로, 명령어라기보다는 특정 URL을 호출하는 내용이다.

  1. 데이터 입력 (POST)
    curl -x POST http://localhost:8083/connectors
  2. 목록 확인 (GET)
    curl -x GET http://localhost:8083/connectors 
  3. connect 내역 확인
    curl http://localhost:8083/connectors/ojava-kafka-connect(또는 등록한 Connector 이름)/status

 

Kafka Source Connect 데이터 입력

echo '
{
    "name" : "ojava-kafka-connect",
    "config" : {
        "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url":"jdbc:mysql://localhost:3306/test", // DB 정보
        "connection.user":"root",  // DB 정보
        "connection.password":"비밀번호입력",  // DB 정보
        "mode": "incrementing",
        "incrementing.column.name" : "id",
        "table.whitelist":"users", // MariaDB에서 데이터 변경을 감지하고자 할 테이블 목록
        "topic.prefix" : "my_topic_", // 감지되는 내용을 Topic에 저장할 때 사용할 명명규칙
        "tasks.max" : "1"
    }
}' 
| curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"

 

Kafka Sink Connect 데이터 입력

echo '
{
    "name" : "ojava-sink-connect",
    "config" : {
        "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url":"jdbc:mysql://localhost:3306/test", // DB 정보
        "connection.user":"root",  // DB 정보
        "connection.password":"비밀번호입력",  // DB 정보
        "auto.create":"true", // Topic의 이름과 같은 테이블을 생성해주는 설정!!
        "auto.evolve":"true",
        "delete.enabled":"false",
        "tasks.max" : "1",
        "topics":"my_topic_users" // 어느 데이터랑 연결할지를 지정해주는 부분
    }
}' 
| curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"

 

위에서 배운 내용을 조합해서 아래와 같은 처리 진행이 가능

1) Kafka-console-producer에서 데이터 전송 (특정 Topic) -> Topic에서 데이터 수신하여 추가 -> Kafka Source Connector를 통해 연결되어 MariaDB에 추가

2) MariaDB에 직접 데이터 변경 시, Kafka Source Connector를 통해 연결된 내용이 데이터 변경을 감지하여 Topic에 담게 되고 이 Topic을 바라보고 있는 Kafka Sink Connector에서 수신된 데이터를 담는 테이블에 데이터 입력 처리  

3) 위와 같은 내용을 이용하여 ETL에 대해 처리할 수 있음
: ETL이란 Extract Transform Load, 추출 변환 로드를 말하며 동일 기종 또는 다른 기종의 데이터 소스로부터 데이터 추출 후 다른 DB에 적재하는 방식
: 내가 알고 있는 건 TIBCO 한 가지였는데, Hevo Data, AWS Glud, Azure Data Facroty, Google Data Flow 등 다양한 상용 툴이 제공되고 있다. 특징적으로 Cloud 서비스 별로 ETL Tool을 제공하고 있다.
: 무료로 제공되는 ETL Tool 오픈 소스로는 Pentaho, Talend, Apache Nifi 등이 있음 

 

** Kafka Connect 윈도우에서 기동 시, 로그 누적 안 되는 이유 및 해결 방법
(인프런 커뮤니티 질의응답 게시글 내용 발췌)

/bin/windows/connect-distributed.bat 파일 내 로그 누적되는 설정 정보 경로가 잘못되어있음
kafka 서버 설치 경로에 있는 내용을 참조하도록 되어있는 듯

rem Log4j settings
IF ["%KAFKA_LOG4J_OPTS%"] EQU [""] (
    set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%BASE_DIR%/config/connect-log4j.properties
)

위 경로를 %BASE_DIR%/etc/kafka/connect-log4j.properties 로 바꿔주면 해결 가능

[참고] connect-distributed.sh

if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
  LOG4J_CONFIG_NORMAL_INSTALL="/etc/kafka/connect-log4j.properties"
  LOG4J_CONFIG_ZIP_INSTALL="$base_dir/../etc/kafka/connect-log4j.properties"
  if [ -e "$LOG4J_CONFIG_NORMAL_INSTALL" ]; then # Normal install layout
    KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${LOG4J_CONFIG_NORMAL_INSTALL}"
  elif [ -e "${LOG4J_CONFIG_ZIP_INSTALL}" ]; then # Simple zip file layout
    KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${LOG4J_CONFIG_ZIP_INSTALL}"
  else # Fallback to normal default
    KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/connect-log4j.properties"
  fi
fi
export KAFKA_LOG4J_OPTS

 

 

참고자료
: 인프런 강좌 Spring Cloud로 개발하는 마이크로 서비스 애플리케이션 (MSA)
https://kafka.apache.org/
: What is Apache Kafka? (redhat.com)

 

Apache Kafka

Apache Kafka: A Distributed Streaming Platform.

kafka.apache.org

 

Apache Kafka(아파치 카프카)란? 소개, 생성, 설치 및 성능

Apache Kafka(아파치 카프카)는 분산 환경에서 사용되는 데이터 스트리밍 플랫폼이고, 오픈소스를 특징으로 하며, 실시간 스트림의 처리 등에서 활용되는 솔루션입니다.

www.redhat.com

 

반응형