欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Kafka日志模块(七):LogManager周期性定时任务

程序员文章站 2022-07-14 18:32:58
...

LogManager总共有以下几个任务,由startup启动。

  def startup(): Unit = {
    /* Schedule the cleanup task to delete old logs */
    if (scheduler != null) {
      // 1. 启动 kafka-log-retention 周期性任务,对过期或过大的日志文件执行清理工作
      info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
      scheduler.schedule("kafka-log-retention",
                         cleanupLogs _,
                         delay = InitialTaskDelayMs,
                         period = retentionCheckMs,
                         TimeUnit.MILLISECONDS)
      info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
      // 2. 启动 kafka-log-flusher 周期性任务,对日志文件执行刷盘操作
      scheduler.schedule("kafka-log-flusher",
                         flushDirtyLogs _,
                         delay = InitialTaskDelayMs,
                         period = flushCheckMs,
                         TimeUnit.MILLISECONDS)
      // 3. 启动 kafka-recovery-point-checkpoint 周期性任务,更新 recovery-point-offset-checkpoint 文件
      scheduler.schedule("kafka-recovery-point-checkpoint",
                         checkpointLogRecoveryOffsets _,
                         delay = InitialTaskDelayMs,
                         period = flushRecoveryOffsetCheckpointMs,
                         TimeUnit.MILLISECONDS)
      // 4. 启动 kafka-log-start-offset-checkpoint 周期性任务,更新 kafka-log-start-offset-checkpoint 文件
      scheduler.schedule("kafka-log-start-offset-checkpoint",
                         checkpointLogStartOffsets _,
                         delay = InitialTaskDelayMs,
                         period = flushStartOffsetCheckpointMs,
                         TimeUnit.MILLISECONDS)
      // 5. 启动 kafka-delete-logs 周期性任务,删除标记为需要被删除的 log 目录
      scheduler.schedule("kafka-delete-logs", // will be rescheduled after each delete logs with a dynamic period
                         deleteLogs _,
                         delay = InitialTaskDelayMs,
                         unit = TimeUnit.MILLISECONDS)
    }
    if (cleanerConfig.enableCleaner)
      cleaner.startup()
  }

cleanupLogs任务

        先看cleanupLogs任务,该方法会遍历所有的 Log 对象,并从两个维度对执行清理工作:一个是时间维度,即保证 Log 对象中所有的 LogSegment 都是有效的,对于过期的 LogSegment 执行删除操作;另外一个是空间维度,既保证 Log 对象不应过大,对于超出的部分会执行删除操作。

  def cleanupLogs(): Unit = {
    debug("Beginning log cleanup...")
    var total = 0
    val startMs = time.milliseconds

    // clean current logs.
    // 遍历处理每个 topic 分区对应的 Log 对象,只有对应 Log 配置了 cleanup.policy=delete 才会执行删除
    val deletableLogs = {
      if (cleaner != null) {
        // prevent cleaner from working on same partitions when changing cleanup policy
        cleaner.pauseCleaningForNonCompactedPartitions()
      } else {
        currentLogs.filter {
          case (_, log) => !log.config.compact
        }
      }
    }

    try {
      deletableLogs.foreach {
        case (topicPartition, log) =>
          debug(s"Garbage collecting '${log.name}'")
          // 遍历删除当前 Log 对象中过期的 LogSegment 对象,并保证 Log 的大小在允许范围内(对应 retention.bytes 配置)
          total += log.deleteOldSegments()

          val futureLog = futureLogs.get(topicPartition)
          if (futureLog != null) {
            // clean future logs
            debug(s"Garbage collecting future log '${futureLog.name}'")
            total += futureLog.deleteOldSegments()
          }
      }
    } finally {
      if (cleaner != null) {
        cleaner.resumeCleaning(deletableLogs.map(_._1))
      }
    }

    debug(s"Log cleanup completed. $total files deleted in " +
                  (time.milliseconds - startMs) / 1000 + " seconds")
  }
def deleteOldSegments(): Int = {
    if (config.delete) {
        // 根据时间判断,根据空间判断,根据baseOffset < logStartOffset判断。
      deleteRetentionMsBreachedSegments() + deleteRetentionSizeBreachedSegments() + deleteLogStartOffsetBreachedSegments()
    } else {
      deleteLogStartOffsetBreachedSegments()
    }
  }

这三个方法,到最后,都会调用带参数版的 deleteOldSegments 方法

private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: String): Int = {
    lock synchronized {
      val deletable = deletableSegments(predicate)
      if (deletable.nonEmpty)
        info(s"Found deletable segments with base offsets [${deletable.map(_.baseOffset).mkString(",")}] due to $reason")
      deleteSegments(deletable)
    }
  }

