org.apache.spark.SparkException: Task not serializable
程序员文章站
2022-07-15 12:55:36
...
报错详情
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2101)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:925)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924)
at exam.Five$$anonfun$3.apply(Five.scala:66)
at exam.Five$$anonfun$3.apply(Five.scala:42)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:256)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:256)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:256)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:255)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.NotSerializableException: redis.clients.jedis.JedisPool
Serialization stack:
- object not serializable (class: redis.clients.jedis.JedisPool, value: [email protected])
- field (class: exam.Five$$anonfun$3$$anonfun$apply$2, name: cs$1, type: class redis.clients.jedis.JedisPool)
- object (class exam.Five$$anonfun$3$$anonfun$apply$2, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
... 30 more
原因
出现“task not serializable"这个错误,一般是因为在map、filter等的参数使用了外部的变量,但是这个变量不能序列化。特别是当引用了某个类(经常是当前类)的成员函数或变量时,会导致这个类的所有成员(整个类)都需要支持序列化。
错误代码
// 从kafka中的消息
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
// 处理kafka的消息
stream.foreachRDD(rdd=>{
if(!rdd.isEmpty()){
val lin = rdd.map(_.value())
// lin.foreach(println(_))
val res = lin.map(x=>{
val l = x.split(" ")
(l(0),l(2),l(4).trim.toInt)
})
// *******************redis连接池
val rcon = new JedisPoolConfig()
rcon.setMaxIdle(3)
val cs = new JedisPool(rcon,"localhost",6379,1000)
// (5) 求出实时成交额保存到redis(5分)
cs.getResource.select(6)
val res2 =res.map(_._3).reduce(_+_)
println(res2)
cs.getResource.incrBy("实时成交额",res2)
// (6) 求出实时购买人数保存到redis(5分)
val res3=res.map(_._1).distinct().count().toInt
println(res3)
cs.getResource.incrBy("购买人数",res3)
// (7) 求出实时分类成交额保存到redis中(5分)
val res4 = res.map(xx=>(xx._2,xx._3)).reduceByKey(_+_)
res4.foreach(println(_))
res4.foreachPartition(c=>{
c.foreach(v=>{
//******************* 因为redis连接池是外部定义的,所以在这个for循环里不能用需要重新定义
cs.getResource.incrBy(v._1,v._2)
})
})
cs.getResource.close()
cs.close()
}
})
解决方法
每次往redis中存的时候,都需要重新获取 连接池
// (7) 求出实时分类成交额保存到redis中(5分)
val res4 = res.map(xx=>(xx._2,xx._3)).reduceByKey(_+_)
res4.foreach(println(_))
res4.foreachPartition(c=>{
val rconn = new JedisPoolConfig() //***********重新获取redis连接池
rconn.setMaxIdle(3)
val css = new JedisPool(rconn,"localhost",6379,1000)
c.foreach(v=>{
cs.getResource.incrBy(v._1,v._2)
})
css.getResource.close()
css.close()
})