hadoop3.2.0,kafka_2.12-2.4.0 flume采集方案demo
程序员文章站
2022-07-14 15:17:31
...
效果
根据消息中的事件时间来分目录,按照年月日分!
前言
公司有个数据采集需求,因为历史原因,我们公司使用的hadoop和kafka版本比较高,所以我采用了1.9.0的flume进行开发,flume内存调优这里就不讲了,有兴趣的自己去搜下。第一次使用肯定需要调优。
环境准备
java 1.8
flume 1.9.0
kafka_2.12-2.4.0
hadoop3.2.0
flume部署在hadoop所在集群上!
flume的lib中需要将hadoop的一些依赖放进去,在hadoop3.2.0中慢慢去找
hadoop-3.2.0/share/hadoop/common/lib/hadoop-auth-3.2.0.jar
hadoop-3.2.0/share/hadoop/common/hadoop-common-3.2.0.jar
hadoop-3.2.0/share/hadoop/common/lib/htrace-core4-4.1.0-incubating.jar
hadoop-3.2.0/share/hadoop/common/lib/commons-configuration2-2.1.1.jar
笔者放了4个包就行了。看了别的人还放了commons-io-2.5.jar
(hadoop-3.2.0/share/hadoop/hdfs/lib中)
方案
将kafka中的数据解析,将事件时间解析出来,然后将时间转换成13位的时间戳!别问我为什么是13位,将时间戳解析以后put到header中,用于hdfssink分目录用!这个sink会解析event头部,然后根据你写的时间转义来分path路径。前提是你path路径是用时间转义写的!
实施
因为是测试数据所以直接贴源码!
POM
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.imprexion</groupId>
<artifactId>flume-collectifdata</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
</dependency>
<!--阿里巴巴开源json解析框架-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
flume配置文件
# 命名每个组件 a1代表agent的名称
#a1.sources代表a1中配置的source,多个使用空格间隔
#a1.sinks代表a1中配置的sink,多个使用空格间隔
#a1.channels代表a1中配置的channel,多个使用空格间隔
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = yxyxyx001:9092
a1.sources.r1.kafka.topics = kettle04
#控制kafka消费者从主题的最早的位置消费,此参数只会在一个从未提交过offset的组中生效
#a1.sources.r1.kafka.consumer.auto.offset.reset=earliest
a1.sources.r1.kafka.consumer.auto.offset.reset=latest
#a1.sources.r1.kafka.consumer.group.id = flume
#拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.interceptors.TestInterceptor$Builder
#channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000000
a1.channels.c1.transactionCapacity = 1000000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000000
# 配置sink
a1.sinks.k1.type = hdfs
#a1.sinks.k1.hdfs.path=hdfs://yxhdcluster/origin_data/ifcustomer/behavior-log/%Y-%m-%d
a1.sinks.k1.hdfs.path=/origin_data/ifcustomer/behavior-log/%Y-%m-%d
#上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = iflog-
a1.sinks.k1.hdfs.batchSize=1000
#配置文件滚动
a1.sinks.k1.hdfs.rollInterval = 1
a1.sinks.k1.hdfs.rollSize = 134217700
a1.sinks.k1.hdfs.rollCount = 0
#a1.sinks.k1.hdfs.useLocalTimeStamp = true
#使用文件格式存储数据
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.hdfs-sink.hdfs.callTimeout = 300000
#使用压缩格式存储数据
#a1.sinks.k1.hdfs.fileType=CompressedStream
#指定文件使用LZO压缩
#a1.sinks.k1.hdfs.codeC=lzop
# 绑定和连接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
自定义拦截器
package com.interceptors;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.util.DateUtil;
import com.util.ETLUtil;
import com.util.JsonParseUtil;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* @ClassName:TestInterceptor
* @author: zhengkw
* @description: 根据测试数据写的demo
* @date: 20/10/27下午 1:49
* @version:1.0
* @since: jdk 1.8
*/
public class TestInterceptor implements Interceptor {
private List<Event> results = new ArrayList<>();
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
//获取body
byte[] body = event.getBody();
Map<String, String> headers = event.getHeaders();
//转换body为string类型
String bodyStr = new String(body, StandardCharsets.UTF_8);
//验证数据是否合法
if (ETLUtil.validStartLog(bodyStr)) {
JSONObject jsonObj = JsonParseUtil.getJsonObj(bodyStr);
JSONArray data = jsonObj.getJSONArray("data");
String ts = (String) data.getJSONObject(0).get("create_time");
String ts1 = ts.replaceAll("/", "-");
String date = ts1.substring(0, ts1.indexOf("."));
String formatTime = DateUtil.date2TimeStamp(date, "yyyy-MM-dd HH:mm:ss");
headers.put("timestamp", formatTime);
return event;
}
return null;
}
@Override
public List<Event> intercept(List<Event> events) {
//先清空results
results.clear();
for (Event event : events) {
Event e = intercept(event);
//判断拦截的数据是否合法
if (e != null) {
//将合法的数据放入到集合中
results.add(e);
}
}
return results;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder {
// 返回一个拦截器对象
@Override
public Interceptor build() {
return new TestInterceptor();
}
//读取agent配置文件中的参数
@Override
public void configure(Context context) {
}
}
}
时间戳转换工具类
package com.util;
import java.text.SimpleDateFormat;
/**
* @ClassName:DateUtil
* @author: zhengkw
* @description: 日期工具类
* @date: 20/10/27下午 7:02
* @version:1.0
* @since: jdk 1.8
*/
public class DateUtil {
public static String date2TimeStamp(String date_str, String format) {
try {
SimpleDateFormat sdf = new SimpleDateFormat(format);
//返回的是13位!
return String.valueOf(sdf.parse(date_str).getTime());
} catch (Exception e) {
e.printStackTrace();
}
return "";
}
}
总结
这里除了核心的代码展示,什么验证代码就不展示了!在这个过程中遇到了几个坑的地方。一个是网络
2020-10-27 17:18:21,630 (SinkRunner-PollingRunner-DefaultSinkProcessor) [WARN - org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:454)] HDFS IO error
java.io.IOException: Callable timed out after 30000 ms on file: hdfs://yxhdcluster/origin_data/ifcustomer/behavior-log/2020-10-27/iflog-.1603790269896.tmp
at org.apache.flume.sink.hdfs.BucketWriter.callWithTimeout(BucketWriter.java:741)
at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:247)
at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:572)
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:412)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.TimeoutException
at java.util.concurrent.FutureTask.get(FutureTask.java:205)
at org.apache.flume.sink.hdfs.BucketWriter.callWithTimeout(BucketWriter.java:734)
就是在自定义的conf文件中加入一行a1.sinks.hdfs-sink.hdfs.callTimeout = 300000
还有就是跑的过程中发现自己flume内存没调优。
#channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000000
a1.channels.c1.transactionCapacity = 1000000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000000 #数据量大就调大
经验公式:byteCapacity>capacity=transactionCapacity