HomeBlogGuestbookLab 

JDM's Blog

온갖 테스트 결과가 기록되는 이곳은 JDM's Blog입니다. :3

Apache Kafka

Introduce

아파치 카프카를 공식 홈페이지에서 소개한 내용을 발췌해봤습니다.

Apache Kafka is publish-subscribe messaging rethought as a distributed commit log.

Fast
A single Kafka broker can handle hundreds of megabytes of reads and writes per second from thousands of clients.

Scalable
Kafka is designed to allow a single cluster to serve as the central data backbone for a large organization. It can be elastically and transparently expanded without downtime. Data streams are partitioned and spread over a cluster of machines to allow data streams larger than the capability of any single machine and to allow clusters of co-ordinated consumers

Durable
Messages are persisted on disk and replicated within the cluster to prevent data loss. Each broker can handle terabytes of messages without performance impact.

Distributed by Design
Kafka has a modern cluster-centric design that offers strong durability and fault-tolerance guarantees.

http://kafka.apache.org/

아파치 카프카는 발행-구독 메시지를 분산 커밋 로그로 다시 생각한 것이고 빠르고, 확장적이고 지속적이면서 분산 환경에 알맞도록 디자인이 되었다고 하네요. 개인적인 느낌은 메시지 큐Message Queue의 분산 기능을 추가한 것처럼 보입니다.

위에서 확장적Scalable에서 언급된 것이 눈에 띕니다. 거대한 기관을 위한 중앙 데이터 백본으로써 싱글 클러스터만으로도 서비스 가능하다는 점인데요. 이건 벤치마킹 자료를 검색하면 많이 있어 보일 것 같아서 그냥 지나가는 걸로... :)

그럼 소개는 여기까지만 하고 아파치 카프카를 사용하는 법을 알아봅시다.

Getting Started :: Basic

아파치 카프카를 설치하고 구동하는 법을 알아봅시다. 참고한 사이트는 Apache Kafka :: Getting Started입니다.

Download Source

처음으로 할 것은 wget으로 카프카를 내려 받는 것입니다. 해당 커맨드가 허용 안된다면 scp 또는 FTP 프로그램 등을 이용해서 파일을 옮깁니다.

$ wget http://apache.tt.co.kr/kafka/0.8.2.0/kafka_2.10-0.8.2.0.tgz
$ tar -xzf kafka_2.10-0.8.2.0.tgz
$ ln -s kafka_2.10-0.8.2.0 kafka
$ cd kafka

간단한 심볼릭 링크도 걸어뒀습니다.

Start Zookeeper

카프카가 구동되기 위해서는 주키퍼가 필요합니다. 아래의 커맨드는 주키퍼 서버 인스턴스를 시작합니다.

$ cd bin
$ ./zookeeper-server-start.sh ../config/zookeeper.properties &

Start Kafka

주키퍼 서버가 준비되었다면 카프카 브로커kafka broker를 구동합니다.

$ ./kafka-server-start.sh ../config/server.properties &

Create Topic

카프카 토픽을 만듭니다. 여기서는 "test" 라는 이름의 토픽을 만들었고 리플리케이션 팩터는 1, 파티션도 1인 토픽입니다.

$ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

리플리케이션 팩터replication-factor와 파티션partition은 하단부에서 설명할 예정입니다.

Send Message

"test" 토픽에 메시지를 만들어서 보내봅시다.

$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic test
Test Message.
Hello World!
^C

여기서 9092라는 포트를 사용하는 것은 카프카 브로커 구동시 설정한 server.properties에 port=9092 라고 설정 되었기 때문입니다.

Receiving Message

"test" 토픽을 구독하고 메시지를 읽어봅시다.

$ ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
Test Message.
Hello World!
^C

Getting Started :: Single Host Multi-broker Cluster

