Skip to content

Commit

Permalink
Merge pull request #34 from TinkoffCreditSystems/daemons
Browse files Browse the repository at this point in the history
daemon extensions, instances and syntax
  • Loading branch information
Odomontois authored Nov 18, 2019
2 parents cef28a0 + 3d7df6f commit ba78be5
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 8 deletions.
55 changes: 51 additions & 4 deletions concurrent/src/main/scala/tofu/concurrent/Daemon.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import cats.effect.concurrent.{MVar, TryableDeferred}
import cats.effect.syntax.bracket._
import cats.syntax.applicativeError._
import cats.tagless.autoApplyK
import cats.{Apply, FlatMap, Monad}
import cats.{Applicative, Apply, FlatMap, Monad}
import tofu.control.ApplicativeZip
import tofu.higherKind.Function2K
import tofu.syntax.monadic._
import tofu.syntax.start._
Expand Down Expand Up @@ -51,7 +52,9 @@ trait DaemonicInstances { self: Daemonic.type =>

/** instance making Daemons keeping default underlying behaviour*/
implicit def nativeInstance[F[_]: Start: TryableDeferreds: Bracket[*[_], E], E]: Daemonic[F, E] =
mkInstance[F, E](Function2K[Fiber[F, *], Promise[F, E, *], Daemon[F, E, *]]((fib, promise) => new Daemon.Impl(fib, promise)))
mkInstance[F, E](
Function2K[Fiber[F, *], Promise[F, E, *], Daemon[F, E, *]]((fib, promise) => new Daemon.Impl(fib, promise))
)
}

@autoApplyK
Expand All @@ -65,7 +68,7 @@ trait Daemon[F[_], E, A] extends Fiber[F, A] {
}

