欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

org.apache.spark.SparkException: Task not serializable

程序员文章站 2022-07-15 12:55:12
...

前言

本文隶属于专栏《Spark异常问题汇总》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!

本专栏目录结构和参考文献请见 Spark异常问题汇总

正文

报错原因解析如果出现“org.apache.spark.SparkException: Task not serializable”错误,一般是因为在 map 、 filter 等的参数使用了外部的变量,但是这个变量不能序列化(不是说不可以引用外部变量,只是要做好序列化工作)。

其中最普遍的情形是:当引用了某个类(经常是当前类)的成员函数或变量时,会导致这个类的所有成员(整个类)都需要支持序列化

虽然许多情形下,当前类使用了“extends Serializable”声明支持序列化,但是由于某些字段不支持序列化,仍然会导致整个类序列化时出现问题,最终导致出现 Task 未序列化问题。

实践

1

需求描述

由于 Spark 程序中的 map 、 filter 等算子内部引用了类成员函数或变量导致需要该类所有成员都支持序列化,又由于该类某些成员变量不支持序列化,最终引发 Task 无法序列化问题。

为了验证上述原因,我们编写了一个实例程序,如下所示。

该类的功能是从域名列表中( RDD )过滤得到特定*域名(rootDomain,如.com,cn,org)的域名列表,而该特定*域名需在要函数调用时指定。

代码 1

package com.shockang.study.bigdata.spark.errors.serializable

import org.apache.spark.{SparkConf, SparkContext}

class MyTest1 private(conf: String) extends Serializable {
  private val list = List("a.com", "www.b.com", "a.cn", "a.com.cn", "a.org")
  private val sparkConf = new SparkConf().setMaster("local[*]").setAppName("MyTest1")
  private val sc = new SparkContext(sparkConf)
  private val rdd = sc.parallelize(list)
  private val rootDomain = conf

  private def getResult(): Unit = {
    val result = rdd.filter(item => item.contains(rootDomain))
    result.foreach(println)
  }

  private def stop(): Unit = {
    sc.stop()
  }
}

object MyTest1 {
  def main(args: Array[String]): Unit = {
    val test = new MyTest1("com")
    test.getResult()
    test.stop()
  }
}

日志 1

依据上述分析的原因,由于依赖了当前类的成员变量,所以导致当前类全部需要序列化。

当前类的某些字段未做好序列化,导致出错。

实际情况与分析的原因一致,运行过程中出现的错误如下所示。

分析下面的日志,可知错误是由于 sc ( SparkContext )引起的。

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/07/25 15:50:42 INFO SparkContext: Running Spark version 3.1.2
21/07/25 15:50:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/07/25 15:50:43 INFO ResourceUtils: ==============================================================
21/07/25 15:50:43 INFO ResourceUtils: No custom resources configured for spark.driver.
21/07/25 15:50:43 INFO ResourceUtils: ==============================================================
21/07/25 15:50:43 INFO SparkContext: Submitted application: MyTest1
21/07/25 15:50:43 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
21/07/25 15:50:43 INFO ResourceProfile: Limiting resource is cpu
21/07/25 15:50:43 INFO ResourceProfileManager: Added ResourceProfile id: 0
21/07/25 15:50:43 INFO SecurityManager: Changing view acls to: shockang
21/07/25 15:50:43 INFO SecurityManager: Changing modify acls to: shockang
21/07/25 15:50:43 INFO SecurityManager: Changing view acls groups to: 
21/07/25 15:50:43 INFO SecurityManager: Changing modify acls groups to: 
21/07/25 15:50:43 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(shockang); groups with view permissions: Set(); users  with modify permissions: Set(shockang); groups with modify permissions: Set()
21/07/25 15:50:43 INFO Utils: Successfully started service 'sparkDriver' on port 63559.
21/07/25 15:50:43 INFO SparkEnv: Registering MapOutputTracker
21/07/25 15:50:43 INFO SparkEnv: Registering BlockManagerMaster
21/07/25 15:50:43 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
21/07/25 15:50:43 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
21/07/25 15:50:43 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
21/07/25 15:50:43 INFO DiskBlockManager: Created local directory at /private/var/folders/hr/js774vr11dndfxny1324ch980000gn/T/blockmgr-862d818a-ff7d-473b-a321-84ca60962ada
21/07/25 15:50:43 INFO MemoryStore: MemoryStore started with capacity 2004.6 MiB
21/07/25 15:50:43 INFO SparkEnv: Registering OutputCommitCoordinator
21/07/25 15:50:44 INFO Utils: Successfully started service 'SparkUI' on port 4040.
21/07/25 15:50:44 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.0.105:4040
21/07/25 15:50:44 INFO Executor: Starting executor ID driver on host 192.168.0.105
21/07/25 15:50:44 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 63560.
21/07/25 15:50:44 INFO NettyBlockTransferService: Server created on 192.168.0.105:63560
21/07/25 15:50:44 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
21/07/25 15:50:44 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.0.105, 63560, None)
21/07/25 15:50:44 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.0.105:63560 with 2004.6 MiB RAM, BlockManagerId(driver, 192.168.0.105, 63560, None)
21/07/25 15:50:44 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.0.105, 63560, None)
21/07/25 15:50:44 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.0.105, 63560, None)
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:2459)
	at org.apache.spark.rdd.RDD.$anonfun$filter$1(RDD.scala:439)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.filter(RDD.scala:438)
	at com.shockang.study.bigdata.spark.errors.serializable.MyTest1.com$shockang$study$bigdata$spark$errors$serializable$MyTest1$$getResult(MyTest1.scala:13)
	at com.shockang.study.bigdata.spark.errors.serializable.MyTest1$.main(MyTest1.scala:25)
	at com.shockang.study.bigdata.spark.errors.serializable.MyTest1.main(MyTest1.scala)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
	- object not serializable (class: org.apache.spark.SparkContext, value: [email protected])
	- field (class: com.shockang.study.bigdata.spark.errors.serializable.MyTest1, name: sc, type: class org.apache.spark.SparkContext)
	- object (class com.shockang.study.bigdata.spark.errors.serializable.MyTest1, [email protected]aa5f2)
	- element of array (index: 0)
	- array (class [Ljava.lang.Object;, size 1)
	- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
	- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class com.shockang.study.bigdata.spark.errors.serializable.MyTest1, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic com/shockang/study/bigdata/spark/errors/serializable/MyTest1.$anonfun$getResult$1$adapted:(Lcom/shockang/study/bigdata/spark/errors/serializable/MyTest1;Ljava/lang/String;)Ljava/lang/Object;, instantiatedMethodType=(Ljava/lang/String;)Ljava/lang/Object;, numCaptured=1])
	- writeReplace data (class: java.lang.invoke.SerializedLambda)
	- object (class com.shockang.study.bigdata.spark.errors.serializable.MyTest1$$Lambda$689/1155399955, com.shockang.study.bigdata.spark.errors.serializable.MyTest1$$Lambda$689/[email protected])
	at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:413)
	... 11 more
21/07/25 15:50:44 INFO SparkContext: Invoking stop() from shutdown hook
21/07/25 15:50:44 INFO SparkUI: Stopped Spark web UI at http://192.168.0.105:4040
21/07/25 15:50:44 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
21/07/25 15:50:45 INFO MemoryStore: MemoryStore cleared
21/07/25 15:50:45 INFO BlockManager: BlockManager stopped
21/07/25 15:50:45 INFO BlockManagerMaster: BlockManagerMaster stopped
21/07/25 15:50:45 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
21/07/25 15:50:45 INFO SparkContext: Successfully stopped SparkContext
21/07/25 15:50:45 INFO ShutdownHookManager: Shutdown hook called
21/07/25 15:50:45 INFO ShutdownHookManager: Deleting directory /private/var/folders/hr/js774vr11dndfxny1324ch980000gn/T/spark-f084a4b0-2f36-4e9d-a1f6-fb2bbbbd99d9

