{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE FlexibleContexts #-}

-- |
-- Module      : Hasklepias.AppBuilder.CohortApp
-- Description : Functions for running a cohort application.
-- Copyright   : (c) Target RWE 2023
-- License     : BSD3
-- Maintainer  : bbrown@targetrwe.com
--               ljackman@targetrwe.com
--               dpritchard@targetrwe.com
module Hasklepias.CohortApp (cohortMain) where

import Aws (Configuration (..))
import qualified Aws
import qualified Aws.S3 as S3
import Blammo.Logging.Simple
import Codec.Compression.GZip
  ( CompressionLevel,
    compress,
    decompress,
  )
import Cohort.Cohort
  ( Cohort,
    CohortSpec,
    SubjId (..),
    Subject (..),
    eventsToSubject,
  )
import qualified Cohort.Core as CCore
import qualified Cohort.Output as COutput
import Control.Exception (throwIO)
import Control.Monad (when)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Trans.Reader
import Control.Monad.Trans.Resource
import Data.Aeson (ToJSON (..), Value (..), encode)
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BL
import Data.Conduit (runConduit, (.|))
import Data.Conduit.Binary (sinkLbs)
import Data.Foldable (foldrM)
import Data.Generics.Product (HasField (field))
import qualified Data.HashMap.Strict as HM
import qualified Data.Ini as Ini
import Data.List.NonEmpty (NonEmpty (..))
import Data.Map.Strict (Map, keys)
import Data.String
import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Text.Encoding as TE
import EventDataTheory
  ( Event,
    EventLineAble,
    Eventable,
    FromJSONEvent,
    LineParseError (..),
    ToJSONEvent,
    defaultParseEventLineOption,
    parseEventLinesL',
  )
import GHC.Arr (accumArray, assocs)
import Hasklepias.CohortApp.CohortAppCLI
import Lens.Micro (set, (<&>))
import Lens.Micro.Extras (view)
import Network.HTTP.Conduit
  ( RequestBody (..),
    newManager,
    responseBody,
    tlsManagerSettings,
  )
import Options.Applicative
import System.Directory
  ( doesFileExist,
    getHomeDirectory,
  )
import System.Exit
import System.FilePath ((</>))

-- | Create a command-line application for processing event-line data into
-- cohorts, using the logic provided in each 'CohortSpec'. See the top-level
-- `Hasklepias` module documentation for details.
cohortMain :: (CohortConstraints t m a b) => Map Text (CohortSpec t m a) -> IO ()
cohortMain :: forall t m a b.
CohortConstraints t m a b =>
Map Text (CohortSpec t m a) -> IO ()
cohortMain Map Text (CohortSpec t m a)
specs = do
  CohortCLIOpts
opts <- forall a. ParserInfo a -> IO a
execParser ParserInfo CohortCLIOpts
cliParserInfo
  let cfg :: CohortSettings t m a
cfg = forall t m a.
Map Text (CohortSpec t m a)
-> CohortCLIOpts -> CohortSettings t m a
MkCohortSettings Map Text (CohortSpec t m a)
specs CohortCLIOpts
opts
  forall t m a b. CohortApp t m a b -> CohortSettings t m a -> IO b
runCohortApp forall t m a b. CohortConstraints t m a b => CohortApp t m a ()
cohortApp CohortSettings t m a
cfg

{- INTERNAL: CohortApp -}

-- | Internal. Collecting the many constraints placed on `t m a` by
-- dependencies. Note the unused 'b' parameter is taken from 'IntervalSizeable
-- a b' but is unused here.  See interval-algebra issues.
type CohortConstraints t m a b = (Eventable t m a, EventLineAble t m a b, ToJSONEvent t m a, FromJSONEvent t m a)

-- | Internal. Configuration type for a `CohortApp`. `t m a` correspond to the
-- underlying @'Event' t m a@ parameters via the 'CohortSpec'. A project
-- developer can only configure the 'cohortSpecs' field. Remaining options are
-- accessible via the compiled command-line application only.
data CohortSettings t m a = MkCohortSettings
  { forall t m a. CohortSettings t m a -> Map Text (CohortSpec t m a)
cohortSpecs :: Map Text (CohortSpec t m a),
    forall t m a. CohortSettings t m a -> CohortCLIOpts
cliOpts :: CohortCLIOpts
  }

-- | Internal. Type used to run a cohort application pipeline. The user does
-- not have access to this type and can only run the entire pipeline, with
-- custom 'cohortSpecs', via 'cohortMain'.
type CohortApp t m a = ReaderT (CohortSettings t m a) (LoggingT IO)

-- | Internal.
runCohortApp :: CohortApp t m a b -> CohortSettings t m a -> IO b
runCohortApp :: forall t m a b. CohortApp t m a b -> CohortSettings t m a -> IO b
runCohortApp CohortApp t m a b
app = forall (m :: * -> *) a. MonadUnliftIO m => LoggingT m a -> m a
runSimpleLoggingT forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT CohortApp t m a b
app

{- INTERNAL: CohortApp pipeline
    * Wraps pure cohort-building pipeline from `Cohort`
    * Provides logging, top-level error handling
    * Data read/write actions
    -}

-- | Internal. A simple `CohortApp` pipeline, which consists of a read step, a
-- transformation step in which the pure cohort-building functions are applied
-- to data, and a write step. Each step should handle its own logging and
-- failure modes.
cohortApp :: (CohortConstraints t m a b) => CohortApp t m a ()
cohortApp :: forall t m a b. CohortConstraints t m a b => CohortApp t m a ()
cohortApp = forall t m a b.
CohortConstraints t m a b =>
CohortApp t m a [Subject t m a]
readData forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall t m a.
[Subject t m a] -> CohortApp t m a (Map Text (Cohort a))
evalCohortMap forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a t m. ToJSON a => Map Text (Cohort a) -> CohortApp t m a ()
writeData

-- | Internal. Evaluate the input data with respect to the user-provided
-- cohort-bilding logic in 'cohortSpecs'. If the input list is empty, fail
-- immediately with error code one.
evalCohortMap :: [Subject t m a] -> CohortApp t m a (Map Text (Cohort a))
evalCohortMap :: forall t m a.
[Subject t m a] -> CohortApp t m a (Map Text (Cohort a))
evalCohortMap [] = do
  forall (m :: * -> *).
(HasCallStack, MonadLogger m) =>
Message -> m ()
logError Message
"No valid subject data to process. Check logs for line parse errors."
  forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. ExitCode -> IO a
exitWith (Int -> ExitCode
ExitFailure Int
1)
evalCohortMap [Subject t m a]
subjs = do
  Map Text (CohortSpec t m a)
sm <- forall (m :: * -> *) r a. Monad m => (r -> a) -> ReaderT r m a
asks forall t m a. CohortSettings t m a -> Map Text (CohortSpec t m a)
cohortSpecs
  forall (m :: * -> *).
(HasCallStack, MonadLogger m) =>
Message -> m ()
logInfo forall a b. (a -> b) -> a -> b
$ Text
"Building cohorts" Text -> [SeriesElem] -> Message
:# [Key
"cohorts" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= forall k a. Map k a -> [k]
keys Map Text (CohortSpec t m a)
sm]
  forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall t m a. CohortSpecMap t m a -> [Subject t m a] -> CohortMap a
