The Judoscale’s Python library comes with support for Celery and RQ out of the box. If your project uses a different background job processing library, you can create your own custom adapter for it by following this guide.
Create a Custom Worker Adapter
For this example, we’re going to create an adapter for a “file system” queue. Pending jobs are stored as files in tmp/queues/[queue-name]
.
🚨 Warning
Seriously, this example is a terrible idea. 🙈
Metrics Collector
The “metrics collector” is the core piece of adapter machinery. It is responsible for collecting the metrics (surprise!) from your job backend.
A metrics collector is a protocol that specifies two aspects:
- a
should_collect
property, which returns a bool
; and
- a
.collect
method, which returns List[Metric]
.
Additionally, you may want to implement the adapter_config
property if you choose to inherit from JobMetricsCollector
.
Here’s what our example metrics collector looks like:
from pathlib
from datetime
from judoscale.core.metrics_collectors
from judoscale.core.metrics
DEFAULTS = {
"ENABLED": True,
"MAX_QUEUES": 20,
"QUEUES": [],
"TRACK_BUSY_JOBS": False,
}
class FileSystemQueueMetricsCollector(JobMetricsCollector):
def __init__(self, config: Config):
super().__init__(config=config)
# Override the defaults with any config values provided by the user
self.config["FSQUEUE"] = {**DEFAULTS, **self.config.get("FSQUEUE", {})}
@property
def should_collect(self) -> bool:
# A sensible default is provided in `JobMetricsCollector.should_collect`,
# but you can override it here if needed.
return True
def collect(self) -> List[Metric]:
metrics = []
if not self.should_collect:
return metrics
for queue_path in Path("/tmp/queues").glob("*"):
queue_name = queue_path.stem
# Queue time is based on the oldest job in the queue
job_enqueued_times = [
path.stat().st_mtime for path in queue_path.glob("*")
]
if not job_enqueued_times:
continue
oldest_job_ts = min(job_enqueued_times)
# This is the important part!
metrics.append(Metric.for_queue(queue_name, oldest_job_ts))
# Also very important to return the metrics
return metrics
This is a contrived example, but check out the metrics collectors for Celery and RQ for more realistic examples.
Adding the Adapter
The final step is to tell the Judoscale metrics reporter about the new adapter:
from judoscale.core.adapter
from judoscale.core.config
from judoscale.core.reporter
collector = FileSystemQueueMetricsCollector(config=judoconfig)
adapter = Adapter(
"judoscale-filesystem",
adapter_info=AdapterInfo(platform_version="0.0.1"),
metrics_collector=collector,
)
reporter.add_adapter(adapter)