2

代码 2

为了验证上述结论,将不需要序列化的成员变量使用关键字“@transient”标注,表示不序列化当前类中的这两个成员变量。

package com.shockang.study.bigdata.spark.errors.serializable

import org.apache.spark.{SparkConf, SparkContext}

class MyTest2 private(conf: String) extends Serializable {
  private val list = List("a.com", "www.b.com", "a.cn", "a.com.cn", "a.org")
  @transient private val sparkConf = new SparkConf().setMaster("local[*]").setAppName("MyTest2")
  @transient private val sc = new SparkContext(sparkConf)
  private val rdd = sc.parallelize(list)
  private val rootDomain = conf

  private def getResult(): Unit = {
    val result = rdd.filter(item => item.contains(rootDomain))
    result.foreach(println)
  }

  private def stop(): Unit = {
    sc.stop()
  }
}

object MyTest2 {
  def main(args: Array[String]): Unit = {
    val test = new MyTest2("com")
    test.getResult()
    test.stop()
  }
}

日志 2

再次执行程序,程序运行正常。

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/07/25 15:51:17 INFO SparkContext: Running Spark version 3.1.2
21/07/25 15:51:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/07/25 15:51:18 INFO ResourceUtils: ==============================================================
21/07/25 15:51:18 INFO ResourceUtils: No custom resources configured for spark.driver.
21/07/25 15:51:18 INFO ResourceUtils: ==============================================================
21/07/25 15:51:18 INFO SparkContext: Submitted application: MyTest2
21/07/25 15:51:18 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
21/07/25 15:51:18 INFO ResourceProfile: Limiting resource is cpu
21/07/25 15:51:18 INFO ResourceProfileManager: Added ResourceProfile id: 0
21/07/25 15:51:18 INFO SecurityManager: Changing view acls to: shockang
21/07/25 15:51:18 INFO SecurityManager: Changing modify acls to: shockang
21/07/25 15:51:18 INFO SecurityManager: Changing view acls groups to: 
21/07/25 15:51:18 INFO SecurityManager: Changing modify acls groups to: 
21/07/25 15:51:18 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(shockang); groups with view permissions: Set(); users  with modify permissions: Set(shockang); groups with modify permissions: Set()
21/07/25 15:51:18 INFO Utils: Successfully started service 'sparkDriver' on port 63584.
21/07/25 15:51:18 INFO SparkEnv: Registering MapOutputTracker
21/07/25 15:51:18 INFO SparkEnv: Registering BlockManagerMaster
21/07/25 15:51:18 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
21/07/25 15:51:18 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
21/07/25 15:51:18 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
21/07/25 15:51:18 INFO DiskBlockManager: Created local directory at /private/var/folders/hr/js774vr11dndfxny1324ch980000gn/T/blockmgr-292dce43-a0c2-4ba7-aed2-7786b9c34b6d
21/07/25 15:51:18 INFO MemoryStore: MemoryStore started with capacity 2004.6 MiB
21/07/25 15:51:18 INFO SparkEnv: Registering OutputCommitCoordinator
21/07/25 15:51:19 INFO Utils: Successfully started service 'SparkUI' on port 4040.
21/07/25 15:51:19 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.0.105:4040
21/07/25 15:51:19 INFO Executor: Starting executor ID driver on host 192.168.0.105
21/07/25 15:51:19 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 63585.
21/07/25 15:51:19 INFO NettyBlockTransferService: Server created on 192.168.0.105:63585
21/07/25 15:51:19 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
21/07/25 15:51:19 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.0.105, 63585, None)
21/07/25 15:51:19 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.0.105:63585 with 2004.6 MiB RAM, BlockManagerId(driver, 192.168.0.105, 63585, None)
21/07/25 15:51:19 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.0.105, 63585, None)
21/07/25 15:51:19 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.0.105, 63585, None)
21/07/25 15:51:19 INFO SparkContext: Starting job: foreach at MyTest2.scala:14
21/07/25 15:51:19 INFO DAGScheduler: Got job 0 (foreach at MyTest2.scala:14) with 12 output partitions
21/07/25 15:51:19 INFO DAGScheduler: Final stage: ResultStage 0 (foreach at MyTest2.scala:14)
21/07/25 15:51:19 INFO DAGScheduler: Parents of final stage: List()
21/07/25 15:51:19 INFO DAGScheduler: Missing parents: List()
21/07/25 15:51:19 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at filter at MyTest2.scala:13), which has no missing parents
21/07/25 15:51:19 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 3.4 KiB, free 2004.6 MiB)
21/07/25 15:51:19 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1922.0 B, free 2004.6 MiB)
21/07/25 15:51:19 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.0.105:63585 (size: 1922.0 B, free: 2004.6 MiB)
21/07/25 15:51:19 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1388
21/07/25 15:51:19 INFO DAGScheduler: Submitting 12 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at filter at MyTest2.scala:13) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11))
21/07/25 15:51:19 INFO TaskSchedulerImpl: Adding task set 0.0 with 12 tasks resource profile 0
21/07/25 15:51:19 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) (192.168.0.105, executor driver, partition 0, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 15:51:19 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1) (192.168.0.105, executor driver, partition 1, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 15:51:19 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2) (192.168.0.105, executor driver, partition 2, PROCESS_LOCAL, 4457 bytes) taskResourceAssignments Map()
21/07/25 15:51:19 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3) (192.168.0.105, executor driver, partition 3, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 15:51:19 INFO TaskSetManager: Starting task 4.0 in stage 0.0 (TID 4) (192.168.0.105, executor driver, partition 4, PROCESS_LOCAL, 4461 bytes) taskResourceAssignments Map()
21/07/25 15:51:19 INFO TaskSetManager: Starting task 5.0 in stage 0.0 (TID 5) (192.168.0.105, executor driver, partition 5, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 15:51:19 INFO TaskSetManager: Starting task 6.0 in stage 0.0 (TID 6) (192.168.0.105, executor driver, partition 6, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 15:51:19 INFO TaskSetManager: Starting task 7.0 in stage 0.0 (TID 7) (192.168.0.105, executor driver, partition 7, PROCESS_LOCAL, 4456 bytes) taskResourceAssignments Map()
21/07/25 15:51:19 INFO TaskSetManager: Starting task 8.0 in stage 0.0 (TID 8) (192.168.0.105, executor driver, partition 8, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 15:51:19 INFO TaskSetManager: Starting task 9.0 in stage 0.0 (TID 9) (192.168.0.105, executor driver, partition 9, PROCESS_LOCAL, 4460 bytes) taskResourceAssignments Map()
21/07/25 15:51:19 INFO TaskSetManager: Starting task 10.0 in stage 0.0 (TID 10) (192.168.0.105, executor driver, partition 10, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 15:51:19 INFO TaskSetManager: Starting task 11.0 in stage 0.0 (TID 11) (192.168.0.105, executor driver, partition 11, PROCESS_LOCAL, 4457 bytes) taskResourceAssignments Map()
21/07/25 15:51:19 INFO Executor: Running task 7.0 in stage 0.0 (TID 7)
21/07/25 15:51:19 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
21/07/25 15:51:19 INFO Executor: Running task 5.0 in stage 0.0 (TID 5)
21/07/25 15:51:19 INFO Executor: Running task 6.0 in stage 0.0 (TID 6)
21/07/25 15:51:19 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
21/07/25 15:51:19 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
21/07/25 15:51:19 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
21/07/25 15:51:19 INFO Executor: Running task 10.0 in stage 0.0 (TID 10)
21/07/25 15:51:19 INFO Executor: Running task 11.0 in stage 0.0 (TID 11)
21/07/25 15:51:19 INFO Executor: Running task 9.0 in stage 0.0 (TID 9)
21/07/25 15:51:19 INFO Executor: Running task 4.0 in stage 0.0 (TID 4)
21/07/25 15:51:19 INFO Executor: Running task 8.0 in stage 0.0 (TID 8)
www.b.com
a.com
a.com.cn
21/07/25 15:51:20 INFO Executor: Finished task 8.0 in stage 0.0 (TID 8). 880 bytes result sent to driver
21/07/25 15:51:20 INFO Executor: Finished task 11.0 in stage 0.0 (TID 11). 880 bytes result sent to driver
21/07/25 15:51:20 INFO Executor: Finished task 9.0 in stage 0.0 (TID 9). 880 bytes result sent to driver
21/07/25 15:51:20 INFO Executor: Finished task 10.0 in stage 0.0 (TID 10). 880 bytes result sent to driver
21/07/25 15:51:20 INFO Executor: Finished task 4.0 in stage 0.0 (TID 4). 880 bytes result sent to driver
21/07/25 15:51:20 INFO Executor: Finished task 6.0 in stage 0.0 (TID 6). 880 bytes result sent to driver
21/07/25 15:51:20 INFO Executor: Finished task 5.0 in stage 0.0 (TID 5). 880 bytes result sent to driver
21/07/25 15:51:20 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 880 bytes result sent to driver
21/07/25 15:51:20 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 880 bytes result sent to driver
21/07/25 15:51:20 INFO Executor: Finished task 7.0 in stage 0.0 (TID 7). 880 bytes result sent to driver
21/07/25 15:51:20 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 880 bytes result sent to driver
21/07/25 15:51:20 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 880 bytes result sent to driver
21/07/25 15:51:20 INFO TaskSetManager: Finished task 11.0 in stage 0.0 (TID 11) in 595 ms on 192.168.0.105 (executor driver) (1/12)
21/07/25 15:51:20 INFO TaskSetManager: Finished task 6.0 in stage 0.0 (TID 6) in 600 ms on 192.168.0.105 (executor driver) (2/12)
21/07/25 15:51:20 INFO TaskSetManager: Finished task 5.0 in stage 0.0 (TID 5) in 601 ms on 192.168.0.105 (executor driver) (3/12)
21/07/25 15:51:20 INFO TaskSetManager: Finished task 10.0 in stage 0.0 (TID 10) in 599 ms on 192.168.0.105 (executor driver) (4/12)
21/07/25 15:51:20 INFO TaskSetManager: Finished task 4.0 in stage 0.0 (TID 4) in 603 ms on 192.168.0.105 (executor driver) (5/12)
21/07/25 15:51:20 INFO TaskSetManager: Finished task 8.0 in stage 0.0 (TID 8) in 602 ms on 192.168.0.105 (executor driver) (6/12)
21/07/25 15:51:20 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 606 ms on 192.168.0.105 (executor driver) (7/12)
21/07/25 15:51:20 INFO TaskSetManager: Finished task 9.0 in stage 0.0 (TID 9) in 603 ms on 192.168.0.105 (executor driver) (8/12)
21/07/25 15:51:20 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 607 ms on 192.168.0.105 (executor driver) (9/12)
21/07/25 15:51:20 INFO TaskSetManager: Finished task 7.0 in stage 0.0 (TID 7) in 606 ms on 192.168.0.105 (executor driver) (10/12)
21/07/25 15:51:20 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 609 ms on 192.168.0.105 (executor driver) (11/12)
21/07/25 15:51:20 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 637 ms on 192.168.0.105 (executor driver) (12/12)
21/07/25 15:51:20 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
21/07/25 15:51:20 INFO DAGScheduler: ResultStage 0 (foreach at MyTest2.scala:14) finished in 0.763 s
21/07/25 15:51:20 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
21/07/25 15:51:20 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage finished
21/07/25 15:51:20 INFO DAGScheduler: Job 0 finished: foreach at MyTest2.scala:14, took 0.807256 s
21/07/25 15:51:20 INFO SparkUI: Stopped Spark web UI at http://192.168.0.105:4040
21/07/25 15:51:20 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
21/07/25 15:51:20 INFO MemoryStore: MemoryStore cleared
21/07/25 15:51:20 INFO BlockManager: BlockManager stopped
21/07/25 15:51:20 INFO BlockManagerMaster: BlockManagerMaster stopped
21/07/25 15:51:20 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
21/07/25 15:51:20 INFO SparkContext: Successfully stopped SparkContext
21/07/25 15:51:20 INFO ShutdownHookManager: Shutdown hook called
21/07/25 15:51:20 INFO ShutdownHookManager: Deleting directory /private/var/folders/hr/js774vr11dndfxny1324ch980000gn/T/spark-b6b9cd75-c3cc-4701-8b63-93e25180bd56

初步结论

所以,通过上面的例子可以得到结论:

由于 Spark 程序中的 map 、 filter 等算子内部引用了类成员函数或变量,导致该类所有成员都需要支持序列化,又由于该类某些成员变量不支持序列化,最终引发 Task 无法序列化问题。

相反,对类中那些不支持序列化问题的成员变量标注后,使得整个类能够正常序列化,最终消除 Task 未序列化问题。

3

引用成员函数的实例分析

成员变量与成员函数对序列化的影响相同,即引用了某类的成员函数,会导致该类所有成员都支持序列化。

为了验证这个假设,我们在 map 中使用了当前类的一个成员函数,作用是如果当前域名没有以“www.”开头,那么就在域名头部添加“www.”前缀

注:由于 rootDomain 是在 getResult 函数内部定义的,所以就不存在引用类成员变量的问题,也就不存在和排除了上一个例子讨论和引发的问题。
因此,这个例子主要讨论成员函数引用的影响:
此外,不直接引用类成员变量也是解决这类问题的一个手段,如本例中为了消除成员变量的影响而在函数内部定义变量的这种做法。

代码 3

下面的代码同样会报错,同上面的例子一样,由于当前类中的 sc ( SparkContext )和 sparkConf ( SparkConf )两个成员变量没有做好序列化处理,导致当前类的序列化出现问题。

package com.shockang.study.bigdata.spark.errors.serializable

import org.apache.spark.{SparkConf, SparkContext}

class MyTest3 private(conf: String) extends Serializable {
  private val list = List("a.com", "www.b.com", "a.cn", "a.com.cn", "a.org")
  private val sparkConf = new SparkConf().setMaster("local[*]").setAppName("MyTest3")
  private val sc = new SparkContext(sparkConf)
  private val rdd = sc.parallelize(list)

  private def getResult(): Unit = {
    val rootDomain = conf
    val result = rdd.filter(item => item.contains(rootDomain)).map(item => addWWW(item))
    result.foreach(println)
  }

  private def addWWW(str: String): String = {
    if (str.startsWith("www")) str else "www." + str
  }

  private def stop(): Unit = {
    sc.stop()
  }
}

object MyTest3 {
  def main(args: Array[String]): Unit = {
    val test = new MyTest3("com")
    test.getResult()
    test.stop()
  }
}

日志 3

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/07/25 15:52:26 INFO SparkContext: Running Spark version 3.1.2
21/07/25 15:52:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/07/25 15:52:27 INFO ResourceUtils: ==============================================================
21/07/25 15:52:27 INFO ResourceUtils: No custom resources configured for spark.driver.
21/07/25 15:52:27 INFO ResourceUtils: ==============================================================
21/07/25 15:52:27 INFO SparkContext: Submitted application: MyTest3
21/07/25 15:52:27 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
21/07/25 15:52:27 INFO ResourceProfile: Limiting resource is cpu
21/07/25 15:52:27 INFO ResourceProfileManager: Added ResourceProfile id: 0
21/07/25 15:52:27 INFO SecurityManager: Changing view acls to: shockang
21/07/25 15:52:27 INFO SecurityManager: Changing modify acls to: shockang
21/07/25 15:52:27 INFO SecurityManager: Changing view acls groups to: 
21/07/25 15:52:27 INFO SecurityManager: Changing modify acls groups to: 
21/07/25 15:52:27 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(shockang); groups with view permissions: Set(); users  with modify permissions: Set(shockang); groups with modify permissions: Set()
21/07/25 15:52:27 INFO Utils: Successfully started service 'sparkDriver' on port 63658.
21/07/25 15:52:27 INFO SparkEnv: Registering MapOutputTracker
21/07/25 15:52:27 INFO SparkEnv: Registering BlockManagerMaster
21/07/25 15:52:27 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
21/07/25 15:52:27 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
21/07/25 15:52:27 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
21/07/25 15:52:27 INFO DiskBlockManager: Created local directory at /private/var/folders/hr/js774vr11dndfxny1324ch980000gn/T/blockmgr-135805bc-d9ab-4d01-a374-f5631e2cb311
21/07/25 15:52:27 INFO MemoryStore: MemoryStore started with capacity 2004.6 MiB
21/07/25 15:52:27 INFO SparkEnv: Registering OutputCommitCoordinator
21/07/25 15:52:28 INFO Utils: Successfully started service 'SparkUI' on port 4040.
21/07/25 15:52:28 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.0.105:4040
21/07/25 15:52:28 INFO Executor: Starting executor ID driver on host 192.168.0.105
21/07/25 15:52:28 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 63659.
21/07/25 15:52:28 INFO NettyBlockTransferService: Server created on 192.168.0.105:63659
21/07/25 15:52:28 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
21/07/25 15:52:28 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.0.105, 63659, None)
21/07/25 15:52:28 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.0.105:63659 with 2004.6 MiB RAM, BlockManagerId(driver, 192.168.0.105, 63659, None)
21/07/25 15:52:28 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.0.105, 63659, None)
21/07/25 15:52:28 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.0.105, 63659, None)
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:2459)
	at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:422)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.map(RDD.scala:421)
	at com.shockang.study.bigdata.spark.errors.serializable.MyTest3.com$shockang$study$bigdata$spark$errors$serializable$MyTest3$$getResult(MyTest3.scala:13)
	at com.shockang.study.bigdata.spark.errors.serializable.MyTest3$.main(MyTest3.scala:29)
	at com.shockang.study.bigdata.spark.errors.serializable.MyTest3.main(MyTest3.scala)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
	- object not serializable (class: org.apache.spark.SparkContext, value: [email protected])
	- field (class: com.shockang.study.bigdata.spark.errors.serializable.MyTest3, name: sc, type: class org.apache.spark.SparkContext)
	- object (class com.shockang.study.bigdata.spark.errors.serializable.MyTest3, [email protected]2cd4f)
	- element of array (index: 0)
	- array (class [Ljava.lang.Object;, size 1)
	- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
	- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class com.shockang.study.bigdata.spark.errors.serializable.MyTest3, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic com/shockang/study/bigdata/spark/errors/serializable/MyTest3.$anonfun$getResult$2:(Lcom/shockang/study/bigdata/spark/errors/serializable/MyTest3;Ljava/lang/String;)Ljava/lang/String;, instantiatedMethodType=(Ljava/lang/String;)Ljava/lang/String;, numCaptured=1])
	- writeReplace data (class: java.lang.invoke.SerializedLambda)
	- object (class com.shockang.study.bigdata.spark.errors.serializable.MyTest3$$Lambda$703/393481646, com.shockang.study.bigdata.spark.errors.serializable.MyTest3$$Lambda$703/[email protected])
	at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:413)
	... 11 more