Basic에서 본 내용은 아주 간단한 예제만을 위해 진행했던 코드입니다. 이제부터는 클러스터를 만들어 봅니다. Basic에서부터 예제를 진행했다면 현재는 주키퍼 서버 인스턴스 하나와 카프카 브로커 하나가 돌아가는 상태입니다. 여기서 2개의 카프카 브로커를 추가해봅시다.

즉, 하나의 호스트에 주키퍼와 카프카 브로커가 3개인 구성이 됩니다.

Make server.properies

우선 카프카 브로커 환경 설정을 해야 합니다. {kafka}/config 위치의 server.properties 파일을 각각 다음의 이름으로 복사합니다.

$ cp ./server.properties ./server-2.properties
$ cp ./server.properties ./server-3.properties

각각의 파일을 다음과 같은 내용으로 수정합니다.

# server-2.properties
broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1

# server-3.properties
broker.id=2
port=9094
log.dir=/tmp/kafka-logs-2

Start Kafka

{kafka}/bin 에서 아래와 같은 커맨드로 카프카 브로커를 띄웁니다.

$ ./kafka-server-start.sh ../config/server-2.properties &
$ ./kafka-server-start.sh ../config/server-3.properties &

Create Topic

이번엔 카프카 토픽을 만들때 레플리카Replica를 만들수 있도록 생성해 봅시다. 여기서 레플리카를 생성할수 있도록 --replication-factor 옵션에 "3"이라는 값을 줍니다. 레플리카를 3곳으로 유지한다는 뜻이죠. 새로 만든 토픽 이름은 "rep_test" 입니다.

$ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic rep_test

TIP. 레플리카(Replica)

레플리카라는 것은 사본이라는 뜻입니다. IT 용어로의 레플리카는 원본 데이터의 손실을 대비해 만들어두는 백업 데이터라 생각하면 됩니다.

Detail Topic Info

지금 만들어낸 "rep_test" 토픽의 환경설정을 한번 살펴봅시다. 아래와 같은 커맨드를 입력합니다.

$ ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic rep_test
Topic:rep_test  PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: rep_test Partition: 0    Leader: 1   Replicas: 1,0,2 Isr: 1,0,2

여기서 리더Leader는 토픽의 파티션들 중에 어떤 파티션이 읽고 쓰기 위한 권한을 가졌는지 인덱스를 표현합니다. 현재 0번 파티션에 1이 출력되는데 이것은 1번 브로커가 모든 읽기/쓰기에 대한 권한을 가지고 있다는 뜻입니다.

레플리카스Replicas는 토픽의 사본Replica이 어떤 브로커에 저장되었는지 목록을 출력합니다. 여기서는 0번 파티션에 레플리카스가 1,0,2가 찍히는데 다시 말하면 0,1,2번 브로커에 사본이 있다는 뜻입니다.

Isr은 레플리카스의 부분집합이기도 하지만 "현재 살아있는 브로커를 표현한다." 보면 편합니다. 즉 현재 동기화 되어 있는 레플리카스의 집합도 됩니다.

Send Message to Cluster

이번엔 메시지를 만들어서 보내봅시다. 사실 카프카에서는 "Log"라고 하지만 맥락상 메시지로 임의 표현했습니다.

$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic rep_test
Hello World! Cluster!
^C

0번 카프카 브로커에 "rep_test"라는 토픽으로 메시지를 보낸것입니다.

Receiving Message from Cluster

이번엔 카프카 클러스터에서 메시지를 받아봅시다.

$ ./kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic rep_test
Hello World! Cluster!
^C

Fail Over

이번엔 클러스터를 이루는 카프카 브로커 하나를 Kill 하고 어떤 변화가 있는지 알아봅시다.

Kill Kafka broker

$ ps -ef | grep server-3.properties
$ kill -9 1234

Check Topic Info

이번엔 토픽의 정보를 확인해 봅시다.

$ ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic rep_test
Topic:rep_test  PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: rep_test Partition: 0    Leader: 1   Replicas: 1,0,2 Isr: 1,0

