2013年2月7日星期四

KAFKA在统计系统中的应用

Kafka

kafka是LinkedIn开源的一款分布式的发布-订阅消息系统,它具有:
1. 通过O(1)的磁盘结构持久化存储消息,即使TB级的数据也能保持长期稳定;
2. 高吞吐率:即使非常普通的硬件,kafka也能支持每秒数十万的消息;
3. 支持通过kafka服务器和消费集群来分区消息;
4. 支持Hadoop并行加载;

设计流程

通过集成kafka和log4j在各个需要采集日志的系统进行日志采集,日志统一发送到kafka的消息队列。再通过定时运行kafka-hadoop的作业,将数据从KAFKA同步到hadoop的HDFS中。hadoop的各类map-reduce作业可以对数据进行统计,存储到db或缓存中。

安装

首先安装kafka,下载kafka到本地,然后执行:
> tar xzf kafka-<VERSION>.tgz
> cd kafka-<VERSION>
> ./sbt update
> ./sbt package
启动zookeeper
> bin/zookeeper-server-start.sh config/zookeeper.properties
[2010-11-21 23:45:02,335] INFO Reading configuration from: config/zookeeper.properties...
启动kafka
> bin/kafka-server-start.sh config/server.properties
jkreps-mn-2:kafka-trunk jkreps$ bin/kafka-server-start.sh config/server.properties 
[2010-11-21 23:51:39,608] INFO starting log cleaner every 60000 ms (kafka.log.LogManager)
[2010-11-21 23:51:39,628] INFO connecting to ZK: localhost:2181 (kafka.server.KafkaZooKeeper)...
发送一些消息
> bin/kafka-console-producer.sh --zookeeper localhost:2181 --topic test 
This is a message
This is another message
查看消息
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is another message
至此,kafka已经启动成功。

安装hadoop,下载hadoop0.20.2,按照hadoop的操作手册安装,此处省略安装过程。

配置log4j和kafka-log

#配置KAFKA
#Hostname表示应用的名称
#Topic表示打印日志生成的目录名
log4j.appender.KAFKA=com.comp.kafka.AsyncKafkaAppender
log4j.appender.KAFKA.topic=TOPIC
log4j.appender.KAFKA.bufferSize = 10
log4j.appender.KAFKA.brokerList=0:kafka-server-ip:9092
log4j.appender.KAFKA.serializerClass=kafka.serializer.StringEncoder
log4j.appender.KAFKA.hostname=APP-NAME
log4j.appender.KAFKA.layout=org.apache.log4j.PatternLayout
log4j.appender.KAFKA.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
log4j.logger.com.comp= DEBUG, KAFKA
配置中,配置了kafka的ip和端口,并设置了topic和序列化类和pattern。
AsyncKafkaAppender是继承了log4j的AsyncAppender,是log4j异步发送日志的模式,当log达到bufferSize的大小时,会统一由log4j异步执行打印操作。代码如下:
public class AsyncKafkaAppender extends AsyncAppender {
    private java.lang.String topic;
    private java.lang.String serializerClass;
    private java.lang.String zkConnect;
    private java.lang.String brokerList;
    private java.lang.String hostname;

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public String getSerializerClass() {
        return serializerClass;
    }

    public void setSerializerClass(String serializerClass) {
        this.serializerClass = serializerClass;
    }

    public String getZkConnect() {
        return zkConnect;
    }

    public void setZkConnect(String zkConnect) {
        this.zkConnect = zkConnect;
    }

    public String getBrokerList() {
        return brokerList;
    }

    public void setBrokerList(String brokerList) {
        this.brokerList = brokerList;
    }

    public String getHostname() {
        return hostname;
    }

    public void setHostname(String hostname) {
        this.hostname = hostname;
    }
    @Override
    public void activateOptions() {
        super.activateOptions();
        synchronized (this) {
            KafkaLog4jAppender kafka = new KafkaLog4jAppender();
            kafka.setLayout(getLayout());
            kafka.setHostname(getHostname());
            kafka.setBrokerList(getBrokerList());
            kafka.setSerializerClass(getSerializerClass());
            kafka.setZkConnect(getZkConnect());
            kafka.setTopic(getTopic());
            kafka.activateOptions();
            addAppender(kafka);
        }
    }

    @Override
    public boolean requiresLayout() {
        return true;
    }
}
这时我们就可以撰写一个测试代码,测试log是否已经发送到了kafka。

KAFKA消息发送到Hadoop

在kafka的contrib目录的hadoop-consumer中有一系列的文件,包括脚本,jar,配置文件等。我们可以直接使用这个目录下的脚本进行定时数据同步。
需要先修改test目录下的test.properties:
# kafka的topic名称
kafka.etl.topic=TOPIC

# hdfs location of jars
hdfs.default.classpath.dir=/tmp/kafka/lib

# number of test events to be generated
event.count=1000

# hadoop id and group
hadoop.job.ugi=kafka,hadoop

# kafka server uri
kafka.server.uri=tcp://localhost:9092

# hdfs location of input directory
input=hdfs://localhost:9000/tmp/kafka/data

# hdfs location of output directory
output=hdfs://localhost:9000/tmp/kafka/output

# limit the number of events to be fetched;
# value -1 means no limitation
kafka.request.limit=-1

# kafka parameters
client.buffer.size=1048576
client.so.timeout=60000
修改topic名称以及对应hadoop的input,output目录。
首先生成offset,执行:
./run-class.sh kafka.etl.impl.DataGenerator test/test.properties
然后拷贝依赖的jar文件:
./copy-jars.sh ${hdfs.default.classpath.dir}
最后执行hadoop任务:
./run-class.sh kafka.etl.impl.SimpleKafkaETLJob test/test.properties
执行完成后,运行hadoop脚本查看是否已经在output目录生成数据:
bin/hadoop fs -cat /tmp/kafka/output/part-00000 | wc -l
至此,从日志从各个系统已经成功收集到hadoop的HDFS文件系统中。
现在只需要撰写一些map-reduce作业,就可以利用hadoop进行数据统计了。

KAFKA消息清除

# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.file.size=536870912
在配置中,如果一个log file距离上一次写入时间达到168小时,也就是一周,会自动清除这个日志文件;日志文件的上限大小是536870912,超过这个大小会创建新的日志文件。

没有评论:

发表评论