21/07/25 15:52:28 INFO SparkContext: Invoking stop() from shutdown hook
21/07/25 15:52:28 INFO SparkUI: Stopped Spark web UI at http://192.168.0.105:4040
21/07/25 15:52:28 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
21/07/25 15:52:28 INFO MemoryStore: MemoryStore cleared
21/07/25 15:52:28 INFO BlockManager: BlockManager stopped
21/07/25 15:52:28 INFO BlockManagerMaster: BlockManagerMaster stopped
21/07/25 15:52:28 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
21/07/25 15:52:28 INFO SparkContext: Successfully stopped SparkContext
21/07/25 15:52:28 INFO ShutdownHookManager: Shutdown hook called
21/07/25 15:52:28 INFO ShutdownHookManager: Deleting directory /private/var/folders/hr/js774vr11dndfxny1324ch980000gn/T/spark-43b61841-b823-4c8d-9589-725962d49c2a

4

代码 4

如同前面的做法,将 sc ( SparkContext )和 SparkConf ( SparkConf )两个成员变量使用"@transient"标注后,使当前类不序列化这两个变量,则程序可以正常运行。

此外,与成员变量稍有不同的是,由于该成员函数不依赖特定的成员变量,因此可以定义在 scala 的 Object 中(类似于 Java 中的 static 函数),这样也取消了对特定类的依赖。

