リモート開発メインのソフトウェア開発企業のエンジニアブログです

Scala + Kinesis Client LibraryでKinesisコンシューマーアプリケーションを作る

ここ最近のプロジェクトでKinesisのコンシューマーアプリケーションをScalaで開発・メンテしていたので、何回かに分けてノウハウをメモしておきます。

今回はScalaでKinesis Client Libraryを使って、ストリームのコンシューマアプリケーション実装した内容を簡単に記載します。尚、今回採用したKinesis Client Libraryのバージョンは1.x系となります。

Kinesisデータストリームについてざっくり

AWSによるスケーラブルなフルマネージドのデータストリームサービスで、ログやユーザーアクティビティ等、大量のデータレコードをリアルタイムに処理する事ができます。類似サービスにApache Kafka等があります。
Amazon SQSのようなキューサービスとの違いは、コンシューマーがどこまでデータを処理したかをストリーム側が管理しないと言う点です。一度ストリームに投下されたデータは一定期間保管され、その間はいつでも取得する事が可能です。

Kinesisデータストリームのイメージ図

また、シャード単位で水平分割する事で、データ量に応じていつでも簡単にスケールイン・アウトする事ができます。
詳しくはAmazon Kinesisを参照。

Kinesis Client Library (以下KCL) について

https://github.com/awslabs/amazon-kinesis-client

AWSが公式で公開している、Kinesisのコンシューマーアプリケーションを容易に実装する為のライブラリです。
Kinesisも例外なくAPIが公開されているので自前でそれらを駆使して実装する事は可能ですが、Kinesisのコンシューマーアプリケーションは考慮すべき点が膨大にある為、よほどの理由がない限りはKCLを使うようにしましょう。

対応言語はJavaなのでScalaでも使えます。一応、他の言語(Node.jsやRuby, Python)にも展開されていますが、実質的にJVMで動くKCL Daemonを経由する事になるので、JVM言語を使うのが簡単です。

KCLを使うメリット

先程も言ったとおりKinesisデータストリームはシャード単位で水平分割するのですが、その場合当然ながらコンシューマアプリケーション側も分割して動作する必要があります。
KCLを実装したJVMアプリケーションは、シャードの増減に応じてワーカーをスレッド単位で自動的に増減する事で、均等に処理を割り振る事が可能です。
また、コンシューマアプリケーションを実行するプロセスを増やしたり、あるいは実行インスタンスそのものが新たに起動、または停止すると、それぞれのインスタンス上のワーカー同士がよしなに割り当てを譲り合う事で、マシンのスケールアウト・インが容易に可能となっています。

また、その他にもシャード毎に処理したレコード数や遅延ミリ秒等の統計情報をCloudWatchに送信してくれる等の面倒も見てくたりと大変便利です。

KCLの実装の基本

主な作業は、Javaで提供されているインターフェース IRecordProcessor を実装する事です。これはレコードプロセッサと呼ばれ、ワーカーの実態となります。1つのレコードプロセッサは1つのシャードに割り当てられます。シャードが10個ある場合、レコードプロセッサも10個立ち上がります。
このインターフェースに定義されているメソッドを具体的に実装するのですが、アプリケーションのユースケースによっていくつか注意しないといけない点があります。

public interface IRecordProcessor {
    void initialize(InitializationInput initializationInput);
    void processRecords(ProcessRecordsInput processRecordsInput);
    void shutdown(ShutdownInput shutdownInput);
}

initialize

レコードプロセッサが初期化されると実行されます。タイミングとしては最初にアプリケーションを起動した時や、シャードが増加、または実行インスタンスが増加した際に新しく割り当てが起きた場合等です。 InitializationInput から割り当てられたシャードのID等を取得する事ができます。

Scalaでの実装例:

class RecordProcessor extends IRecordProcessor {

  override def initialize(initializationInput: InitializationInput): Unit = {
    shardId = initializationInput.getShardId

    println(s"Initialized processor for $shardId")
  }
  
  // ...
  
}

processRecords

Kinesisストリームからレコードを受け取ると実行されます。アプリケーションが遅延なく正常にストリームを消費している場合、ストリームにデータがPushされるとすぐに実行されます。
また、ここではあまり深く触れませんが、同じくAWSから提供されているKinesis Producer Libraryと言う、Kinesisにデータを投入する為のライブラリでは、複数のレコードを可能な限り単一のレコードに集約して処理するので、コンシューマ側の責務としてこの集約の解除を行わないといけないのですが、KCLではこの処理も自動的に行なってくれます。

ProcessRecordsInput から受信した全てのレコードをList形式で取り出す事ができるので、後はアプリケーション側でレコードを処理するだけとなります。

Scalaでの実装例:

class RecordProcessor extends IRecordProcessor {
  
  // ...
  
