본문 바로가기
Dev/Kafka

Kafka 2.1 이상에서 Kafka Offset Monitor 사용하기 - Unknown offset schema version 3 해결

by 돈코츠라멘 2019. 12. 26.
2019-12-19 14:52:58 ERROR KafkaOffsetGetter$:103 - The message was malformed and does not conform to a type of (BaseKey, OffsetAndMetadata. Ignoring this message.
kafka.common.KafkaException: Unknown offset schema version 3
        at kafka.coordinator.GroupMetadataManager$.schemaForOffset(GroupMetadataManager.scala:739)
        at kafka.coordinator.GroupMetadataManager$.readOffsetMessageValue(GroupMetadataManager.scala:884)
        at com.quantifind.kafka.core.KafkaOffsetGetter$.tryParseOffsetMessage(KafkaOffsetGetter.scala:277)
        at com.quantifind.kafka.core.KafkaOffsetGetter$.startCommittedOffsetListener(KafkaOffsetGetter.scala:351)
        at com.quantifind.kafka.OffsetGetter$$anon$3.run(OffsetGetter.scala:289)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Kafka Offset Monitor 개발팀이 요즘 일을 안하시는지 Kafka 2.1 버전 이상에서는 위와 같은 에러가 발생한다.

 

 

여기서 넘어가지 못함 ㅠㅠㅠㅠㅠㅠㅠㅠ

 

Offset schema를 가져오지 못하기 때문에 당연히 모니터링도 불가능하다.

 

 

결국 소스코드 수정

Kafka Offset Monitor로 모니터링이 너무 하고싶은 관계로 소스코드 수정에 도전한다.

git clone

$ git clone https://github.com/quantifind/KafkaOffsetMonitor.git

KafkaOffsetGetter 수정

com.quantifind.kafka.core.KafkaOffsetGetter 클래스를 아래 코드로 변경한다. 그냥 복사해서 덮어써버려도 된다.

package com.quantifind.kafka.core

import java.nio.ByteBuffer

import com.quantifind.kafka.OffsetGetter
import com.quantifind.kafka.OffsetGetter.OffsetInfo
import com.quantifind.utils.ZkUtilsWrapper
import com.twitter.util.Time
import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo}
import kafka.common.{OffsetAndMetadata, TopicAndPartition}
import kafka.consumer.{ConsumerConnector, KafkaStream}
import kafka.message.MessageAndMetadata
import kafka.utils.Logging
import org.I0Itec.zkclient.ZkClient

import scala.collection._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.control.NonFatal

class KafkaOffsetGetter(theZkClient: ZkClient, zkUtils: ZkUtilsWrapper = new ZkUtilsWrapper) extends OffsetGetter {

  import KafkaOffsetGetter._

  override val zkClient = theZkClient

  override def processPartition(group: String, topic: String, pid: Int): Option[OffsetInfo] = {
    try {
      zkUtils.getLeaderForPartition(zkClient, topic, pid) match {
        case Some(bid) =>
          val consumerOpt = consumerMap.getOrElseUpdate(bid, getConsumer(bid))
          consumerOpt flatMap { consumer =>
            val topicAndPartition = TopicAndPartition(topic, pid)
            offsetMap.get(GroupTopicPartition(group, topicAndPartition)) map { offsetMetaData =>
              val request = OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
              val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
              OffsetInfo(group = group,
                topic = topic,
                partition = pid,
                offset = offsetMetaData.offset,
                logSize = logSize,
                owner = Some("NA"),
                creation = Time.fromMilliseconds(offsetMetaData.timestamp),
                modified = Time.fromMilliseconds(offsetMetaData.timestamp))
            }
          }
        case None =>
          error("No broker for partition %s - %s".format(topic, pid))
          None
      }
    } catch {
      case NonFatal(t) =>
        error(s"Could not parse partition info. group: [$group] topic: [$topic]", t)
        None
    }
  }

  override def getGroups: Seq[String] = {
    topicAndGroups.groupBy(_.group).keySet.toSeq
  }

  override def getTopicList(group: String): List[String] = {
    topicAndGroups.filter(_.group == group).groupBy(_.topic).keySet.toList
  }