CCore.evalCohortMap Map Text (CohortSpec t m a)
sm [Subject t m a]
subjs

-- | Internal. Transform data to the output shape via @Cohort.'toCohortJSON'@
-- and write to specified location.
writeData :: (ToJSON a) => Map Text (Cohort a) -> CohortApp t m a ()
writeData :: forall a t m. ToJSON a => Map Text (Cohort a) -> CohortApp t m a ()
writeData Map Text (Cohort a)
d = do
  CohortCLIOpts
opts <- forall (m :: * -> *) r a. Monad m => (r -> a) -> ReaderT r m a
asks forall t m a. CohortSettings t m a -> CohortCLIOpts
cliOpts
  forall (m :: * -> *).
(HasCallStack, MonadLogger m) =>
Message -> m ()
logInfo forall a b. (a -> b) -> a -> b
$ Text
"Writing data to" Text -> [SeriesElem] -> Message
:# [Key
"location" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= forall a. Show a => a -> String
show (CohortCLIOpts -> (OutputFlag, Output)
output CohortCLIOpts
opts)]
  let compfun :: ByteString -> ByteString
compfun = case CohortCLIOpts -> OutputCompression
outCompress CohortCLIOpts
opts of
        OutputCompression
Compress -> ByteString -> ByteString
compress
        OutputCompression
NoCompress -> forall a. a -> a
id
  let writefun :: ByteString -> ReaderT (CohortSettings t m a) (LoggingT IO) ()