Isr 항목에서 2번 브로커가 빠진 것을 알 수 있습니다.

Multi Node Cluster

지금까지 하나의 노드에서 카프카 브로커를 하나 또는 세개를 만들어 테스트를 했습니다. 이번엔 멀티 노드 카프카 클러스터를 만들어봅시다. 앞서 말했던 내용과 다른 부분만 언급하겠습니다.

Architecture

지금 만들어볼 카프카 클러스터는 다음과 같은 구성입니다.

┏━━━━━━━━┓┏━━━━━━━━┓┏━━━━━━━━┓
┃kafka-1.jdm.kr  ┃┃kafka-2.jdm.kr  ┃┃kafka-3.jdm.kr  ┃ * 해당 라인에 있는 것은 Host 이름!
┃┏━━━━━━┓┃┃┏━━━━━━┓┃┃┏━━━━━━┓┃
┃┃Zookeeper   ┃┃┃┃Zookeeper   ┃┃┃┃Zookeeper   ┃┃
┃┗━━━━━━┛┃┃┗━━━━━━┛┃┃┗━━━━━━┛┃
┃┏━━━━━━┓┃┃┏━━━━━━┓┃┃┏━━━━━━┓┃
┃┃Kafka Broker┃┃┃┃Kafka Broker┃┃┃┃Kafka Broker┃┃
┃┗━━━━━━┛┃┃┗━━━━━━┛┃┃┗━━━━━━┛┃
┗━━━━━━━━┛┗━━━━━━━━┛┗━━━━━━━━┛

Zookeeper Configration

주키퍼 환경 설정을 해봅시다. 다음과 같은 주키퍼 환경 설정 파일을 각 노드 별로 만들어줍니다.

dataDir=/kafka/zookeeper
clientPort=2181
maxClientCnxns=0
initLimit=5
syncLimit=2
server.1=kafka-1.jdm.kr:2888:3888
server.2=kafka-2.jdm.kr:2888:3888
server.3=kafka-3.jdm.kr:2888:3888

환경 설정 파일 변경 이후 각 주키퍼 인스턴스에서는 자신의 인스턴스 아이디를 판별하기 위해 myid 라는 파일이 dataDir 속성에 정의된 위치에 존재해야 합니다. 따라서 myid 파일 안에 각 노드에 맞는 아이디를 넣습니다. (ex:1,2,3...)

예를 들어 1번 노드에는 /kafka/zookeeper/myid 라는 파일 내부에 1이라는 값이 있어야 합니다.

TIP. 주키퍼 스냅샷 파일(Zookeeper snapshot file)

주키퍼는 일정 간격으로 스냅샷을 떠서 파일로 만듭니다. 해당 파일들이 쌓이게 되면 불필요한 디스크 공간을 차지하기 때문에 주기적으로 지워줘야 합니다.

아파치 카프카에 내장된 주키퍼를 사용할때 써볼만한 내용을 공유해봅니다. 자세한 내용은 Maintenance 항목에서 찾을 수 있어요.

아래와 같은 커맨드로 사용 가능합니다. 아래의 커맨드를 수행하면 지정한 개수의 로그만 남기고 삭제를 해줍니다.

# ex
$ java -cp zookeeper.jar:log4j.jar:conf org.apache.zookeeper.server.PurgeTxnLog <dataDir> <snapDir> -n <count>
# my
$ cd {kafka}
$ java -cp config:libs/zookeeper-3.4.6.jar:libs/log4j-1.2.16.jar:libs/slf4j-log4j12-1.6.1.jar:libs/slf4j-api-1.7.6.jar org.apache.zookeeper.server.PurgeTxnLog {dataDir} {snapshotDir} {count}

주의할 점으로는 count 속성은 최소 3 이상이어야 합니다. *별도로 주키퍼를 설치할땐 bin 디렉토리에 zkCleanup.sh 으로 시작하는 .sh 파일이 있습니다.