  override def getTopicMap: Map[String, scala.Seq[String]] = {
    topicAndGroups.groupBy(_.topic).mapValues(_.map(_.group).toSeq)
  }

  override def getActiveTopicMap: Map[String, Seq[String]] = {
    getTopicMap
  }
}

object KafkaOffsetGetter extends Logging {
  val ConsumerOffsetTopic = "__consumer_offsets"
  val offsetMap: mutable.Map[GroupTopicPartition, OffsetAndMetadata] = mutable.HashMap()
  val topicAndGroups: mutable.Set[TopicAndGroup] = mutable.HashSet()

  def startOffsetListener(consumerConnector: ConsumerConnector) = {
    Future {
      try {
        logger.info("Staring Kafka offset topic listener")
        val offsetMsgStream: KafkaStream[Array[Byte], Array[Byte]] = consumerConnector
          .createMessageStreams(Map(ConsumerOffsetTopic -> 1))
          .get(ConsumerOffsetTopic).map(_.head) match {
          case Some(s) => s
          case None => throw new IllegalStateException("Cannot create a consumer stream on offset topic ")
        }
        val it = offsetMsgStream.iterator()
        while (true) {
          try {
            val offsetMsg: MessageAndMetadata[Array[Byte], Array[Byte]] = it.next()
            val commitKey: GroupTopicPartition = readMessageKey(ByteBuffer.wrap(offsetMsg.key()))
            val commitValue: OffsetAndMetadata = readMessageValue(ByteBuffer.wrap(offsetMsg.message()))
            info("Processed commit message: " + commitKey + " => " + commitValue)
            offsetMap += (commitKey -> commitValue)
            topicAndGroups += TopicAndGroup(commitKey.topicPartition.topic, commitKey.group)
            info(s"topicAndGroups = $topicAndGroups")
          } catch {
            case e: RuntimeException =>
              // sometimes offsetMsg.key() || offsetMsg.message() throws NPE
              warn("Failed to process one of the commit message due to exception. The 'bad' message will be skipped", e)
          }
        }
      } catch {
        case e: Throwable =>
          fatal("Offset topic listener aborted dur to unexpected exception", e)
          System.exit(1)
      }
    }
  }

  // massive code stealing from kafka.server.OffsetManager

  import java.nio.ByteBuffer

  import org.apache.kafka.common.protocol.types.Type.{INT32, INT64, STRING}
  import org.apache.kafka.common.protocol.types.{Field, Schema, Struct}

  private case class KeyAndValueSchemas(keySchema: Schema, valueSchema: Schema)

