一:问题现象:
对Flume的soure进行了开发:增加了两列:最终要实现的效果如下:可是一切就绪,在本地断就是没有数据输出
计划Flume+kafka+spark进行消费,在本地测试么有数据过来,然后打开kafka消费端查看,kafka正常,从生产端是可以写入数据的,但是在flume采集文件后消费端没有数据,flume启动也是正常的:
flume 启动成功
[root@hadoop002 bin]# nohup flume-ng agent -c conf -f /opt/software/flume/conf/exec_memory_kafka.properties -n a2 -Dflume.root.logger=INFO,console &
[2] 13379
[root@hadoop002 bin]# nohup: ignoring input and appending output to ‘nohup.out’
^C
[root@hadoop002 bin]# tail -f nohup.out
send.buffer.bytes = 131072
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
reconnect.backoff.ms = 50
metrics.num.samples = 2
ssl.keystore.type = JKS
19/02/28 21:52:44 INFO utils.AppInfoParser: Kafka version : 0.9.0.1
19/02/28 21:52:44 INFO utils.AppInfoParser: Kafka commitId : 23c69d62a0cabf06
19/02/28 21:52:44 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.
19/02/28 21:52:44 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started
配置文件如下:
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the custom exec source
a2.sources.r1.type = com.onlinelog.analysis.ExecSource_JSON
a2.sources.r1.command = tail -F /var/log/hadoop-hdfs/hadoop002.log
a2.sources.r1.hostname = hadoop002
a2.sources.r1.servicename = datanode
# Describe the sink
a2.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a2.sinks.k1.kafka.topic = kunming
a2.sinks.k1.kafka.bootstrap.servers = 172.17.4.16:9092,172.17.4.17:9092,172.17.217.124:9092
a2.sinks.k1.kafka.flumeBatchSize = 6000
a2.sinks.k1.kafka.producer.acks = 1
a2.sinks.k1.kafka.producer.linger.ms = 1
a2.sinks.ki.kafka.producer.compression.type = snappy
# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.keep-alive = 90
a2.channels.c1.capacity = 2000000
a2.channels.c1.transactionCapacity = 6000
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
二:问题处理:
2.1怀疑flume问题:
把flume更改为logger输出,输出端没有内容。
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the custom exec source
a2.sources.r1.type = com.onlinelog.analysis.ExecSource_JSON
a2.sources.r1.command = tail -F /var/log/hadoop-hdfs/hadoop002.log
a2.sources.r1.hostname = hadoop002
a2.sources.r1.servicename = datanode
# Describe the sink
#a2.sinks.k1.type =logger
# Use a channel which buffers events in memory
a2.channels.c1.type = memory
# Describe the sink
a2.sinks.k1.type =logger
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
仔细查看日志内容:发现日期格式错误导致,更改后一起正常:
通过命令查看消费端情况,有日志输出
kafka-console-consumer
–zookeeper 172.17.4.16:2181,172.17.4.17:2181,172.17.217.124:2181/kafka
–topic kunming
–from-beginning