No description
  • Python 99.6%
  • Shell 0.3%
  • Makefile 0.1%
Find a file
2026-06-06 09:39:15 -07:00
.vscode Bump Python version to 3.10 and upgrade dependencies 2026-06-01 14:56:13 -07:00
kosmos `feat: enhance ripple state management to allow custom scope handling and i 2026-06-06 09:39:15 -07:00
scripts Bump Python version to 3.10 and upgrade dependencies 2026-06-01 14:56:13 -07:00
tests `feat: enhance ripple state management to allow custom scope handling and i 2026-06-06 09:39:15 -07:00
.gitignore update 2026-05-29 02:31:24 -07:00
.python-version initial 2026-05-27 13:42:01 -07:00
create_py_project.sh initial 2026-05-27 13:42:01 -07:00
main.py rename: function name changes and variable renames to follow PEP 8 naming c 2026-05-29 02:30:27 -07:00
Makefile Update Makefile to remove unnecessary uv venv --reinstall command 2026-06-02 09:04:10 -07:00
pyproject.toml `feat: enhance ripple state management to allow custom scope handling and i 2026-06-06 09:39:15 -07:00
README.md Rename MongoDetector to follow the new naming convention. Add Projector cla 2026-06-01 18:39:40 -07:00
test.env add mongo database for kosmos 2026-05-31 16:32:51 -07:00

Kosmos-Py

Kosmos-Py is a high-performance, asynchronous MongoDB ORM/ODM-like framework and data persistence layer for Python. It is designed to establish a consistent, type-safe environment for backend services, combining Pydantic models with MongoDB operations, pandas/polars/pyarrow data science integration, secret resolution, timezone-aware timeframing, and hierarchical execution profiling.


Table of Contents

  1. Core Philosophy & Architecture
  2. Ignition & Configuration
  3. Liminal Configuration & Secrets
  4. Defining Models & Lifecycles
  5. Atomic Counters
  6. Field Transformers & Custom Metadata
  7. Querying and Filtering (MongoDetector)
  8. Data Saving & Recording (MongoRecorder)
  9. GridFS Blobs (PersistableBlob)
  10. Time & Profiling Utilities

1. Core Philosophy & Architecture

  • State of Matter (ModelState):
    • Unset: The model exists only in memory and has not been prepared for database synchronization.
    • Transition: The model is collapsing its changes into a database instruction representation.
    • Material: The model is fully synchronized/entangled with the database and is considered stable.
  • Collapse (collapse()): Compiles the current attributes of a model instance (including auto-timestamps, atomic increments, etc.) into a database update instruction representation (Ripple).
  • Decohere (decohere()): Finalizes the model's state in-memory after database update confirmation (e.g. converting a delta increment to a static value, setting IDs).

2. Ignition & Configuration

Before interacting with databases or configuration properties, you must ignite the environment using ignite.

import kosmos as km

# Load environment variables and initialize standard observers
km.ignite("app.env")

The configuration is mapped to connection affinities using PurposeAffinity:

  • PurposeAffinity.Observer: Read-only / read-heavy connections (uses MONGODB_URI).
  • PurposeAffinity.Creator: Write / creation operations (uses MONGODB_CREATOR_URI).
  • PurposeAffinity.Admin: Schema modification / administrative operations (uses MONGODB_ADMIN_URI).

3. Liminal Configuration & Secrets

Kosmos-Py provides a configuration mapper called LiminalStructure that matches environment variables to Python dataclass fields using the MapStruct annotation.

Liminal Configuration Dataclass

from dataclasses import dataclass
from typing import Annotated
import kosmos as km

@dataclass
class CustomAppConstants:
    app_port: Annotated[str, km.ether.MapStruct("PORT")] = "8080"
    db_name: Annotated[str, km.ether.MapStruct("DB_NAME")] = "default_db"

# Wrap inside LiminalStructure to enable lazy environment resolution
liminal_constants = km.ether.struct.LiminalStructure(CustomAppConstants())

# Retrieve resolved constants
constants = liminal_constants.collapse()
print(constants.db_name)

Secrets Resolution

If strings in your environment configuration follow the __secret:SECRET_NAME:VERSION__ pattern, Kosmos-Py will automatically intercept them and query Google Cloud Secret Manager via GCPSecretManager.

# Example Env File: app.env
PROJECT_ID=my-gcp-project-123
MONGODB_URI=mongodb://user:__secret:mongodb-prod-password:latest__@localhost:27017/prod

When ignite parses the URI, the secret is resolved invisibly to the caller, and credentials are masked during standard logging.


4. Defining Models & Lifecycles

All models inherit from Model (a Pydantic wrapper mapping id to _id) and ParticleBase (handling DB metadata), or Persistable (which adds automatic creation and update timestamps).

To declare database mappings, use the @declare_persist_db decorator.

from bson import ObjectId
from pydantic import Field
import kosmos as km

@km.declare_persist_db(db_name="store_db", collection_name="products", version=1)
class Product(km.Persistable):
    name: str = Field(alias="title")
    price: float
    category_id: ObjectId | None = None

Lifecycle & Update Tracking

When a model is instantiated directly, it is marked as modified (has_update = True). When loaded from the database, it is clean (has_update = False). To force-mark a model as needing preservation on save, use mark_updated().

# Query database
detector = km.MongoDetector[Product](Product)
product = detector.filter(km.fld("title") == "Laptop").load_one()

# Modify properties and notify the persistence layer
product.price = 1200.00
product.mark_updated()

# Save modifications back to MongoDB
km.record(product)

5. Atomic Counters

For concurrent systems, Kosmos-Py provides IncrCounter (aliased to IntCounter / ZeroCounter), which compiles to MongoDB $inc operations.

import kosmos as km
from pydantic import Field

@km.declare_persist_db(db_name="store_db", collection_name="stock_inventory")
class Inventory(km.Persistable):
    item_id: str
    stock: km.IncrCounter = Field(default=km.ZeroCounter)

# Initialize Inventory
inv = Inventory(item_id="sku-102")
km.record(inv)

# Load and increment atomically
detector = km.detect(Inventory)
loaded_inv = detector.filter(km.fld("item_id") == "sku-102").load_one()

loaded_inv.stock += 5  # Queues an atomic increment of +5
km.record(loaded_inv)  # Sends {"$inc": {"stock": 5}} to MongoDB

6. Field Transformers & Custom Metadata

Kosmos-Py uses type annotations to hook lifecycle transformations into Pydantic models.

Annotation / Type Target Operations Action
StrUpper Initialization / Set Converts string to uppercase
StrLower Initialization / Set Converts string to lowercase
TimeInserted Document Creation Automatically sets current UTC timestamp once on creation
TimeUpdated Document Update Sets current UTC timestamp on every save

Example Hook Usage:

import kosmos as km
from typing import Annotated

@km.declare_persist_db(db_name="analytics_db", collection_name="logs")
class LogEntry(km.ParticleBase):
    log_level: km.meta.annotation.StrUpper  # Always converts e.g. "info" to "INFO"
    message: str
    created_at: km.meta.annotation.TimeInserted  # Auto set on insert
    updated_at: km.meta.annotation.TimeUpdated   # Auto updated on save

7. Querying and Filtering (MongoDetector)

The MongoDetector class (instantiated with km.detect(Model)) exposes a chainable query builder supporting projection, filtering, pagination, lookups, aggregation, and conversion to standard scientific formats.

Fluent Aggregation & Querying

import kosmos as km

detector = km.detect(Product)

# Chain filters and lookup pipelines
results = (
    detector
    .filter((km.fld("price") > 100) & (km.fld("category") == "electronics"))
    .sort("price", descending=True)
    .skip(10)
    .limit(5)
    .load_many()
)

Type Resolution & Aliasing

Using fld() automatically resolves your Pydantic alias fields to their DB representation.

# In Product, "name" is aliased to "title"
# km.fld("name") compiles to query field: "title"
product = detector.filter(km.fld("name") == "UltraBook").load_one()

Loading to Pandas, Polars, and PyArrow

Kosmos-Py natively outputs your query results as data science containers using pymongoarrow:

# Load query results directly to Pandas
df_pandas = detector.filter(km.fld("price") > 50).load_dataframe()

# Load query results directly to Polars
df_polars = detector.filter(km.fld("price") > 50).load_polars()

# Load query results directly to PyArrow Table
arrow_table = detector.filter(km.fld("price") > 50).load_table()

Async Operations

All loading and aggregation pipeline methods support async execution:

# Async load query
await detector.filter(km.fld("price") > 500).load_one_async()
await detector.filter(km.fld("price") > 500).load_many_async()

8. Data Saving & Recording (MongoRecorder)

Saving and updating operations are handled by MongoRecorder or the wrapper functions km.record(obj) and km.record_async(obj).

Single Document Saving

# Sync saving
km.record(product)

# Async saving
await km.record_async(product)

Dataframe Bulk Operations

You can bulk insert or upsert DataFrames containing hundreds of rows using bulk write queries.

import pandas as pd
import kosmos as km

recorder = km.MongoRecorder(Product)
df = pd.DataFrame([
    {"title": "Tablet A", "price": 299.99},
    {"title": "Tablet B", "price": 499.99}
])

# Insert bulk rows into database
recorder.insert_dataframe(df)

# Upsert dataframes based on specific keys
recorder.update_dataframe(df, on=["title"], upsert=True)

9. GridFS Blobs (PersistableBlob)

For storing large files, inherit from PersistableBlob and specify is_blob=True in your DB declaration. Kosmos-Py manages upload, metadata association, and deletes old chunks from GridFS automatically when files are replaced.

from typing import Optional
import io
import kosmos as km

@km.declare_persist_db(db_name="files_db", collection_name="attachments", is_blob=True)
class DocumentAttachment(km.PersistableBlob):
    data: bytes = b""
    metadata: Optional[dict] = None

    def dump_buffer(self) -> io.BytesIO:
        return io.BytesIO(self.data)

# 1. Upload a new file
new_attachment = DocumentAttachment(
    filename="invoice.pdf",
    data=b"Raw PDF bytes content",
    metadata={"customer_id": "1002"}
)
km.record(new_attachment)  # Uploads to GridFS

# 2. Retrieve file content
detector = km.detect(DocumentAttachment)
loaded = detector.filter(km.fld("filename") == "invoice.pdf").load_one()

# Open download stream to read contents
stream = km.open_blob(loaded)
file_bytes = stream.read()
print(file_bytes)

Both sync (open_blob, record) and async (open_blob_async, record_async) methods are supported.


10. Time & Profiling Utilities

Timeframes and Chronological Alignment

Kosmos-Py has a robust TimeFrame utility mapping to specific time bounds (hourly, daily, weekly, monthly, quarterly, yearly).

from datetime import datetime, timezone
import kosmos.time as kt

moment = datetime(2026, 6, 1, 15, 0, 0, tzinfo=timezone.utc)

# Align date to Daily bounds
day_frame = kt.DailyFrame.create(moment=moment, tzone=timezone.utc)
print(day_frame.floor)    # 2026-06-01 00:00:00+00:00
print(day_frame.ceiling)  # 2026-06-02 00:00:00+00:00

# Jump to previous time frames
yesterday = day_frame.get_previous_frame()
last_week = day_frame.get_previous_x_frame(7)

Performance & Nested Profiling

Profile code blocks hierarchically using PerfTimer or the @timed decorator.

import kosmos.time as kt
import time

@kt.timed(name="Fetch API Data")
def fetch_api(ptimer=None):
    time.sleep(0.1)

@kt.timed(name="Process DB Records")
def process_db(ptimer=None):
    # Pass ptimer to record nested timer calls
    time.sleep(0.05)
    fetch_api(ptimer=ptimer)

# Run operations inside a parent timer scope
with kt.PerfTimer("Main Flow", verbose=True) as root_timer:
    process_db(ptimer=root_timer)
    
# Prints output similar to:
# Main Flow -> 1 times in 150.00 ms
# └── Process DB Records -> 1 times in 150.00 ms (100.0%)
# |  └── Fetch API Data -> 1 times in 100.00 ms (66.7%)