Python

Creating a Custom Adapter for Judoscale's Python Library Metrics

The Judoscale’s Python library comes with support for Celery and RQ out of the box. If your project uses a different background job processing library, you can create your own custom adapter for it by following this guide.

Create a Custom Worker Adapter

For this example, we’re going to create an adapter for a “file system” queue. Pending jobs are stored as files in tmp/queues/[queue-name].

🚨 Warning

Seriously, this example is a terrible idea. 🙈

Metrics Collector

The “metrics collector” is the core piece of adapter machinery. It is responsible for collecting the metrics (surprise!) from your job backend.

A metrics collector is a protocol that specifies two aspects:

  • a should_collect property, which returns a bool; and
  • a .collect method, which returns List[Metric].

Additionally, you may want to implement the adapter_config property if you choose to inherit from JobMetricsCollector.

Here’s what our example metrics collector looks like:

from pathlib
from datetime
from judoscale.core.metrics_collectors
from judoscale.core.metrics

DEFAULTS = {
    "ENABLED": True,
    "MAX_QUEUES": 20,
    "QUEUES": [],
    "TRACK_BUSY_JOBS": False,
}

class FileSystemQueueMetricsCollector(JobMetricsCollector):
    def __init__(self, config: Config):
        super().__init__(config=config)
        # Override the defaults with any config values provided by the user
        self.config["FSQUEUE"] = {**DEFAULTS, **self.config.get("FSQUEUE", {})}

    @property
    def should_collect(self) -> bool:
        # A sensible default is provided in `JobMetricsCollector.should_collect`,
        # but you can override it here if needed.
        return True

    def collect(self) -> List[Metric]:
        metrics = []

        if not self.should_collect:
            return metrics

        for queue_path in Path("/tmp/queues").glob("*"):
            queue_name = queue_path.stem

            # Queue time is based on the oldest job in the queue
            job_enqueued_times = [
                path.stat().st_mtime for path in queue_path.glob("*")
            ]

            if not job_enqueued_times:
                continue

            oldest_job_ts = min(job_enqueued_times)

            # This is the important part!
            metrics.append(Metric.for_queue(queue_name, oldest_job_ts))

        # Also very important to return the metrics
        return metrics

This is a contrived example, but check out the metrics collectors for Celery and RQ for more realistic examples.

Adding the Adapter

The final step is to tell the Judoscale metrics reporter about the new adapter:

from judoscale.core.adapter
from judoscale.core.config
from judoscale.core.reporter

collector = FileSystemQueueMetricsCollector(config=judoconfig)
adapter = Adapter(
    "judoscale-filesystem",
    adapter_info=AdapterInfo(platform_version="0.0.1"),
    metrics_collector=collector,
)
reporter.add_adapter(adapter)