Storage

Напомню абстракцию хранилища:

(ns publicator.use-cases.abstractions.storage
  (:require
   [clojure.spec.alpha :as s]
   [publicator.domain.abstractions.id-generator :as id-generator]
   [publicator.domain.abstractions.aggregate :as aggregate]
   [publicator.domain.identity :as identity]
   [publicator.utils.ext :as ext]))

(defprotocol Storage
  (-wrap-tx [this body]))

(defprotocol Transaction
  (-get-many [t ids])
  (-create [t state]))

(s/fdef get-many
  :args (s/cat :tx any?
               :ids (s/coll-of ::id-generator/id :distinct true))
  :ret (s/map-of ::id-generator/id ::identity/identity))

(s/fdef create
  :args (s/cat :tx any?
               :state ::aggregate/aggregate)
  :ret ::identity/identity)

(defn get-many [t ids] (-get-many t ids))
(defn create   [t state] (-create t state))

(declare ^:dynamic *storage*)

(defmacro with-tx
  "Note that body forms may be called multiple times,
   and thus should be free of side effects."
  [tx-name & body-forms-free-of-side-effects]
  `(-wrap-tx *storage*
            (fn [~tx-name]
              [email protected]body-forms-free-of-side-effects)))

(s/fdef get-one
  :args (s/cat :tx any?
               :id ::id-generator/id)
  :ret (s/nilable ::identity/identity))

(defn get-one [t id]
  (let [res (get-many t [id])]
    (get res id)))

;; ...

Ранее я рассказывал, что есть 2 стратегии выполнения бизнес транзакций: оптимистический и пессимистический. Эта реализация будет основываться на оптимистической стратегии.

Исходя из специфики бизнес-транзакций можно реализовать и пессимистическую стратегию. Отмечу, что для этого не нужно переписывать сами бизнес-транзакции.

При использовании оптимистических блокировок мы свободно читаем любые агрегаты, но запоминаем их версии, а при фиксации проверяем, что версии не изменились. Если версии изменились, то повторяем бизнес-транзакцию.

Каждому агрегату будет соответствовать свой маппер, который реализует специфичную для этого агрегата persistence логику. Каждый маппер должен:

  • извлекать (select) агрегаты по списку их идентификаторов
  • вставлять (insert) агрегаты в базу
  • удалять (delete) агрегаты из базы
  • блокировать агрегат и извлекать его версию

Отмечу, что маппер поддерживает только вставку и удаление, но не изменение. Для надежности и упрощения кода изменение сведено к удалению и вставке. Да, это не оптимально с точки зрения производительности, зато просто и надежно. Если начнутся проблемы с производительностью, можно применить описанную далее оптимизацию.

В конце бизнес-транзакции нужно начать sql транзакцию и вычитать версии изменившихся агрегатов. Т.к. транзакции идут параллельно, то запрос должен содержать блокировку FOR UPDATE, чтобы другие транзакции дождались изменений в текущей транзакции.

Протокол маппера:

(defprotocol Mapper
  (-lock   [this conn ids])
  (-select [this conn ids])
  (-insert [this conn aggregates])
  (-delete [this conn ids]))

(s/def ::mapper #(satisfies? Mapper %))

(s/fdef lock
  :args (s/cat :this ::mapper, :conn any?, :ids (s/coll-of ::id-generator/id))
  :ret (s/coll-of ::versioned-id))

(s/fdef select
  :args (s/cat :this ::mapper, :conn any?, :ids (s/coll-of ::id-generator/id))
  :ret (s/coll-of ::versioned-aggregate))

(s/fdef insert
  :args (s/cat :this ::mapper, :conn any?, :aggregates (s/coll-of ::aggregate/aggregate))
  :ret any?)

(s/fdef delete
  :args (s/cat :this ::mapper, :conn any?, :ids (s/coll-of ::id-generator/id))
  :ret any?)

Для тестирования мы будем использовать тестовый агрегат, содержащий одно поле - счетчик:

(defrecord TestEntity [id counter]
  aggregate/Aggregate
  (id [_] id)
  (spec [_] any?))

Эта сущность будет храниться в таблице:

CREATE TABLE "test-entity" (
  "id" bigint PRIMARY KEY,
  "counter" integer
);

Отмечу, что эта таблица не имеет поля «версия». В PostgreSQL каждая таблица содержит служебную колонку xmin, будем использовать ее для отслеживания версии, т.к. нам достаточно определить совпадают версии или нет.

xmin - Идентификатор (код) транзакции, добавившей строку этой версии. (Версия строки — это её индивидуальное состояние; при каждом изменении создаётся новая версия одной и той же логической строки.) Подробнее.

Обратите внимание, xmin может переполняться и в этом случае начинает считать сначала. Иными словами, есть гипотетическая вероятность ложно-положительного сравнения версий, когда выполняется очень долгая бизнес-транзакция, а другая транзакция БД с тем же xmin изменила нашу запись.

Тестовый маппер использует следующие запросы:

-- :name- drop-test-entity-table :! :raw
DROP TABLE "test-entity"

-- :name- test-entity-insert :!
INSERT INTO "test-entity" VALUES :tuple*:vals;

-- :name- test-entity-select :? :*
SELECT *, xmin AS version FROM "test-entity" WHERE id IN (:v*:ids)

-- :name- test-entity-delete :!
DELETE FROM "test-entity" WHERE id IN (:v*:ids)

-- :name- test-entity-locks :? :*
SELECT id, xmin AS version FROM "test-entity" WHERE id IN (:v*:ids) FOR UPDATE

В процессе исполнения бизнес-транзакции мы отслеживаем какие сущности извлекались, создавались или изменялись, так же как отслеживали в фейковой реализации.

В конце бизнес-транзакции мы выбираем с блокировкой версии измененных агрегатов, если версии не изменились, то группируем агрегаты по типу и производим удаление и вставку с помощью мапперов.

(ns publicator.persistence.storage
  (:require
   [jdbc.core :as jdbc]
   [publicator.use-cases.abstractions.storage :as storage]
   [publicator.domain.abstractions.aggregate :as aggregate]
   [publicator.domain.abstractions.id-generator :as id-generator]
   [publicator.domain.identity :as identity]
   [publicator.utils.ext :as ext]
   [clojure.spec.alpha :as s])
  (:import
   [java.util.concurrent TimeoutException]
   [java.time Instant]))

(s/def ::version some?)
(s/def ::versioned-id (s/keys :req-un [::id-generator/id ::version]))
(s/def ::versioned-aggregate (s/keys :req-un [::aggregate/aggregate ::version]))

;; ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

(defprotocol Mapper
  (-lock   [this conn ids])
  (-select [this conn ids])
  (-insert [this conn aggregates])
  (-delete [this conn ids]))

(s/def ::mapper #(satisfies? Mapper %))

(s/fdef lock
  :args (s/cat :this ::mapper, :conn any?, :ids (s/coll-of ::id-generator/id))
  :ret (s/coll-of ::versioned-id))

(s/fdef select
  :args (s/cat :this ::mapper, :conn any?, :ids (s/coll-of ::id-generator/id))
  :ret (s/coll-of ::versioned-aggregate))

(s/fdef insert
  :args (s/cat :this ::mapper, :conn any?, :aggregates (s/coll-of ::aggregate/aggregate))
  :ret any?)

(s/fdef delete
  :args (s/cat :this ::mapper, :conn any?, :ids (s/coll-of ::id-generator/id))
  :ret any?)

(defn- default-for-empty [f default]
  (fn [this conn coll]
    (if (empty? coll)
      default
      (f this conn coll))))

(def lock   (default-for-empty -lock   []))
(def select (default-for-empty -select []))
(def insert (default-for-empty -insert nil))
(def delete (default-for-empty -delete nil))

;; ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

(defrecord Transaction [data-source mappers identity-map]
  storage/Transaction
  (-get-many [this ids]
    (with-open [conn (jdbc/connection data-source)]
      (let [ids-for-select (remove #(contains? @identity-map %) ids)
            selected       (->> mappers
                                (vals)
                                (mapcat #(select % conn ids-for-select)) ;; <1>
                                (map (fn [{:keys [aggregate version]}]
                                       (let [iaggregate (identity/build aggregate)]
                                         (alter-meta! iaggregate assoc
                                                      ::version version
                                                      ::initial aggregate)
                                         iaggregate)))
                                (group-by #(-> % deref aggregate/id))
                                (ext/map-vals first))]
        ;; Здесь принципиально использование reverse-merge,
        ;; т.к. другой поток может успеть извлечь данные из базы,
        ;; создать объект-идентичность, записать его в identity map
        ;; и сделать в нем изменения.
        ;; Если использовать merge, то этот поток затрет идентичность
        ;; другим объектом-идентичностью с начальным состоянием.
        ;; Фактически это нарушает саму идею identity-map -
        ;; сопоставление ссылки на объект с его идентификатором
        (-> identity-map
            (swap! ext/reverse-merge selected)
            (select-keys ids)))))

  (-create [this state]
    (let [id     (aggregate/id state)
          istate (identity/build state)]
      (swap! identity-map (fn [map]
                            {:pre [(not (contains? map id))]}
                            (assoc map id istate)))
      istate)))

(defn- build-tx [data-source mappers]
  (Transaction. data-source mappers (atom {})))

(defn- need-insert? [identity]
  (not= @identity
        (-> identity meta ::initial)))

(defn- need-delete? [identity]
  (let [initial (-> identity meta ::initial)]
    (and (some? initial)
         (not= @identity initial))))

(defn- lock-all [conn mappers identities]
  (let [ids             (->> identities
                             (vals)
                             (filter need-delete?)
                             (map deref)
                             (map aggregate/id))
        db-versions     (->> mappers
                             (vals)
                             (mapcat #(lock % conn ids))
                             (group-by :id)
                             (ext/map-vals #(-> % first :version)))
        memory-versions (->> (select-keys identities ids)
                             (ext/map-vals #(-> % meta ::version)))]
    (= db-versions memory-versions)))

(defn- delete-all [conn mappers identities]
  (let [groups (->> identities
                    (vals)
                    (filter need-delete?)
                    (map deref)
                    (group-by class)
                    (ext/map-keys #(get mappers %))
                    (ext/map-vals #(map aggregate/id %)))]
    (doseq [[manager ids] groups]
      (delete manager conn ids))))

(defn- insert-all [conn mappers identities]
  (let [groups (->> identities
                    (vals)
                    (filter need-insert?)
                    (map deref)
                    (group-by class)
                    (ext/map-keys #(get mappers %)))]
    (doseq [[manager aggregates] groups]
      (insert manager conn aggregates))))

(defn- commit [tx mappers]
  (let [data-source (:data-source tx)
        identities  @(:identity-map tx)]
    (with-open [conn (jdbc/connection data-source)]
      (jdbc/atomic conn
                   (when (lock-all conn mappers identities)
                     (delete-all conn mappers identities)
                     (insert-all conn mappers identities)
                     true)))))

(defn- timestamp []
  (inst-ms (Instant/now)))

(deftype Storage [data-source mappers opts]
  storage/Storage
  (-wrap-tx [this body]
    (let [soft-timeout (get opts :soft-timeout-ms 500)
          stop-after   (+ (timestamp) soft-timeout)]
      (loop [attempt 0]
        (let [tx       (build-tx data-source mappers)
              res      (body tx)
              success? (commit tx mappers)]
          (cond
            success?                   res
            (< (timestamp) stop-after) (recur (inc attempt))
            :else                      (throw (TimeoutException.
                                               (str "Can't run transaction after "
                                                    attempt " attempts")))))))))



(s/fdef binding-map
  :args (s/cat :data-source any?
               :mappers (s/map-of class? ::mapper)
               :opts (s/? map?))
  :ret map?)

(defn binding-map
  ([data-source mappers]
   (binding-map data-source mappers {}))
  ([data-source mappers opts]
   {#'storage/*storage* (Storage. data-source mappers opts)}))

Тест повторяет тест фейковой реализации:

(ns publicator.persistence.storage-test
  (:require
   [publicator.utils.test.instrument :as instrument]
   [clojure.test :as t]
   [hugsql.core :as hugsql]
   [jdbc.core :as jdbc]
   [publicator.domain.abstractions.aggregate :as aggregate]
   [publicator.use-cases.abstractions.storage :as storage]
   [publicator.persistence.test.db :as db]
   [publicator.persistence.storage :as sut]))

(defrecord TestEntity [id counter]
  aggregate/Aggregate
  (id [_] id)
  (spec [_] any?))

(defn build-test-entity []
  (TestEntity. 42 0))

;; ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

(hugsql/def-db-fns "publicator/persistence/storage_test.sql")

(defn- aggregate->sql [aggregate]
  (vals aggregate))

(defn- row->versioned-aggregate [row]
  {:aggregate (-> row (dissoc :version) map->TestEntity)
   :version   (-> row (get :version))})

(def mapper (reify sut/Mapper
              (-lock [_ conn ids]
                (test-entity-locks conn {:ids ids}))
              (-select [_ conn ids]
                (map row->versioned-aggregate (test-entity-select conn {:ids ids})))
              (-insert [_ conn states]
                (test-entity-insert conn {:vals (map aggregate->sql states)}))
              (-delete [_ conn ids]
                (test-entity-delete conn {:ids ids}))))

;; ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

(defn- setup [t]
  (with-bindings (sut/binding-map db/*data-source* {TestEntity mapper})
    (t)))

(defn- test-table [t]
  (with-open [conn (jdbc/connection db/*data-source*)]
    (drop-test-entity-table conn)
    (create-test-entity-table conn))
  (t))

(t/use-fixtures :once
  instrument/fixture
  db/once-fixture)

(t/use-fixtures :each
  db/each-fixture
  test-table
  setup)

(t/deftest create
  (let [entity (storage/tx-create (build-test-entity))]
    (t/is (some? (storage/tx-get-one (aggregate/id entity))))))

(t/deftest change
  (let [entity (storage/tx-create (build-test-entity))
        _      (storage/tx-alter entity update :counter inc)
        entity (storage/tx-get-one (:id entity))]
    (t/is (= 1 (:counter entity)))))

(t/deftest identity-map-persisted
  (let [id (:id (storage/tx-create (build-test-entity)))]
    (storage/with-tx t
      (let [x (storage/get-one t id)
            y (storage/get-one t id)]
        (t/is (identical? x y))))))

(t/deftest identity-map-in-memory
  (storage/with-tx t
    (let [x (storage/create t (build-test-entity))
          y (storage/get-one t (aggregate/id @x))]
      (t/is (identical? x y)))))

(t/deftest identity-map-swap
  (storage/with-tx t
    (let [x (storage/create t (build-test-entity))
          y (storage/get-one t (aggregate/id @x))]
       (dosync (alter x update :counter inc))
      (t/is (= 1 (:counter @x) (:counter @y))))))

(t/deftest concurrency
  (let [test (storage/tx-create (build-test-entity))
        id   (aggregate/id test)
        n    10
        _    (->> (repeatedly #(future (storage/tx-alter test update :counter inc)))
                  (take n)
                  (doall)
                  (map deref)
                  (doall))
        test (storage/tx-get-one id)]
    (t/is (= n (:counter test)))))

(t/deftest inner-concurrency
  (let [test (storage/tx-create (build-test-entity))
        id   (aggregate/id test)
        n    10
        _    (storage/with-tx t
               (->> (repeatedly #(future (as-> id <>
                                           (storage/get-one t <>)
                                           (dosync (alter <> update :counter inc)))))
                    (take n)
                    (doall)
                    (map deref)
                    (doall)))
        test (storage/tx-get-one id)]
    (t/is (= n (:counter test)))))

Оптимизация

Например, у нас есть агрегат Пост, содержащий вложенные комментарии. Пост и Комментарий сохраняются в отдельных таблицах. Для начальной и новой версии агрегата нужно сгенерировать списки операций вставки:

;; initial
[[:post {:id 1, :title "123", :content "123"}]
 [:comment {:id 1, :title "awesome!"}]]

;; current
[[:post {:id 1, :title "123", :content "123 - additional text"}]
 [:comment {:id 1, :title "awesome!"}]]

Сравнивая эти списки получаем набор sql операций. В данном случае нужно только удалить и вставить строку с постом, т.к. комментарии не изменились.


Возможно, вы обратили внимание на <1>. Чтобы найти одну запись, нужно выполнить select для всех мапперов. Такой подход сильно упрощает логику, но ухудшает производительность. Если ожидается, что в вашем приложении будет большое кол-во агрегатов, то стоит добавить в абстракции поддержку пространств для идентификаторов:

;; было
(id-generator/generate)
(storage/get-many t some-ids)

;; стало
(id-generator/generate :user)
(storage/get-many t :user some-ids)
(aggregate/space user) ;; => :user

Mappers

Самостоятельно разберите мапперы Пользователя и Поста: