Locks & Versioning

To ensure changes to platform data are properly recorded, ixmp4 has a data versioning mechanism in place on PostgreSQL databases. This mechanism records all rows changed by statements executed on marked tables and associates them with transaction records. In order to coordinate between many concurrent clients, a locking mechanism is implemented around the Run class. This locking mechanism is also enabled on SQLite databases.

Locking Runs

To acquire the lock on a Run, the transact() context manager may be used.

Run.transact(message: str, timeout: float | None = None, revert_platform_on_error: bool = False) Generator[None, None, None]

Context manager to lock the run before yielding control back to the caller. The run is unlocked and a checkpoint with the provided message is created after the context manager exits. If an exception occurs, the run is reverted to the last checkpoint or if no checkpoint exists, to the transaction the run was locked at.

If the run is already locked, the context manager will throw Run.IsLocked or if timeout is provided retry until the timeout has passed (and then throw the original Run.IsLocked exception).

with run.transact("My message"):
    # perform operations that require locking the run
    run.meta["new-key"] = 1

#> Checkpoint created and run unlocked
Parameters:
  • messsage (str) – The message for the checkpoint created after conclusion of the context manager.

  • timeout (int, optional) – Timeout in seconds.

  • revert_platform_on_error (bool, optional) – Whether to revert units when encountering an error; default False.

Raises:

ixmp4.core.exceptions.RunIsLocked – If the run is already locked and no timeout is provided or the provided timeout is exceeded.

Internally, ixmp4 will save the last transaction id in the database as the lock_transaction column for the run’s row. Any subsequent attempt to lock the run will result in an exception as the column will be read and only updated if it currently contains NULL / None. No additional checks for lock ownership are implemented in the data layer.

Any well-behaved ixmp4 client must remember that it has acquired this run’s lock, for example with the owns_lock property. Lock ownership is bound to a single run, not the entire platform, so only one of these instances may acquire a lock.

run1 = platform.runs.get("Model", "Scenario")
run2 = platform.runs.get("Model", "Scenario")

with run1.transact("Add xyz data"):
    run2.iamc.add(...)
    # ^ `RunIsLocked`

with run1.transact("Add xyz data"):
    with run2.transact("Second locking attempt.")
        # ^ `RunIsLocked`
        pass

with run1.transact("Add xyz data"):
    run1.iamc.add(...)
    # ^ works!

Note

All other operations that potentially result in a RunIsLocked exception do so solely because of client-side checks, making the locking mechanism essentially a method of coordination for clients that rely on versioning facilities and thus not a reliable security mechanism in any manner.

Version Tables

Any table with versioning enabled, has a sibling table with at least three additional columns to which all changes are recorded.

  • transaction_id:

    The transaction at which the row was created, updated or deleted.

  • end_transaction_id:

    The transaction at which a newer row in this table represents the current state.

  • operation_type:

    The Operation that was performed to result in the row.

class ixmp4.data.versions.model.Operation(*values)
INSERT = 0
UPDATE = 1
DELETE = 2

The two tables are then “linked” via a set of database triggers that record the changes made by emitted statements.

class ixmp4.data.versions.PostgresVersionTriggers(table: Table | FromClause, version_table: Table | FromClause, transaction_table: Table = Table('transaction', MetaData(), Column('issued_at', DateTime(), table=<transaction>, nullable=False), Column('id', Integer(), table=<transaction>, primary_key=True, nullable=False, server_default=Identity(on_null=True, start=1, increment=1)), schema=None), transaction_id_column: ColumnElement[int] | None = None, end_transaction_id_column: ColumnElement[int] | None = None, operation_type_column: ColumnElement[int] | None = None)

Represents a set of triggers on a source table that record changes to a version table.

from sqlalchemy import orm
from toolkit.db.types import String

from ixmp4.data import versions
from ixmp4.data.base.db import BaseModel

class Example(BaseModel):
    # ...
    name: String = orm.mapped_column(unique=True)

class ExampleVersion(versions.BaseVersionModel):
    # ...
    name: String # omit any constraints

version_triggers = versions.PostgresVersionTriggers(
    Example.__table__, ExampletVersion.__table__
)

A set of listeners will be attached to the provided tables. On creation (for example via BaseModel.metadata.create_all()) each source table will also emit statements to create the triggers. On deletion (f.e. .drop_all()) the triggers will also be dropped.

Since these triggers don’t support alembic autogeneration, a migration has to be added manually to create or update them.

def upgrade():
    conn = op.get_bind()
    dialect_name = conn.dialect.name
    metadata = sa.MetaData()

    if dialect_name == "postgresql":
        transaction_table = sa.Table(
            "transaction", metadata, autoload_with=conn
        )
        version_table = sa.Table(
            "example", metadata, autoload_with=conn
        )
        data_table = sa.Table(
            "example_version", metadata, autoload_with=conn
        )
        triggers = PostgresVersionTriggers(
            data_table, version_table, transaction_table
        )
        triggers.create_entities(conn)

def downgrade():
    # ...
    if dialect_name == "postgresql":
        # ...
        triggers.drop_entities(conn)
sync_entities(con: Session | Connection) None

Recreate all DDL entities for this trigger group on a supplied connection.

create_entities(con: Session | Connection) None

Create all DDL entities for this trigger group on a supplied connection.

drop_entities(con: Session | Connection) None

Drop all DDL entities for this trigger group on a supplied connection.

create_listeners() None

Creates listeners for trigger creation when using BaseModel.metadata.create_all(…), for example when testing.

Checkpoints

Checkpoints are used to label important transactions in the transaction history as to provide “anchors” to revert to. They are created automatically at the exit point of the transact() context manager and can be manually created at any other point.

with run.transact("Add data"):
    run.iamc.add(...)
    run.checkpoints.create("Add IAMC Data")
    #> Checkpoint "Add IAMC Data" created
    run.meta["key"] = "value"
#> Checkpoint "Add data" created

If an exception occurs within a transact() block, data in the run will be rolled back to the latest checkpoint on platforms that support versioning.