  private val OFFSET_COMMIT_KEY_SCHEMA_V0 = new Schema(new Field("group", STRING),
    new Field("topic", STRING),
    new Field("partition", INT32))
  private val KEY_GROUP_FIELD = OFFSET_COMMIT_KEY_SCHEMA_V0.get("group")
  private val KEY_TOPIC_FIELD = OFFSET_COMMIT_KEY_SCHEMA_V0.get("topic")
  private val KEY_PARTITION_FIELD = OFFSET_COMMIT_KEY_SCHEMA_V0.get("partition")
  private val OFFSET_COMMIT_VALUE_SCHEMA_V0 = new Schema(new Field("offset", INT64),
    new Field("metadata", STRING, "Associated metadata.", ""),
    new Field("timestamp", INT64))
  private val OFFSET_COMMIT_VALUE_SCHEMA_V1 = new Schema(new Field("offset", INT64),
    new Field("metadata", STRING, "Associated metadata.", ""),
    new Field("commit_timestamp", INT64),
    new Field("expire_timestamp", INT64))
  private val OFFSET_COMMIT_VALUE_SCHEMA_V2 = new Schema(new Field("offset", INT64),
    new Field("metadata", STRING, "Associated metadata.", ""),
    new Field("commit_timestamp", INT64))
  private val OFFSET_COMMIT_VALUE_SCHEMA_V3 = new Schema(new Field("offset", INT64),
    new Field("leader_epoch", INT32),
    new Field("metadata", STRING, "Associated metadata.", ""),
    new Field("commit_timestamp", INT64))
  private val VALUE_OFFSET_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset")
  private val VALUE_METADATA_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata")
  private val VALUE_TIMESTAMP_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp")
  private val VALUE_OFFSET_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("offset")
  private val VALUE_METADATA_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("metadata")
  private val VALUE_COMMIT_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp")
  private val VALUE_OFFSET_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("offset")
  private val VALUE_METADATA_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata")
  private val VALUE_COMMIT_TIMESTAMP_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp")
  private val VALUE_OFFSET_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("offset")
  private val VALUE_LEADER_EPOCH_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("leader_epoch")
  private val VALUE_METADATA_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("metadata")
  private val VALUE_COMMIT_TIMESTAMP_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("commit_timestamp")
  // private val VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp")
  // map of versions to schemas
  private val OFFSET_SCHEMAS = Map(0 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V0),
    1 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V1),
    2 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V2),
    3 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V3))

  private def schemaFor(version: Int) = {
    val schemaOpt = OFFSET_SCHEMAS.get(version)
    schemaOpt match {
      case Some(schema) => schema
      case _ => throw new RuntimeException("Unknown offset schema version " + version)
    }
  }

  case class MessageValueStructAndVersion(value: Struct, version: Short)

  case class TopicAndGroup(topic: String, group: String)

  case class GroupTopicPartition(group: String, topicPartition: TopicAndPartition) {

    def this(group: String, topic: String, partition: Int) =
      this(group, new TopicAndPartition(topic, partition))

    override def toString =
      "[%s,%s,%d]".format(group, topicPartition.topic, topicPartition.partition)
  }

  /**
   * Decodes the offset messages' key
   *
   * @param buffer input byte-buffer
   * @return an GroupTopicPartition object
   */
  private def readMessageKey(buffer: ByteBuffer): GroupTopicPartition = {
    val version = buffer.getShort()
    val keySchema = schemaFor(version).keySchema
    val key = keySchema.read(buffer).asInstanceOf[Struct]

    val group = key.get(KEY_GROUP_FIELD).asInstanceOf[String]
    val topic = key.get(KEY_TOPIC_FIELD).asInstanceOf[String]
    val partition = key.get(KEY_PARTITION_FIELD).asInstanceOf[Int]

    GroupTopicPartition(group, TopicAndPartition(topic, partition))
  }

  /**
   * Decodes the offset messages' payload and retrieves offset and metadata from it
   *
   * @param buffer input byte-buffer
   * @return an offset-metadata object from the message
   */
  private def readMessageValue(buffer: ByteBuffer): OffsetAndMetadata = {
    val structAndVersion = readMessageValueStruct(buffer)

    if (structAndVersion.value == null) { // tombstone
      null
    } else {
      if (structAndVersion.version == 0) {
        val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V0).asInstanceOf[Long]
        val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V0).asInstanceOf[String]
        val timestamp = structAndVersion.value.get(VALUE_TIMESTAMP_FIELD_V0).asInstanceOf[Long]

        OffsetAndMetadata(offset, metadata, timestamp)
      } else if (structAndVersion.version == 1) {
        val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V1).asInstanceOf[Long]
        val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V1).asInstanceOf[String]
        val commitTimestamp = structAndVersion.value.get(VALUE_COMMIT_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
        // not supported in 0.8.2
        // val expireTimestamp = structAndVersion.value.get(VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
        OffsetAndMetadata(offset, metadata, commitTimestamp)
      } else if (structAndVersion.version == 2) {
        val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V2).asInstanceOf[Long]
        val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V2).asInstanceOf[String]
        val commitTimestamp = structAndVersion.value.get(VALUE_COMMIT_TIMESTAMP_FIELD_V2).asInstanceOf[Long]
        // not supported in 0.8.2
        // val expireTimestamp = structAndVersion.value.get(VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
        OffsetAndMetadata(offset, metadata, commitTimestamp)
      } else if (structAndVersion.version == 3) {
        val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V3).asInstanceOf[Long]
        val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V3).asInstanceOf[String]
        val commitTimestamp = structAndVersion.value.get(VALUE_COMMIT_TIMESTAMP_FIELD_V3).asInstanceOf[Long]
        // not supported in 0.8.2
        // val expireTimestamp = structAndVersion.value.get(VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
        OffsetAndMetadata(offset, metadata, commitTimestamp)
      } else {
        throw new IllegalStateException("Unknown offset message version: " + structAndVersion.version)
      }
    }
  }

  private def readMessageValueStruct(buffer: ByteBuffer): MessageValueStructAndVersion = {
    if (buffer == null) { // tombstone
      MessageValueStructAndVersion(null, -1)
    } else {
      val version = buffer.getShort()
      val valueSchema = schemaFor(version).valueSchema
      val value = valueSchema.read(buffer).asInstanceOf[Struct]

      MessageValueStructAndVersion(value, version)
    }
  }
}

