I attempted to PR this but had some issues (type inference on getTypeMap which is strange, something about Class[_]
vs Class[_ <: Object]
) which I don't feel comfortable in submitting the PR.
Could this be project be upgraded to RC4?
The Transactor now needs a LoggingHandler which is effectful.
Here's the code I came up with:
package com.ovoenergy.natchez.extras.doobie
import cats.data.Kleisli
import cats.effect.Async
import cats.implicits._
import com.ovoenergy.natchez.extras.core.Config
import com.ovoenergy.natchez.extras.core.Config.ServiceAndResource
import doobie.WeakAsync
import doobie.free.KleisliInterpreter
import doobie.util.log
import doobie.util.log.LogHandler
import doobie.util.transactor.Transactor
import natchez.{Span, Trace}
import java.sql.{Connection, PreparedStatement, ResultSet}
object TracedTransactor {
private val DefaultResourceName = "db.execute"
type Traced[F[_], A] = Kleisli[F, Span[F], A]
def apply[F[_]: Async: LogHandler](
service: String,
transactor: Transactor[F]
): Transactor[Traced[F, *]] = {
val kleisliTransactor = transactor
.mapK(Kleisli.liftK[F, Span[F]])(implicitly, Async.asyncForKleisli(implicitly))
implicit val lh: LogHandler[Traced[F, *]] = new LogHandler[Traced[F, *]] {
override def run(
logEvent: log.LogEvent
): Traced[F, Unit] = Kleisli.liftF(implicitly[LogHandler[F]].run(logEvent))
}
trace(ServiceAndResource(s"$service-db", DefaultResourceName), kleisliTransactor)
}
private def formatQuery(q: String): String =
q.replace("\n", " ").replaceAll("\\s+", " ").trim()
def trace[F[_]: Trace: Async: LogHandler](
config: Config,
transactor: Transactor[F]
): Transactor[F] =
transactor
.copy(
interpret0 = createInterpreter(config)(WeakAsync.doobieWeakAsyncForAsync[F]).ConnectionInterpreter
)
private def createInterpreter[F[_]: Trace: LogHandler](
config: Config
)(W: WeakAsync[F]): KleisliInterpreter[F] = {
new KleisliInterpreter[F](implicitly[LogHandler[F]])(W) {
override lazy val PreparedStatementInterpreter: PreparedStatementInterpreter =
new PreparedStatementInterpreter {
type TracedOp[A] = Kleisli[F, PreparedStatement, A] //PreparedStatement => F[A]
def runTraced[A](f: TracedOp[A]): TracedOp[A] =
Kleisli {
case TracedStatement(p, sql) =>
Trace[F].span(config.fullyQualifiedSpanName(formatQuery(sql)))(
Trace[F].put("span.type" -> "db") >> f(p)
)
case a =>
f(a)
}
override val executeBatch: TracedOp[Array[Int]] =
runTraced(super.executeBatch)
override val executeLargeBatch: TracedOp[Array[Long]] =
runTraced(super.executeLargeBatch)
override val execute: TracedOp[Boolean] =
runTraced(super.execute)
override val executeUpdate: TracedOp[Int] =
runTraced(super.executeUpdate)
override val executeQuery: TracedOp[ResultSet] =
runTraced(super.executeQuery)
}
override lazy val ConnectionInterpreter: ConnectionInterpreter =
new ConnectionInterpreter {
override def prepareStatement(a: String): Kleisli[F, Connection, PreparedStatement] =
super.prepareStatement(a).map(TracedStatement(_, a): PreparedStatement)
override def getTypeMap = primitive(_.getTypeMap)
}
}
}
}