Skip to content

DynamoDB API Reference

DynamoDBBackend

DynamoDBBackend

DynamoDBBackend(table_name: str, adapter: Optional[DynamoDBAdapter] = None, region_name: Optional[str] = None, endpoint_url: Optional[str] = None, **boto3_kwargs: Any)

Bases: Backend

DynamoDB storage backend.

Handles connection management, CRUD operations, and query building for AWS DynamoDB tables.

Example

from restmachine_orm.backends.dynamodb import DynamoDBBackend from restmachine_orm.backends.adapters import DynamoDBAdapter

backend = DynamoDBBackend( ... table_name="my-table", ... adapter=DynamoDBAdapter() ... )

class TodoItem(Model): ... class Meta: ... backend = backend ... user_id: str ... todo_id: str ... title: str ... @partition_key ... def pk(self) -> str: ... return f"USER#{self.user_id}" ... @sort_key ... def sk(self) -> str: ... return f"TODO#{self.todo_id}"

Initialize DynamoDB backend.

Parameters:

Name Type Description Default
table_name str

Name of the DynamoDB table

required
adapter Optional[DynamoDBAdapter]

DynamoDBAdapter instance (uses default if not provided)

None
region_name Optional[str]

AWS region (e.g., 'us-east-1')

None
endpoint_url Optional[str]

Custom endpoint URL (for local DynamoDB)

None
**boto3_kwargs Any

Additional arguments for boto3.resource()

{}
Source code in packages/restmachine-orm-dynamodb/src/restmachine_orm_dynamodb/backend.py
def __init__(
    self,
    table_name: str,
    adapter: Optional[DynamoDBAdapter] = None,
    region_name: Optional[str] = None,
    endpoint_url: Optional[str] = None,
    **boto3_kwargs: Any,
):
    """
    Initialize DynamoDB backend.

    Args:
        table_name: Name of the DynamoDB table
        adapter: DynamoDBAdapter instance (uses default if not provided)
        region_name: AWS region (e.g., 'us-east-1')
        endpoint_url: Custom endpoint URL (for local DynamoDB)
        **boto3_kwargs: Additional arguments for boto3.resource()
    """
    # Store adapter in private attribute to avoid property conflicts
    self._dynamodb_adapter = adapter or DynamoDBAdapter()
    # Initialize parent with the adapter
    super().__init__(self._dynamodb_adapter)

    self.table_name = table_name
    self.region_name = region_name
    self.endpoint_url = endpoint_url
    self.boto3_kwargs = boto3_kwargs

    # Lazy initialization of DynamoDB resource
    self._dynamodb_resource: Optional[Any] = None
    self._table: Optional["Table"] = None

Functions

create

create(model_class: type[Model], data: dict[str, Any]) -> dict[str, Any]

Create a new record in DynamoDB.

Parameters:

Name Type Description Default
model_class type[Model]

Model class

required
data dict[str, Any]

Record data

required

Returns:

Type Description
dict[str, Any]

Created record data

Raises:

Type Description
DuplicateKeyError

If item with same key already exists

Source code in packages/restmachine-orm-dynamodb/src/restmachine_orm_dynamodb/backend.py
def create(self, model_class: type["Model"], data: dict[str, Any]) -> dict[str, Any]:
    """
    Create a new record in DynamoDB.

    Args:
        model_class: Model class
        data: Record data

    Returns:
        Created record data

    Raises:
        DuplicateKeyError: If item with same key already exists
    """
    # Ensure extensions are configured
    self._ensure_configured(model_class)

    # Create instance to generate keys
    instance = model_class(**data)

    # Transform to storage format (adds pk, sk, entity_type)
    item = self.adapter.model_to_storage(instance)

    # Call extension hooks
    item = self._call_serialize_hooks(model_class, item)
    self._call_validate_hooks(model_class, item)

    # Convert to DynamoDB types
    item = self._python_to_dynamodb(item)

    try:
        # Use ConditionExpression to prevent overwriting existing items
        pk_attr = self.adapter.pk_attribute
        self.table.put_item(
            Item=item,
            ConditionExpression=f"attribute_not_exists({pk_attr})"
        )
    except ClientError as e:
        if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
            raise DuplicateKeyError(
                f"Item with key {item.get(pk_attr)} already exists"
            )
        raise

    # Convert back to Python types
    item = self._dynamodb_to_python(item)

    # Transform back to model format
    result_data = self.adapter.storage_to_model(model_class, item)
    result_data = self._call_deserialize_hooks(model_class, result_data)
    return result_data