이외에도 주키퍼 설정에 대한 자세한 문서는 zookeeperStarted를 참조해주세요.

Broker Configration

주키퍼 설정이 끝났고 각 노드별로 주키퍼를 실행해둔 뒤에 아래와 같은 카프카 브로커 설정을 각 노드별로 해줍니다.

broker.id=1 # 이부분은 브로커마다 유일한 값을 가져야 합니다.
port=9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=kafka-1.jdm.kr:2181,kafka-2.jdm.kr:2181,kafka-3.jdm.kr:2181
zookeeper.connection.timeout.ms=6000

broker.id 값은 반드시 유일한 값을 가져야 합니다.
그 외의 부분은 동일하게 작성합니다. 다만, 카프카 브로커가 노드별로 더 추가 된다면 log.dirs 속성값을 각 브로커별로 바꿔줄 필요는 있습니다. 예를 들면 /kafka/logs/{broker.id} 처럼 말이죠.

그외 속성들은 카프카 문서에서 찾아볼 수 있지만 하나를 짚고 가자면 log.retention.hours라는 속성이 있는데 이것이 카프카 메시지(or 로그)를 얼마나 보관해둘지 결정합니다. 단위는 시간(hour)입니다.

브로커 환경 설정까지 끝난 뒤 브로커들을 각 노드별로 실행해주면 멀티 노드 클러스터 구축은 끝난겁니다.

Topic & Partition & Replication

소제목에 기술된 용어들의 자세한 내용은 documentation에서 확인 할 수 있습니다. 아래의 내용은 개인적으로 정리해본것입니다.

토픽을 생성하면 기본적으로 파티션 하나는 반드시 가지고 있습니다. 하지만 클러스터에서는 하나일 필요는 없기 때문에 파티션의 개수가 많아지는 경향이 있습니다.

파티션이 여러개 있는 토픽인 경우, 메시지를 전달할 때 파티셔닝 알고리즘(기본은 랜덤)에 의해 하나의 파티션을 정해 보내게 됩니다. 또한 파티션들은 하나의 노드가 아니라 클러스터를 이루는 모든 노드에 분산됩니다.

따라서 하나의 토픽이 클러스터를 이루는 노드들에 고르게 분산될 정도의 충분한 파티션 개수를 가진다면 하나의 노드가 받는 부하가 줄어들 수 있습니다.

그리고 고가용성을 위해 파티션을 복제하게 되는데 이것을 리플리케이션 팩터 값을 통해 정의할 수 있습니다. 파티션 자체를 복제하고 나중에 문제가 생기면 파티션 단위로 Fail Over를 하게 됩니다. 심지어 노드가 다운되더라도 Fail Over가 되는데 이것은 복제한 파티션들이 다른 노드에도 존재하기 때문에 가능합니다. (물론 이 경우는 모든 노드에 파티션이 할당 될 정도의 충분한 파티션 개수를 가진 토픽에 한정합니다.)

Monitoring tool

깃헙에 https://github.com/yahoo/kafka-manager라는 도구가 있네요. 설치는 추후에 시간이 나면... :)

https://github.com/linkedin/Burrow 이것도 한번 보세요!

System Tools

모니터링 툴 대신 https://cwiki.apache.org/confluence/display/KAFKA/System+Tools을 사용해 볼 수도 있어요.

Closing Remarks

너무 정신없이 정리했습니다. 잘못된 부분 있으면 댓글 부탁드립니다. ^^ 추가적인 내용도 좋습니다!

추가사항(2016.06.01)

카프카 브로커 구동시에 다음과 같은 옵션을 주면 모니터링 도구에서 JMX을 이용한 값을 조회할 수 있습니다. 또한 데몬으로 구동시에 옵션을 주면 좋습니다.

$ JMX_PORT=9997 ./kafka-server-start.sh -daemon ./server.properties