详细配置文件flume-conf.properties如下:
############################################# producer config############################################agent sectionproducer.sources = sproducer.channels = c c1producer.sinks = r r1#source section#producer.sources.s.type = exec#producer.sources.s.command = tail -f -n+1 /usr/local/test.logproducer.sources.s.type = spooldirproducer.sources.s.spoolDir = /usr/local/testlogproducer.sources.s.fileHeader = trueproducer.sources.s.batchSize = 100producer.sources.s.channels = c c1# Each sink's type must be definedproducer.sinks.r.type = org.apache.flume.plugins.KafkaSinkproducer.sinks.r.metadata.broker.list=127.0.0.1:9092producer.sinks.r.partition.key=0producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartitionproducer.sinks.r.serializer.class=kafka.serializer.StringEncoderproducer.sinks.r.request.required.acks=0producer.sinks.r.max.message.size=1000000producer.sinks.r.producer.type=syncproducer.sinks.r.custom.encoding=UTF-8producer.sinks.r.custom.topic.name=topcar#store in HDFSproducer.sinks.r1.type = hdfsproducer.sinks.r1.channel = c1producer.sinks.r1.hdfs.path=hdfs://node2:9000/user/flume/events/%Y-%m-%d-%Hproducer.sinks.r1.hdfs.filePrefix=events-#producer.sinks.r1.hdfs.fileSuffix = .log #设定后缀producer.sinks.r1.hdfs.round = trueproducer.sinks.r1.hdfs.roundValue = 10producer.sinks.r1.hdfs.roundUnit = minute#--文件格式:默认SequenceFile,可选 DataStream \ CompressedStreamproducer.sinks.r1.hdfs.fileType=DataStream#--Format for sequence file records. “Text” or “Writable”producer.sinks.r1.hdfs.writeFormat=Textproducer.sinks.r1.hdfs.rollInterval=0#--触发roll操作的文件大小in bytes (0: never roll based on file size)producer.sinks.r1.hdfs.rollSize=128000000#--在roll操作之前写入文件的事件数量(0 = never roll based on number of events)producer.sinks.r1.hdfs.rollCount=0producer.sinks.r1.hdfs.idleTimeout=60#--使用local time来替换转移字符 (而不是使用event header的timestamp)producer.sinks.r1.hdfs.useLocalTimeStamp = trueproducer.channels.c1.type = memoryproducer.channels.c1.capacity = 1000producer.channels.c1.transactionCapacity=1000producer.channels.c1.keep-alive=30#Specify the channel the sink should useproducer.sinks.r.channel = c# Each channel's type is defined.producer.channels.c.type = memoryproducer.channels.c.capacity = 1000############################################# consumer config###########################################consumer.sources = sconsumer.channels = cconsumer.sinks = rconsumer.sources.s.type = seqconsumer.sources.s.channels = cconsumer.sinks.r.type = loggerconsumer.sinks.r.channel = cconsumer.channels.c.type = memoryconsumer.channels.c.capacity = 100consumer.sources.s.type = org.apache.flume.plugins.KafkaSourceconsumer.sources.s.zookeeper.connect=127.0.0.1:2181consumer.sources.s.group.id=testGroupconsumer.sources.s.zookeeper.session.timeout.ms=400consumer.sources.s.zookeeper.sync.time.ms=200consumer.sources.s.auto.commit.interval.ms=1000consumer.sources.s.custom.topic.name=topcarconsumer.sources.s.custom.thread.per.consumer=4
Flume启动命令如下:
bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name producer -Dflume.root.logger=INFO,console