欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

hadoop3.2.0,kafka_2.12-2.4.0 flume采集方案demo

程序员文章站 2022-07-14 15:17:31
...

效果

根据消息中的事件时间来分目录,按照年月日分!
hadoop3.2.0,kafka_2.12-2.4.0 flume采集方案demo

前言

公司有个数据采集需求,因为历史原因,我们公司使用的hadoop和kafka版本比较高,所以我采用了1.9.0的flume进行开发,flume内存调优这里就不讲了,有兴趣的自己去搜下。第一次使用肯定需要调优。

环境准备

java 1.8 flume 1.9.0 kafka_2.12-2.4.0 hadoop3.2.0

flume部署在hadoop所在集群上!

flume的lib中需要将hadoop的一些依赖放进去,在hadoop3.2.0中慢慢去找

hadoop-3.2.0/share/hadoop/common/lib/hadoop-auth-3.2.0.jar
hadoop-3.2.0/share/hadoop/common/hadoop-common-3.2.0.jar
hadoop-3.2.0/share/hadoop/common/lib/htrace-core4-4.1.0-incubating.jar
hadoop-3.2.0/share/hadoop/common/lib/commons-configuration2-2.1.1.jar

笔者放了4个包就行了。看了别的人还放了commons-io-2.5.jar(hadoop-3.2.0/share/hadoop/hdfs/lib中)

方案

将kafka中的数据解析,将事件时间解析出来,然后将时间转换成13位的时间戳!别问我为什么是13位,将时间戳解析以后put到header中,用于hdfssink分目录用!这个sink会解析event头部,然后根据你写的时间转义来分path路径。前提是你path路径是用时间转义写的!

实施

因为是测试数据所以直接贴源码!

POM

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.imprexion</groupId>
    <artifactId>flume-collectifdata</artifactId>
    <version>1.0-SNAPSHOT</version>


    <dependencies>

        <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.9.0</version>
        </dependency>

        <!--阿里巴巴开源json解析框架-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.62</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

flume配置文件

# 命名每个组件 a1代表agent的名称 
#a1.sources代表a1中配置的source,多个使用空格间隔
#a1.sinks代表a1中配置的sink,多个使用空格间隔
#a1.channels代表a1中配置的channel,多个使用空格间隔
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = yxyxyx001:9092
a1.sources.r1.kafka.topics = kettle04
#控制kafka消费者从主题的最早的位置消费,此参数只会在一个从未提交过offset的组中生效
#a1.sources.r1.kafka.consumer.auto.offset.reset=earliest
a1.sources.r1.kafka.consumer.auto.offset.reset=latest
#a1.sources.r1.kafka.consumer.group.id = flume

#拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.interceptors.TestInterceptor$Builder

#channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000000
a1.channels.c1.transactionCapacity = 1000000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000000

# 配置sink
a1.sinks.k1.type = hdfs
#a1.sinks.k1.hdfs.path=hdfs://yxhdcluster/origin_data/ifcustomer/behavior-log/%Y-%m-%d
a1.sinks.k1.hdfs.path=/origin_data/ifcustomer/behavior-log/%Y-%m-%d
#上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = iflog-
a1.sinks.k1.hdfs.batchSize=1000
#配置文件滚动
a1.sinks.k1.hdfs.rollInterval = 1
a1.sinks.k1.hdfs.rollSize = 134217700
a1.sinks.k1.hdfs.rollCount = 0
#a1.sinks.k1.hdfs.useLocalTimeStamp = true
#使用文件格式存储数据
a1.sinks.k1.hdfs.fileType = DataStream 
a1.sinks.hdfs-sink.hdfs.callTimeout = 300000

#使用压缩格式存储数据
#a1.sinks.k1.hdfs.fileType=CompressedStream 
#指定文件使用LZO压缩
#a1.sinks.k1.hdfs.codeC=lzop
# 绑定和连接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

自定义拦截器

package com.interceptors;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.util.DateUtil;
import com.util.ETLUtil;
import com.util.JsonParseUtil;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * @ClassName:TestInterceptor
 * @author: zhengkw
 * @description: 根据测试数据写的demo
 * @date: 20/10/27下午 1:49
 * @version:1.0
 * @since: jdk 1.8
 */
public class TestInterceptor implements Interceptor {
    private List<Event> results = new ArrayList<>();

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {

        //获取body
        byte[] body = event.getBody();
        Map<String, String> headers = event.getHeaders();
        //转换body为string类型
        String bodyStr = new String(body, StandardCharsets.UTF_8);
        //验证数据是否合法
        if (ETLUtil.validStartLog(bodyStr)) {
            JSONObject jsonObj = JsonParseUtil.getJsonObj(bodyStr);
            JSONArray data = jsonObj.getJSONArray("data");
            String ts = (String) data.getJSONObject(0).get("create_time");
            String ts1 = ts.replaceAll("/", "-");
            String date = ts1.substring(0, ts1.indexOf("."));
            String formatTime = DateUtil.date2TimeStamp(date, "yyyy-MM-dd HH:mm:ss");

            headers.put("timestamp", formatTime);
            return event;
        }
        return null;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        //先清空results
        results.clear();

        for (Event event : events) {
            Event e = intercept(event);
            //判断拦截的数据是否合法
            if (e != null) {
                //将合法的数据放入到集合中
                results.add(e);
            }
        }
        return results;


    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {

        // 返回一个拦截器对象
        @Override
        public Interceptor build() {
            return new TestInterceptor();
        }

        //读取agent配置文件中的参数
        @Override
        public void configure(Context context) {

        }
    }
}

时间戳转换工具类

package com.util;

import java.text.SimpleDateFormat;

/**
 * @ClassName:DateUtil
 * @author: zhengkw
 * @description: 日期工具类
 * @date: 20/10/27下午 7:02
 * @version:1.0
 * @since: jdk 1.8
 */
public class DateUtil {
    public static String date2TimeStamp(String date_str, String format) {
        try {
            SimpleDateFormat sdf = new SimpleDateFormat(format);
            //返回的是13位!
            return String.valueOf(sdf.parse(date_str).getTime());
        } catch (Exception e) {
            e.printStackTrace();
        }
        return "";
    }
}

总结

这里除了核心的代码展示,什么验证代码就不展示了!在这个过程中遇到了几个坑的地方。一个是网络


2020-10-27 17:18:21,630 (SinkRunner-PollingRunner-DefaultSinkProcessor) [WARN - org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:454)] HDFS IO error
java.io.IOException: Callable timed out after 30000 ms on file: hdfs://yxhdcluster/origin_data/ifcustomer/behavior-log/2020-10-27/iflog-.1603790269896.tmp
	at org.apache.flume.sink.hdfs.BucketWriter.callWithTimeout(BucketWriter.java:741)
	at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:247)
	at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:572)
	at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:412)
	at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
	at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.TimeoutException
	at java.util.concurrent.FutureTask.get(FutureTask.java:205)
	at org.apache.flume.sink.hdfs.BucketWriter.callWithTimeout(BucketWriter.java:734)

解决办法

就是在自定义的conf文件中加入一行
a1.sinks.hdfs-sink.hdfs.callTimeout = 300000

还有就是跑的过程中发现自己flume内存没调优。

#channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000000
a1.channels.c1.transactionCapacity = 1000000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000000 #数据量大就调大

经验公式:byteCapacity>capacity=transactionCapacity