欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Exception in thread “main“ org.apache.spark.SparkException: Task not serializable

程序员文章站 2022-07-15 12:55:06
...

Exception in thread “main” org.apache.spark.SparkException: Task not serializable

原因:

当需要在Driver之间传递变量时,这个变量是需要可以序列化的

错误代码

错误分析:
获取和关闭jedis连接应该在分区内进行,此处相当于在driver处获取了连接,所以需要序列化jedis变量.

解决方法:
把jedis连接获取放入分区内即可解决此问题.
代码

    val filteredStream: DStream[JSONObject] = jsonObjDStream.mapPartitions {
      //获取和关闭jedis连接应该在分区内进行,此处相当于在driver处获取了连接,所以需要序列化jedis变量
      val jedis: Jedis = RedisUtil.getJedisClient
      datas => {
        println("该分区过滤前数据量-----" + datas.size)
        val filterd: Iterator[JSONObject] = datas.filter(
          jsonObj => {
            val dt: String = jsonObj.getString("dt")
            val mid: String = jsonObj.getJSONObject("common").getString("mid")
            val dau: String = "dau:" + dt
            val isNew: lang.Long = jedis.sadd(dau, mid)
            if (isNew == 1L) true else false
          }
        )
        jedis.close()
        println("该分区过滤后数据量*******" + filterd.size)
        filterd
      }
    }

修正代码

val filteredStream: DStream[JSONObject] = jsonObjDStream.mapPartitions {
      datas => {
      	//只用修改此处即可------>
        val jedis: Jedis = RedisUtil.getJedisClient
        println("该分区过滤前数据量-----" + datas.size)
        val filterd: Iterator[JSONObject] = datas.filter(
          jsonObj => {
            val dt: String = jsonObj.getString("dt")
            val mid: String = jsonObj.getJSONObject("common").getString("mid")
            val dau: String = "dau:" + dt
            val isNew: lang.Long = jedis.sadd(dau, mid)
            if (isNew == 1L) true else false
          }
        )
        jedis.close()
        println("该分区过滤后数据量*******" + filterd.size)
        filterd
      }
    }