2019-12-19 14:52:58 ERROR KafkaOffsetGetter$:103 - The message was malformed and does not conform to a type of (BaseKey, OffsetAndMetadata. Ignoring this message.
kafka.common.KafkaException: Unknown offset schema version 3
at kafka.coordinator.GroupMetadataManager$.schemaForOffset(GroupMetadataManager.scala:739)
at kafka.coordinator.GroupMetadataManager$.readOffsetMessageValue(GroupMetadataManager.scala:884)
at com.quantifind.kafka.core.KafkaOffsetGetter$.tryParseOffsetMessage(KafkaOffsetGetter.scala:277)
at com.quantifind.kafka.core.KafkaOffsetGetter$.startCommittedOffsetListener(KafkaOffsetGetter.scala:351)
at com.quantifind.kafka.OffsetGetter$$anon$3.run(OffsetGetter.scala:289)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Kafka Offset Monitor 개발팀이 요즘 일을 안하시는지 Kafka 2.1 버전 이상에서는 위와 같은 에러가 발생한다.
Offset schema를 가져오지 못하기 때문에 당연히 모니터링도 불가능하다.
결국 소스코드 수정
Kafka Offset Monitor로 모니터링이 너무 하고싶은 관계로 소스코드 수정에 도전한다.
git clone
- github url: https://github.com/quantifind/KafkaOffsetMonitor
- git: https://github.com/quantifind/KafkaOffsetMonitor.git
$ git clone https://github.com/quantifind/KafkaOffsetMonitor.git
KafkaOffsetGetter 수정
com.quantifind.kafka.core.KafkaOffsetGetter
클래스를 아래 코드로 변경한다. 그냥 복사해서 덮어써버려도 된다.
package com.quantifind.kafka.core
import java.nio.ByteBuffer
import com.quantifind.kafka.OffsetGetter
import com.quantifind.kafka.OffsetGetter.OffsetInfo
import com.quantifind.utils.ZkUtilsWrapper
import com.twitter.util.Time
import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo}
import kafka.common.{OffsetAndMetadata, TopicAndPartition}
import kafka.consumer.{ConsumerConnector, KafkaStream}
import kafka.message.MessageAndMetadata
import kafka.utils.Logging
import org.I0Itec.zkclient.ZkClient
import scala.collection._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.control.NonFatal
class KafkaOffsetGetter(theZkClient: ZkClient, zkUtils: ZkUtilsWrapper = new ZkUtilsWrapper) extends OffsetGetter {
import KafkaOffsetGetter._
override val zkClient = theZkClient
override def processPartition(group: String, topic: String, pid: Int): Option[OffsetInfo] = {
try {
zkUtils.getLeaderForPartition(zkClient, topic, pid) match {
case Some(bid) =>
val consumerOpt = consumerMap.getOrElseUpdate(bid, getConsumer(bid))
consumerOpt flatMap { consumer =>
val topicAndPartition = TopicAndPartition(topic, pid)
offsetMap.get(GroupTopicPartition(group, topicAndPartition)) map { offsetMetaData =>
val request = OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
OffsetInfo(group = group,
topic = topic,
partition = pid,
offset = offsetMetaData.offset,
logSize = logSize,
owner = Some("NA"),
creation = Time.fromMilliseconds(offsetMetaData.timestamp),
modified = Time.fromMilliseconds(offsetMetaData.timestamp))
}
}
case None =>
error("No broker for partition %s - %s".format(topic, pid))
None
}
} catch {
case NonFatal(t) =>
error(s"Could not parse partition info. group: [$group] topic: [$topic]", t)
None
}
}
override def getGroups: Seq[String] = {
topicAndGroups.groupBy(_.group).keySet.toSeq
}
override def getTopicList(group: String): List[String] = {
topicAndGroups.filter(_.group == group).groupBy(_.topic).keySet.toList
}
override def getTopicMap: Map[String, scala.Seq[String]] = {
topicAndGroups.groupBy(_.topic).mapValues(_.map(_.group).toSeq)
}
override def getActiveTopicMap: Map[String, Seq[String]] = {
getTopicMap
}
}
object KafkaOffsetGetter extends Logging {
val ConsumerOffsetTopic = "__consumer_offsets"
val offsetMap: mutable.Map[GroupTopicPartition, OffsetAndMetadata] = mutable.HashMap()
val topicAndGroups: mutable.Set[TopicAndGroup] = mutable.HashSet()
def startOffsetListener(consumerConnector: ConsumerConnector) = {
Future {
try {
logger.info("Staring Kafka offset topic listener")
val offsetMsgStream: KafkaStream[Array[Byte], Array[Byte]] = consumerConnector
.createMessageStreams(Map(ConsumerOffsetTopic -> 1))
.get(ConsumerOffsetTopic).map(_.head) match {
case Some(s) => s
case None => throw new IllegalStateException("Cannot create a consumer stream on offset topic ")
}
val it = offsetMsgStream.iterator()
while (true) {
try {
val offsetMsg: MessageAndMetadata[Array[Byte], Array[Byte]] = it.next()
val commitKey: GroupTopicPartition = readMessageKey(ByteBuffer.wrap(offsetMsg.key()))
val commitValue: OffsetAndMetadata = readMessageValue(ByteBuffer.wrap(offsetMsg.message()))
info("Processed commit message: " + commitKey + " => " + commitValue)
offsetMap += (commitKey -> commitValue)
topicAndGroups += TopicAndGroup(commitKey.topicPartition.topic, commitKey.group)
info(s"topicAndGroups = $topicAndGroups")
} catch {
case e: RuntimeException =>
// sometimes offsetMsg.key() || offsetMsg.message() throws NPE
warn("Failed to process one of the commit message due to exception. The 'bad' message will be skipped", e)
}
}
} catch {
case e: Throwable =>
fatal("Offset topic listener aborted dur to unexpected exception", e)
System.exit(1)
}
}
}
// massive code stealing from kafka.server.OffsetManager
import java.nio.ByteBuffer
import org.apache.kafka.common.protocol.types.Type.{INT32, INT64, STRING}
import org.apache.kafka.common.protocol.types.{Field, Schema, Struct}
private case class KeyAndValueSchemas(keySchema: Schema, valueSchema: Schema)
private val OFFSET_COMMIT_KEY_SCHEMA_V0 = new Schema(new Field("group", STRING),
new Field("topic", STRING),
new Field("partition", INT32))
private val KEY_GROUP_FIELD = OFFSET_COMMIT_KEY_SCHEMA_V0.get("group")
private val KEY_TOPIC_FIELD = OFFSET_COMMIT_KEY_SCHEMA_V0.get("topic")
private val KEY_PARTITION_FIELD = OFFSET_COMMIT_KEY_SCHEMA_V0.get("partition")
private val OFFSET_COMMIT_VALUE_SCHEMA_V0 = new Schema(new Field("offset", INT64),
new Field("metadata", STRING, "Associated metadata.", ""),
new Field("timestamp", INT64))
private val OFFSET_COMMIT_VALUE_SCHEMA_V1 = new Schema(new Field("offset", INT64),
new Field("metadata", STRING, "Associated metadata.", ""),
new Field("commit_timestamp", INT64),
new Field("expire_timestamp", INT64))
private val OFFSET_COMMIT_VALUE_SCHEMA_V2 = new Schema(new Field("offset", INT64),
new Field("metadata", STRING, "Associated metadata.", ""),
new Field("commit_timestamp", INT64))
private val OFFSET_COMMIT_VALUE_SCHEMA_V3 = new Schema(new Field("offset", INT64),
new Field("leader_epoch", INT32),
new Field("metadata", STRING, "Associated metadata.", ""),
new Field("commit_timestamp", INT64))
private val VALUE_OFFSET_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset")
private val VALUE_METADATA_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata")
private val VALUE_TIMESTAMP_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp")
private val VALUE_OFFSET_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("offset")
private val VALUE_METADATA_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("metadata")
private val VALUE_COMMIT_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp")
private val VALUE_OFFSET_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("offset")
private val VALUE_METADATA_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata")
private val VALUE_COMMIT_TIMESTAMP_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp")
private val VALUE_OFFSET_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("offset")
private val VALUE_LEADER_EPOCH_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("leader_epoch")
private val VALUE_METADATA_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("metadata")
private val VALUE_COMMIT_TIMESTAMP_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("commit_timestamp")
// private val VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp")
// map of versions to schemas
private val OFFSET_SCHEMAS = Map(0 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V0),
1 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V1),
2 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V2),
3 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V3))
private def schemaFor(version: Int) = {
val schemaOpt = OFFSET_SCHEMAS.get(version)
schemaOpt match {
case Some(schema) => schema
case _ => throw new RuntimeException("Unknown offset schema version " + version)
}
}
case class MessageValueStructAndVersion(value: Struct, version: Short)
case class TopicAndGroup(topic: String, group: String)
case class GroupTopicPartition(group: String, topicPartition: TopicAndPartition) {
def this(group: String, topic: String, partition: Int) =
this(group, new TopicAndPartition(topic, partition))
override def toString =
"[%s,%s,%d]".format(group, topicPartition.topic, topicPartition.partition)
}
/**
* Decodes the offset messages' key
*
* @param buffer input byte-buffer
* @return an GroupTopicPartition object
*/
private def readMessageKey(buffer: ByteBuffer): GroupTopicPartition = {
val version = buffer.getShort()
val keySchema = schemaFor(version).keySchema
val key = keySchema.read(buffer).asInstanceOf[Struct]
val group = key.get(KEY_GROUP_FIELD).asInstanceOf[String]
val topic = key.get(KEY_TOPIC_FIELD).asInstanceOf[String]
val partition = key.get(KEY_PARTITION_FIELD).asInstanceOf[Int]
GroupTopicPartition(group, TopicAndPartition(topic, partition))
}
/**
* Decodes the offset messages' payload and retrieves offset and metadata from it
*
* @param buffer input byte-buffer
* @return an offset-metadata object from the message
*/
private def readMessageValue(buffer: ByteBuffer): OffsetAndMetadata = {
val structAndVersion = readMessageValueStruct(buffer)
if (structAndVersion.value == null) { // tombstone
null
} else {
if (structAndVersion.version == 0) {
val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V0).asInstanceOf[Long]
val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V0).asInstanceOf[String]
val timestamp = structAndVersion.value.get(VALUE_TIMESTAMP_FIELD_V0).asInstanceOf[Long]
OffsetAndMetadata(offset, metadata, timestamp)
} else if (structAndVersion.version == 1) {
val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V1).asInstanceOf[Long]
val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V1).asInstanceOf[String]
val commitTimestamp = structAndVersion.value.get(VALUE_COMMIT_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
// not supported in 0.8.2
// val expireTimestamp = structAndVersion.value.get(VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
OffsetAndMetadata(offset, metadata, commitTimestamp)
} else if (structAndVersion.version == 2) {
val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V2).asInstanceOf[Long]
val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V2).asInstanceOf[String]
val commitTimestamp = structAndVersion.value.get(VALUE_COMMIT_TIMESTAMP_FIELD_V2).asInstanceOf[Long]
// not supported in 0.8.2
// val expireTimestamp = structAndVersion.value.get(VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
OffsetAndMetadata(offset, metadata, commitTimestamp)
} else if (structAndVersion.version == 3) {
val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V3).asInstanceOf[Long]
val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V3).asInstanceOf[String]
val commitTimestamp = structAndVersion.value.get(VALUE_COMMIT_TIMESTAMP_FIELD_V3).asInstanceOf[Long]
// not supported in 0.8.2
// val expireTimestamp = structAndVersion.value.get(VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
OffsetAndMetadata(offset, metadata, commitTimestamp)
} else {
throw new IllegalStateException("Unknown offset message version: " + structAndVersion.version)
}
}
}
private def readMessageValueStruct(buffer: ByteBuffer): MessageValueStructAndVersion = {
if (buffer == null) { // tombstone
MessageValueStructAndVersion(null, -1)
} else {
val version = buffer.getShort()
val valueSchema = schemaFor(version).valueSchema
val value = valueSchema.read(buffer).asInstanceOf[Struct]
MessageValueStructAndVersion(value, version)
}
}
}
그리고 왜인지는 모르겠지만 dependency들의 버전이 잘 안맞아서 Build 파일을 아래와 같이 수정하였다.
import sbt._
import Keys._
import sbtassembly.Plugin._
import AssemblyKeys._
object KafkaUtilsBuild extends Build {
def sharedSettings = Defaults.defaultSettings ++ assemblySettings ++ Seq(
version := "0.3.0-SNAPSHOT",
scalaVersion := "2.10.3",
organization := "com.quantifind",
scalacOptions := Seq("-deprecation", "-unchecked", "-optimize"),
unmanagedJars in Compile <<= baseDirectory map { base => (base / "lib" ** "*.jar").classpath },
retrieveManaged := true,
transitiveClassifiers in Scope.GlobalScope := Seq("sources"),
resolvers ++= Seq(
"sonatype-snapshots" at "http://oss.sonatype.org/content/repositories/snapshots",
"sonatype-releases" at "http://oss.sonatype.org/content/repositories/releases",
"Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
"JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/"),
libraryDependencies ++= Seq(
"log4j" % "log4j" % "1.2.17",
"org.scalatest" %% "scalatest" % "2.2.4" % "test",
"org.mockito" % "mockito-all" % "1.10.19" % "test",
"org.apache.kafka" %% "kafka" % "0.8.2.1"))
val slf4jVersion = "1.6.1"
//offsetmonitor project
lazy val offsetmonitor = Project("offsetmonitor", file("."), settings = offsetmonSettings)
def offsetmonSettings = sharedSettings ++ Seq(
mergeStrategy in assembly := {
case "about.html" => MergeStrategy.discard
case x =>
val oldStrategy = (mergeStrategy in assembly).value
oldStrategy(x)
},
name := "KafkaOffsetMonitor",
libraryDependencies ++= Seq(
"net.databinder" %% "unfiltered-filter" % "0.8.4",
"net.databinder" %% "unfiltered-jetty" % "0.8.4",
"net.databinder" %% "unfiltered-json4s" % "0.8.4",
"com.quantifind" %% "sumac" % "0.3.0",
"com.typesafe.slick" %% "slick" % "2.0.0",
"org.xerial" % "sqlite-jdbc" % "3.7.2",
"com.twitter" % "util-core_2.10" % "6.1.0",
"org.reflections" % "reflections" % "0.9.10"),
resolvers ++= Seq(
"java m2" at "http://download.java.net/maven/2",
"twitter repo" at "http://maven.twttr.com"))
}
build
$ sbt assembly
assembly 명령을 실행하면 target/
이하에 jar 파일이 생성된다.
Test
$ java -cp KafkaOffsetMonitor-assembly-0.3.0-SNAPSHOT.jar \
com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--offsetStorage kafka \
--zk host1:19941,host2:19941 \
--port 19943 \
--refresh 1.seconds \
jar를 서버에 올려서 실행하고 사이트에 접속한다. 이제 에러 없이 모니터링이 가능하다.
(+) 추가
jar 파일 첨부 원하시는 분들이 계셔서 본문에 파일 첨부합니다.
티스토리 파일 첨부 용량이 10MB밖에 안 돼서 분할 압축했습니다😁
'Dev > Kafka' 카테고리의 다른 글
Kafka retention 옵션 - log 보관 주기 설정 (0) | 2020.12.02 |
---|---|
Kafka topic 삭제 - already marked for deletion 해결 (0) | 2020.06.11 |
Kafka Monitoring Tool - Kafka Offset Monitor (0) | 2019.12.26 |
Kafka 용어 정리 (0) | 2019.12.25 |
Kafka 외부 접속 허용하기 (0) | 2019.10.07 |
댓글