如下面的例子所示,将 addWWW 放到 Object 对象中,在 filter 操作中直接调用,这样处理以后,程序能够正常运行

package com.shockang.study.bigdata.spark.errors.serializable

import org.apache.spark.{SparkConf, SparkContext}

class MyTest4 private(conf: String) extends Serializable {
  private val list = List("a.com", "www.b.com", "a.cn", "a.com.cn", "a.org")
  private val sparkConf = new SparkConf().setMaster("local[*]").setAppName("MyTest4")
  private val sc = new SparkContext(sparkConf)
  private val rdd = sc.parallelize(list)

  private def getResult(): Unit = {
    val rootDomain = conf
    val result = rdd.filter(item => item.contains(rootDomain)).map(item => MyTest4.addWWW(item))
    result.foreach(println)
  }

  private def stop(): Unit = {
    sc.stop()
  }
}

object MyTest4 {

  private def addWWW(str: String): String = {
    if (str.startsWith("www")) str else "www." + str
  }

  def main(args: Array[String]): Unit = {
    val test = new MyTest4("com")
    test.getResult()
    test.stop()
  }
}

日志 4

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/07/25 15:53:24 INFO SparkContext: Running Spark version 3.1.2
21/07/25 15:53:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/07/25 15:53:24 INFO ResourceUtils: ==============================================================
21/07/25 15:53:24 INFO ResourceUtils: No custom resources configured for spark.driver.
21/07/25 15:53:24 INFO ResourceUtils: ==============================================================
21/07/25 15:53:24 INFO SparkContext: Submitted application: MyTest4
21/07/25 15:53:24 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
21/07/25 15:53:24 INFO ResourceProfile: Limiting resource is cpu
21/07/25 15:53:24 INFO ResourceProfileManager: Added ResourceProfile id: 0
21/07/25 15:53:24 INFO SecurityManager: Changing view acls to: shockang
21/07/25 15:53:24 INFO SecurityManager: Changing modify acls to: shockang
21/07/25 15:53:24 INFO SecurityManager: Changing view acls groups to: 
21/07/25 15:53:24 INFO SecurityManager: Changing modify acls groups to: 
21/07/25 15:53:24 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(shockang); groups with view permissions: Set(); users  with modify permissions: Set(shockang); groups with modify permissions: Set()
21/07/25 15:53:25 INFO Utils: Successfully started service 'sparkDriver' on port 63720.
21/07/25 15:53:25 INFO SparkEnv: Registering MapOutputTracker
21/07/25 15:53:25 INFO SparkEnv: Registering BlockManagerMaster
21/07/25 15:53:25 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
21/07/25 15:53:25 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
21/07/25 15:53:25 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
21/07/25 15:53:25 INFO DiskBlockManager: Created local directory at /private/var/folders/hr/js774vr11dndfxny1324ch980000gn/T/blockmgr-44d2c8c8-0c79-4d90-b0c5-94ea0528a8ba
21/07/25 15:53:25 INFO MemoryStore: MemoryStore started with capacity 2004.6 MiB
21/07/25 15:53:25 INFO SparkEnv: Registering OutputCommitCoordinator
21/07/25 15:53:25 INFO Utils: Successfully started service 'SparkUI' on port 4040.
21/07/25 15:53:25 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.0.105:4040
21/07/25 15:53:25 INFO Executor: Starting executor ID driver on host 192.168.0.105
21/07/25 15:53:25 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 63721.
21/07/25 15:53:25 INFO NettyBlockTransferService: Server created on 192.168.0.105:63721
21/07/25 15:53:25 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
21/07/25 15:53:25 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.0.105, 63721, None)
21/07/25 15:53:25 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.0.105:63721 with 2004.6 MiB RAM, BlockManagerId(driver, 192.168.0.105, 63721, None)
21/07/25 15:53:25 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.0.105, 63721, None)
21/07/25 15:53:25 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.0.105, 63721, None)
21/07/25 15:53:26 INFO SparkContext: Starting job: foreach at MyTest4.scala:14
21/07/25 15:53:26 INFO DAGScheduler: Got job 0 (foreach at MyTest4.scala:14) with 12 output partitions
21/07/25 15:53:26 INFO DAGScheduler: Final stage: ResultStage 0 (foreach at MyTest4.scala:14)
21/07/25 15:53:26 INFO DAGScheduler: Parents of final stage: List()
21/07/25 15:53:26 INFO DAGScheduler: Missing parents: List()
21/07/25 15:53:26 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[2] at map at MyTest4.scala:13), which has no missing parents
21/07/25 15:53:26 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 3.6 KiB, free 2004.6 MiB)
21/07/25 15:53:26 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 2003.0 B, free 2004.6 MiB)
21/07/25 15:53:26 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.0.105:63721 (size: 2003.0 B, free: 2004.6 MiB)
21/07/25 15:53:26 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1388
21/07/25 15:53:26 INFO DAGScheduler: Submitting 12 missing tasks from ResultStage 0 (MapPartitionsRDD[2] at map at MyTest4.scala:13) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11))
21/07/25 15:53:26 INFO TaskSchedulerImpl: Adding task set 0.0 with 12 tasks resource profile 0
21/07/25 15:53:26 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) (192.168.0.105, executor driver, partition 0, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 15:53:26 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1) (192.168.0.105, executor driver, partition 1, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 15:53:26 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2) (192.168.0.105, executor driver, partition 2, PROCESS_LOCAL, 4457 bytes) taskResourceAssignments Map()
21/07/25 15:53:26 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3) (192.168.0.105, executor driver, partition 3, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 15:53:26 INFO TaskSetManager: Starting task 4.0 in stage 0.0 (TID 4) (192.168.0.105, executor driver, partition 4, PROCESS_LOCAL, 4461 bytes) taskResourceAssignments Map()
21/07/25 15:53:26 INFO TaskSetManager: Starting task 5.0 in stage 0.0 (TID 5) (192.168.0.105, executor driver, partition 5, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 15:53:26 INFO TaskSetManager: Starting task 6.0 in stage 0.0 (TID 6) (192.168.0.105, executor driver, partition 6, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 15:53:26 INFO TaskSetManager: Starting task 7.0 in stage 0.0 (TID 7) (192.168.0.105, executor driver, partition 7, PROCESS_LOCAL, 4456 bytes) taskResourceAssignments Map()
21/07/25 15:53:26 INFO TaskSetManager: Starting task 8.0 in stage 0.0 (TID 8) (192.168.0.105, executor driver, partition 8, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 15:53:26 INFO TaskSetManager: Starting task 9.0 in stage 0.0 (TID 9) (192.168.0.105, executor driver, partition 9, PROCESS_LOCAL, 4460 bytes) taskResourceAssignments Map()
21/07/25 15:53:26 INFO TaskSetManager: Starting task 10.0 in stage 0.0 (TID 10) (192.168.0.105, executor driver, partition 10, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 15:53:26 INFO TaskSetManager: Starting task 11.0 in stage 0.0 (TID 11) (192.168.0.105, executor driver, partition 11, PROCESS_LOCAL, 4457 bytes) taskResourceAssignments Map()
21/07/25 15:53:26 INFO Executor: Running task 11.0 in stage 0.0 (TID 11)
21/07/25 15:53:26 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
21/07/25 15:53:26 INFO Executor: Running task 10.0 in stage 0.0 (TID 10)
21/07/25 15:53:26 INFO Executor: Running task 4.0 in stage 0.0 (TID 4)
21/07/25 15:53:26 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
21/07/25 15:53:26 INFO Executor: Running task 5.0 in stage 0.0 (TID 5)
21/07/25 15:53:26 INFO Executor: Running task 9.0 in stage 0.0 (TID 9)
21/07/25 15:53:26 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
21/07/25 15:53:26 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
21/07/25 15:53:26 INFO Executor: Running task 7.0 in stage 0.0 (TID 7)
21/07/25 15:53:26 INFO Executor: Running task 6.0 in stage 0.0 (TID 6)
21/07/25 15:53:26 INFO Executor: Running task 8.0 in stage 0.0 (TID 8)
www.a.com.cn
www.b.com
www.a.com
21/07/25 15:53:27 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 880 bytes result sent to driver
21/07/25 15:53:27 INFO Executor: Finished task 10.0 in stage 0.0 (TID 10). 880 bytes result sent to driver
21/07/25 15:53:27 INFO Executor: Finished task 11.0 in stage 0.0 (TID 11). 880 bytes result sent to driver
21/07/25 15:53:27 INFO Executor: Finished task 7.0 in stage 0.0 (TID 7). 880 bytes result sent to driver
21/07/25 15:53:27 INFO Executor: Finished task 5.0 in stage 0.0 (TID 5). 880 bytes result sent to driver
21/07/25 15:53:27 INFO Executor: Finished task 4.0 in stage 0.0 (TID 4). 880 bytes result sent to driver
21/07/25 15:53:27 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 880 bytes result sent to driver
21/07/25 15:53:27 INFO Executor: Finished task 6.0 in stage 0.0 (TID 6). 880 bytes result sent to driver
21/07/25 15:53:27 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 880 bytes result sent to driver
21/07/25 15:53:27 INFO Executor: Finished task 8.0 in stage 0.0 (TID 8). 880 bytes result sent to driver
21/07/25 15:53:27 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 880 bytes result sent to driver
21/07/25 15:53:27 INFO Executor: Finished task 9.0 in stage 0.0 (TID 9). 880 bytes result sent to driver
21/07/25 15:53:27 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 574 ms on 192.168.0.105 (executor driver) (1/12)
21/07/25 15:53:27 INFO TaskSetManager: Finished task 10.0 in stage 0.0 (TID 10) in 539 ms on 192.168.0.105 (executor driver) (2/12)
21/07/25 15:53:27 INFO TaskSetManager: Finished task 7.0 in stage 0.0 (TID 7) in 542 ms on 192.168.0.105 (executor driver) (3/12)
21/07/25 15:53:27 INFO TaskSetManager: Finished task 11.0 in stage 0.0 (TID 11) in 539 ms on 192.168.0.105 (executor driver) (4/12)
21/07/25 15:53:27 INFO TaskSetManager: Finished task 5.0 in stage 0.0 (TID 5) in 546 ms on 192.168.0.105 (executor driver) (5/12)
21/07/25 15:53:27 INFO TaskSetManager: Finished task 4.0 in stage 0.0 (TID 4) in 546 ms on 192.168.0.105 (executor driver) (6/12)
21/07/25 15:53:27 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 549 ms on 192.168.0.105 (executor driver) (7/12)
21/07/25 15:53:27 INFO TaskSetManager: Finished task 6.0 in stage 0.0 (TID 6) in 546 ms on 192.168.0.105 (executor driver) (8/12)
21/07/25 15:53:27 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 549 ms on 192.168.0.105 (executor driver) (9/12)
21/07/25 15:53:27 INFO TaskSetManager: Finished task 8.0 in stage 0.0 (TID 8) in 545 ms on 192.168.0.105 (executor driver) (10/12)
21/07/25 15:53:27 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 551 ms on 192.168.0.105 (executor driver) (11/12)
21/07/25 15:53:27 INFO TaskSetManager: Finished task 9.0 in stage 0.0 (TID 9) in 544 ms on 192.168.0.105 (executor driver) (12/12)
21/07/25 15:53:27 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
21/07/25 15:53:27 INFO DAGScheduler: ResultStage 0 (foreach at MyTest4.scala:14) finished in 0.716 s
21/07/25 15:53:27 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
21/07/25 15:53:27 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage finished
21/07/25 15:53:27 INFO DAGScheduler: Job 0 finished: foreach at MyTest4.scala:14, took 0.760258 s
21/07/25 15:53:27 INFO SparkUI: Stopped Spark web UI at http://192.168.0.105:4040
21/07/25 15:53:27 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
21/07/25 15:53:27 INFO MemoryStore: MemoryStore cleared
21/07/25 15:53:27 INFO BlockManager: BlockManager stopped
21/07/25 15:53:27 INFO BlockManagerMaster: BlockManagerMaster stopped
21/07/25 15:53:27 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
21/07/25 15:53:27 INFO SparkContext: Successfully stopped SparkContext
21/07/25 15:53:27 INFO ShutdownHookManager: Shutdown hook called
21/07/25 15:53:27 INFO ShutdownHookManager: Deleting directory /private/var/folders/hr/js774vr11dndfxny1324ch980000gn/T/spark-8a24c6de-c463-40e9-817a-8ad40561f024

分析 4

如上所述,引用了某类成员函数,会导致该类及所有成员都需要支持序列化。

因此,对于使用了某类成员变量或函数的情形,首先该类需要序列化( extends Serializable ),同时需要对某些不需要序列化的成员变量进行标记,以避免对序列化造成影响。

5

去掉 extends Serializable

对于上面两个例子,由于引用了该类的成员变量或函数,所以导致该类以及所有成员支持序列化,为了消除某些成员变量对序列化的影响,使用“@transient”进行标注。

为了进一步验证关于整个类需要序列化的假设,将类序列化的相关代码删除(去掉 extends Serializable )

这样程序执行会报该类未序列化的错误,如下所示。

Caused by: java.io.NotSerializableException: com.shockang.study.bigdata.spark.errors.serializable.MyTest5

所以,这个实例说明了上面的假设。

代码 5

package com.shockang.study.bigdata.spark.errors.serializable

import org.apache.spark.{SparkConf, SparkContext}

class MyTest5 private(conf: String) {
  private val list = List("a.com", "www.b.com", "a.cn", "a.com.cn", "a.org")
  @transient private val sparkConf = new SparkConf().setMaster("local[*]").setAppName("MyTest5")
  @transient private val sc = new SparkContext(sparkConf)
  private val rdd = sc.parallelize(list)
  private val rootDomain = conf

  private def getResult(): Unit = {
    val result = rdd.filter(item => item.contains(rootDomain)).map(item => MyTest5.addWWW(item))
    result.foreach(println)
  }

  private def stop(): Unit = {
    sc.stop()
  }
}

object MyTest5 {

  private def addWWW(str: String): String = {
    if (str.startsWith("www")) str else "www." + str
  }

  def main(args: Array[String]): Unit = {
    val test = new MyTest5("com")
    test.getResult()
    test.stop()
  }
}

日志 5

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/07/25 16:01:25 INFO SparkContext: Running Spark version 3.1.2
21/07/25 16:01:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/07/25 16:01:25 INFO ResourceUtils: ==============================================================
21/07/25 16:01:25 INFO ResourceUtils: No custom resources configured for spark.driver.
21/07/25 16:01:25 INFO ResourceUtils: ==============================================================
21/07/25 16:01:25 INFO SparkContext: Submitted application: MyTest5
21/07/25 16:01:25 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
21/07/25 16:01:25 INFO ResourceProfile: Limiting resource is cpu
21/07/25 16:01:25 INFO ResourceProfileManager: Added ResourceProfile id: 0
21/07/25 16:01:25 INFO SecurityManager: Changing view acls to: shockang
21/07/25 16:01:25 INFO SecurityManager: Changing modify acls to: shockang
21/07/25 16:01:25 INFO SecurityManager: Changing view acls groups to: 
21/07/25 16:01:25 INFO SecurityManager: Changing modify acls groups to: 
21/07/25 16:01:25 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(shockang); groups with view permissions: Set(); users  with modify permissions: Set(shockang); groups with modify permissions: Set()
21/07/25 16:01:26 INFO Utils: Successfully started service 'sparkDriver' on port 64099.
21/07/25 16:01:26 INFO SparkEnv: Registering MapOutputTracker
21/07/25 16:01:26 INFO SparkEnv: Registering BlockManagerMaster
21/07/25 16:01:26 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
21/07/25 16:01:26 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
21/07/25 16:01:26 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
21/07/25 16:01:26 INFO DiskBlockManager: Created local directory at /private/var/folders/hr/js774vr11dndfxny1324ch980000gn/T/blockmgr-2276d227-1ad9-4742-96c1-aa4507fb17f5
21/07/25 16:01:26 INFO MemoryStore: MemoryStore started with capacity 2004.6 MiB
21/07/25 16:01:26 INFO SparkEnv: Registering OutputCommitCoordinator
21/07/25 16:01:26 INFO Utils: Successfully started service 'SparkUI' on port 4040.
21/07/25 16:01:26 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.0.105:4040
21/07/25 16:01:26 INFO Executor: Starting executor ID driver on host 192.168.0.105
21/07/25 16:01:26 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 64100.
21/07/25 16:01:26 INFO NettyBlockTransferService: Server created on 192.168.0.105:64100
21/07/25 16:01:26 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
21/07/25 16:01:26 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.0.105, 64100, None)
21/07/25 16:01:26 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.0.105:64100 with 2004.6 MiB RAM, BlockManagerId(driver, 192.168.0.105, 64100, None)
21/07/25 16:01:26 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.0.105, 64100, None)
21/07/25 16:01:26 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.0.105, 64100, None)
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:2459)
	at org.apache.spark.rdd.RDD.$anonfun$filter$1(RDD.scala:439)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.filter(RDD.scala:438)
	at com.shockang.study.bigdata.spark.errors.serializable.MyTest5.com$shockang$study$bigdata$spark$errors$serializable$MyTest5$$getResult(MyTest5.scala:13)
	at com.shockang.study.bigdata.spark.errors.serializable.MyTest5$.main(MyTest5.scala:30)
	at com.shockang.study.bigdata.spark.errors.serializable.MyTest5.main(MyTest5.scala)
Caused by: java.io.NotSerializableException: com.shockang.study.bigdata.spark.errors.serializable.MyTest5
Serialization stack:
	- object not serializable (class: com.shockang.study.bigdata.spark.errors.serializable.MyTest5, value: [email protected]aa5f2)
	- element of array (index: 0)
	- array (class [Ljava.lang.Object;, size 1)
	- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
	- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class com.shockang.study.bigdata.spark.errors.serializable.MyTest5, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic com/shockang/study/bigdata/spark/errors/serializable/MyTest5.$anonfun$getResult$1$adapted:(Lcom/shockang/study/bigdata/spark/errors/serializable/MyTest5;Ljava/lang/String;)Ljava/lang/Object;, instantiatedMethodType=(Ljava/lang/String;)Ljava/lang/Object;, numCaptured=1])
	- writeReplace data (class: java.lang.invoke.SerializedLambda)
	- object (class com.shockang.study.bigdata.spark.errors.serializable.MyTest5$$Lambda$689/1155399955, com.shockang.study.bigdata.spark.errors.serializable.MyTest5$$Lambda$689/[email protected])
	at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:413)
	... 11 more