upsert

upsert(model_class: type[Model], data: dict[str, Any]) -> dict[str, Any]

Create or update a record (upsert).

Parameters:

Name Type Description Default
model_class type[Model]

Model class

required
data dict[str, Any]

Record data

required

Returns:

Type Description
dict[str, Any]

Upserted record data

Source code in packages/restmachine-orm-dynamodb/src/restmachine_orm_dynamodb/backend.py
def upsert(self, model_class: type["Model"], data: dict[str, Any]) -> dict[str, Any]:
    """
    Create or update a record (upsert).

    Args:
        model_class: Model class
        data: Record data

    Returns:
        Upserted record data
    """
    # Ensure extensions are configured
    self._ensure_configured(model_class)

    # Create instance to generate keys
    instance = model_class(**data)

    # Transform to storage format (adds pk, sk, entity_type)
    item = self.adapter.model_to_storage(instance)

    # Call extension hooks
    item = self._call_serialize_hooks(model_class, item)
    self._call_validate_hooks(model_class, item)

    # Convert to DynamoDB types
    item = self._python_to_dynamodb(item)

    # Put item without condition - overwrites if exists
    self.table.put_item(Item=item)

    # Convert back to Python types
    item = self._dynamodb_to_python(item)

    # Transform back to model format
    result_data = self.adapter.storage_to_model(model_class, item)
    result_data = self._call_deserialize_hooks(model_class, result_data)
    return result_data

get

get(model_class: type[Model], **filters: Any) -> Optional[dict[str, Any]]

Get a single record by key.

Parameters:

Name Type Description Default
model_class type[Model]

Model class

required
**filters Any

Filter conditions (must include enough to build key)

{}

Returns:

Type Description
Optional[dict[str, Any]]

Record data, or None if not found

Source code in packages/restmachine-orm-dynamodb/src/restmachine_orm_dynamodb/backend.py
def get(self, model_class: type["Model"], **filters: Any) -> Optional[dict[str, Any]]:
    """
    Get a single record by key.

    Args:
        model_class: Model class
        **filters: Filter conditions (must include enough to build key)

    Returns:
        Record data, or None if not found
    """
    # Create instance without validation to generate keys
    instance = model_class.model_construct(**filters)

    # Get key attributes from adapter
    pk_value = self.adapter._get_partition_key_value(instance)
    sk_value = self.adapter._get_sort_key_value(instance)

    if not pk_value:
        # Fall back to query
        result = self.query(model_class).and_(**filters).first()
        if result:
            return result.model_dump()
        return None

    # Build key dict
    key = {self.adapter.pk_attribute: pk_value}
    if sk_value:
        key[self.adapter.sk_attribute] = sk_value

    try:
        response = self.table.get_item(Key=key)
        if "Item" in response:
            item = self._dynamodb_to_python(response["Item"])
            model_data = self.adapter.storage_to_model(model_class, item)
            model_data = self._call_deserialize_hooks(model_class, model_data)
            return model_data
    except ClientError:
        pass

    return None

update

update(model_class: type[Model], instance: Model) -> dict[str, Any]

Update an existing record.

Parameters:

Name Type Description Default
model_class type[Model]

Model class

required
instance Model

Model instance with updated data

required

Returns:

Type Description
dict[str, Any]

Updated record data

Raises:

Type Description
NotFoundError

If record not found

