# -*- coding: utf-8 -*-
from setuphelpers import *
def install():
pass
def audit():
root_scheduled_tasks = get_scheduled_tasks()
microsoft_scheduled_tasks = get_scheduled_tasks(location="Microsoft", recursive=True)
enabled_scheduled_tasks = get_scheduled_tasks(enabled=True, recursive=True)
enabled_scheduled_tasks = {key: value for key, value in enabled_scheduled_tasks.items() if key not in microsoft_scheduled_tasks}
all_scheduled_tasks = get_scheduled_tasks(recursive=True)
others_scheduled_tasks = {key: value for key, value in all_scheduled_tasks.items() if key not in microsoft_scheduled_tasks}
others_scheduled_tasks = {key: value for key, value in others_scheduled_tasks.items() if key not in root_scheduled_tasks}
audit_scheduled_tasks_dict = {
"root": root_scheduled_tasks,
"enabled": enabled_scheduled_tasks,
"others": others_scheduled_tasks,
"microsoft": microsoft_scheduled_tasks,
}
WAPT.write_audit_data_if_changed("audit-scheduled-tasks", "audit-scheduled-tasks", audit_scheduled_tasks_dict)
# WAPT.write_audit_data_if_changed("audit-scheduled-tasks", "all-scheduled-tasks", root_scheduled_tasks)
# WAPT.write_audit_data_if_changed("audit-scheduled-tasks", "enabled-scheduled-tasks", enabled_scheduled_tasks)
print('You can now check this host Audit Data: "audit-scheduled-tasks"')
return "OK"
import os
import re
import sys
import shutil
import tempfile
import win32com.client
def win32com_ensure_dispatch_patch(prog_id):
global win32com
try:
return win32com.client.gencache.EnsureDispatch(prog_id)
except AttributeError:
# Remove cache and try again.
MODULE_LIST = [m.__name__ for m in sys.modules.values()]
for module in MODULE_LIST:
if re.match(r"win32com\.gen_py\..+", module):
del sys.modules[module]
shutil.rmtree(os.path.join(tempfile.gettempdir(), "gen_py"))
import win32com.client
return win32com.client.gencache.EnsureDispatch(prog_id)
def get_scheduled_tasks(location="\\", enabled=None, recursive=False):
"""
Retrieve information about scheduled tasks from the Windows Task Scheduler.
Args:
location (str): The location of the tasks in the Task Scheduler hierarchy. Default is root ("\\").
enabled (bool): If specified, filter tasks based on their enabled status.
recursive (bool): If True, recursively retrieve tasks from subfolders; otherwise, only
retrieve tasks from the specified location.
Returns:
dict: A dictionary containing information about scheduled tasks.
The keys are task paths, and the values are dictionaries with task information,
including 'Name', 'Enabled', 'LastRunTime', 'LastTaskResult', 'NextRunTime',
'NumberOfMissedRuns', 'State', and 'Path'.
"""
# Ensure that location starts with "\\"
location = "\\" + location.lstrip("\\")
scheduler = win32com_ensure_dispatch_patch("Schedule.Service")
scheduler.Connect()
root_folder = scheduler.GetFolder(location)
dict_tasks = get_all_tasks_in_folder(root_folder, enabled)
if recursive:
collect_subfolder_tasks(root_folder, dict_tasks, enabled)
return dict_tasks
def collect_subfolder_tasks(folder, dict_tasks, enabled):
subfolders = folder.GetFolders(1) # 1 means include subfolders
for subfolder in subfolders:
subfolder_tasks = get_all_tasks_in_folder(subfolder, enabled)
dict_tasks.update(subfolder_tasks)
collect_subfolder_tasks(subfolder, dict_tasks, enabled)
def get_all_tasks_in_folder(folder, enabled=None):
tasks = {}
colTasks = folder.GetTasks(1)
for task in colTasks:
if enabled is not None and task.Enabled != enabled:
continue
tasks[task.Path] = {
"Name": task.Name,
"Enabled": task.Enabled,
"LastRunTime": task.LastRunTime,
"LastTaskResult": task.LastTaskResult,
"NextRunTime": task.NextRunTime,
"NumberOfMissedRuns": task.NumberOfMissedRuns,
"State": task.State,
"Path": task.Path,
}
return tasks