21/07/25 16:01:27 INFO SparkContext: Invoking stop() from shutdown hook
21/07/25 16:01:27 INFO SparkUI: Stopped Spark web UI at http://192.168.0.105:4040
21/07/25 16:01:27 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
21/07/25 16:01:27 INFO MemoryStore: MemoryStore cleared
21/07/25 16:01:27 INFO BlockManager: BlockManager stopped
21/07/25 16:01:27 INFO BlockManagerMaster: BlockManagerMaster stopped
21/07/25 16:01:27 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
21/07/25 16:01:27 INFO SparkContext: Successfully stopped SparkContext
21/07/25 16:01:27 INFO ShutdownHookManager: Shutdown hook called
21/07/25 16:01:27 INFO ShutdownHookManager: Deleting directory /private/var/folders/hr/js774vr11dndfxny1324ch980000gn/T/spark-a750f6ba-13a5-4c0a-a79d-8fa86fa19f0b

分析 5

通过上面的例子可以说明: map 等算子内部可以引用外部变量和某类的成员变量,但是要做好该类的序列化处理。

首先是该类需要继承 Serializable 类,此外,对类中某些序列化会出错的成员变量做好处理,这也是 Task 未序列化问题的主要原因。

出现这类问题,首先查看未能序列化的成员变量是哪个,对于可以不需要序列化的成员变量,可使用“@transient”标注。