writefun = case forall a b. (a, b) -> b
snd forall a b. (a -> b) -> a -> b
$ CohortCLIOpts -> (OutputFlag, Output)
output CohortCLIOpts
opts of
        Output
StdOutput -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> IO ()
BL.putStr
        FileOutput String
x -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> ByteString -> IO ()
BL.writeFile String
x
        S3Output CredentialsCfg
cred Text
b Text
k -> \ByteString
d' -> do
          Text
hash <- forall t m a.
CredentialsCfg
-> Text -> Text -> ByteString -> CohortApp t m a Text
putS3Object CredentialsCfg
cred Text
b Text
k ByteString
d'
          forall (m :: * -> *).
(HasCallStack, MonadLogger m) =>
Message -> m ()
logInfo forall a b. (a -> b) -> a -> b
$ Text
"Put request complete" Text -> [SeriesElem] -> Message
:# [Key
"etag" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= forall a. Show a => a -> String
show Text
hash]

  forall {t} {m} {a}.
ByteString -> ReaderT (CohortSettings t m a) (LoggingT IO) ()
writefun forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
compfun forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. ToJSON a => a -> ByteString
encode forall a b. (a -> b) -> a -> b
$ forall a. ToJSON a => CohortMap a -> CohortMapJSON
COutput.toCohortJSON Map Text (Cohort a)
d

-- | Internal. Read data from the specified location, attempt to parse as
-- @['EventLine']@, then convert to a list of subjects. Note the result
-- can be an empty list, and there is no check at this stage it is not. For
-- example, if no lines parse correctly the result will be @[]@.
readData :: (CohortConstraints t m a b) => CohortApp t m a [Subject t m a]
readData :: forall t m a b.
CohortConstraints t m a b =>
CohortApp t m a [Subject t m a]
readData = do
  CohortCLIOpts
opts <- forall (m :: * -> *) r a. Monad m => (r -> a) -> ReaderT r m a
asks forall t m a. CohortSettings t m a -> CohortCLIOpts
cliOpts
  forall (m :: * -> *).
(HasCallStack, MonadLogger m) =>
Message -> m ()
logInfo forall a b. (a -> b) -> a -> b
$ Text
"Reading data from" Text -> [SeriesElem] -> Message
:# [Key
"location" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= forall a. Show a => a -> String
show (CohortCLIOpts -> (InputFlag, Input)
input CohortCLIOpts
opts)]

  let decompFun :: ByteString -> ByteString
decompFun = case CohortCLIOpts -> InputDecompression
inDecompress CohortCLIOpts
opts of
        InputDecompression
Decompress -> ByteString -> ByteString
decompress
        InputDecompression
NoDecompress -> forall a. a -> a
id

  ByteString
bs <-
    -- TODO: toStrict can be expensive: Only 'decompress' with default options
    -- prevents us from simply replacing BL with BS to use strict bytestrings.
    -- See #402
    ByteString -> ByteString
BL.toStrict forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
decompFun forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> case forall a b. (a, b) -> b
snd forall a b. (a -> b) -> a -> b
$ CohortCLIOpts -> (InputFlag, Input)
input CohortCLIOpts
opts of
      Input
StdInput -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ByteString
BL.getContents
      FileInput String
x -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ String -> IO ByteString
BL.readFile String
x
      S3Input CredentialsCfg
cred Text
b Text
k -> do
        (ByteString
d, ObjectMetadata
meta) <- forall t m a.
CredentialsCfg
-> Text -> Text -> CohortApp t m a (ByteString, ObjectMetadata)
getS3Object CredentialsCfg
cred Text
b Text
k
        -- TODO destr
        forall (m :: * -> *).
(HasCallStack, MonadLogger m) =>
Message -> m ()
logInfo forall a b. (a -> b) -> a -> b
$
          Text
"Get request complete"
            Text -> [SeriesElem] -> Message
:# [ Key
"object-last-modified" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= forall a. Show a => a -> String
show (ObjectMetadata -> UTCTime
S3.omLastModified ObjectMetadata
meta),
                 Key
"etag" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= forall a. Show a => a -> String
show (ObjectMetadata -> Text
S3.omETag ObjectMetadata
meta)
               ]
        forall (f :: * -> *) a. Applicative f => a -> f a
pure ByteString
d

  -- Invoke
  forall t m a b.
CohortConstraints t m a b =>
ByteString -> CohortApp t m a [Subject t m a]
parseSubjects ByteString
bs

