buy the book ribbon

Appendix C: Swapping Out the Infrastructure: Do Everything with CSVs

This appendix is intended as a little illustration of the benefits of the Repository, Unit of Work, and Service Layer patterns. It’s intended to follow from [chapter_06_uow].

Just as we finish building out our Flask API and getting it ready for release, the business comes to us apologetically, saying they’re not ready to use our API and asking if we could build a thing that reads just batches and orders from a couple of CSVs and outputs a third CSV with allocations.

Ordinarily this is the kind of thing that might have a team cursing and spitting and making notes for their memoirs. But not us! Oh no, we’ve ensured that our infrastructure concerns are nicely decoupled from our domain model and service layer. Switching to CSVs will be a simple matter of writing a couple of new Repository and UnitOfWork classes, and then we’ll be able to reuse all of our logic from the domain layer and the service layer.

Here’s an E2E test to show you how the CSVs flow in and out:

A first CSV test (tests/e2e/test_csv.py)
def test_cli_app_reads_csvs_with_batches_and_orders_and_outputs_allocations(make_csv):
    sku1, sku2 = random_ref("s1"), random_ref("s2")
    batch1, batch2, batch3 = random_ref("b1"), random_ref("b2"), random_ref("b3")
    order_ref = random_ref("o")
    make_csv("batches.csv", [
        ["ref", "sku", "qty", "eta"],
        [batch1, sku1, 100, ""],
        [batch2, sku2, 100, "2011-01-01"],
        [batch3, sku2, 100, "2011-01-02"],
    ])
    orders_csv = make_csv("orders.csv", [
        ["orderid", "sku", "qty"],
        [order_ref, sku1, 3],
        [order_ref, sku2, 12],
    ])

    run_cli_script(orders_csv.parent)

    expected_output_csv = orders_csv.parent / "allocations.csv"
    with open(expected_output_csv) as f:
        rows = list(csv.reader(f))
    assert rows == [
        ["orderid", "sku", "qty", "batchref"],
        [order_ref, sku1, "3", batch1],
        [order_ref, sku2, "12", batch2],
    ]

Diving in and implementing without thinking about repositories and all that jazz, you might start with something like this:

A first cut of our CSV reader/writer (src/bin/allocate-from-csv)
#!/usr/bin/env python
import csv
import sys
from datetime import datetime
from pathlib import Path

from allocation.domain import model


def load_batches(batches_path):
    batches = []
    with batches_path.open() as inf:
        reader = csv.DictReader(inf)
        for row in reader:
            if row["eta"]:
                eta = datetime.strptime(row["eta"], "%Y-%m-%d").date()
            else:
                eta = None
            batches.append(
                model.Batch(
                    ref=row["ref"], sku=row["sku"], qty=int(row["qty"]), eta=eta
                )
            )
    return batches


def main(folder):
    batches_path = Path(folder) / "batches.csv"
    orders_path = Path(folder) / "orders.csv"
    allocations_path = Path(folder) / "allocations.csv"

    batches = load_batches(batches_path)

    with orders_path.open() as inf, allocations_path.open("w") as outf:
        reader = csv.DictReader(inf)
        writer = csv.writer(outf)
        writer.writerow(["orderid", "sku", "batchref"])
        for row in reader:
            orderid, sku = row["orderid"], row["sku"]
            qty = int(row["qty"])
            line = model.OrderLine(orderid, sku, qty)
            batchref = model.allocate(line, batches)
            writer.writerow([line.orderid, line.sku, batchref])


if __name__ == "__main__":
    main(sys.argv[1])

It’s not looking too bad! And we’re reusing our domain model objects and our domain service.

But it’s not going to work. Existing allocations need to also be part of our permanent CSV storage. We can write a second test to force us to improve things:

And another one, with existing allocations (tests/e2e/test_csv.py)
def test_cli_app_also_reads_existing_allocations_and_can_append_to_them(make_csv):
    sku = random_ref("s")
    batch1, batch2 = random_ref("b1"), random_ref("b2")
    old_order, new_order = random_ref("o1"), random_ref("o2")
    make_csv("batches.csv", [
        ["ref", "sku", "qty", "eta"],
        [batch1, sku, 10, "2011-01-01"],
        [batch2, sku, 10, "2011-01-02"],
    ])
    make_csv("allocations.csv", [
        ["orderid", "sku", "qty", "batchref"],
        [old_order, sku, 10, batch1],
    ])
    orders_csv = make_csv("orders.csv", [
        ["orderid", "sku", "qty"],
        [new_order, sku, 7],
    ])

    run_cli_script(orders_csv.parent)

    expected_output_csv = orders_csv.parent / "allocations.csv"
    with open(expected_output_csv) as f:
        rows = list(csv.reader(f))
    assert rows == [
        ["orderid", "sku", "qty", "batchref"],
        [old_order, sku, "10", batch1],
        [new_order, sku, "7", batch2],
    ]

And we could keep hacking about and adding extra lines to that load_batches function, and some sort of way of tracking and saving new allocations—but we already have a model for doing that! It’s called our Repository and Unit of Work patterns.

All we need to do ("all we need to do") is reimplement those same abstractions, but with CSVs underlying them instead of a database. And as you’ll see, it really is relatively straightforward.

Implementing a Repository and Unit of Work for CSVs

Here’s what a CSV-based repository could look like. It abstracts away all the logic for reading CSVs from disk, including the fact that it has to read two different CSVs (one for batches and one for allocations), and it gives us just the familiar .list() API, which provides the illusion of an in-memory collection of domain objects:

A repository that uses CSV as its storage mechanism (src/allocation/service_layer/csv_uow.py)
class CsvRepository(repository.AbstractRepository):
    def __init__(self, folder):
        self._batches_path = Path(folder) / "batches.csv"
        self._allocations_path = Path(folder) / "allocations.csv"
        self._batches = {}  # type: Dict[str, model.Batch]
        self._load()

    def get(self, reference):
        return self._batches.get(reference)

    def add(self, batch):
        self._batches[batch.reference] = batch

    def _load(self):
        with self._batches_path.open() as f:
            reader = csv.DictReader(f)
            for row in reader:
                ref, sku = row["ref"], row["sku"]
                qty = int(row["qty"])
                if row["eta"]:
                    eta = datetime.strptime(row["eta"], "%Y-%m-%d").date()
                else:
                    eta = None
                self._batches[ref] = model.Batch(ref=ref, sku=sku, qty=qty, eta=eta)
        if self._allocations_path.exists() is False:
            return
        with self._allocations_path.open() as f:
            reader = csv.DictReader(f)
            for row in reader:
                batchref, orderid, sku = row["batchref"], row["orderid"], row["sku"]
                qty = int(row["qty"])
                line = model.OrderLine(orderid, sku, qty)
                batch = self._batches[batchref]
                batch._allocations.add(line)

    def list(self):
        return list(self._batches.values())

And here’s what a UoW for CSVs would look like:

A UoW for CSVs: commit = csv.writer (src/allocation/service_layer/csv_uow.py)
class CsvUnitOfWork(unit_of_work.AbstractUnitOfWork):
    def __init__(self, folder):
        self.batches = CsvRepository(folder)

    def commit(self):
        with self.batches._allocations_path.open("w") as f:
            writer = csv.writer(f)
            writer.writerow(["orderid", "sku", "qty", "batchref"])
            for batch in self.batches.list():
                for line in batch._allocations:
                    writer.writerow(
                        [line.orderid, line.sku, line.qty, batch.reference]
                    )

    def rollback(self):
        pass

And once we have that, our CLI app for reading and writing batches and allocations to CSV is pared down to what it should be—a bit of code for reading order lines, and a bit of code that invokes our existing service layer:

Allocation with CSVs in nine lines (src/bin/allocate-from-csv)
def main(folder):
    orders_path = Path(folder) / "orders.csv"
    uow = csv_uow.CsvUnitOfWork(folder)
    with orders_path.open() as f:
        reader = csv.DictReader(f)
        for row in reader:
            orderid, sku = row["orderid"], row["sku"]
            qty = int(row["qty"])
            services.allocate(orderid, sku, qty, uow)

Ta-da! Now are y’all impressed or what?

Much love,

Bob and Harry