diff --git a/agent/callbacks.py b/agent/callbacks.py index f3500825..bd1b8d1e 100644 --- a/agent/callbacks.py +++ b/agent/callbacks.py @@ -1,8 +1,51 @@ +import json + import requests def callback(job, connection, result, *args, **kwargs): from agent.server import Server - press_url = Server().press_url - requests.post(url=f"{press_url}/api/method/press.api.callbacks.callback", data={"job_id": job.id}) + server = Server() + press_url = server.press_url + + path = "/api/method/press.api.callbacks.callback" + data = {"job_id": job.id} + token = server.config["agent_token"] + + requests.post( + url=f"{press_url}{path}", + data=data, + headers={"X-Agent-Token": token}, + ) + + +def update_callback(job): + from agent.server import Server + + server = Server() + press_url = server.press_url + + job_string = json.dumps(job, default=str) + + path = "/api/method/press.api.callbacks.update_job" + + data = { + "job": job_string, + "server": server.name, + } + + token = server.config["agent_token"] + + try: + response = requests.post( + url=f"{press_url}{path}", + data=data, + headers={"X-Agent-Token": token}, + timeout=10, + ) + + return response.ok + + except requests.RequestException: + return False diff --git a/agent/cli.py b/agent/cli.py index 4eee3d3d..2c1ccd3e 100644 --- a/agent/cli.py +++ b/agent/cli.py @@ -76,7 +76,18 @@ def ping_server(password: str): @click.option("--proxy-ip", required=False, type=str, default=None) @click.option("--sentry-dsn", required=False, type=str) @click.option("--press-url", required=False, type=str) -def config(name, user, workers, job_timeout, proxy_ip=None, sentry_dsn=None, press_url=None, db_port=3306): +@click.option("--agent-token", required=False, type=str) +def config( + name, + user, + workers, + job_timeout, + proxy_ip=None, + sentry_dsn=None, + press_url=None, + db_port=3306, + agent_token=None, +): config = { "benches_directory": f"/home/{user}/benches", "name": name, @@ -91,6 +102,8 @@ def config(name, user, workers, job_timeout, proxy_ip=None, sentry_dsn=None, pre "db_port": db_port, "job_timeout": job_timeout, } + if agent_token: + config["agent_token"] = agent_token if press_url: config["press_url"] = press_url if proxy_ip: @@ -102,6 +115,15 @@ def config(name, user, workers, job_timeout, proxy_ip=None, sentry_dsn=None, pre json.dump(config, f, sort_keys=True, indent=4) +@setup.command() +@click.option("--agent-token", required=True) +def agent_token(agent_token): + server = Server() + config = server.get_config(for_update=True) + config["agent_token"] = agent_token + server.set_config(config, indent=4) + + @setup.command() def pyspy(): privileges_line = "frappe ALL = (root) NOPASSWD: /home/frappe/agent/env/bin/py-spy" diff --git a/agent/job.py b/agent/job.py index 61e8c5f3..8d87c4be 100644 --- a/agent/job.py +++ b/agent/job.py @@ -49,6 +49,38 @@ ) +def update_job(model: Model): + if isinstance(model, StepModel): + model = model.job + + connection().sadd("dirty_jobs", model.id) + + +def get_updated_jobs(): + from agent.web import to_dict + + redis = connection() + res = [] + + with redis.pipeline() as pipe: + pipe.smembers("dirty_jobs") + pipe.delete("dirty_jobs") + + result, _ = pipe.execute() + + job_ids = [int(i) for i in result] + + if not job_ids: + return [] + + for jid in job_ids: + job = JobModel.get(JobModel.id == jid) + temp = to_dict(job) + res.append((temp, job)) + + return res + + def connection(): from agent.server import Server @@ -65,6 +97,8 @@ def save(wrapped, instance: Action, args, kwargs): wrapped(*args, **kwargs) instance.model.save() + update_job(instance.model) + class Action: if TYPE_CHECKING: @@ -158,7 +192,7 @@ def step(name): def wrapper(wrapped, instance: Base, args, kwargs): from agent.base import AgentException - instance.step_record.start(name, instance.job_record.model.id) + instance.step_record.start(name, instance.job_record.model) try: result = wrapped(*args, **kwargs) except AgentException as e: diff --git a/agent/job_update_poll.py b/agent/job_update_poll.py new file mode 100644 index 00000000..b8c9264d --- /dev/null +++ b/agent/job_update_poll.py @@ -0,0 +1,166 @@ +import base64 +import json +import time +from datetime import datetime, timezone + +import requests + +from agent.callbacks import update_callback +from agent.job import get_updated_jobs, update_job + + +def verify_token_expiry(token): + """ + Returns True if token expires in less than 7 days. + Returns False otherwise. + """ + + try: + parts = token.split(".") + + if len(parts) != 3: + return True + + payload_b64 = parts[1] + + # Add required padding + payload_b64 += "=" * (-len(payload_b64) % 4) + + payload_json = base64.urlsafe_b64decode(payload_b64) + payload = json.loads(payload_json) + + exp = payload.get("exp") + + if not exp: + return True + + expiry_time = datetime.fromtimestamp(exp, tz=timezone.utc) + now = datetime.now(timezone.utc) + + remaining = expiry_time - now + + return remaining.total_seconds() < (7 * 24 * 60 * 60) + + except Exception: + return True + + +def get_regenerate_token(): + from agent.server import Server + + server = Server() + press_url = server.press_url + + path = "/api/method/press.api.agent_auth.regenerate_token" + + token = server.config["agent_token"] + + try: + response = requests.post( + f"{press_url}{path}", + headers={"X-Agent-Token": token}, + timeout=30, + ) + + data = response.json() + + return data["message"] + except requests.RequestException: + return False + + +def retry_undelivered(): + from agent.server import Server + + server = Server() + press_url = server.press_url + + path = "/api/method/press.api.callbacks.retry_undelivered" + + token = server.config["agent_token"] + + try: + response = requests.get(url=f"{press_url}{path}", headers={"X-Agent-Token": token}, timeout=10) + + return response.ok + except requests.RequestException: + return False + + +def handle_retry(counter: int) -> int: + """Retry undelivered jobs every 10 seconds.""" + if counter >= 2: + retry_undelivered() + return 0 + + return counter + + +def handle_token_refresh(server, counter: int) -> int: + """Check and refresh token every 5 minutes.""" + if counter >= 60: + token = server.config["agent_token"] + + if verify_token_expiry(token): + new_token = get_regenerate_token() + + if new_token: + from agent.server import Server + + server = Server() + + server.update_config( + { + "agent_token": new_token, + } + ) + + return 0 + + return counter + + +def process_jobs(): + """Process updated jobs.""" + jobs = get_updated_jobs() + + for job_dict, job in jobs: + success = update_callback(job_dict) + + if not success: + update_job(job) + + +def run(): + from agent.server import Server + + server = Server() + + retry_counter = 0 + token_check_counter = 0 + + while True: + if not server.config.get("enable_feature_worker", False): + continue + + try: + retry_counter = handle_retry(retry_counter) + + token_check_counter = handle_token_refresh( + server, + token_check_counter, + ) + + process_jobs() + + except Exception as e: + print(e) + + retry_counter += 1 + token_check_counter += 1 + + time.sleep(5) + + +if __name__ == "__main__": + run() diff --git a/agent/monitor.py b/agent/monitor.py index faefddb8..ce5f2480 100644 --- a/agent/monitor.py +++ b/agent/monitor.py @@ -59,9 +59,13 @@ def discover_targets(self): def fetch_targets(self): press_url = self.config.get("press_url") press_token = self.config.get("press_token") + + path = "/api/method/press.api.monitoring.targets" + data = {"token": press_token} + return requests.post( - f"{press_url}/api/method/press.api.monitoring.targets", - data={"token": press_token}, + f"{press_url}{path}", + data=data, ).json()["message"] def generate_prometheus_sites_config(self, benches): diff --git a/agent/server.py b/agent/server.py index c13c9996..09c1d8e4 100644 --- a/agent/server.py +++ b/agent/server.py @@ -1191,6 +1191,7 @@ def _generate_supervisor_config(self): "directory": self.directory, "user": self.config["user"], "sentry_dsn": self.config.get("sentry_dsn"), + "enable_feature_worker": self.config.get("enable_feature_worker", False), # Default for now "is_standalone": self.config.get("standalone", False), } if self.config.get("name").startswith("n") and not self.config.get("name").startswith("nat"): diff --git a/agent/templates/agent/supervisor.conf.jinja2 b/agent/templates/agent/supervisor.conf.jinja2 index 38d1a4d3..798fc420 100644 --- a/agent/templates/agent/supervisor.conf.jinja2 +++ b/agent/templates/agent/supervisor.conf.jinja2 @@ -31,6 +31,16 @@ stderr_logfile={{ directory }}/logs/worker.error.log user={{ user }} directory={{ directory }} +[program:job_update_poll] +command=bash -c "{{ directory }}/repo/wait-for-it.sh redis://127.0.0.1:{{ redis_port }} && exec {{ directory }}/env/bin/python {{ directory }}/repo/agent/job_update_poll.py" +environment=PYTHONUNBUFFERED=1 +autostart=true +autorestart=true +stdout_logfile={{ directory }}/logs/job_update_poll.log +stderr_logfile={{ directory }}/logs/job_update_poll.error.log +user={{ user }} +directory={{ directory }} + {% if is_proxy_server or is_standalone %} [program:nginx_reload_manager] command=bash -c "{{ directory }}/repo/wait-for-it.sh redis://127.0.0.1:{{ redis_port }} && exec {{directory}}/env/bin/python {{ directory }}/repo/agent/nginx_reload_manager.py" @@ -46,7 +56,7 @@ directory={{ directory }} [group:agent] -{% set programs = "web, redis, worker" %} +{% set programs = "web, redis, worker, job_update_poll" %} {% if is_proxy_server or is_standalone %} {% set programs = programs + ", nginx_reload_manager" %} diff --git a/agent/web.py b/agent/web.py index 7d291ad6..275c40d0 100644 --- a/agent/web.py +++ b/agent/web.py @@ -1963,3 +1963,35 @@ def backup_db(): offsite = data.get("offsite") job = SnapshotRecovery().backup_db(site, database_ip, database_name, mariadb_root_password, offsite) return {"job": job} + + +@application.route("/server/feature/enable", methods=["POST"]) +def enable_feature_worker(): + server = Server() + + server.update_config( + { + "enable_feature_worker": True, + } + ) + + return { + "enabled": True, + "message": "Feature worker enabled", + } + + +@application.route("/server/feature/disable", methods=["POST"]) +def disable_feature_worker(): + server = Server() + + server.update_config( + { + "enable_feature_worker": False, + } + ) + + return { + "enabled": False, + "message": "Feature worker disabled", + } diff --git a/monitor_idle.py b/monitor_idle.py index c1f26d3f..9f1118fe 100755 --- a/monitor_idle.py +++ b/monitor_idle.py @@ -47,10 +47,13 @@ def check_idle_slave(self, bench_path: str) -> bool: def inform_master(self) -> None: """Let the master know of idle benches""" + path = "/api/method/press.api.server.benches_are_idle" + token = self.config["agent_token"] + try: requests.post( - f"{self.config['press_url']}/api/method/press.api.server.benches_are_idle", - data={"server": self.config["name"], "access_token": self.config["access_token"]}, + f"{self.config['press_url']}{path}", + headers={"X-Agent-Token": token}, timeout=10, ) print(f"Informed master at {self.config['press_url']} that benches are idle")