该方法只有两个步骤:使用传入的函数计算哪些日志段对象能够被删除;调用 deleteSegments 方法删除这些日志段。

  protected def deletableSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean): Iterable[LogSegment] = {
    if (segments.isEmpty) {
      // 如果当前压根就没有任何日志段对象,直接返回
      Seq.empty
    } else {
      val deletable = ArrayBuffer.empty[LogSegment]
      var segmentEntry = segments.firstEntry
      // 从具有最小起始位移值的日志段对象开始遍历,直到满足以下条件之一便停止遍历:
      // 1. 测定条件函数predicate = false
      // 2. 扫描到包含Log对象高水位值所在的日志段对象
      // 3. 最新的日志段对象不包含任何消息
      // 最新日志段对象是segments中Key值最大对应的那个日志段,也就是我们常说的Active Segment。完全为空的Active Segment如果被允许删除,后面还要重建它,故代码这里不允许删除大小为空的Active Segment。
      // 在遍历过程中,同时不满足以上3个条件的所有日志段都是可以被删除的!
      while (segmentEntry != null) {
        val segment = segmentEntry.getValue
        val nextSegmentEntry = segments.higherEntry(segmentEntry.getKey)
        // 判断segments比当前segmentsEntry大的第一个segments
        val (nextSegment, upperBoundOffset, isLastSegmentAndEmpty) = if (nextSegmentEntry != null)
          (nextSegmentEntry.getValue, nextSegmentEntry.getValue.baseOffset, false)
        else
          (null, logEndOffset, segment.size == 0)
        // 高水位比当前segments的下个segments的baseOffset大。
        // 如果是最后一个segments,且大小是0,则不删除
        if (highWatermark >= upperBoundOffset && predicate(segment, Option(nextSegment)) && !isLastSegmentAndEmpty) {
          deletable += segment
          segmentEntry = nextSegmentEntry
        } else {
          segmentEntry = null
        }
      }
      deletable
    }
  }

最后是 deleteSegments 方法,这个方法执行真正的日志段删除操作。

  protected def deleteSegments(deletable: Iterable[LogSegment]): Int = {
    maybeHandleIOException(s"Error while deleting segments for $topicPartition in dir ${dir.getParent}") {
      val numToDelete = deletable.size
      if (numToDelete > 0) {
        // 不允许删除所有日志段对象。如果一定要做,先创建出一个新的来,然后再把前面N个删掉
        // we must always have at least one segment, so if we are going to delete all the segments, create a new one first
        if (segments.size == numToDelete)
          roll()
        lock synchronized {
          // 确保Log对象没有被关闭
          checkIfMemoryMappedBufferClosed()
          // remove the segments for lookups
          removeAndDeleteSegments(deletable, asyncDelete = true)
          // 尝试更新日志的Log Start Offset值
          // 如果我们删除了日志段对象,很有可能对外可见消息的范围发生了变化,自然要看一下是否需要更新 Log Start Offset 值。这就是 deleteSegments 方法最后要更新 Log Start Offset 值的原因。
          maybeIncrementLogStartOffset(segments.firstEntry.getValue.baseOffset)
        }
      }
      numToDelete
    }
  }

kafka-log-flusher 任务

用于定期对日志文件执行刷盘操作。相关逻辑实现位于 LogManager#flushDirtyLogs 方法中,该方法会遍历处理每个 topic 分区对应的 Log 对象,通过记录在 Log 对象中的上次执行 flush 的时间戳与当前时间对比,如果时间差值超过一定的阈值(对应 flush.ms 配置),则调用 Log#flush 方法执行刷盘操作。

  private def flushDirtyLogs(): Unit = {
    debug("Checking for dirty logs to flush...")

    for ((topicPartition, log) <- currentLogs.toList ++ futureLogs.toList) {
      try {
        val timeSinceLastFlush = time.milliseconds - log.lastFlushTime
        debug(s"Checking if flush is needed on ${topicPartition.topic} flush interval ${log.config.flushMs}" +
              s" last flushed ${log.lastFlushTime} time since last flush: $timeSinceLastFlush")
        if(timeSinceLastFlush >= log.config.flushMs)
          log.flush
      } catch {
        case e: Throwable =>
          error(s"Error flushing topic ${topicPartition.topic}", e)
      }
    }
  }

kafka-recovery-point-checkpoint 任务

