欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

在java代码中运行spark任务报异常org.apache.spark.SparkException: Task not serializable

程序员文章站 2022-07-15 12:54:54
...

运行如下java程序代码会报未序列化的异常

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.junit.Test;
import scala.Serializable;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;

/**
 * Created by Feng
 */
public class SparkTest{
    @Test
    public void testFileOp(){
        SparkConf conf = new SparkConf().setMaster("local").setAppName("My App");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> input = sc.textFile("C:\\Users\\76348\\Desktop\\民国之文豪崛起.txt");

        JavaRDD<String> words = input.flatMap(
                new FlatMapFunction<String, String>() {
                    public Iterator<String> call(String x) {
                        return Arrays.asList(x.split("")).iterator();
                    }});

        JavaPairRDD<String, Integer> counts = words.mapToPair(
                new PairFunction<String, String, Integer>(){
                    public Tuple2<String, Integer> call(String x){
                        return new Tuple2(x, 1);
                    }}).reduceByKey(new Function2<Integer, Integer, Integer>(){
            public Integer call(Integer x, Integer y){ return x + y;}});
        Map map = counts.collectAsMap();
        counts.saveAsTextFile("C:\\Users\\76348\\Desktop\\test");
    }
}

但奇怪的是spark依赖定义的FlatMapFunctionPairFunction都是实现了Serializable接口的,仔细看报错信息,找到如下关键字

Serialization stack:
	- object not serializable (class: SparkTest, value: [email protected])
	- field (class: SparkTest$1, name: this$0, type: class SparkTest)
	- object (class SparkTest$1, [email protected])
	- element of array (index: 0)
	- array (class [Ljava.lang.Object;, size 1)
	- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
	- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=interface org.apache.spark.api.java.JavaRDDLike, 

竟然是SparkTest无法序列化,原因是java中的非静态内部类都会隐式的持有外部类的一个隐式引用,序列化内部类时当然也会序列化这个引用,解决方法除了在外部类继承Serializable接口或在外部定义任务函数之外,还可以使用lambda表达式来替换匿名内部类