Source code in packages/restmachine-orm-dynamodb/src/restmachine_orm_dynamodb/backend.py
def update(
    self,
    model_class: type["Model"],
    instance: "Model"
) -> dict[str, Any]:
    """
    Update an existing record.

    Args:
        model_class: Model class
        instance: Model instance with updated data

    Returns:
        Updated record data

    Raises:
        NotFoundError: If record not found
    """
    # Get key values from adapter
    pk_value = self.adapter._get_partition_key_value(instance)
    sk_value = self.adapter._get_sort_key_value(instance)

    if not pk_value:
        raise RuntimeError("Cannot update without partition key")

    # Build key dict
    key = {self.adapter.pk_attribute: pk_value}
    if sk_value:
        key[self.adapter.sk_attribute] = sk_value

    # Transform to storage format
    item = self.adapter.model_to_storage(instance)

    # Call extension hooks
    item = self._call_serialize_hooks(model_class, item)
    self._call_validate_hooks(model_class, item)

    item = self._python_to_dynamodb(item)

    # Build update expression
    update_parts = []
    expression_attribute_names = {}
    expression_attribute_values = {}

    for field_name, value in item.items():
        # Skip key attributes
        if field_name in (self.adapter.pk_attribute, self.adapter.sk_attribute):
            continue

        # Use attribute name aliases to handle reserved words
        attr_alias = f"#{field_name}"
        value_alias = f":{field_name}"

        update_parts.append(f"{attr_alias} = {value_alias}")
        expression_attribute_names[attr_alias] = field_name
        expression_attribute_values[value_alias] = value

    if not update_parts:
        # No fields to update
        return self.adapter.storage_to_model(model_class, item)

    update_expression = "SET " + ", ".join(update_parts)

    try:
        response = self.table.update_item(
            Key=key,
            UpdateExpression=update_expression,
            ExpressionAttributeNames=expression_attribute_names,
            ExpressionAttributeValues=expression_attribute_values,
            ConditionExpression=f"attribute_exists({self.adapter.pk_attribute})",
            ReturnValues="ALL_NEW"
        )

        updated_item = self._dynamodb_to_python(response["Attributes"])
        result_data = self.adapter.storage_to_model(model_class, updated_item)
        result_data = self._call_deserialize_hooks(model_class, result_data)
        return result_data

    except ClientError as e:
        if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
            raise NotFoundError("Record not found")
        raise

delete

delete(model_class: type[Model], instance: Model) -> bool

Delete a record.

Parameters:

Name Type Description Default
model_class type[Model]

Model class

required
instance Model

Model instance to delete

required

Returns:

Type Description
bool

True if deleted successfully

Source code in packages/restmachine-orm-dynamodb/src/restmachine_orm_dynamodb/backend.py
def delete(self, model_class: type["Model"], instance: "Model") -> bool:
    """
    Delete a record.

    Args:
        model_class: Model class
        instance: Model instance to delete

    Returns:
        True if deleted successfully
    """
    # Get key values from adapter
    pk_value = self.adapter._get_partition_key_value(instance)
    sk_value = self.adapter._get_sort_key_value(instance)

    if not pk_value:
        raise RuntimeError("Cannot delete without partition key")

    # Build key dict
    key = {self.adapter.pk_attribute: pk_value}
    if sk_value:
        key[self.adapter.sk_attribute] = sk_value

    try:
        self.table.delete_item(
            Key=key,
            ConditionExpression=f"attribute_exists({self.adapter.pk_attribute})"
        )
        return True
    except ClientError as e:
        if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
            return False
        raise

query

query(model_class: type[Model]) -> QueryBuilder

Create a query builder for complex queries.

Returns:

Type Description
QueryBuilder

DynamoDBQueryBuilder instance

Source code in packages/restmachine-orm-dynamodb/src/restmachine_orm_dynamodb/backend.py
def query(self, model_class: type["Model"]) -> "QueryBuilder":
    """
    Create a query builder for complex queries.

    Returns:
        DynamoDBQueryBuilder instance
    """
    query_builder: QueryBuilder = DynamoDBQueryBuilder(model_class, self)
    # Call extension hooks to modify query behavior
    query_builder = self._call_modify_query_hooks(model_class, query_builder)
    return query_builder

