diff --git a/agent/cli.py b/agent/cli.py index 2383de7d..4eee3d3d 100644 --- a/agent/cli.py +++ b/agent/cli.py @@ -72,10 +72,11 @@ def ping_server(password: str): @click.option("--user", default="frappe") @click.option("--db-port", default=3306) @click.option("--workers", required=True, type=int) +@click.option("--job-timeout", required=False, type=int, default=4 * 3600) @click.option("--proxy-ip", required=False, type=str, default=None) @click.option("--sentry-dsn", required=False, type=str) @click.option("--press-url", required=False, type=str) -def config(name, user, workers, proxy_ip=None, sentry_dsn=None, press_url=None, db_port=3306): +def config(name, user, workers, job_timeout, proxy_ip=None, sentry_dsn=None, press_url=None, db_port=3306): config = { "benches_directory": f"/home/{user}/benches", "name": name, @@ -88,6 +89,7 @@ def config(name, user, workers, proxy_ip=None, sentry_dsn=None, press_url=None, "web_port": 25052, "press_url": "https://frappecloud.com", "db_port": db_port, + "job_timeout": job_timeout, } if press_url: config["press_url"] = press_url @@ -123,6 +125,12 @@ def sentry(sentry_dsn): Server().setup_sentry(sentry_dsn) +@setup.command() +@click.option("--job-timeout", required=True) +def job_timeout(job_timeout): + Server().setup_job_timeout(job_timeout) + + @setup.command() def supervisor(): Server().setup_supervisor() diff --git a/agent/job.py b/agent/job.py index fe1d39f5..61e8c5f3 100644 --- a/agent/job.py +++ b/agent/job.py @@ -36,7 +36,7 @@ except ImportError: pass - +DEFAULT_TIMEOUT = 4 * 3600 agent_database = SqliteDatabase( "jobs.sqlite3", timeout=15, @@ -176,10 +176,13 @@ def wrapper(wrapped, instance: Base, args, kwargs): return wrapper -def job(name: str, priority="default", on_success=None, on_failure=None): +def job(name: str, priority="default", timeout=None, on_success=None, on_failure=None): @wrapt.decorator def wrapper(wrapped, instance: Base, args, kwargs): + from flask import has_request_context, request + from agent.base import AgentException + from agent.server import Server if get_current_job(connection=connection()): instance.job_record.start() @@ -195,12 +198,20 @@ def wrapper(wrapped, instance: Base, args, kwargs): instance.job_record.success(result) return result agent_job_id = get_agent_job_id() + agent_job_timeout = None + if has_request_context() and request and request.is_json: + agent_job_timeout = request.json.get("agent_job_timeout", None) instance.job_record.enqueue(name, wrapped, args, kwargs, agent_job_id) + final_timeout = ( + agent_job_timeout or timeout or Server().config.get("job_timeout", None) or DEFAULT_TIMEOUT + ) + if not 0 <= final_timeout <= 24 * 3600: + final_timeout = DEFAULT_TIMEOUT queue(priority).enqueue_call( wrapped, args=args, kwargs=kwargs, - timeout=4 * 3600, + timeout=final_timeout, result_ttl=24 * 3600, job_id=str(instance.job_record.model.id), on_success=on_success or callback, diff --git a/agent/pages/deactivated.html b/agent/pages/deactivated.html index d65f4a3a..aacb9f3f 100644 --- a/agent/pages/deactivated.html +++ b/agent/pages/deactivated.html @@ -117,7 +117,7 @@