{- UTILITIES -}

-- Input data utilities

-- TODO need to consider whether to use lazy or strict bytestrings here.

-- | Internal. Parse a 'ByteString' into event lines. Process successfully parsed lines
-- into subjects. Logs out lines that failed to parse.
-- Note input must be UTF-8. See documentation for @BS.'Bytestring'@.
parseSubjects :: (CohortConstraints t m a b) => BS.ByteString -> CohortApp t m a [Subject t m a]
parseSubjects :: forall t m a b.
CohortConstraints t m a b =>
ByteString -> CohortApp t m a [Subject t m a]
parseSubjects ByteString
bs = do
  ParseEventLineOption
ivopt <- forall (m :: * -> *) r a. Monad m => (r -> a) -> ReaderT r m a
asks (CohortCLIOpts -> ParseEventLineOption
intervalOpt forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall t m a. CohortSettings t m a -> CohortCLIOpts
cliOpts)
  let ([LineParseError]
errs, [(Text, Event t m a)]
bss) = forall m t a b.
(Eventable t m a, EventLineAble t m a b, FromJSONEvent t m a) =>
ParseEventLineOption
-> ByteString -> ([LineParseError], [(Text, Event t m a)])
parseEventLinesL' ParseEventLineOption
ivopt ByteString
bs
  let ss :: [Subject t m a]
ss = forall t m a. [(Text, Event t m a)] -> [Subject t m a]
eventsToSubject [(Text, Event t m a)]
bss
  -- Note this does not exit.
  forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_
    ( \(MkLineParseError (Natural
n, String
e)) ->
        forall (m :: * -> *).
(HasCallStack, MonadLogger m) =>
Message -> m ()
logError forall a b. (a -> b) -> a -> b
$ Text
"parse-error" Text -> [SeriesElem] -> Message
:# [Key
"line-number" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Natural
n, Key
"error" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= String
e]
    )
    [LineParseError]
errs
  forall (f :: * -> *) a. Applicative f => a -> f a
pure [Subject t m a]
ss

-- AWS utilities
-- TODO convert all fals to throwIO when we do an error handling full rework.

-- | Utility for 'getAwsConfig'. `Aws.loadCredentialsFromFile` does not read
-- the Ini file format. This does so in order to maintain compatibility with
-- standard aws tools, e.g. boto and the cli.
getAwsCredentialsIniFile :: CredentialsCfg -> CohortApp t m a (Maybe Aws.Credentials)
getAwsCredentialsIniFile :: forall t m a. CredentialsCfg -> CohortApp t m a (Maybe Credentials)
getAwsCredentialsIniFile (MkCredentialsCfg Text
prof) = do
  -- TODO this will throw an exception if unable to get the Home dir.
  String
credfile <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ (String -> String -> String
</> String
".aws/credentials") forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO String
getHomeDirectory
  Bool
chk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ String -> IO Bool
doesFileExist String
credfile
  -- NOTE: getAwsConfig will handle missing file error after checking other
  -- sources, if needed.
  if Bool
chk
    then do
      Either String Ini
ini <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ String -> IO (Either String Ini)
Ini.readIniFile String
credfile
      case Either String Ini
ini of
        Left String
_ -> do
          forall (m :: * -> *).
(HasCallStack, MonadLogger m) =>
Message -> m ()
logDebug forall a b. (a -> b) -> a -> b
$ Text
"cred-file-parse" Text -> [SeriesElem] -> Message
:# [Key
"cred-file" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= String
credfile]
          -- IMPORTANT: Do not print or log the readIniFile error, as it could
          -- leak secrets.
          forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"Credentials file must be in format required by aws cli."
        Right Ini
d -> do
          let prof_cred :: Maybe (HashMap Text Text)
prof_cred = forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HM.lookup Text
prof forall a b. (a -> b) -> a -> b
$ Ini -> HashMap Text (HashMap Text Text)
Ini.unIni Ini
d
          let creds :: Maybe (Text, Text)
creds = (,) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HM.lookup Text
"aws_access_key_id" forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Maybe (HashMap Text Text)
prof_cred) forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> (forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HM.lookup Text
"aws_secret_access_key" forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Maybe (HashMap Text Text)
prof_cred)
          case Maybe (Text, Text)
creds of
            Maybe (Text, Text)
Nothing -> do
              forall (m :: * -> *).