  override def processRecords(processRecordsInput: ProcessRecordsInput): Unit = {
    val records = processRecordsInput.getRecords.asScala

    println(s"Processing ${records.length} records, ${processRecordsInput.getMillisBehindLatest} msec behind latest")

    records.foreach { record =>
      // ...
    }
  }
  
  // ...

}

shutdown

その名の通り、レコードプロセッサが終了する時に呼ばれ、主に後述するチェックポイント処理の為に使います。
ShutdownInput からはシャットダウンの理由を知る事ができます。シャットダウンの理由は2つあります:

  • シャードの割り当てが別のプロセッサに移った時(他のインスタンスにスケールアウト、あるいはフェイルオーバーする際)
  • 割り当てれているシャードが、リシャーディングにより終了した時

それとは別に、KCLアプリケーションのプロセスが終了した事を検知するフックもあるのですが、そちらも後述します。

チェックポイントについて

先述したとおり、Kinesisストリームは、コンシューマがどこまでデータを読み込んだかを管理しません。その代わりに、読み出しを開始するポイントをピンポイントに指定ができますので、アプリケーション側でどこまで読み込んだかを記録しておく必要があります。

KCLはチェックポイントをDynamoDB駆動で簡単に記録できる仕組みも用意されていて、これを使う事ができます。
ただし、チェックポイント記録の戦略はアプリケーションのニーズによって異なる為か、自動的には行われません。従って、処理は自前で実装する必要がありますが、 IRecordProcessor のメソッド引数では簡単にチェックポイント記録が行えるようになっています。

チェックポイント処理の共通化

チェックポイントの記録処理はDynamoDBを介して行われる為、書き込みスループット上昇時のスロットリングエラーに対処する必要があります。今回は、以下のようにDynamoDBへの過去込み失敗時には一定期間を置いてリトライする仕組みを準備しました:

class RecordProcessor extends IRecordProcessor {

  private val CHECKPOINT_RETRY_COUNT = 10
  
  // ...

  @scala.annotation.tailrec
  private def retryable(retryCount: Int)(f: => Unit): Unit = {
    try f
    catch {
      case e: Throwable =>
        if (retryCount > 0) {
          println(s"An error occurred, will be retried at most $retryCount time(s)...", e)

          try Thread.sleep(3000)
          catch {
            case e: InterruptedException => println("Interrupted sleep", e)
          }

          retryable(retryCount - 1)(f)
        } else {
          println(s"An error occurred", e)
        }
    }
  }
  
  private def checkpoint(checkpointer: IRecordProcessorCheckpointer): Unit =
    retryable(CHECKPOINT_RETRY_COUNT) {
      checkpointer.checkpoint()
    }
}

チェックポイントを記録する際は、 checkpoint メソッドにチェックポインタ(各インターフェースメソッドで取り出せる)を渡す必要があります。

processRecords

まずは、 processRecords の実装を次のようにします:

class RecordProcessor extends IRecordProcessor {

  private val CHECKPOINT_INTERVAL_MILLIS = 30000L
  private var nextCheckpointTimeInMillis = 0L
  
  // ...

  override def processRecords(processRecordsInput: ProcessRecordsInput): Unit = {
    val records = processRecordsInput.getRecords.asScala

    println(s"Processing ${records.length} records, ${processRecordsInput.getMillisBehindLatest} msec behind latest")

    records.foreach { record =>
      // ...
    }

    // チェックポイント処理. DateTimeは `org.joda.time.DateTime` 
    if (DateTime.now().getMillis > nextCheckpointTimeInMillis) {
      checkpoint(processRecordsInput.getCheckpointer)

      nextCheckpointTimeInMillis = DateTime.now().getMillis + CHECKPOINT_INTERVAL_MILLIS
    }
  }
  
  // ...
}

先程のprocessRecordsのサンプルに処理を追加しました。今回のポイントとしては以下の2点です:

  • 全てのレコードが正常に処理された場合のみチェックポイントを記録している
    • 処理中に例外が発生した場合は記録を行わない
  • 30秒以内に次のprocessRecordsが実行された際は記録は行わない
    • DynamoDBのスロットリング軽減の為

ただし、これが正解と言うわけではありません。先述の通り、チェックポイントの戦略は多種多様です。例えば、全てのレコード (processRecordsInput.getRecords の要素) はシーケンスナンバーを保持しており、これをチェックポインタにわたす事で、ピンポイントにそのレコードまでを記録する事が可能です。

以下はシンプルな一例です (※今回はこの戦略を採用していないので自前実装した checkpoiot は使ってません):

var lastSequenceNumber = ""

records.foreach { record =>
  try {
    // ...

    lastSequenceNumber = record.getSequenceNumber
  }
  catch {
    case e: Throable => {
      if (lastSequenceNumber.nonEmpty) 
        processRecordsInput.getCheckpointer.checkpoint(lastSequenceNumber)

      throw e
    }
  }
}