此外,也不是 map 操作所在的类必须序列化不可(继承 Serializable 类),对于不需要引用某类成员变量或函数的情形,就不会要求相应的类必须实现序列化,如下面的例子所示。

filter 操作内部没有引用任何类的成员变量或函数,因此当前类不用序列化,程序可正常执行。

6

代码 6

package com.shockang.study.bigdata.spark.errors.serializable

import org.apache.spark.{SparkConf, SparkContext}

class MyTest6 private(conf: String) {
  private val list = List("a.com", "www.b.com", "a.cn", "a.com.cn", "a.org")
  @transient private val sparkConf = new SparkConf().setMaster("local[*]").setAppName("MyTest6")
  @transient private val sc = new SparkContext(sparkConf)
  private val rdd = sc.parallelize(list)

  private def getResult(): Unit = {
    val rootDomain = conf
    val result = rdd.filter(item => item.contains(rootDomain)).map(item => MyTest6.addWWW(item))
    result.foreach(println)
  }

  private def stop(): Unit = {
    sc.stop()
  }
}

object MyTest6 {

  private def addWWW(str: String): String = {
    if (str.startsWith("www")) str else "www." + str
  }

  def main(args: Array[String]): Unit = {
    val test = new MyTest6("com")
    test.getResult()
    test.stop()
  }
}