(HasCallStack, MonadLogger m) =>
Message -> m ()
logDebug forall a b. (a -> b) -> a -> b
$ Text
"cred-file-missing-var" Text -> [SeriesElem] -> Message
:# [Key
"profile" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Text
prof]
              forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Maybe a
Nothing
            Just (Text
i, Text
k) -> forall a. a -> Maybe a
Just forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (io :: * -> *).
MonadIO io =>
ByteString -> ByteString -> io Credentials
Aws.makeCredentials (Text -> ByteString
TE.encodeUtf8 Text
i) (Text -> ByteString
TE.encodeUtf8 Text
k)
    else do
      forall (m :: * -> *).
(HasCallStack, MonadLogger m) =>
Message -> m ()
logError forall a b. (a -> b) -> a -> b
$ Text
"cred-file-does-not-exist" Text -> [SeriesElem] -> Message
:# [Key
"cred-file" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= String
credfile]
      forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Maybe a
Nothing

-- | This is 'Aws.baseConfiguration' but with a different default credentials
-- location. This checks credentials from the following sources, from first to last:
-- environment variables, credentials for the "default" profile in the
-- [@aws@ cli supported format](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html)
-- within @$HOME/.aws@, and finally Ec2 instance metadata.
--
-- Some explanation about why we are replicating the 'Aws.baseConfiguration' logic here:
--
-- 'Aws.baseConfiguration' uses 'Aws.loadCredentialsFromFile' to fetch
-- credentials from that source should loading from environment variables fail.
-- However, that function uses a custom format, not the Ini file format the @aws@
-- cli uses, as described
-- [here](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html).
--
-- So, we can either require the user to provide credentials in the supported format,
-- or this module can provide a function to parse Ini files. We opted for the latter,
-- implemented in 'getAwsCredentialsIniFile' and using it here.
getAwsConfig :: CredentialsCfg -> CohortApp t m a Aws.Configuration
getAwsConfig :: forall t m a. CredentialsCfg -> CohortApp t m a Configuration
getAwsConfig CredentialsCfg
cfg = do
  Maybe Credentials
credfromenv <- forall (io :: * -> *). MonadIO io => io (Maybe Credentials)
Aws.loadCredentialsFromEnv
  Credentials
creds <- case Maybe Credentials
credfromenv of
    Maybe Credentials
Nothing -> do
      forall (m :: * -> *).
(HasCallStack, MonadLogger m) =>
Message -> m ()
logDebug forall a b. (a -> b) -> a -> b
$ Text
"aws-credentials-env" Text -> [SeriesElem] -> Message
:# [Key
"cred-env" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Value
Null]
      Maybe Credentials
credfromfile <- forall t m a. CredentialsCfg -> CohortApp t m a (Maybe Credentials)
getAwsCredentialsIniFile CredentialsCfg
cfg
      case Maybe Credentials
credfromfile of
        Maybe Credentials
Nothing -> do
          -- Some logging done in getAwsCredentialsIniFile
          forall (m :: * -> *).
(HasCallStack, MonadLogger m) =>
Message -> m ()
logDebug forall a b. (a -> b) -> a -> b
$ Text
"aws-credentials-file" Text -> [SeriesElem] -> Message
:# [Key
"cred-cfg" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= forall a. ToJSON a => a -> Value
toJSON CredentialsCfg
cfg]
          Maybe Credentials
credim <- forall (io :: * -> *). MonadIO io => io (Maybe Credentials)
Aws.loadCredentialsFromInstanceMetadata
          case Maybe Credentials
credim of
            Maybe Credentials
Nothing -> do
              forall (m :: * -> *).
(HasCallStack, MonadLogger m) =>
Message -> m ()
logError forall a b. (a -> b) -> a -> b
$ Text
"credentials-not-found" Text -> [SeriesElem] -> Message
:# [Key
"cred-cfg" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= forall a. ToJSON a => a -> Value
toJSON CredentialsCfg
cfg]
              forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"Could not load Aws credentials from any source."
        Just Credentials
cr -> forall (f :: * -> *) a. Applicative f => a -> f a
pure Credentials
cr
    Just Credentials
cr -> forall (f :: * -> *) a. Applicative f => a -> f a
pure Credentials
cr
  forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$
    Aws.Configuration
      { timeInfo :: TimeInfo
timeInfo = TimeInfo
Aws.Timestamp,
        credentials :: Credentials
credentials = Credentials
creds,
        logger :: Logger
logger = LogLevel -> Logger
Aws.defaultLog LogLevel
Aws.Warning,
        proxy :: Maybe Proxy
proxy = forall a. Maybe a
Nothing
      }