batch_create

batch_create(model_class: type[Model], records: list[dict[str, Any]]) -> list[dict[str, Any]]

Create multiple records in batch.

Parameters:

Name Type Description Default
model_class type[Model]

Model class

required
records list[dict[str, Any]]

List of record data

required

Returns:

Type Description
list[dict[str, Any]]

List of created record data

Source code in packages/restmachine-orm-dynamodb/src/restmachine_orm_dynamodb/backend.py
def batch_create(
    self,
    model_class: type["Model"],
    records: list[dict[str, Any]]
) -> list[dict[str, Any]]:
    """
    Create multiple records in batch.

    Args:
        model_class: Model class
        records: List of record data

    Returns:
        List of created record data
    """
    if not records:
        return []

    # Ensure extensions are configured
    self._ensure_configured(model_class)

    # Convert to DynamoDB format
    items = []
    for data in records:
        instance = model_class(**data)
        item = self.adapter.model_to_storage(instance)
        # Call extension hooks
        item = self._call_serialize_hooks(model_class, item)
        self._call_validate_hooks(model_class, item)
        item = self._python_to_dynamodb(item)
        items.append(item)

    # Batch write (handles batching of 25 items automatically)
    with self.table.batch_writer() as batch:
        for item in items:
            batch.put_item(Item=item)

    # Return the items converted back to model format
    results = []
    for item in items:
        item = self._dynamodb_to_python(item)
        model_data = self.adapter.storage_to_model(model_class, item)
        model_data = self._call_deserialize_hooks(model_class, model_data)
        results.append(model_data)

    return results

batch_get

batch_get(model_class: type[Model], keys: list[dict[str, Any]]) -> list[dict[str, Any]]

Get multiple items by their keys.

Parameters:

Name Type Description Default
model_class type[Model]

Model class

required
keys list[dict[str, Any]]

List of key dicts (e.g., [{"id": "1"}, {"id": "2"}])

required

Returns:

Type Description
list[dict[str, Any]]

List of record data

Source code in packages/restmachine-orm-dynamodb/src/restmachine_orm_dynamodb/backend.py
def batch_get(  # type: ignore[override]
    self,
    model_class: type["Model"],
    keys: list[dict[str, Any]]
) -> list[dict[str, Any]]:
    """
    Get multiple items by their keys.

    Args:
        model_class: Model class
        keys: List of key dicts (e.g., [{"id": "1"}, {"id": "2"}])

    Returns:
        List of record data
    """
    if not keys:
        return []

    # Convert keys to DynamoDB format
    dynamodb_keys = []
    for key_dict in keys:
        # Create instance without validation to generate keys
        instance = model_class.model_construct(**key_dict)
        pk_value = self.adapter._get_partition_key_value(instance)
        sk_value = self.adapter._get_sort_key_value(instance)

        ddb_key = {self.adapter.pk_attribute: pk_value}
        if sk_value:
            ddb_key[self.adapter.sk_attribute] = sk_value
        dynamodb_keys.append(ddb_key)

    # Batch get (handles pagination automatically)
    response = self.dynamodb.batch_get_item(
        RequestItems={
            self.table_name: {
                "Keys": dynamodb_keys
            }
        }
    )

    results = []
    if self.table_name in response.get("Responses", {}):
        for item in response["Responses"][self.table_name]:
            item = self._dynamodb_to_python(item)
            model_data = self.adapter.storage_to_model(model_class, item)
            model_data = self._call_deserialize_hooks(model_class, model_data)
            results.append(model_data)

    return results

DynamoDBAdapter

DynamoDBAdapter

DynamoDBAdapter(*, pk_attribute: str = 'pk', sk_attribute: str = 'sk', entity_type_attribute: str = 'entity_type', include_type_in_sk: bool = False)

Bases: ModelAdapter

Adapter for DynamoDB single-table design.

Maps models to DynamoDB items with: - pk (partition key) - from @partition_key method or primary key field - sk (sort key) - from @sort_key method or default value - entity_type - for filtering different entities in same table - All model fields as attributes - GSI keys from @gsi_partition_key and @gsi_sort_key methods

