Kafka
kafka是LinkedIn开源的一款分布式的发布-订阅消息系统,它具有:
1. 通过O(1)的磁盘结构持久化存储消息,即使TB级的数据也能保持长期稳定;
2. 高吞吐率:即使非常普通的硬件,kafka也能支持每秒数十万的消息;
3. 支持通过kafka服务器和消费集群来分区消息;
4. 支持Hadoop并行加载;
设计流程
通过集成kafka和log4j在各个需要采集日志的系统进行日志采集,日志统一发送到kafka的消息队列。再通过定时运行kafka-hadoop的作业,将数据从KAFKA同步到hadoop的HDFS中。hadoop的各类map-reduce作业可以对数据进行统计,存储到db或缓存中。
安装
首先安装kafka,下载kafka到本地,然后执行:
> tar xzf kafka-<VERSION>.tgz > cd kafka-<VERSION> > ./sbt update > ./sbt package
启动zookeeper
> bin/zookeeper-server-start.sh config/zookeeper.properties [2010-11-21 23:45:02,335] INFO Reading configuration from: config/zookeeper.properties...
启动kafka
> bin/kafka-server-start.sh config/server.properties jkreps-mn-2:kafka-trunk jkreps$ bin/kafka-server-start.sh config/server.properties [2010-11-21 23:51:39,608] INFO starting log cleaner every 60000 ms (kafka.log.LogManager) [2010-11-21 23:51:39,628] INFO connecting to ZK: localhost:2181 (kafka.server.KafkaZooKeeper)...发送一些消息
> bin/kafka-console-producer.sh --zookeeper localhost:2181 --topic test This is a message This is another message
查看消息
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning This is a message This is another message
至此,kafka已经启动成功。
安装hadoop,下载hadoop0.20.2,按照hadoop的操作手册安装,此处省略安装过程。
配置log4j和kafka-log
#配置KAFKA #Hostname表示应用的名称 #Topic表示打印日志生成的目录名 log4j.appender.KAFKA=com.comp.kafka.AsyncKafkaAppender log4j.appender.KAFKA.topic=TOPIC log4j.appender.KAFKA.bufferSize = 10 log4j.appender.KAFKA.brokerList=0:kafka-server-ip:9092 log4j.appender.KAFKA.serializerClass=kafka.serializer.StringEncoder log4j.appender.KAFKA.hostname=APP-NAME log4j.appender.KAFKA.layout=org.apache.log4j.PatternLayout log4j.appender.KAFKA.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n log4j.logger.com.comp= DEBUG, KAFKA
配置中,配置了kafka的ip和端口,并设置了topic和序列化类和pattern。
AsyncKafkaAppender是继承了log4j的AsyncAppender,是log4j异步发送日志的模式,当log达到bufferSize的大小时,会统一由log4j异步执行打印操作。代码如下:
AsyncKafkaAppender是继承了log4j的AsyncAppender,是log4j异步发送日志的模式,当log达到bufferSize的大小时,会统一由log4j异步执行打印操作。代码如下:
public class AsyncKafkaAppender extends AsyncAppender { private java.lang.String topic; private java.lang.String serializerClass; private java.lang.String zkConnect; private java.lang.String brokerList; private java.lang.String hostname; public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } public String getSerializerClass() { return serializerClass; } public void setSerializerClass(String serializerClass) { this.serializerClass = serializerClass; } public String getZkConnect() { return zkConnect; } public void setZkConnect(String zkConnect) { this.zkConnect = zkConnect; } public String getBrokerList() { return brokerList; } public void setBrokerList(String brokerList) { this.brokerList = brokerList; } public String getHostname() { return hostname; } public void setHostname(String hostname) { this.hostname = hostname; } @Override public void activateOptions() { super.activateOptions(); synchronized (this) { KafkaLog4jAppender kafka = new KafkaLog4jAppender(); kafka.setLayout(getLayout()); kafka.setHostname(getHostname()); kafka.setBrokerList(getBrokerList()); kafka.setSerializerClass(getSerializerClass()); kafka.setZkConnect(getZkConnect()); kafka.setTopic(getTopic()); kafka.activateOptions(); addAppender(kafka); } } @Override public boolean requiresLayout() { return true; } }这时我们就可以撰写一个测试代码,测试log是否已经发送到了kafka。
KAFKA消息发送到Hadoop
在kafka的contrib目录的hadoop-consumer中有一系列的文件,包括脚本,jar,配置文件等。我们可以直接使用这个目录下的脚本进行定时数据同步。
需要先修改test目录下的test.properties:
# kafka的topic名称 kafka.etl.topic=TOPIC # hdfs location of jars hdfs.default.classpath.dir=/tmp/kafka/lib # number of test events to be generated event.count=1000 # hadoop id and group hadoop.job.ugi=kafka,hadoop # kafka server uri kafka.server.uri=tcp://localhost:9092 # hdfs location of input directory input=hdfs://localhost:9000/tmp/kafka/data # hdfs location of output directory output=hdfs://localhost:9000/tmp/kafka/output # limit the number of events to be fetched; # value -1 means no limitation kafka.request.limit=-1 # kafka parameters client.buffer.size=1048576 client.so.timeout=60000
修改topic名称以及对应hadoop的input,output目录。
首先生成offset,执行:
现在只需要撰写一些map-reduce作业,就可以利用hadoop进行数据统计了。
首先生成offset,执行:
./run-class.sh kafka.etl.impl.DataGenerator test/test.properties然后拷贝依赖的jar文件:
./copy-jars.sh ${hdfs.default.classpath.dir}最后执行hadoop任务:
./run-class.sh kafka.etl.impl.SimpleKafkaETLJob test/test.properties执行完成后,运行hadoop脚本查看是否已经在output目录生成数据:
bin/hadoop fs -cat /tmp/kafka/output/part-00000 | wc -l至此,从日志从各个系统已经成功收集到hadoop的HDFS文件系统中。
现在只需要撰写一些map-reduce作业,就可以利用hadoop进行数据统计了。
KAFKA消息清除
# The minimum age of a log file to be eligible for deletion log.retention.hours=168 # The maximum size of a log segment file. When this size is reached a new log segment will be created. log.file.size=536870912
在配置中,如果一个log file距离上一次写入时间达到168小时,也就是一周,会自动清除这个日志文件;日志文件的上限大小是536870912,超过这个大小会创建新的日志文件。
没有评论:
发表评论