/** Probably Infinite processes */
object Daemon {
object Daemon extends DaemonInstances {
private[tofu] class Impl[F[_], E, A](process: Fiber[F, A], end: TryableDeferred[F, Exit[E, A]])
extends Daemon[F, E, A] {

Expand All @@ -80,7 +83,7 @@ object Daemon {
extends Impl[F, Throwable, A](process, end) {
override def join: F[A] = exit.flatMap {
case Exit.Error(e) => e.raiseError
case Exit.Completed(a) => a.pure
case Exit.Completed(a) => a.pure[F]
case Exit.Canceled => new TofuCanceledJoinException[F, A](this).raiseError
}
}
Expand All @@ -96,14 +99,24 @@ object Daemon {

def repeat[F[_]: Monad: Daemonic[*[_], E], E, A, B](step: F[A]): F[Daemon[F, E, B]] = apply(step.foreverM)

def repeatThrow[F[_]: Monad: DaemonicThrow, A, B](step: F[A]): F[DaemonThrow[F, B]] = repeat(step)

def repeatTask[F[_]: Monad: DaemonicThrow, A](step: F[A]): F[DaemonTask[F]] = repeat(step)

def iterate[F[_]: Monad: Daemonic[*[_], E], E, A, B](init: A)(step: A => F[A]): F[Daemon[F, E, B]] =
apply(init.iterateForeverM(step))

def iterateThrow[F[_]: Monad: DaemonicThrow, A, B](init: A)(step: A => F[A]): F[DaemonThrow[F, B]] =
iterate(init)(step)

def iterateTask[F[_]: Monad: DaemonicThrow, A](init: A)(step: A => F[A]): F[DaemonTask[F]] = iterate(init)(step)

def state[F[_]: Monad: Daemonic[*[_], E], E, S, A, B](init: S)(state: StateT[F, S, A]): F[Daemon[F, E, B]] =
iterate(init)(state.runS)

def resource[F[_]: Monad: Daemonic[*[_], E], E, A](daemon: F[Daemon[F, E, A]]): Resource[F, Daemon[F, E, A]] =
Resource.make(daemon)(_.cancel)

}

final class Actor[F[_], E, A] private (queue: MVar[F, A], val daemon: Daemon[F, E, Void]) {
Expand Down Expand Up @@ -163,3 +176,37 @@ object Actor {

final class TofuCanceledJoinException[F[_], A] private[tofu] (val daemon: Daemon[F, Throwable, A])
extends InterruptedException("trying to join canceled fiber")

trait DaemonInstances {
implicit def daemonApplicative[F[_], E](implicit F: Monad[F]): Applicative[Daemon[F, E, *]] =
new ApplicativeZip[Daemon[F, E, *]] {
def pure[A](x: A): Daemon[F, E, A] = new Daemon[F, E, A] {
def join: F[A] = F.pure(x)
def cancel: F[Unit] = F.unit
def poll: F[Option[Exit[E, A]]] = F.pure(Some(Exit.Completed(x)))
def exit: F[Exit[E, A]] = F.pure(Exit.Completed(x))
}

override def map[A, B](fa: Daemon[F, E, A])(f: A => B): Daemon[F, E, B] = new Daemon[F, E, B] {
def join: F[B] = fa.join.map(f)
def cancel: F[Unit] = fa.cancel
def poll: F[Option[Exit[E, B]]] = fa.poll.map(_.map(_.map(f)))
def exit: F[Exit[E, B]] = fa.exit.map(_.map(f))
}

def zipWith[A, B, C](fa: Daemon[F, E, A], fb: Daemon[F, E, B])(f: (A, B) => C): Daemon[F, E, C] =
new Daemon[F, E, C] {
def join: F[C] = fa.join.map2(fb.join)(f)
def cancel: F[Unit] = fa.cancel *> fb.cancel
def poll: F[Option[Exit[E, C]]] = fa.poll.flatMap {
case fe @ (None | Some(_: Exit.Incomplete[E])) => F.pure(fe.asInstanceOf[Option[Exit[E, C]]])
case Some(Exit.Completed(a)) =>
fb.poll.map {
case fe @ (None | Some(_: Exit.Incomplete[E])) => fe.asInstanceOf[Option[Exit[E, C]]]
case Some(Exit.Completed(b)) => Some(Exit.Completed(f(a, b)))
}
}
def exit: F[Exit[E, C]] = fa.exit.map2(fb.exit)(_.map2(_)(f))
}
}
}
50 changes: 48 additions & 2 deletions concurrent/src/main/scala/tofu/concurrent/Exit.scala
Original file line number Diff line number Diff line change
@@ -1,21 +1,67 @@
package tofu.concurrent

import cats.{Applicative, Eval, Traverse}
import cats.effect.ExitCase
import tofu.control.ApplicativeZip
import tofu.syntax.monadic._

sealed trait Exit[+E, +A] {
def exitCase: ExitCase[E]
}

object Exit {
case object Canceled extends Exit[Nothing, Nothing] {

sealed trait Incomplete[+E] extends Exit[E, Nothing]

case object Canceled extends Incomplete[Nothing] {
def exitCase = ExitCase.Canceled
}
final case class Error[+E](e: E) extends Exit[E, Nothing] {
final case class Error[+E](e: E) extends Incomplete[E] {
def exitCase = ExitCase.Error(e)
}
final case class Completed[+A](a: A) extends Exit[Nothing, A] {
override def exitCase = ExitCase.Completed
}


private[this] object exitInstanceAny extends Traverse[Exit[Any, *]] with ApplicativeZip[Exit[Any, *]] {
def traverse[G[_], A, B](fa: Exit[Any, A])(f: A => G[B])(implicit G: Applicative[G]): G[Exit[Any, B]] =
fa match {
case Canceled => G.pure(Canceled)
case Error(e) => G.pure(Error(e))
case Completed(a) => f(a).map(Completed(_))
}
def foldLeft[A, B](fa: Exit[Any, A], b: B)(f: (B, A) => B): B =
fa match {
case Canceled | Error(_) => b
case Completed(a) => f(b, a)
}
def foldRight[A, B](fa: Exit[Any, A], lb: Eval[B])(f: (A, Eval[B]) => Eval[B]): Eval[B] =
fa match {
case Canceled | Error(_) => lb
case Completed(a) => f(a, lb)
}
override def map[A, B](fa: Exit[Any, A])(f: A => B): Exit[Any, B] =
fa match {
case Canceled => Canceled
case e: Error[Any] => e
case Completed(a) => Completed(f(a))
}

def zipWith[A, B, C](fa: Exit[Any, A], fb: Exit[Any, B])(f: (A, B) => C): Exit[Any, C] =
fa match {
case Canceled => Canceled
case err: Error[Any] => err
case Completed(a) =>
fb match {
case Canceled => Canceled
case err: Error[Any] => err
case Completed(b) => Completed(f(a, b))
}
}
def pure[A](x: A): Exit[Any, A] = Completed(x)
}

implicit def exitInstance[E]: Traverse[Exit[E, *]] with Applicative[Exit[E, *]] =
exitInstanceAny.asInstanceOf[Traverse[Exit[E, *]] with Applicative[Exit[E, *]]]
}
5 changes: 5 additions & 0 deletions concurrent/src/main/scala/tofu/concurrent/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,9 @@ package object concurrent {
def newSemaphore[F[_]: Semaphores] = MakeSemaphore[F, F]
def newVar[F[_]: MVars] = MakeMVar[F, F]
def newDeffered[F[_]: Deferreds, A] = MakeDeferred[F, F, A]

type DaemonThrow[F[_], A] = Daemon[F, Throwable, A]
type DaemonTask[F[_]] = DaemonThrow[F, Unit]

type DaemonicThrow[F[_]] = Daemonic[F, Throwable]
}
9 changes: 9 additions & 0 deletions concurrent/src/test/scala/tofu/concurrent/SyntaxCheck.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package tofu
package concurrent
import tofu.lift.Lift
import tofu.syntax.lift._

object SyntaxCheck {
def liftDaemon[F[_], G[_], E, A](daemon: Daemon[F, E, A])(implicit lift: Lift[F, G]): Daemon[G, E, A] =
daemon.lift2[G]
}
14 changes: 14 additions & 0 deletions core/src/main/scala/tofu/control/ApplicativeZip.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package tofu.control
import cats.{Applicative, Apply}

/** mix-in for Apply instances via `map2`*/
trait ApplyZip[F[_]] extends Apply[F] {
def zipWith[A, B, C](fa: F[A], fb: F[B])(f: (A, B) => C): F[C]

def ap[A, B](ff: F[A => B])(fa: F[A]): F[B] = zipWith(ff, fa)(_ apply _)
}

/** mix-in for Applicative instances via `map2`*/
trait ApplicativeZip[F[_]] extends ApplyZip[F] with Applicative[F] {
override def map[A, B](fa: F[A])(f: A => B): F[B] = zipWith(fa, unit)((a, _) => f(a))
}
26 changes: 24 additions & 2 deletions core/src/main/scala/tofu/syntax/lift.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package tofu.syntax

import cats.Functor
import cats.{Functor, ~>}
import cats.effect.concurrent.{Deferred, MVar, Ref, Semaphore}
import tofu.lift.{IsoK, Lift, Unlift}
import cats.tagless.{FunctorK, InvariantK}
Expand All @@ -25,10 +25,32 @@ object lift {
G.map(unlift.unlift)(backf => semaphore.imapK(unlift.liftF, backf))
}

implicit class CatsFunctorKLiftSyntax[T[_[_]], F[_]](val tf: T[F]) extends AnyVal {
implicit class CatsTaglessLiftSyntax[T[_[_]], F[_]](val tf: T[F]) extends AnyVal {
def lift[G[_]](implicit lift: Lift[F, G], fk: FunctorK[T]): T[G] = fk.mapK(tf)(lift.liftF)
def ilift[G[_]](implicit lift: IsoK[F, G], fk: InvariantK[T]): T[G] = fk.imapK(tf)(lift.tof)(lift.fromF)
def unlift[G[_]](implicit unlift: Unlift[F, G], G: Functor[G], fk: InvariantK[T]): G[T[G]] =
G.map(unlift.unlift)(backf => fk.imapK(tf)(unlift.liftF)(backf))
}

implicit class CatsTagless1LiftSyntax[T[_[_], _], F[_], A](val tf: T[F, A]) extends AnyVal {
def mapK1[G[_]](f: F ~> G)(implicit fk: FunctorK[T[*[_], A]]): T[G, A] = fk.mapK(tf)(f)
def imapK1[G[_]](f: F ~> G)(g: G ~> F)(implicit fk: InvariantK[T[*[_], A]]): T[G, A] = fk.imapK(tf)(f)(g)

def lift1[G[_]](implicit lift: Lift[F, G], fk: FunctorK[T[*[_], A]]): T[G, A] = fk.mapK(tf)(lift.liftF)
def ilift1[G[_]](implicit lift: IsoK[F, G], fk: InvariantK[T[*[_], A]]): T[G, A] =
fk.imapK(tf)(lift.tof)(lift.fromF)
def unlift1[G[_]](implicit unlift: Unlift[F, G], G: Functor[G], fk: InvariantK[T[*[_], A]]): G[T[G, A]] =
G.map(unlift.unlift)(backf => fk.imapK(tf)(unlift.liftF)(backf))
}

implicit class CatsTagless2LiftSyntax[T[_[_], _, _], F[_], A, B](val tf: T[F, A, B]) extends AnyVal {
def mapK2[G[_]](f: F ~> G)(implicit fk: FunctorK[T[*[_], A, B]]): T[G, A, B] = fk.mapK(tf)(f)
def imapK2[G[_]](f: F ~> G)(g: G ~> F)(implicit fk: InvariantK[T[*[_], A, B]]): T[G, A, B] = fk.imapK(tf)(f)(g)

def lift2[G[_]](implicit lift: Lift[F, G], fk: FunctorK[T[*[_], A, B]]): T[G, A, B] = fk.mapK(tf)(lift.liftF)
def ilift2[G[_]](implicit lift: IsoK[F, G], fk: InvariantK[T[*[_], A, B]]): T[G, A, B] =
fk.imapK(tf)(lift.tof)(lift.fromF)
def unlift2[G[_]](implicit unlift: Unlift[F, G], G: Functor[G], fk: InvariantK[T[*[_], A, B]]): G[T[G, A, B]] =
G.map(unlift.unlift)(backf => fk.imapK(tf)(unlift.liftF)(backf))
}
}

0 comments on commit ba78be5

Please sign in to comment.