⚝
One Hat Cyber Team
⚝
Your IP:
160.79.111.190
Server IP:
162.254.39.145
Server:
Linux premium289.web-hosting.com 4.18.0-513.11.1.lve.el8.x86_64 #1 SMP Thu Jan 18 16:21:02 UTC 2024 x86_64
Server Software:
LiteSpeed
PHP Version:
8.2.28
Buat File
|
Buat Folder
Eksekusi
Dir :
~
/
lib64
/
nagios
/
plugins
/
nccustom
/
View File Name :
check-unexpected-systemd-services.py
#!/usr/libexec/platform-python # -*- coding: utf-8 -*- """ Nagios plugin to check for unexpected systemd unit files. """ import sys import os from pathlib import Path from datetime import datetime, timedelta import argparse from typing import Set, List, Dict, Tuple, Optional # Determine script directory SCRIPT_DIR = Path(__file__).resolve().parent # Nagios exit codes OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3 # Constants WHITELIST_FILES: List[Path] = [ SCRIPT_DIR / "systemd_services_whitelist", SCRIPT_DIR / "systemd_targets_whitelist", SCRIPT_DIR / "systemd_scopes_whitelist", ] LOG_PATH = Path("/var/log/unexpected-systemd-services.log") TRANSIENT_DIR = Path("/run/systemd/transient") def log_to_file(message: str, severity: str): timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') with LOG_PATH.open('a') as f: f.write(f"{timestamp} - {severity.upper()} - {message}\n") def parse_args(): p = argparse.ArgumentParser(description="Nagios check for unexpected systemd units") p.add_argument('-p', '--pattern', required=True, help="Comma-separated extensions (e.g. 'service,scope')") p.add_argument('-d', '--dirs-file', required=True, help="File listing directories to scan") p.add_argument('-m', '--minutes', type=int, default=5, help="Transient age threshold in minutes") p.add_argument('-v', '--verbose', action='store_true', help="Enable verbose output") return p.parse_args() def load_whitelist() -> Set[str]: entries: Set[str] = set() for path in WHITELIST_FILES: try: with path.open() as f: for line in f: line = line.strip() if line: entries.add(line) except Exception as e: print(f"CRITICAL: cannot read whitelist {path}: {e}") sys.exit(CRITICAL) if not entries: print("CRITICAL: no whitelist entries loaded") sys.exit(CRITICAL) return entries def scan_dir(directory: Path, exts: List[str]) -> Set[str]: found = set() try: for entry in os.scandir(directory): if entry.is_file() and any(entry.name.endswith(ext) for ext in exts): found.add(entry.name) except OSError as e: print(f"CRITICAL: cannot scan {directory}: {e}") sys.exit(CRITICAL) return found def handle_transient(pattern_exts: List[str], allowed: Set[str], age_th: timedelta) -> Tuple[Set[str], List[Tuple[str, str]]]: """ Return (skipped_set, critical_list) from the transient directory. """ skipped: Set[str] = set() critical: List[Tuple[str, str]] = [] # list of (name, age_str) now = datetime.now() threshold_min = int(age_th.total_seconds() // 60) try: for entry in os.scandir(TRANSIENT_DIR): if not entry.is_file(): continue name = entry.name if name in allowed or not any(name.endswith(ext) for ext in pattern_exts): continue try: created = datetime.fromtimestamp(entry.stat().st_ctime) except Exception: created = now age = now - created age_str = str(age).split('.')[0] if age < age_th: skipped.add(name) log_to_file(f"File {TRANSIENT_DIR}/{name} age={age_str} < {threshold_min}m", 'WARNING') else: critical.append((name, age_str)) # log_to_file(f"File {TRANSIENT_DIR}/{name} age={age_str} > {threshold_min}m", 'CRITICAL') except OSError as e: print(f"CRITICAL: cannot scan transient dir {TRANSIENT_DIR}: {e}") sys.exit(CRITICAL) return skipped, critical def main(): args = parse_args() # prepare log try: LOG_PATH.parent.mkdir(parents=True, exist_ok=True) LOG_PATH.touch(exist_ok=True) os.chmod(str(LOG_PATH), 0o600) except Exception as e: print(f"CRITICAL: log setup failed: {e}") sys.exit(CRITICAL) allowed = load_whitelist() exts = ['.' + p.strip() for p in args.pattern.split(',')] age_th = timedelta(minutes=args.minutes) # read dirs, strip trailing slashes dirs_file = Path(args.dirs_file) if not dirs_file.is_absolute(): dirs_file = SCRIPT_DIR / dirs_file try: with dirs_file.open() as f: dirs = [line.strip().rstrip('/') for line in f if line.strip()] except Exception as e: print(f"CRITICAL: cannot read dirs file: {e}") sys.exit(CRITICAL) unexpected: Dict[Path, List[Tuple[str, Optional[str]]]] = {} for d in dirs: dpath = Path(d) if not dpath.is_absolute(): dpath = SCRIPT_DIR / dpath dpath = dpath.resolve() services = scan_dir(dpath, exts) bad = services - allowed details: List[Tuple[str, Optional[str]]] = [] if dpath == TRANSIENT_DIR: skipped, critical_list = handle_transient(exts, allowed, age_th) bad.difference_update(skipped) for name, age_str in critical_list: if name in bad: details.append((name, age_str)) else: for name in sorted(bad): details.append((name, None)) log_to_file(f"DIR={dpath} OBJECT={name}", 'CRITICAL') if details: unexpected[dpath] = details if unexpected: print("CRITICAL: Unexpected systemd units found") for d, items in unexpected.items(): formatted = [f"'{name}'" for name, _ in items] print(f"Directory: {d}") print(f"Units: [{', '.join(formatted)}]") sys.exit(CRITICAL) else: print("OK: No unexpected systemd units detected") sys.exit(OK) if __name__ == '__main__': main()