
2019-04-14 18:01发布


对Flume的soure进行了开发:增加了两列:最终要实现的效果如下:可是一切就绪,在本地断就是没有数据输出 在这里插入图片描述
flume 启动成功
[root@hadoop002 bin]# nohup flume-ng agent -c conf -f /opt/software/flume/conf/exec_memory_kafka.properties -n a2 -Dflume.root.logger=INFO,console &
[2] 13379
[root@hadoop002 bin]# nohup: ignoring input and appending output to ‘nohup.out’
[root@hadoop002 bin]# tail -f nohup.out
send.buffer.bytes = 131072
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
reconnect.backoff.ms = 50
metrics.num.samples = 2
ssl.keystore.type = JKS 19/02/28 21:52:44 INFO utils.AppInfoParser: Kafka version :
19/02/28 21:52:44 INFO utils.AppInfoParser: Kafka commitId : 23c69d62a0cabf06
19/02/28 21:52:44 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.
19/02/28 21:52:44 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started 配置文件如下: a2.sources = r1 a2.sinks = k1 a2.channels = c1 # Describe/configure the custom exec source a2.sources.r1.type = com.onlinelog.analysis.ExecSource_JSON a2.sources.r1.command = tail -F /var/log/hadoop-hdfs/hadoop002.log a2.sources.r1.hostname = hadoop002 a2.sources.r1.servicename = datanode # Describe the sink a2.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a2.sinks.k1.kafka.topic = kunming a2.sinks.k1.kafka.bootstrap.servers =,, a2.sinks.k1.kafka.flumeBatchSize = 6000 a2.sinks.k1.kafka.producer.acks = 1 a2.sinks.k1.kafka.producer.linger.ms = 1 a2.sinks.ki.kafka.producer.compression.type = snappy # Use a channel which buffers events in memory a2.channels.c1.type = memory a2.channels.c1.keep-alive = 90 a2.channels.c1.capacity = 2000000 a2.channels.c1.transactionCapacity = 6000 # Bind the source and sink to the channel a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1



把flume更改为logger输出,输出端没有内容。 # Name the components on this agent a2.sources = r1 a2.sinks = k1 a2.channels = c1 # Describe/configure the custom exec source a2.sources.r1.type = com.onlinelog.analysis.ExecSource_JSON a2.sources.r1.command = tail -F /var/log/hadoop-hdfs/hadoop002.log a2.sources.r1.hostname = hadoop002 a2.sources.r1.servicename = datanode # Describe the sink #a2.sinks.k1.type =logger # Use a channel which buffers events in memory a2.channels.c1.type = memory # Describe the sink a2.sinks.k1.type =logger # Bind the source and sink to the channel a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1 仔细查看日志内容:发现日期格式错误导致,更改后一起正常:
通过命令查看消费端情况,有日志输出 kafka-console-consumer
–topic kunming