Initialize DynamoDB adapter.

Parameters:

Name Type Description Default
pk_attribute str

DynamoDB attribute name for partition key

'pk'
sk_attribute str

DynamoDB attribute name for sort key

'sk'
entity_type_attribute str

Attribute name for entity type

'entity_type'
include_type_in_sk bool

Whether to include entity type in sort key

False
Source code in packages/restmachine-orm-dynamodb/src/restmachine_orm_dynamodb/adapter.py
def __init__(
    self,
    *,
    pk_attribute: str = "pk",
    sk_attribute: str = "sk",
    entity_type_attribute: str = "entity_type",
    include_type_in_sk: bool = False,
):
    """
    Initialize DynamoDB adapter.

    Args:
        pk_attribute: DynamoDB attribute name for partition key
        sk_attribute: DynamoDB attribute name for sort key
        entity_type_attribute: Attribute name for entity type
        include_type_in_sk: Whether to include entity type in sort key
    """
    self.pk_attribute = pk_attribute
    self.sk_attribute = sk_attribute
    self.entity_type_attribute = entity_type_attribute
    self.include_type_in_sk = include_type_in_sk

Functions

model_to_storage

model_to_storage(instance: Model) -> dict[str, Any]

Transform model instance to DynamoDB item format.

Source code in packages/restmachine-orm-dynamodb/src/restmachine_orm_dynamodb/adapter.py
def model_to_storage(self, instance: "Model") -> dict[str, Any]:
    """Transform model instance to DynamoDB item format."""
    # Get model data
    data = instance.model_dump()

    # Generate partition key
    pk_value = self._get_partition_key_value(instance)
    if pk_value:
        data[self.pk_attribute] = pk_value
    else:
        # Fallback to primary key field
        from restmachine_orm.models.fields import get_field_orm_metadata

        pk_field = None
        for field_name, field_info in instance.__class__.model_fields.items():
            metadata = get_field_orm_metadata(field_info)
            if metadata.get("primary_key"):
                pk_field = field_name
                break

        if pk_field:
            data[self.pk_attribute] = f"{self.get_entity_type(instance.__class__)}#{data[pk_field]}"
        else:
            raise ValueError(
                f"No partition key method or primary key field defined for {instance.__class__.__name__}"
            )

    # Generate sort key
    sk_value = self._get_sort_key_value(instance)
    if sk_value:
        data[self.sk_attribute] = sk_value
    else:
        # Default sort key
        if self.include_type_in_sk:
            data[self.sk_attribute] = f"{self.get_entity_type(instance.__class__)}#METADATA"
        else:
            data[self.sk_attribute] = "METADATA"

    # Add entity type for filtering
    data[self.entity_type_attribute] = self.get_entity_type(instance.__class__)

    # Add GSI keys
    gsi_keys = self._get_gsi_keys(instance)
    data.update(gsi_keys)

    return data

storage_to_model

storage_to_model(model_class: type[Model], data: dict[str, Any]) -> dict[str, Any]

Transform DynamoDB item to model data.

Source code in packages/restmachine-orm-dynamodb/src/restmachine_orm_dynamodb/adapter.py
def storage_to_model(
    self,
    model_class: type["Model"],
    data: dict[str, Any]
) -> dict[str, Any]:
    """Transform DynamoDB item to model data."""
    # Create a copy to avoid mutating original
    model_data = dict(data)

    # Remove DynamoDB-specific attributes
    model_data.pop(self.pk_attribute, None)
    model_data.pop(self.sk_attribute, None)
    model_data.pop(self.entity_type_attribute, None)

    # Remove GSI attributes
    for key in list(model_data.keys()):
        if key.startswith("gsi_"):
            model_data.pop(key)

    return model_data

get_primary_key_value

get_primary_key_value(instance: Model) -> dict[str, str]

Get composite key for DynamoDB.

