StatefulFunction
This typeclass provides access to all the functionality provided by Flink Stateful Functions.
All the methods it provides are based around managing state and sending messages.
Working with State
StatefulFunction
provides methods around accessing/mutating state. This state is committed in Flink and has ACID, exactly
once guarantees.
Example
import cats.effect._, cats.implicits._
import com.bcf.statefun4s._
import scala.annotation.nowarn
case class GreeterRequest(name: String)
case class GreeterState(num: Int)
def greeter[F[_]: StatefulFunction[*[_], GreeterState]: Sync](
@nowarn input: GreeterRequest
): F[Unit] = {
val statefun = StatefulFunction[F, GreeterState]
for {
newCount <- statefun.insideCtx(_.num + 1)
_ <- statefun.modifyCtx(_.copy(newCount))
} yield ()
}
This greeter
function accepts a message with a user’s name and tied to the instance of that function
is some GreeterState
instance.
So for example, if a consumer were to send message to example/greeter
with the ID of John
, it will
create a new instance of GreeterState
for John
and any future messages sent to that function ID will
use the current GreeterState
for John
. So if we sent one message and increment num
, then send another message
with the same ID, Flink will give us the previous value of num
for us to increment it. Since this state is committed
in lockstep with Kafka, we can safely increment this number without keeping a unique set of ids and counting them
idempotently.
Sending Messages
Regular messages are sent to another function immediately. If the function ID sent to doesn’t exist, Flink makes a new inbox/state record for that function and passes the message.
Example
With the following protobuf definitions:
message PrinterRequest {
string msg = 1;
}
message GreeterRequest {
string name = 1;
}
message GreeterState {
int64 num = 1;
}
import cats.effect._, cats.implicits._
import com.bcf.statefun4s._
def greeter[F[_]: StatefulFunction[*[_], GreeterState]: Sync](
input: GreeterRequest
): F[Unit] = {
val statefun = StatefulFunction[F, GreeterState]
for {
newCount <- statefun.insideCtx(_.num + 1)
_ <- statefun.modifyCtx(_.copy(newCount))
_ <- statefun.sendMsg(
"example",
"printer",
"universal",
PrinterRequest(s"Saw ${input.name} ${newCount} time(s)")
)
} yield ()
}
def printer[F[_]: StatefulFunction[*[_], Unit]: Sync](
input: PrinterRequest
): F[Unit] = Sync[F].delay(println(input.msg))
This will send the output of each counter increment to the printer and print it to stdout.
Note that in this case, printer is stateless so we use Unit
as the state structure and pass some
arbitrary string like "universal"
.
Delayed messages
Delayed messages are stored in Flink state and scheduled to run at a different point in time. They do not require any events flowing through Flink to trigger.
This is very useful for simulating windows or other more complex behavior.
Example
import cats.effect._, cats.implicits._
import com.bcf.statefun4s._
import com.google.protobuf.any
import scala.concurrent.duration._
import java.util.UUID
sealed trait Cron
object Cron {
final case class Create(cronString: String, event: any.Any) extends Cron
final case object Trigger extends Cron
}
case class CronState(event: Option[Cron.Create] = None)
// Easy to imagine how this would be implemented
def nextRun(cronStr: String): FiniteDuration = ???
def cron[F[_]: StatefulFunction[*[_], CronState]: Sync](
input: Cron
): F[Unit] = {
val statefun = StatefulFunction[F, GreeterState]
input match {
case create @ Cron.Create(cronStr, _) =>
for {
_ <- statefun.modifyCtx(_.copy(Some(create)))
id <- statefun.functionId
_ <- statefun.sendDelayedMsg(
"example",
"cron",
id,
nextRun(cronStr)
)
} yield ()
case Cron.Trigger =>
for {
id <- statefun.functionId
state <- statefun.getCtx
create <- Sync[F].fromOption(state.event, new RuntimeException("Event trigger was empty"))
_ <- statefun.sendEgressMsg(
"example",
"triggers",
KafkaProducerRecord(
UUID.randomUUID().toString,
create.event.toByteString,
"triggers"
)
)
_ <- statefun.sendDelayedMsg(
"example",
"cron",
id,
nextRun(create.cronStr)
)
} yield ()
}
}
This is a basic idea for how one could implement a cron scheduler using delayed messages. Users who want to create a Cron
would send a message to cron
with the ID of the job name (or some unique identifier), which would create some state to store
the event we are using cron
to trigger on a schedule. The function will simply continuously sends itself delayed messages for
the next run time and publish the triggered event exactly once to Kafka.