Logstash 集成 kafka 收集日志 笔记 (二) 搭建kafka
参考 kafka quick start 搭建kafka服务
使用 kafka output 将events 写入kafka topic
logstash myconfig.conf
output {
#stdout{codec => rubydebug}
kafka{
topic_id => "test"
bootstrap_servers => "localhost:9092"
}
}
input 使用 file input 具体使用参考 file
下面是input 的配置
input {
file {
path => "/path/to/file/*.log"
}
}
这样logstash 就完成收集数据并写入 kafka了。
接下来是java client 接收kafka的数据。
导入
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.1.0</version>
</dependency>
参考Class KafkaConsumer<K,V> 以下为java 消费main方法代码
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}