Source code in packages/restmachine-orm-dynamodb/src/restmachine_orm_dynamodb/adapter.py
def get_primary_key_value(self, instance: "Model") -> dict[str, str]:
    """Get composite key for DynamoDB."""
    result = {}

    # Get partition key
    pk_value = self._get_partition_key_value(instance)
    if pk_value:
        result[self.pk_attribute] = pk_value
    else:
        # Fallback to primary key field
        from restmachine_orm.models.fields import get_field_orm_metadata

        pk_field = None
        for field_name, field_info in instance.__class__.model_fields.items():
            metadata = get_field_orm_metadata(field_info)
            if metadata.get("primary_key"):
                pk_field = field_name
                break

        if pk_field:
            result[self.pk_attribute] = f"{self.get_entity_type(instance.__class__)}#{getattr(instance, pk_field)}"

    # Get sort key
    sk_value = self._get_sort_key_value(instance)
    if sk_value:
        result[self.sk_attribute] = sk_value
    else:
        if self.include_type_in_sk:
            result[self.sk_attribute] = f"{self.get_entity_type(instance.__class__)}#METADATA"
        else:
            result[self.sk_attribute] = "METADATA"

    return result

Testing

DynamoDBDriver

DynamoDBDriver

DynamoDBDriver(table_name: str = 'test-table', region_name: str = 'us-east-1')

Bases: DriverInterface

Driver that tests against the DynamoDB backend.

This driver uses moto to mock DynamoDB for testing.

Initialize with DynamoDB backend.

Parameters:

Name Type Description Default
table_name str

DynamoDB table name for testing

'test-table'
region_name str

AWS region name

'us-east-1'
Source code in packages/restmachine-orm-dynamodb/src/restmachine_orm_dynamodb/testing/drivers.py
def __init__(self, table_name: str = "test-table", region_name: str = "us-east-1"):
    """
    Initialize with DynamoDB backend.

    Args:
        table_name: DynamoDB table name for testing
        region_name: AWS region name
    """
    self.backend = DynamoDBBackend(
        table_name=table_name,
        region_name=region_name
    )
    self.table_name = table_name
    self.region_name = region_name

Functions

execute_create

execute_create(operation: CreateOperation) -> OperationResult

Execute a create operation.

Source code in packages/restmachine-orm-dynamodb/src/restmachine_orm_dynamodb/testing/drivers.py
def execute_create(self, operation: CreateOperation) -> OperationResult:
    """Execute a create operation."""
    try:
        self.setup_backend(operation.model_class)
        instance = operation.model_class.create(**operation.data)

        return OperationResult(
            success=True,
            instance=instance,
            data=instance.model_dump()
        )
    except Exception as e:
        if operation.should_fail and operation.expected_error:
            if isinstance(e, operation.expected_error):
                return OperationResult(success=False, error=e)
        if not operation.should_fail:
            return OperationResult(success=False, error=e)
        raise

execute_get

execute_get(operation: GetOperation) -> OperationResult

Execute a get operation.

Source code in packages/restmachine-orm-dynamodb/src/restmachine_orm_dynamodb/testing/drivers.py
def execute_get(self, operation: GetOperation) -> OperationResult:
    """Execute a get operation."""
    try:
        self.setup_backend(operation.model_class)
        instance = operation.model_class.get(**operation.filters)

        return OperationResult(
            success=True,
            instance=instance,
            data=instance.model_dump() if instance else None
        )
    except Exception as e:
        return OperationResult(success=False, error=e)

execute_update

execute_update(operation: UpdateOperation) -> OperationResult

Execute an update operation.

Source code in packages/restmachine-orm-dynamodb/src/restmachine_orm_dynamodb/testing/drivers.py
def execute_update(self, operation: UpdateOperation) -> OperationResult:
    """Execute an update operation."""
    try:
        self.setup_backend(operation.model_class)

        for key, value in operation.changes.items():
            setattr(operation.instance, key, value)

        operation.instance.save()

        return OperationResult(
            success=True,
            instance=operation.instance,
            data=operation.instance.model_dump()
        )
    except Exception as e:
        if operation.should_fail:
            return OperationResult(success=False, error=e)
        return OperationResult(success=False, error=e)