-- | Get an object from S3 as a @BL.'ByteString'@, and return it along with its metadata.
getS3Object :: CredentialsCfg -> BucketName -> ObjectKey -> CohortApp t m a (BL.ByteString, S3.ObjectMetadata)
getS3Object :: forall t m a.
CredentialsCfg
-> Text -> Text -> CohortApp t m a (ByteString, ObjectMetadata)
getS3Object CredentialsCfg
credcfg Text
b Text
k = do
  -- NOTE: This is largely copy-pasted from the Aws package top-level docs.
  -- Aws.baseConfiguration doesn't have the credentials defaults we want.
  Configuration
cfg <- forall t m a. CredentialsCfg -> CohortApp t m a Configuration
getAwsConfig CredentialsCfg
credcfg
  let s3cfg :: S3Configuration NormalQuery
s3cfg = forall config. DefaultServiceConfiguration config => config
Aws.defServiceConfig :: S3.S3Configuration Aws.NormalQuery
  -- TODO according to docs, creating a manager is "expensive." evaluate
  -- whether to use one (for retries) at all, and if so pass it as an
  -- argument to getS3Object, so it can be shared with putS3Object. likely
  -- "expensive" is nothing compared to processing a cohort.
  Manager
mgr <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ ManagerSettings -> IO Manager
newManager ManagerSettings
tlsManagerSettings
  forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadUnliftIO m => ResourceT m a -> m a
runResourceT forall a b. (a -> b) -> a -> b
$ do
    GetObjectResponse
rsp <- forall r a.
Transaction r a =>
Configuration
-> ServiceConfiguration r NormalQuery
-> Manager
-> r
-> ResourceT IO a
Aws.pureAws Configuration
cfg S3Configuration NormalQuery
s3cfg Manager
mgr forall a b. (a -> b) -> a -> b
$ Text -> Text -> GetObject
S3.getObject Text
b Text
k
    ByteString
d <- forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit forall a b. (a -> b) -> a -> b
$ forall body. Response body -> body
responseBody (GetObjectResponse
-> Response (ConduitM () ByteString (ResourceT IO) ())
S3.gorResponse GetObjectResponse
rsp) forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| forall (m :: * -> *) o.
Monad m =>
ConduitT ByteString o m ByteString
sinkLbs
    forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString
d, GetObjectResponse -> ObjectMetadata
S3.gorMetadata GetObjectResponse
rsp)

-- | Put an object to S3. Returns the object hash 'ETag'. See S3 [object
-- documentation](https://docs.aws.amazon.com/AmazonS3/latest/API/API_Object.html).
putS3Object :: CredentialsCfg -> BucketName -> ObjectKey -> BL.ByteString -> CohortApp t m a Text
putS3Object :: forall t m a.
CredentialsCfg
-> Text -> Text -> ByteString -> CohortApp t m a Text
putS3Object CredentialsCfg
credcfg Text
b Text
k ByteString
o = do
  Configuration
cfg <- forall t m a. CredentialsCfg -> CohortApp t m a Configuration
getAwsConfig CredentialsCfg
credcfg
  let s3cfg :: S3Configuration NormalQuery
s3cfg = forall config. DefaultServiceConfiguration config => config
Aws.defServiceConfig :: S3.S3Configuration Aws.NormalQuery
  Manager
mgr <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ ManagerSettings -> IO Manager
newManager ManagerSettings
tlsManagerSettings
  forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadUnliftIO m => ResourceT m a -> m a
runResourceT forall a b. (a -> b) -> a -> b
$ do
    S3.PutObjectResponse {porETag :: PutObjectResponse -> Text
S3.porETag = Text
etag} <- forall r a.
Transaction r a =>
Configuration
-> ServiceConfiguration r NormalQuery
-> Manager
-> r
-> ResourceT IO a
Aws.pureAws Configuration
cfg S3Configuration NormalQuery
s3cfg Manager
mgr forall a b. (a -> b) -> a -> b
$ Text -> Text -> RequestBody -> PutObject
S3.putObject Text
b Text
k (ByteString -> RequestBody
RequestBodyLBS ByteString
o)
    forall (f :: * -> *) a. Applicative f => a -> f a
pure Text
etag