The Judoscale's Python library comes with support for Celery and RQ out of the box. If your project uses a different background job processing library, you can create your own custom adapter for it by following this guide.
Create a Custom Worker Adapter
For this example, we're going to create an adapter for a "file system" queue. Pending jobs are stored as files in tmp/queues/[queue-name].
Please don't use this example
Seriously, this example is a terrible idea. 🙈
Metrics Collector
The "metrics collector" is the core piece of adapter machinery. It is responsible for collecting the metrics (surprise!) from your job backend.
A metrics collector is a protocol that specifies two aspects:
a should_collect property, which returns a bool; and
a .collect method, which returns List[Metric].
Additionally, you may want to implement the adapter_config property if you choose to inherit from JobMetricsCollector.
Here's what our example metrics collector looks like:
from pathlib import Path
from datetime import datetime, timezone
from judoscale.core.metrics_collectors import JobMetricsCollector
from judoscale.core.metrics import Metric
DEFAULTS ={"ENABLED":True,"MAX_QUEUES":20,"QUEUES":[],"TRACK_BUSY_JOBS":False,}classFileSystemQueueMetricsCollector(JobMetricsCollector):def__init__(self, config: Config):super().__init__(config=config)# Override the defaults with any config values provided by the user
self.config["FSQUEUE"]={**DEFAULTS,**self.config.get("FSQUEUE",{})}@propertydefshould_collect(self)->bool:# A sensible default is provided in `JobMetricsCollector.should_collect`,# but you can override it here if needed.returnTruedefcollect(self)-> List[Metric]:
metrics =[]ifnot self.should_collect:return metrics
for queue_path in Path("/tmp/queues").glob("*"):
queue_name = queue_path.stem
# Queue time is based on the oldest job in the queue
job_enqueued_times =[
path.stat().st_mtime for path in queue_path.glob("*")]ifnot job_enqueued_times:continue
oldest_job_ts =min(job_enqueued_times)# This is the important part!
metrics.append(Metric.for_queue(queue_name, oldest_job_ts))# Also very important to return the metricsreturn metrics
This is a contrived example, but check out the metrics collectors for Celery and RQ for more realistic examples.
Adding the Adapter
The final step is to tell the Judoscale metrics reporter about the new adapter:
from judoscale.core.adapter import Adapter, AdapterInfo
from judoscale.core.config import config as judoconfig
from judoscale.core.reporter import reporter
collector = FileSystemQueueMetricsCollector(config=judoconfig)
adapter = Adapter("judoscale-filesystem",
adapter_info=AdapterInfo(platform_version="0.0.1"),
metrics_collector=collector,)
reporter.add_adapter(adapter)