日志 6

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/07/25 16:16:37 INFO SparkContext: Running Spark version 3.1.2
21/07/25 16:16:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/07/25 16:16:37 INFO ResourceUtils: ==============================================================
21/07/25 16:16:37 INFO ResourceUtils: No custom resources configured for spark.driver.
21/07/25 16:16:37 INFO ResourceUtils: ==============================================================
21/07/25 16:16:37 INFO SparkContext: Submitted application: MyTest6
21/07/25 16:16:37 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
21/07/25 16:16:37 INFO ResourceProfile: Limiting resource is cpu
21/07/25 16:16:37 INFO ResourceProfileManager: Added ResourceProfile id: 0
21/07/25 16:16:37 INFO SecurityManager: Changing view acls to: shockang
21/07/25 16:16:37 INFO SecurityManager: Changing modify acls to: shockang
21/07/25 16:16:37 INFO SecurityManager: Changing view acls groups to: 
21/07/25 16:16:37 INFO SecurityManager: Changing modify acls groups to: 
21/07/25 16:16:37 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(shockang); groups with view permissions: Set(); users  with modify permissions: Set(shockang); groups with modify permissions: Set()
21/07/25 16:16:37 INFO Utils: Successfully started service 'sparkDriver' on port 64622.
21/07/25 16:16:37 INFO SparkEnv: Registering MapOutputTracker
21/07/25 16:16:37 INFO SparkEnv: Registering BlockManagerMaster
21/07/25 16:16:37 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
21/07/25 16:16:37 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
21/07/25 16:16:37 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
21/07/25 16:16:37 INFO DiskBlockManager: Created local directory at /private/var/folders/hr/js774vr11dndfxny1324ch980000gn/T/blockmgr-24ef1968-ab12-416b-ba8a-6b0c79994d55
21/07/25 16:16:37 INFO MemoryStore: MemoryStore started with capacity 2004.6 MiB
21/07/25 16:16:37 INFO SparkEnv: Registering OutputCommitCoordinator
21/07/25 16:16:38 INFO Utils: Successfully started service 'SparkUI' on port 4040.
21/07/25 16:16:38 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.0.105:4040
21/07/25 16:16:38 INFO Executor: Starting executor ID driver on host 192.168.0.105
21/07/25 16:16:38 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 64623.
21/07/25 16:16:38 INFO NettyBlockTransferService: Server created on 192.168.0.105:64623
21/07/25 16:16:38 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
21/07/25 16:16:38 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.0.105, 64623, None)
21/07/25 16:16:38 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.0.105:64623 with 2004.6 MiB RAM, BlockManagerId(driver, 192.168.0.105, 64623, None)
21/07/25 16:16:38 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.0.105, 64623, None)
21/07/25 16:16:38 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.0.105, 64623, None)
21/07/25 16:16:38 INFO SparkContext: Starting job: foreach at MyTest6.scala:14
21/07/25 16:16:38 INFO DAGScheduler: Got job 0 (foreach at MyTest6.scala:14) with 12 output partitions
21/07/25 16:16:38 INFO DAGScheduler: Final stage: ResultStage 0 (foreach at MyTest6.scala:14)
21/07/25 16:16:38 INFO DAGScheduler: Parents of final stage: List()
21/07/25 16:16:38 INFO DAGScheduler: Missing parents: List()
21/07/25 16:16:38 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[2] at map at MyTest6.scala:13), which has no missing parents
21/07/25 16:16:39 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 3.4 KiB, free 2004.6 MiB)
21/07/25 16:16:39 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1919.0 B, free 2004.6 MiB)
21/07/25 16:16:39 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.0.105:64623 (size: 1919.0 B, free: 2004.6 MiB)
21/07/25 16:16:39 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1388
21/07/25 16:16:39 INFO DAGScheduler: Submitting 12 missing tasks from ResultStage 0 (MapPartitionsRDD[2] at map at MyTest6.scala:13) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11))
21/07/25 16:16:39 INFO TaskSchedulerImpl: Adding task set 0.0 with 12 tasks resource profile 0
21/07/25 16:16:39 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) (192.168.0.105, executor driver, partition 0, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 16:16:39 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1) (192.168.0.105, executor driver, partition 1, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 16:16:39 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2) (192.168.0.105, executor driver, partition 2, PROCESS_LOCAL, 4457 bytes) taskResourceAssignments Map()
21/07/25 16:16:39 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3) (192.168.0.105, executor driver, partition 3, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 16:16:39 INFO TaskSetManager: Starting task 4.0 in stage 0.0 (TID 4) (192.168.0.105, executor driver, partition 4, PROCESS_LOCAL, 4461 bytes) taskResourceAssignments Map()
21/07/25 16:16:39 INFO TaskSetManager: Starting task 5.0 in stage 0.0 (TID 5) (192.168.0.105, executor driver, partition 5, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 16:16:39 INFO TaskSetManager: Starting task 6.0 in stage 0.0 (TID 6) (192.168.0.105, executor driver, partition 6, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 16:16:39 INFO TaskSetManager: Starting task 7.0 in stage 0.0 (TID 7) (192.168.0.105, executor driver, partition 7, PROCESS_LOCAL, 4456 bytes) taskResourceAssignments Map()
21/07/25 16:16:39 INFO TaskSetManager: Starting task 8.0 in stage 0.0 (TID 8) (192.168.0.105, executor driver, partition 8, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 16:16:39 INFO TaskSetManager: Starting task 9.0 in stage 0.0 (TID 9) (192.168.0.105, executor driver, partition 9, PROCESS_LOCAL, 4460 bytes) taskResourceAssignments Map()
21/07/25 16:16:39 INFO TaskSetManager: Starting task 10.0 in stage 0.0 (TID 10) (192.168.0.105, executor driver, partition 10, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 16:16:39 INFO TaskSetManager: Starting task 11.0 in stage 0.0 (TID 11) (192.168.0.105, executor driver, partition 11, PROCESS_LOCAL, 4457 bytes) taskResourceAssignments Map()
21/07/25 16:16:39 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
21/07/25 16:16:39 INFO Executor: Running task 11.0 in stage 0.0 (TID 11)
21/07/25 16:16:39 INFO Executor: Running task 7.0 in stage 0.0 (TID 7)
21/07/25 16:16:39 INFO Executor: Running task 10.0 in stage 0.0 (TID 10)
21/07/25 16:16:39 INFO Executor: Running task 6.0 in stage 0.0 (TID 6)
21/07/25 16:16:39 INFO Executor: Running task 4.0 in stage 0.0 (TID 4)
21/07/25 16:16:39 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
21/07/25 16:16:39 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
21/07/25 16:16:39 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
21/07/25 16:16:39 INFO Executor: Running task 8.0 in stage 0.0 (TID 8)
21/07/25 16:16:39 INFO Executor: Running task 5.0 in stage 0.0 (TID 5)
21/07/25 16:16:39 INFO Executor: Running task 9.0 in stage 0.0 (TID 9)
www.b.com
www.a.com
www.a.com.cn
21/07/25 16:16:39 INFO Executor: Finished task 4.0 in stage 0.0 (TID 4). 880 bytes result sent to driver
21/07/25 16:16:39 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 880 bytes result sent to driver
21/07/25 16:16:39 INFO Executor: Finished task 6.0 in stage 0.0 (TID 6). 880 bytes result sent to driver
21/07/25 16:16:39 INFO Executor: Finished task 10.0 in stage 0.0 (TID 10). 880 bytes result sent to driver
21/07/25 16:16:39 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 880 bytes result sent to driver
21/07/25 16:16:39 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 880 bytes result sent to driver
21/07/25 16:16:39 INFO Executor: Finished task 9.0 in stage 0.0 (TID 9). 923 bytes result sent to driver
21/07/25 16:16:39 INFO Executor: Finished task 8.0 in stage 0.0 (TID 8). 880 bytes result sent to driver
21/07/25 16:16:39 INFO Executor: Finished task 7.0 in stage 0.0 (TID 7). 880 bytes result sent to driver
21/07/25 16:16:39 INFO Executor: Finished task 5.0 in stage 0.0 (TID 5). 880 bytes result sent to driver
21/07/25 16:16:39 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 923 bytes result sent to driver
21/07/25 16:16:39 INFO Executor: Finished task 11.0 in stage 0.0 (TID 11). 880 bytes result sent to driver
21/07/25 16:16:39 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 706 ms on 192.168.0.105 (executor driver) (1/12)
21/07/25 16:16:39 INFO TaskSetManager: Finished task 6.0 in stage 0.0 (TID 6) in 665 ms on 192.168.0.105 (executor driver) (2/12)
21/07/25 16:16:39 INFO TaskSetManager: Finished task 10.0 in stage 0.0 (TID 10) in 663 ms on 192.168.0.105 (executor driver) (3/12)
21/07/25 16:16:39 INFO TaskSetManager: Finished task 9.0 in stage 0.0 (TID 9) in 664 ms on 192.168.0.105 (executor driver) (4/12)
21/07/25 16:16:39 INFO TaskSetManager: Finished task 4.0 in stage 0.0 (TID 4) in 667 ms on 192.168.0.105 (executor driver) (5/12)
21/07/25 16:16:39 INFO TaskSetManager: Finished task 8.0 in stage 0.0 (TID 8) in 666 ms on 192.168.0.105 (executor driver) (6/12)
21/07/25 16:16:39 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 672 ms on 192.168.0.105 (executor driver) (7/12)
21/07/25 16:16:39 INFO TaskSetManager: Finished task 7.0 in stage 0.0 (TID 7) in 667 ms on 192.168.0.105 (executor driver) (8/12)
21/07/25 16:16:39 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 671 ms on 192.168.0.105 (executor driver) (9/12)
21/07/25 16:16:39 INFO TaskSetManager: Finished task 5.0 in stage 0.0 (TID 5) in 670 ms on 192.168.0.105 (executor driver) (10/12)
21/07/25 16:16:39 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 672 ms on 192.168.0.105 (executor driver) (11/12)
21/07/25 16:16:39 INFO TaskSetManager: Finished task 11.0 in stage 0.0 (TID 11) in 667 ms on 192.168.0.105 (executor driver) (12/12)
21/07/25 16:16:39 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
21/07/25 16:16:39 INFO DAGScheduler: ResultStage 0 (foreach at MyTest6.scala:14) finished in 0.863 s
21/07/25 16:16:39 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
21/07/25 16:16:39 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage finished
21/07/25 16:16:39 INFO DAGScheduler: Job 0 finished: foreach at MyTest6.scala:14, took 0.910534 s
21/07/25 16:16:39 INFO SparkUI: Stopped Spark web UI at http://192.168.0.105:4040
21/07/25 16:16:39 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
21/07/25 16:16:39 INFO MemoryStore: MemoryStore cleared
21/07/25 16:16:39 INFO BlockManager: BlockManager stopped
21/07/25 16:16:39 INFO BlockManagerMaster: BlockManagerMaster stopped
21/07/25 16:16:39 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
21/07/25 16:16:39 INFO SparkContext: Successfully stopped SparkContext
21/07/25 16:16:39 INFO ShutdownHookManager: Shutdown hook called
21/07/25 16:16:39 INFO ShutdownHookManager: Deleting directory /private/var/folders/hr/js774vr11dndfxny1324ch980000gn/T/spark-6e765980-e5b6-410a-b0e7-5635b476cb11

总结

承上所述,这类问题主要是引用了某类的成员变量或函数,并且相应的类没有做好序列化处理导致的

解决这个问题有以下两种方法:

1. 不在(或不直接在) map 等闭包内部直接引用某类成员函数或成员变量

对于依赖某类成员变量的情形:

  1. 如果程序依赖的值相对固定,可取固定的值,或定义在 map 、 filter 等操作内部,或定义在 scala object 对象中(类似于 Java 中的 static 变量)。
  2. 如果依赖值需要程序调用时动态指定(以函数参数形式),则在 map 、 filter 等操作时,可不直接引用该成员变量,而是在类似上面例子的 getResult 函数中根据成员变量的值重新定义一个局部变量,这样, map 等算子就无需引用类的成员变量。

对于依赖某类成员函数的情形:

如果函数功能独立,可定义在 scala object 对象中(类似于 Java 中的 static 方法),这样就无需特定的类。

2. 如果引用了某类的成员函数或变量,则需对相应的类做好序列化处理。

对于这种情况,需对该类做好序列化处理,首先该类继承序列化类,然后对不能序列化的成员变量使用“@transient”标注,告诉编译器不需要序列化。

此外,如果可以,可将依赖的变量独立放到一个小的 Class 中,让这个 Class 支持序列化,这样做可以减少网络传输量,提高效率。