Locks & Versioning
To ensure changes to platform data are properly recorded,
ixmp4 has a data versioning mechanism in place on PostgreSQL
databases.
This mechanism records all rows changed by statements executed
on marked tables and associates them with transaction records.
In order to coordinate between many concurrent clients, a locking
mechanism is implemented around the Run class.
This locking mechanism is also enabled on SQLite databases.
Locking Runs
To acquire the lock on a Run,
the transact() context manager may be used.
- Run.transact(message: str, timeout: float | None = None, revert_platform_on_error: bool = False) Generator[None, None, None]
Context manager to lock the run before yielding control back to the caller. The run is unlocked and a checkpoint with the provided message is created after the context manager exits. If an exception occurs, the run is reverted to the last checkpoint or if no checkpoint exists, to the transaction the run was locked at.
If the run is already locked, the context manager will throw Run.IsLocked or if timeout is provided retry until the timeout has passed (and then throw the original Run.IsLocked exception).
with run.transact("My message"): # perform operations that require locking the run run.meta["new-key"] = 1 #> Checkpoint created and run unlocked
- Parameters:
- Raises:
ixmp4.core.exceptions.RunIsLocked – If the run is already locked and no timeout is provided or the provided timeout is exceeded.
Internally, ixmp4 will save the last transaction id in the database as
the lock_transaction column for the run’s row.
Any subsequent attempt to lock the run will result in an exception as
the column will be read and only updated if it currently contains NULL / None.
No additional checks for lock ownership are implemented in the data layer.
Any well-behaved ixmp4 client must remember that it has acquired this
run’s lock, for example with the owns_lock property.
Lock ownership is bound to a single run, not the entire platform, so only one of
these instances may acquire a lock.
run1 = platform.runs.get("Model", "Scenario")
run2 = platform.runs.get("Model", "Scenario")
with run1.transact("Add xyz data"):
run2.iamc.add(...)
# ^ `RunIsLocked`
with run1.transact("Add xyz data"):
with run2.transact("Second locking attempt.")
# ^ `RunIsLocked`
pass
with run1.transact("Add xyz data"):
run1.iamc.add(...)
# ^ works!
Note
All other operations that potentially result in a
RunIsLocked exception do so solely
because of client-side checks, making the locking mechanism essentially
a method of coordination for clients that rely on versioning facilities
and thus not a reliable security mechanism in any manner.
Version Tables
Any table with versioning enabled, has a sibling table with at least three additional columns to which all changes are recorded.
- transaction_id:
The transaction at which the row was created, updated or deleted.
- end_transaction_id:
The transaction at which a newer row in this table represents the current state.
- operation_type:
The
Operationthat was performed to result in the row.
The two tables are then “linked” via a set of database triggers that record the changes made by emitted statements.
- class ixmp4.data.versions.PostgresVersionTriggers(table: Table | FromClause, version_table: Table | FromClause, transaction_table: Table = Table('transaction', MetaData(), Column('issued_at', DateTime(), table=<transaction>, nullable=False), Column('id', Integer(), table=<transaction>, primary_key=True, nullable=False, server_default=Identity(on_null=True, start=1, increment=1)), schema=None), transaction_id_column: ColumnElement[int] | None = None, end_transaction_id_column: ColumnElement[int] | None = None, operation_type_column: ColumnElement[int] | None = None)
Represents a set of triggers on a source table that record changes to a version table.
from sqlalchemy import orm from toolkit.db.types import String from ixmp4.data import versions from ixmp4.data.base.db import BaseModel class Example(BaseModel): # ... name: String = orm.mapped_column(unique=True) class ExampleVersion(versions.BaseVersionModel): # ... name: String # omit any constraints version_triggers = versions.PostgresVersionTriggers( Example.__table__, ExampletVersion.__table__ )
A set of listeners will be attached to the provided tables. On creation (for example via
BaseModel.metadata.create_all()) each source table will also emit statements to create the triggers. On deletion (f.e..drop_all()) the triggers will also be dropped.Since these triggers don’t support alembic autogeneration, a migration has to be added manually to create or update them.
def upgrade(): conn = op.get_bind() dialect_name = conn.dialect.name metadata = sa.MetaData() if dialect_name == "postgresql": transaction_table = sa.Table( "transaction", metadata, autoload_with=conn ) version_table = sa.Table( "example", metadata, autoload_with=conn ) data_table = sa.Table( "example_version", metadata, autoload_with=conn ) triggers = PostgresVersionTriggers( data_table, version_table, transaction_table ) triggers.create_entities(conn) def downgrade(): # ... if dialect_name == "postgresql": # ... triggers.drop_entities(conn)
- sync_entities(con: Session | Connection) None
Recreate all DDL entities for this trigger group on a supplied connection.
- create_entities(con: Session | Connection) None
Create all DDL entities for this trigger group on a supplied connection.
Checkpoints
Checkpoints are used to label important transactions in the transaction history
as to provide “anchors” to revert to. They are created automatically at the exit
point of the transact() context manager and can be manually
created at any other point.
with run.transact("Add data"):
run.iamc.add(...)
run.checkpoints.create("Add IAMC Data")
#> Checkpoint "Add IAMC Data" created
run.meta["key"] = "value"
#> Checkpoint "Add data" created
If an exception occurs within a transact() block,
data in the run will be rolled back to the latest checkpoint on platforms
that support versioning.