该任务用于定期更新每个 log 目录名下的 recovery-point-offset-checkpoint 文件,具体在checkpointRecoveryPointOffsets 中。

  def checkpointLogRecoveryOffsets(): Unit = {
    logsByDir.foreach { case (dir, partitionToLogMap) =>
      liveLogDirs.find(_.getAbsolutePath.equals(dir)).foreach { f =>
        checkpointRecoveryOffsetsAndCleanSnapshot(f, partitionToLogMap.values.toSeq)
      }
    }
  }
  private[log] def checkpointRecoveryOffsetsAndCleanSnapshot(dir: File, logsToCleanSnapshot: Seq[Log]): Unit = {
    try {
      checkpointLogRecoveryOffsetsInDir(dir)
      logsToCleanSnapshot.foreach(_.deleteSnapshotsAfterRecoveryPointCheckpoint())
    } catch {
      case e: IOException =>
        logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while writing to recovery point " +
          s"file in directory $dir", e)
    }
  }
  private def checkpointLogRecoveryOffsetsInDir(dir: File): Unit = {
    for {
      // 获取指定 log 目录对应的 Map[TopicPartition, Log] 集合
      partitionToLog <- logsByDir.get(dir.getAbsolutePath)
      checkpoint <- recoveryPointCheckpoints.get(dir)
    } {
      // 更新对应的 recovery-point-offset-checkpoint 文件
      checkpoint.write(partitionToLog.map { case (tp, log) => tp -> log.recoveryPoint })
    }
  }

kafka-log-start-offset-checkpoint 任务

更新 kafka-log-start-offset-checkpoint 文件

  private def checkpointLogStartOffsetsInDir(dir: File): Unit = {
    for {
      partitionToLog <- logsByDir.get(dir.getAbsolutePath)
      // 取出logStartOffset
      checkpoint <- logStartOffsetCheckpoints.get(dir)
    } {
      try {
        val logStartOffsets = partitionToLog.collect {
          case (k, log) if log.logStartOffset > log.logSegments.head.baseOffset => k -> log.logStartOffset
        }
        checkpoint.write(logStartOffsets)
      } catch {
        case e: IOException =>
          logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while writing to logStartOffset file in directory $dir", e)
      }
    }
  }

kafka-delete-logs 任务

周期检测删除标记为需要被删除的 log 目录

  /**
   *  Delete logs marked for deletion. Delete all logs for which `currentDefaultConfig.fileDeleteDelayMs`
   *  has elapsed after the delete was scheduled. Logs for which this interval has not yet elapsed will be
   *  considered for deletion in the next iteration of `deleteLogs`. The next iteration will be executed
   *  after the remaining time for the first log that is not deleted. If there are no more `logsToBeDeleted`,
   *  `deleteLogs` will be executed after `currentDefaultConfig.fileDeleteDelayMs`.
    *  在 LogManager 启动时(对应 LogManager#startup 方法)会注册一个名为 kafka-delete-logs 的周期性任务,
    *  该任务会周期性调用 LogManager#deleteLogs 方法对标记为“-delete”的目录执行删除操作。
   */
  private def deleteLogs(): Unit = {
    var nextDelayMs = 0L
    try {
      def nextDeleteDelayMs: Long = {
        if (!logsToBeDeleted.isEmpty) {
          val (_, scheduleTimeMs) = logsToBeDeleted.peek()
          scheduleTimeMs + currentDefaultConfig.fileDeleteDelayMs - time.milliseconds()
        } else
          currentDefaultConfig.fileDeleteDelayMs
      }
      // 如果存在需要删除的目录
      while ({nextDelayMs = nextDeleteDelayMs; nextDelayMs <= 0}) {
        val (removedLog, _) = logsToBeDeleted.take()
        if (removedLog != null) {
          try {
            removedLog.delete()
            info(s"Deleted log for partition ${removedLog.topicPartition} in ${removedLog.dir.getAbsolutePath}.")
          } catch {
            case e: KafkaStorageException =>
              error(s"Exception while deleting $removedLog in dir ${removedLog.dir.getParent}.", e)
          }
        }
      }
    } catch {
      case e: Throwable =>
        error(s"Exception in kafka-delete-logs thread.", e)
    } finally {
      try {
        scheduler.schedule("kafka-delete-logs",
          deleteLogs _,
          delay = nextDelayMs,
          unit = TimeUnit.MILLISECONDS)
      } catch {
        case e: Throwable =>
          if (scheduler.isStarted) {
            // No errors should occur unless scheduler has been shutdown
            error(s"Failed to schedule next delete in kafka-delete-logs thread", e)
          }
      }
    }
  }

 

相关标签: kafka-2.4