if (lastSequenceNumber.nonEmpty) 
  processRecordsInput.getCheckpointer.checkpoint(lastSequenceNumber)

レコード処理で例外が発生した時点で、正常に処理できた最後のチェックポイントを記録しています。

shutdown

さて、先程ちらっと書きましたが、チェックポイントの記録は shutdown 時にも行う事ができます。
以下に例を示します:

class RecordProcessor extends IRecordProcessor {

  // ...

  override def shutdown(shutdownInput: ShutdownInput): Unit = {
    println(s"Shutdown processor, reason: ${shutdownInput.getShutdownReason}")

    // シャットダウンの理由が "TERMINATE" の時だけ書き込み
    if (shutdownInput.getShutdownReason == ShutdownReason.TERMINATE) {
      checkpoint(shutdownInput.getCheckpointer)
    }
  }
}

shutdown はレコードプロセッサが主に2つの理由で閉じられる時にコールされますが、その内の理由が TERMINATE の時だけチェックポイントを記録します。
TERMINATE はリシャーディングにより、割り当てられたシャードの全てのレコードを処理した時に呼ばれます。KCLアプリケーションでは、リシャーディング時に作られた新しいシャードへは、この終了するシャードのチェックポイントが記録されるまでレコードプロセッサが割り当てられないので、必ずこの処理は実装する必要があります。

For a split or merge operation, the KCL won’t start processing the new shards until the processors for the original shards have called checkpoint to signal that all processing on the original shards is complete.

https://docs.aws.amazon.com/streams/latest/dev/kinesis-record-processor-implementation-app-java.html#kinesis-record-processor-implementation-interface-java

反対にもう1つの理由である ZOMBIE の時はチェックポイントの記録は行ってはいけません。この理由でプロセッサが閉じられるのは、スケールアウト、またはフェイルオーバーにより他のプロセッサにシャードの割り当てが奪われるケースであり、すでに新しいプロセッサがレコードの処理を開始している可能性がある為です。

Applications SHOULD NOT checkpoint their progress (as another record processor may have already started processing data).

https://github.com/awslabs/amazon-kinesis-client/blob/8873b1346ff033de01fa1a237c0436d8fb762d9a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownReason.java#L36-L37
shutdownRequested

さて、KCLにはワーカーそのものが終了する事による、レコードプロセッサの終了処理を作る事もできます。通常、Kinesisストリームのコンシューマーは24/7で稼働するのでワーカーが終了する事はまれですが、ユースケースとして必要であればここでもチェックポイントを記録できます。

まず、レコードプロセッサ自体に IShutdownNotificationAware インターフェースを実装します。このインターフェースは単一の shutdownRequested メソッドを提供します:

class RecordProcessor extends IRecordProcessor with IShutdownNotificationAware {

  // ...

  override def shutdownRequested(checkpointer: IRecordProcessorCheckpointer): Unit = {
    println("Shutdown processor requested")

    checkpoint(checkpointer)
  }
  
  // ...
}

エントリポイントの実装

最後に、エントリポイントとなるメインクラスを準備すればOKです。

object MyConsumer extends App 

  val workerId = s"${InetAddress.getLocalHost.getCanonicalHostName}:${UUID.randomUUID}"
  val credentialsProvider = InstanceProfileCredentialsProvider.getInstance()
  val region = Regions.getCurrentRegion
  val kclConf = new KinesisClientLibConfiguration(cfg.kclApplicationName, cfg.kclStreamName, credentialsProvider, workerId)
    .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)
    .withRegionName(region)

  val recordProcessorFactory = new IRecordProcessorFactory {
    override def createProcessor: IRecordProcessor = new RecordProcessor()
  }

  val worker = new Worker.Builder()
    .recordProcessorFactory(recordProcessorFactory)
    .config(kclConf)
    .build()

  // Javaプロセスの終了を検出し、ワーカーを安全にシャットダウンする. ただし90秒以内に終了できなければ強制終了とする.
  // レコードプロセッサの `requestedShutdown` が呼ばれる.
  sys.addShutdownHook {
    info("Shutting down...")

    try {
      Await.result(
        Future(worker.startGracefulShutdown().get()),
        Duration(90, SECONDS)
      )
    } catch {
      case _: TimeoutException => println("Shutdown duration timed out")
    }
  }

  worker.run()

InitialPositionについては、これもユースケースによるのですが、今回は極力取りこぼしをなくす為にTRIM_HORIZONにしました。また、今回はワーカーを安全にシャットダウンできるようにJavaのシャットダウンフックを利用しています。

運用について

運用面でも色々学びが多かったのですが、それはまた後日別に書く事にします。

← 前の投稿

24時間稼働でないKinesisストリームの運用で手こずった点

次の投稿 →

Azure App Service + Docker で Rails アプリを動かす

コメントを残す