그리고 왜인지는 모르겠지만 dependency들의 버전이 잘 안맞아서 Build 파일을 아래와 같이 수정하였다.

import sbt._
import Keys._
import sbtassembly.Plugin._
import AssemblyKeys._

object KafkaUtilsBuild extends Build {

  def sharedSettings = Defaults.defaultSettings ++ assemblySettings ++ Seq(
    version := "0.3.0-SNAPSHOT",
    scalaVersion := "2.10.3",
    organization := "com.quantifind",
    scalacOptions := Seq("-deprecation", "-unchecked", "-optimize"),
    unmanagedJars in Compile <<= baseDirectory map { base => (base / "lib" ** "*.jar").classpath },
    retrieveManaged := true,
    transitiveClassifiers in Scope.GlobalScope := Seq("sources"),
    resolvers ++= Seq(
      "sonatype-snapshots" at "http://oss.sonatype.org/content/repositories/snapshots",
      "sonatype-releases" at "http://oss.sonatype.org/content/repositories/releases",
      "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
      "JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/"),
    libraryDependencies ++= Seq(
      "log4j" % "log4j" % "1.2.17",
      "org.scalatest" %% "scalatest" % "2.2.4" % "test",
      "org.mockito" % "mockito-all" % "1.10.19" % "test",
      "org.apache.kafka" %% "kafka" % "0.8.2.1"))

  val slf4jVersion = "1.6.1"

  //offsetmonitor project

  lazy val offsetmonitor = Project("offsetmonitor", file("."), settings = offsetmonSettings)

  def offsetmonSettings = sharedSettings ++ Seq(
    mergeStrategy in assembly := {
     case "about.html"                                => MergeStrategy.discard
     case x =>
        val oldStrategy = (mergeStrategy in assembly).value
            oldStrategy(x)
    },
    name := "KafkaOffsetMonitor",
    libraryDependencies ++= Seq(
      "net.databinder" %% "unfiltered-filter" % "0.8.4",
      "net.databinder" %% "unfiltered-jetty" % "0.8.4",
      "net.databinder" %% "unfiltered-json4s" % "0.8.4",
      "com.quantifind" %% "sumac" % "0.3.0",
      "com.typesafe.slick" %% "slick" % "2.0.0",
      "org.xerial" % "sqlite-jdbc" % "3.7.2",
      "com.twitter" % "util-core_2.10" % "6.1.0",
      "org.reflections" % "reflections" % "0.9.10"),
    resolvers ++= Seq(
      "java m2" at "http://download.java.net/maven/2",
      "twitter repo" at "http://maven.twttr.com"))
}

build

$ sbt assembly

assembly 명령을 실행하면 target/ 이하에 jar 파일이 생성된다.

Test

$ java -cp KafkaOffsetMonitor-assembly-0.3.0-SNAPSHOT.jar \
        com.quantifind.kafka.offsetapp.OffsetGetterWeb \
     --offsetStorage kafka \
     --zk host1:19941,host2:19941 \
     --port 19943 \
     --refresh 1.seconds \

 

 

jar를 서버에 올려서 실행하고 사이트에 접속한다. 이제 에러 없이 모니터링이 가능하다.

 

 

 

(+) 추가

jar 파일 첨부 원하시는 분들이 계셔서 본문에 파일 첨부합니다.

티스토리 파일 첨부 용량이 10MB밖에 안 돼서 분할 압축했습니다😁

 

KafkaOffsetMonitor.z04
10.00MB
KafkaOffsetMonitor.zip
2.50MB
KafkaOffsetMonitor.z01
10.00MB
KafkaOffsetMonitor.z02
10.00MB
KafkaOffsetMonitor.z03
10.00MB

댓글