execute_delete

execute_delete(operation: DeleteOperation) -> OperationResult

Execute a delete operation.

Source code in packages/restmachine-orm-dynamodb/src/restmachine_orm_dynamodb/testing/drivers.py
def execute_delete(self, operation: DeleteOperation) -> OperationResult:
    """Execute a delete operation."""
    try:
        self.setup_backend(operation.model_class)
        success = operation.instance.delete()

        return OperationResult(
            success=success,
            data={"deleted": success}
        )
    except Exception as e:
        return OperationResult(success=False, error=e)

execute_upsert

execute_upsert(operation: UpsertOperation) -> OperationResult

Execute an upsert operation.

Source code in packages/restmachine-orm-dynamodb/src/restmachine_orm_dynamodb/testing/drivers.py
def execute_upsert(self, operation: UpsertOperation) -> OperationResult:
    """Execute an upsert operation."""
    try:
        self.setup_backend(operation.model_class)
        instance = operation.model_class.upsert(**operation.data)

        return OperationResult(
            success=True,
            instance=instance,
            data=instance.model_dump()
        )
    except Exception as e:
        if operation.should_fail:
            return OperationResult(success=False, error=e)
        return OperationResult(success=False, error=e)

execute_query

execute_query(operation: QueryOperation) -> OperationResult

Execute a query operation.

Source code in packages/restmachine-orm-dynamodb/src/restmachine_orm_dynamodb/testing/drivers.py
def execute_query(self, operation: QueryOperation) -> OperationResult:
    """Execute a query operation."""
    try:
        self.setup_backend(operation.model_class)

        query = operation.model_class.where(**operation.filters) if operation.filters else operation.model_class.where()

        if operation.order_by:
            for order in operation.order_by:
                query = query.order_by(order)

        if operation.limit:
            query = query.limit(operation.limit)
        if operation.offset:
            query = query.offset(operation.offset)

        instances = query.all()

        return OperationResult(
            success=True,
            data=instances
        )
    except Exception as e:
        return OperationResult(success=False, error=e)

count

count(model_class: Type, **filters: Any) -> int

Count instances matching filters.

Source code in packages/restmachine-orm-dynamodb/src/restmachine_orm_dynamodb/testing/drivers.py
def count(self, model_class: Type, **filters: Any) -> int:
    """Count instances matching filters."""
    self.setup_backend(model_class)
    result: int = model_class.where(**filters).count() if filters else model_class.where().count()
    return result

exists

exists(model_class: Type, **filters: Any) -> bool

Check if instances exist matching filters.

Source code in packages/restmachine-orm-dynamodb/src/restmachine_orm_dynamodb/testing/drivers.py
def exists(self, model_class: Type, **filters: Any) -> bool:
    """Check if instances exist matching filters."""
    self.setup_backend(model_class)
    result: bool = model_class.where(**filters).exists()
    return result

clear

clear(model_class: Optional[Type] = None) -> None

Clear storage for testing - DynamoDB requires table scan and delete.

Source code in packages/restmachine-orm-dynamodb/src/restmachine_orm_dynamodb/testing/drivers.py
def clear(self, model_class: Optional[Type] = None) -> None:
    """Clear storage for testing - DynamoDB requires table scan and delete."""
    # For DynamoDB, we'd need to scan and delete all items
    # This is typically not used in DynamoDB tests - tests clean up after themselves
    pass

get_backend_name

get_backend_name() -> str

Get the name of the backend being tested.

Source code in packages/restmachine-orm-dynamodb/src/restmachine_orm_dynamodb/testing/drivers.py
def get_backend_name(self) -> str:
    """Get the name of the backend being tested."""
    return "dynamodb"

setup_backend

setup_backend(model_class: Type) -> None

Set up backend for a model class.

Source code in packages/restmachine-orm-dynamodb/src/restmachine_orm_dynamodb/testing/drivers.py
def setup_backend(self, model_class: Type) -> None:
    """Set up backend for a model class."""
    model_class.model_backend = self.backend