
Mount Network Share
Paquet d’installation silencieuse pour Mount Network Share
1-13
- package: tis-mount-network-share
- name: Mount Network Share
- version: 1-13
- maintainer: WAPT Team,Tranquil IT,Flavien SCHELFAUT,Simon FONTENEAU
- editor: WAPT
- target_os: all
- architecture: all
- signature_date:
- size: 24.34 Ko
package : tis-mount-network-share
version : 1-13
architecture : all
section : base
priority : optional
name : Mount Network Share
categories :
maintainer : WAPT Team,Tranquil IT,Flavien SCHELFAUT,Simon FONTENEAU
description : A package that lets you set up network shares on all platforms.
depends :
conflicts :
maturity : PROD
locale :
target_os : all
min_wapt_version : 2.6
sources :
installed_size :
impacted_process :
description_fr : Un paquet qui permet de monter des partages réseaux sur toutes les plateformes.
description_pl : Pakiet do konfigurowania udziałów sieciowych na wszystkich platformach.
description_de : Ein Paket, mit dem man Netzwerkfreigaben auf allen Plattformen einrichten kann.
description_es : Un paquete para configurar recursos compartidos de red en todas las plataformas.
description_pt : Um pacote para configurar partilhas de rede em todas as plataformas.
description_it : Un pacchetto per l'impostazione delle condivisioni di rete su tutte le piattaforme.
description_nl : Een pakket om netwerkshares in te stellen op alle platformen.
description_ru : Пакет для настройки общих сетевых ресурсов на всех платформах.
audit_schedule :
editor : WAPT
keywords :
licence :
homepage :
package_uuid : f08c7369-d72e-4173-b888-e5c46e9e5393
valid_from :
valid_until :
forced_install_on :
changelog :
min_os_version :
max_os_version :
icon_sha256sum : 25b0beebb5a383774223652d265c32e809a14cbe789e6ffc140a1e9e43591361
signer : Tranquil IT
signer_fingerprint: 8c5127a75392be9cc9afd0dbae1222a673072c308c14d88ab246e23832e8c6bb
signature_date : 2025-03-21T13:13:55.000000
signed_attributes : package,version,architecture,section,priority,name,categories,maintainer,description,depends,conflicts,maturity,locale,target_os,min_wapt_version,sources,installed_size,impacted_process,description_fr,description_pl,description_de,description_es,description_pt,description_it,description_nl,description_ru,audit_schedule,editor,keywords,licence,homepage,package_uuid,valid_from,valid_until,forced_install_on,changelog,min_os_version,max_os_version,icon_sha256sum,signer,signer_fingerprint,signature_date,signed_attributes
signature : s4NTLu2CyWNFzY3dc1/aS+qMICqzsvFwmmFRii5QM9jeOdEU7sMfK9NDJq14tyYzMT3Le+BG2spfBSDm+wMNluanErWvaczh8myimD39UFDfIyN5/Bbc2uz87i7sTCZ5cQR2FxB3jouZlWOqCcfyywJdwkuYc6uGVfeSRbRX4rfIwrHhy/Xr/h70vpcMTf75cWXLt8adqk485SP8fJ4vc7gVmTKtBWZ5Q104qpHV6VT/xCwtx7l+mnPlFjfoycAgXT8ChCnbeyudPBcX3xpxXeMMI5o2uodMZ+6rJqKdF8qL9KNO3GJBkfbkxVvUvAwfNPkRdVYHsTJmxhOnbllGnQ==
# -*- coding: utf-8 -*-
from setuphelpers import *
from waptutils import WAPT_VERSION_FULL
import pyldap
import os, json
from typing import List
from dataclasses import dataclass
import platform
import xml.etree.ElementTree as ET
###
# DO NOT TOUCH THESE VARIABLES
###
wapt_session_setup = "/etc/xdg/autostart/tis-waptsessionsetup.desktop"
gnome_config_dir = makepath(application_data(), "gtk-3.0")
bookmarks_gnome_file = makepath(gnome_config_dir, "bookmarks")
bookmarks_wapt_file = makepath(application_data(), 'wapt', 'bookmarks.json')
default_conf_smbnetfs = "/etc/smbnetfs.conf"
default_conf_samba = "/etc/samba/smb.conf"
current_username = get_current_user().lower()
current_computer = f"{get_computername().split('.')[0]}$"
def install():
if Version(WAPT_VERSION_FULL.split('-')[0]) < Version("2.6.0.16881"):
error("\!/ This package requires at least wapt version 2.6.0.16881 or higher \!/")
if (is_debian_based() or is_redhat_based()) and not isfile(wapt_session_setup):
error("\!/ This package requires the installation of waptagent-gui to be fully functional \!/")
# Installing necessary dependencies
if is_debian_based():
install_apt('smbnetfs krb5-user smbclient')
elif is_redhat_based():
install_rpm('smbnetfs krb5-user smbclient')
def session_setup():
domain_name = get_domain_name()
if not domain_name:
error('\!/ Unable to retrieve domain name \!/')
network_share = NetworkShare()
# Scan the domain sysvol to find the network drive GPO
ge = GpoEngine(domain_name)
network_share.shares = convert_gpo_network_drive_to_share(ge.get_network_drive())
# Alternative to GpoEngine, but must be defined manually. An example can be found below
# ldap = LdapManager(domain_name)
# if ldap.user_is_member_of_groups(['Administrators']):
# network_share.add('S', "Sysvol Another", 'fschelfaut.lan', 'sysvol/.stfolder')
# network_share.add('E', "Sysvol", 'fschelfaut.lan', 'sysvol')
# if ldap.user_is_member_of_groups(['grp_bitwarden']):
# network_share.add('P', "Partage ISO", 'srvtemplates.ad.tranquil.it', 'iso')
# network_share.add('S', "Partage TEST", 'srvtemplates.ad.tranquil.it', 'test$')
# network_share.add('A', "Partage TEST2", 'srvtemplates.ad.tranquil.it', 'test2$')
# if ldap.computer_is_member_of_groups(['test_wapt_profile', 'testimbrication2']):
# network_share.add('C', "Partage TEST2 Computer", 'srvtemplates.ad.tranquil.it', 'test2$')
# if ldap.user_is_in_OU('OU=srp_hard,OU=users,OU=tranquilit,DC=ad,DC=tranquil,DC=it'):
# network_share.add('B', "Partage TEST2 User", 'srvtemplates.ad.tranquil.it', 'test2$')
# Cleanup share before mounting new one
network_share.cleanup()
# Mount all share to the current user
network_share.mount()
return "RERUN"
def is_debian_based():
return isfile('/etc/debian_version')
def is_redhat_based():
return isfile('/etc/redhat-release')
def is_linux():
return platform.system() == 'Linux'
def is_windows():
return platform.system() == 'Windows'
def is_darwin():
return platform.system() == 'Darwin'
def get_domain_name():
try:
if is_windows():
return get_domain_fromregistry().lower()
elif is_linux():
return next(l.split(':')[-1].split('@')[-1].lower() for l in run('klist').splitlines() if l.startswith('Default principal'))
elif is_darwin():
return json.loads(run('klist --json'))['principal'].split('@')[-1].lower()
except:
return None
def read_txt_file(file: str) -> list:
if isfile(file):
with open(file) as f:
return f.read().splitlines()
return []
def write_txt_file(file: str, data: str, mode: str = 'w', endline: str = "\n") -> None:
with open(file=file, mode=mode) as f:
f.write(data + endline)
def save_bookmark_added_by_wapt(bookmark_line):
bookmarks_data = {}
if isfile(bookmarks_wapt_file):
bookmarks_data = json_load_file(bookmarks_wapt_file)
user_bookmark = bookmarks_data.setdefault(current_username, [])
if bookmark_line not in user_bookmark:
user_bookmark.append(bookmark_line)
json_write_file(bookmarks_wapt_file, bookmarks_data)
def add_gnome_bookmark(bookmark_uri: str, bookmark_name: str = ""):
"""
Add a GNOME bookmark for the current user in ~/.config/gtk-3.0/bookmarks.
:param bookmark_uri: The URI of the bookmark (e.g., "file:///usr/share/doc/" or "ftp://ftp.gnome.org/").
:param bookmark_name: (Optional) A user-friendly name for the bookmark (e.g., "Documentation").
"""
bookmark_line = f"{bookmark_uri} {bookmark_name}".strip()
mkdirs(gnome_config_dir)
# Add the bookmark only if it doesn't exist
if bookmark_line not in read_txt_file(bookmarks_gnome_file):
write_txt_file(bookmarks_gnome_file, bookmark_line, mode='a')
save_bookmark_added_by_wapt(bookmark_line)
print(f"[+] Bookmark added: {bookmark_name if bookmark_name else bookmark_line}")
else:
print(f"[!] Bookmark already exists: {bookmark_name if bookmark_name else bookmark_line}")
def remove_gnome_bookmark():
if not isfile(bookmarks_wapt_file):
print("[!] No WAPT bookmarks file found. Nothing to remove.")
return
bookmarks_wapt = json_load_file(bookmarks_wapt_file)
bookmarks_gnome = read_txt_file(bookmarks_gnome_file)
user_bookmark = bookmarks_wapt.get(current_username, [])
if not user_bookmark:
return
for bookmark in user_bookmark:
if bookmark in bookmarks_gnome:
bookmarks_gnome.remove(bookmark)
write_txt_file(bookmarks_gnome_file, '\n'.join(bookmarks_gnome))
print(f"[+] Successfully removed bookmark(s) added by WAPT for '{current_username}'.")
def convert_gpo_network_drive_to_share(network_drive_dict):
return [
Share(share['letter'], share['label'], *share['path'].removeprefix(r'\\').split('\\', 1))
for share in network_drive_dict.values()
]
@dataclass
class Share:
letter: str
name: str
server: str
path: str
class LdapManager:
"""
Manages the connection and LDAP queries (Active Directory).
"""
def __init__(self, domain_name):
self.domain_ad = domain_name
self.client = pyldap.PyLdapClient(domain_name=self.domain_ad)
result = self.client.bind_sasl_kerberos()
if not result[0]:
error('Failed connect to active directory')
self.base_domain_name = self.client.default_dn()
self.all_user_info = self.__get_all_info_current_user()
self.all_computer_info = self.__get_all_info_current_computer()
self.user_DN = next(iter(self.all_user_info))
self.user_OU = self.__get_OU_from_DN(self.user_DN)
self.computer_DN = next(iter(self.all_computer_info))
self.computer_OU = self.__get_OU_from_DN(self.computer_DN)
def __get_OU_from_DN(self, dn: str):
return ','.join(dn.split(',')[1:])
def __get_all_info_current_user(self):
"""
Retrieves all AD parameters of the current user.
"""
return self.client.search_all(self.base_domain_name, f'(sAMAccountName={current_username})')
def __get_all_info_current_computer(self):
"""
Retrieves all AD parameters of the current computer.
"""
return self.client.search_all(self.base_domain_name, f'(sAMAccountName={current_computer})')
def __is_in_OU(self, object_dn: str, OU: str) -> bool:
"""
Checks if the object's DN ends with the specified OU.
"""
return object_dn.endswith(OU.lower())
def user_is_in_OU(self, OU: str) -> bool:
"""
Checks if the user is in the specified OU.
"""
return self.__is_in_OU(self.user_DN, OU)
def user_is_member_of_groups(self, groups: List[str]) -> bool:
"""
Checks if the given user belongs exactly to the provided list of groups.
"""
user_groups = self.client.get_authorized_user_groups(current_username, groups)
return set(user_groups if user_groups else [] ) == set(groups)
def computer_is_member_of_groups(self, groups: List[str]) -> bool:
"""
Checks if the given computer belongs exactly to the provided list of groups.
"""
# This current method not works, mormot need to fix that
# computer_groups = self.client.get_authorized_user_groups(current_computer, groups)
# return computer_groups == groups
group_filters = ''.join(f'(sAMAccountName={group})' for group in groups)
ldap_filter = (
f"(&(member:1.2.840.113556.1.4.1941:={self.computer_DN})"
f"(|{group_filters}))"
)
self.client.search(self.base_domain_name, False, ldap_filter, ["distinguishedName"])
response = self.client.search_result()
computer_groups = [
dn.split(',')[0].removeprefix('CN=')
for entry in response
for attr in entry.object_attributes
for dn in attr.values
]
return set(computer_groups) == set(groups)
def computer_is_in_OU(self, OU: str) -> bool:
"""
Checks if the computer is in the specified OU.
"""
return self.__is_in_OU(self.computer_DN, OU)
def print_all_info_current_user(self):
"""
Prints all AD parameters of the current user.
"""
print("AD user infos:", json.dumps(self.all_user_info, indent=4))
def print_all_info_current_computer(self):
"""
Prints all AD parameters of the current computer.
"""
print("AD user infos:", json.dumps(self.all_computer_info, indent=4))
class NetworkShare():
"""
Manages the mounting of network drives in the user's session depending on the type of machine.
"""
def __init__(self):
self.linux_mountdir = makepath(user_home_directory(), ".mountpoint")
self.linux_network_drive_dir = makepath(user_home_directory(), "network_drive")
self.shares: List[Share] = []
if is_debian_based() or is_redhat_based():
self.share_manager = SmbNetFsManager(self.linux_mountdir, self.linux_network_drive_dir)
elif is_windows():
self.share_manager = WindowsNetworkShare()
elif is_darwin():
self.share_manager = DarwinNetworkShare()
def add(self, letter, name, server, path):
"""
Adds a network share to the list of shares to be mounted, avoiding duplicates of
letter or (server, path).
"""
if not letter or len(letter) > 1:
raise ValueError("The share letter must be a single character (e.g., 'P').")
if not server:
raise ValueError("The server name must not be empty.")
if not path:
raise ValueError("The share path must not be empty.")
if any('\\' in s for s in [server, path]):
raise ValueError("The share server and path must not contains backslash ('\\') but slash ('/') instead !")
# Check if the letter is already in use
for s in self.shares:
if s.letter == letter:
print(f"The share '{letter}' already exists. Add operation canceled.")
return
# Check if couple (server, path) already in use
for s in self.shares:
if s.server == server and s.path == path:
print(f"The share '{server}/{path}' already exists. Add operation canceled.")
return
new_share = Share(letter, name, server, path)
self.shares.append(new_share)
print(f"Share '{letter}' to '{server}/{path}' successfully added.")
def mount(self):
"""
Mounts the shares using the share manager.
"""
if not self.shares:
print('No shares to mount for the current user'); return
self.share_manager.mount_shares(self.shares)
def cleanup(self):
"""
Cleans up/unmounts the shares.
"""
self.share_manager.umount_shares(self.shares)
class WindowsNetworkShare():
def __init__(self):
import win32com.client
self.WshNetwork = win32com.client.Dispatch("WScript.Network")
def get_mapped_drives(self) -> List[Share]:
"""
Retrieved mounts Windows network share
"""
drives = self.WshNetwork.EnumNetworkDrives()
mapped_drives = []
for i in range(0, drives.Count(), 2):
splited_path = drives.Item(i + 1).removeprefix(os.sep * 2).split(os.sep)
letter = drives.Item(i).replace(':', '')
server, path = splited_path[0], '/'.join(splited_path[1:])
mapped_drives.append(Share(letter, "", server, path))
return mapped_drives
def is_mapped(self, share: Share):
return any(d.letter in share.letter for d in self.get_mapped_drives())
def rename_mapped_drive(self, share: Share):
key_name = f"##{share.server}#{share.path.replace('/', '#')}"
reg_path = 'SOFTWARE\Microsoft\Windows\CurrentVersion\Explorer\MountPoints2'
with reg_openkey_noredir(HKEY_CURRENT_USER, makepath(reg_path, key_name), sam=KEY_WRITE, create_if_missing=True) as key:
reg_setvalue(key, '_LabelFromReg', share.name, REG_SZ)
def remove_named_drive(self, share: Share):
key_name = f"##{share.server}#{share.path.replace('/', '#')}"
reg_path = 'SOFTWARE\Microsoft\Windows\CurrentVersion\Explorer\MountPoints2'
if reg_key_exists(HKEY_CURRENT_USER, makepath(reg_path, key_name)):
registry_deletekey(HKEY_CURRENT_USER, 'SOFTWARE\Microsoft\Windows\CurrentVersion\Explorer\MountPoints2', key_name)
def mount_shares(self, shares: List[Share]):
"""
Mounts Windows network share
"""
for share in shares:
try:
network_path = rf"\\{share.server}\{share.path.replace('/', os.sep)}"
self.WshNetwork.MapNetworkDrive(f"{share.letter}:", network_path)
self.rename_mapped_drive(share)
print(f"Successfully mapped {share.letter}: ({network_path})")
except Exception as e:
print(f"Unable to mount {share.letter}: {e}")
def umount_shares(self, shares: List[Share]):
"""
Unmounts Windows network share
"""
for share in shares:
if not self.is_mapped(share):
continue
try:
self.WshNetwork.RemoveNetworkDrive(f"{share.letter}:", True)
self.remove_named_drive(share)
print(f"Removed mapped drive {share.letter}: done")
except Exception as e:
print(f"Unable to remove mapped {share.letter}: {e}")
class DarwinNetworkShare():
def __init__(self):
pass
def mount_shares(self):
pass
def umount_shares(self):
pass
class SmbNetFsManager():
"""
Manages the mounting of network drives on Debian-based machines.
"""
def __init__(self, mountdir, network_drive_dir):
self.mountdir = mountdir
self.network_drive_dir = network_drive_dir
self.user_smb_folder = makepath(user_home_directory(), ".smb")
self.user_conf_smbnetfs = makepath(self.user_smb_folder, "smbnetfs.conf")
self.user_host_smbnetfs = makepath(self.user_smb_folder, "wapt-smbnetfs.host")
self.all_fqdn_servers = list()
self.mounted_servers = list()
self.__setup_smbnetfs_config()
def __setup_smbnetfs_config(self):
"""
Sets up the configuration for smbnetfs in the user's home directory.
"""
line_to_add = f'include \t\t"{os.path.basename(self.user_host_smbnetfs)}"'
mkdirs(self.user_smb_folder)
if not isfile(self.user_conf_smbnetfs):
filecopyto(default_conf_smbnetfs, self.user_smb_folder)
lines = read_txt_file(self.user_conf_smbnetfs)
if line_to_add not in lines:
write_txt_file(self.user_conf_smbnetfs, line_to_add, mode='a')
def __setup_smbnetfs_host(self):
"""
Writes the servers configuration for smbnetfs.
"""
conf = str()
for server in self.all_fqdn_servers:
conf += f"host {server} visible=true\n"
write_txt_file(self.user_host_smbnetfs, conf)
os.chmod(self.user_host_smbnetfs, 0o600)
def __is_smbnetfs_mounted(self):
"""
Checks if smbnetfs is already mounted.
"""
return any(line for line in read_txt_file('/etc/mtab') if line.startswith(f'smbnetfs {self.mountdir}'))
def __mount_smbnetfs(self):
"""
Mounts smbnetfs
"""
mkdirs(self.mountdir)
run(f'smbnetfs {self.mountdir}')
mounted_servers = os.listdir(self.mountdir)
for server_to_mount in self.all_fqdn_servers:
if server_to_mount not in mounted_servers:
print(f"\!/ Failed to mount smbnetfs share for '{server_to_mount}' server \!/")
else:
self.mounted_servers.append(server_to_mount)
def mount_shares(self, shares: List[Share]):
"""
Mounts the specified shares, creating symbolic links.
"""
self.all_fqdn_servers = list({share.server for share in shares})
self.__setup_smbnetfs_host()
if not self.__is_smbnetfs_mounted():
self.__mount_smbnetfs()
mkdirs(self.network_drive_dir)
remove_gnome_bookmark()
for share in shares:
if share.server not in self.mounted_servers:
print(f"'\!/ Failed to create share '{share.letter}' because smbnetfs cannot mount '{share.server}' \!/'")
continue
share_mountdir = makepath(self.mountdir, share.server, share.path)
share_destination = makepath(self.network_drive_dir, share.letter)
# ensure the symlink not exist
if os.path.islink(share_destination):
remove_file(share_destination)
os.symlink(share_mountdir, share_destination)
add_gnome_bookmark(f"file:///{share_destination}", share.name)
if self.mounted_servers:
print(f"Successfully mapped: {', '.join([share.letter for share in shares if share.server in self.mounted_servers])}")
else:
print(f"No network shares mapped...")
def umount_shares(self, shares: List[Share]):
"""
Unmounts smbnetfs if mounted and removes the network drive directory.
"""
if self.__is_smbnetfs_mounted():
run(f"fusermount -u {self.mountdir}")
# Cleanup the folder
if isdir(self.network_drive_dir):
remove_tree(self.network_drive_dir)
class GpoEngine:
def __init__(self, domain_name):
self.domain_ad = domain_name
self.site = pyldap.cldap_get_domain_info(domain_name=self.domain_ad)['client_site']
self.client = pyldap.PyLdapClient(domain_name=self.domain_ad)
result = self.client.bind_sasl_kerberos()
if not result[0]:
error('Failed connect to active directory')
self.base_domain_name = self.client.default_dn()
# Récupération des informations de l'utilisateur
user_result = self.client.search_all(self.base_domain_name, f"(sAMAccountName={current_username})", ["primaryGroupID", "objectSid"])
self.dn_user = next(iter(user_result))
user_values = next(iter(user_result.values()))
self.primary_group_id = user_values['primaryGroupID'][0]
self.object_sid_user = user_values['objectSid'][0]
# Récupération des unités organisationnelles de l'utilisateur
result = self.client.search_all(self.base_domain_name, f"(sAMAccountName={current_username})")
parent_ou_split = list(result)[0].split(',')
self.list_ou = [','.join(parent_ou_split[i:]) for i in range(len(parent_ou_split))]
# Détermination du contrôleur de domaine et du nom NetBIOS
self.controller = pyldap.cldap_get_ldap_controller(domain_name=self.domain_ad).split(':')[0]
self.netbios_name = self._get_netbios_name()
def _get_netbios_name(self):
# Recherche du nom NetBIOS du contrôleur
search_filter = f"(sAMAccountName={self.controller.split('.')[0]}$)"
attributes = ['servicePrincipalName']
service_principal_names = next(iter(self.client.search_all(self.base_domain_name, search_filter, attributes).values())).get('servicePrincipalName')
return next((u.split('/')[-1] for u in service_principal_names if len(u.split('/')) > 2 and '.' not in str(u.split('/')[-1])), None)
def apply_gpo(self, cn_guid):
guid = cn_guid.split('=')[1].split(',')[0]
drive_file = rf"\\{self.controller}\sysvol\{self.domain_ad}\Policies\{guid}\User\Preferences\Drives\Drives.xml"
return self._parse_drives_xml_with_targeting(drive_file)
def _filter_sid(self, sidtest):
# Vérification de l'appartenance à un SID
if not sidtest:
return False
if sidtest == self.object_sid_user:
return True
return bool(self.client.search_all(self.base_domain_name, f"(&(sAMAccountName={current_username})(objectsid={sidtest}))", ['objectsid']))
def filter_domain(self, attrs):
# Vérification du contexte de l'utilisateur
return attrs['userContext'] == "1" and self.netbios_name == attrs['name']
def filter_computer(self, attrs):
# Vérification du contexte machine (NETBIOS ou DNS)
if attrs['type'] == "NETBIOS":
return get_hostname().split('.')[0] == attrs['name'].lower()
if attrs['type'] == "DNS":
return get_fqdn() == attrs['name'].lower()
return False
def filter_user(self, attrs):
return self._filter_sid(attrs.get('sid'))
def filter_group(self, attrs):
# Vérification de l'appartenance à un groupe
sidtest = attrs.get('sid')
if not sidtest or attrs['localGroup'] == '1' or attrs['userContext'] != '1':
return False
if sidtest.startswith(self.object_sid_user.rsplit('-', 1)[0]) and str(sidtest.split('-')[-1]) == str(self.primary_group_id):
return True
if attrs['primaryGroup'] == '1':
return False
return bool(self.client.search_all(self.base_domain_name, f"(&(member:1.2.840.113556.1.4.1941:={self.dn_user})(objectsid={sidtest}))", ['objectsid']))
def _read_xml_file(self, xml_file_path):
# Lecture du fichier XML
if is_windows():
with open(xml_file_path, encoding='utf-8') as file:
return file.read()
path_parts = xml_file_path.split('\\', 4)
if is_linux():
server, share, subpath, file = path_parts[2], path_parts[3], path_parts[4].rsplit('\\', 1)[0], path_parts[4].rsplit('\\', 1)[1]
result = run(f'smbclient -N --use-kerberos=desired //{server}/{share} -D "{subpath}" -c "get {file} /dev/stdout" --realm={self.domain_ad.upper()}').rsplit('getting file ', 1)[0]
# Find the index of the first line beginning with '<?xml'
lines = result.splitlines()
start_of_xml = next(i for i, line in enumerate(lines) if line.strip().startswith('<?xml'))
return "\n".join(lines[start_of_xml:])
if is_darwin():
# TODO: Umount the share at the end
tmp_sysvol_mountdir = f"/private/tmp/sysvol_{current_username}"
if not isdir(tmp_sysvol_mountdir):
mkdirs(tmp_sysvol_mountdir)
run(f'mount_smbfs //{self.controller}/sysvol {tmp_sysvol_mountdir}')
converted_path = '/'.join(path_parts[-1].split('\\'))
with open(f"{tmp_sysvol_mountdir}/{converted_path}", encoding='utf-8') as file:
return file.read()
def evaluate_filters(self, filters_element):
last_condition_result = None
last_operator = "AND"
for filter_element in filters_element:
#Pour le premier check le last result est True si condition AND, Sinon False
if last_condition_result == None:
if filter_element.attrib["bool"] == "OR":
last_condition_result = False
else:
last_condition_result = True
###################################################
#On évalue le filter et on passe le resulat précédent pour renvoyer le bon résultat en fonction de l'operateur
result_filter = self.evaluate_filter(filter_element,last_condition_result)
# comportement étrange mais visiblement le moteur de ciblage fonctionne comme ceci actuellement
# Si le dernier résultat est True et que le dernier Operateur a True et l'operateur actuelle est a True alors on s'arrête
if last_condition_result and last_operator == "OR" and filter_element.attrib["bool"] == "OR":
return True
####################################################################
# on conserve les resultat pour le prochaine check
last_condition_result = result_filter
last_operator = filter_element.attrib["bool"]
return last_condition_result
def evaluate_filter(self, filter_element, last_condition_result):
#on prend que les filtre pris en charge sinon on dit que le résultat c'est non
type_filter = filter_element.tag
if (type_filter != 'FilterCollection') and (not type_filter in dir(self)):
return False
operator = filter_element.attrib["bool"]
# si la condition c'est OR et que les dernier check on a résultat a True alors pas besoin de tester
if operator == "OR" and last_condition_result:
return True
#on appel la fonction associter au filter
if type_filter == "FilterCollection":
result_filter = self.evaluate_filters(filter_element.findall("*"))
else:
result_filter = getattr(self,type_filter)(filter_element.attrib)
#on vérifie si la condition attendu doit être inversé
if filter_element.attrib["not"] == '1':
result_filter = not result_filter
#On renvoie les resultat en calculant avec les last result
if operator == "OR":
return result_filter or last_condition_result
if operator == "AND":
return result_filter and last_condition_result
def _parse_drives_xml_with_targeting(self, xml_file_path):
# Analyse et extraction des informations des lecteurs réseau
try:
xml_data = self._read_xml_file(xml_file_path)
root = ET.fromstring(xml_data)
share_paths = {}
for drive in root.findall("Drive"):
if drive.attrib.get('disabled') == "1":
continue
properties = drive.find("Properties")
if properties is None:
continue
props = {key: properties.get(key) for key in ['path', 'useLetter', 'label', 'action', 'letter'] if properties.get(key)}
if drive.find("Filters") and not self.evaluate_filters(drive.find("Filters")):
continue
if "letter" in props:
share_paths[props["letter"]] = props
return share_paths
except (FileNotFoundError, ET.ParseError) as e:
print(f"Erreur : {e}")
return {}
def check_acl_gpo(self, data):
datasplit = data.split('(')
group_primary = self.object_sid_user.rsplit('-',1)[0] + '-' + str(self.primary_group_id)
all_sid = {}
for p in datasplit:
if ';edacfd8f-ffb3-11d1-b41d-00a0c968f939;' in p:
sid_test = p.split(';')[5].split(')')[0]
all_sid[sid_test] = None
all_user_sid_member = {group_primary:None,"AU":None}
if all_sid:
filter_ldap = "(&(|%s)(member:1.2.840.113556.1.4.1941:=%s))" % (''.join(['(objectsid=' + u + ')' for u in all_sid]), self.dn_user)
result_ldap = self.client.search_all(self.base_domain_name, filter_ldap,['objectSid'])
for entry in result_ldap:
all_user_sid_member[result_ldap[entry]['objectSid'][0]] = None
for p in datasplit:
if ';edacfd8f-ffb3-11d1-b41d-00a0c968f939;' in p:
if not p.split(';')[0] == 'OD':
continue
sid_test = p.split(';')[5].split(')')[0]
if sid_test in all_user_sid_member:
return False
for p in datasplit:
if ';edacfd8f-ffb3-11d1-b41d-00a0c968f939;' in p:
if not p.split(';')[0] == 'OA':
continue
sid_test = p.split(';')[5].split(')')[0]
if sid_test in all_user_sid_member:
return True
return False
def get_network_drive(self):
""" Récupère les lecteurs réseaux en fonction des GPO appliquées. """
dict_network_drive = {}
dict_ou_result = {}
dict_gpo_value = {}
# Recherche de toutes les OU et récupération des GPO associées
for ou in self.list_ou:
dict_ou_result[ou] = self.client.search_all(self.base_domain_name,
f"(&(distinguishedName={ou})(gPLink=*)(!(gPOptions=2))(!(gPOptions=3)))",
['gPOptions','gPLink']
).get(ou)
if self.site :
rsite = self.client.search_all(f'CN=Configuration,{self.base_domain_name}', f'(&(name={self.site})(objectClass=site)(gPLink=*))')
if rsite:
dict_ou_result[self.list_ou[-1]] = list(rsite.values())[0]
# Détection d'une OU avec Block Inheritance
block_inheritance = next(
(
ou.lower() for ou in self.list_ou
if dict_ou_result.get(ou) and
dict_ou_result[ou].get('gPOptions', [''])[0] == 1
), None
)
def process_gpo_links(ou, apply_forced=False):
if not dict_ou_result.get(ou):
return
for u in dict_ou_result[ou]['gPLink'][0].split('['):
u = u.strip(' ]')
if not u:
continue
gpo_param = u.split('//')[1].rsplit(';', 1)
gpo_distinguished_name = gpo_param[0]
gpo_param_flag = gpo_param[1]
if u not in dict_gpo_value:
gpo_value = list(self.client.search_all(
self.base_domain_name,
f"(distinguishedName={gpo_distinguished_name})",
['nTSecurityDescriptor', 'gPCUserExtensionNames', 'flags']
).values())[0]
dict_gpo_value[u] = gpo_value
else:
gpo_value = dict_gpo_value[u]
## 0 Aucun drapeau actif (configuration par défaut).
## 1 La GPO est désactivée côté utilisateur.
## 2 La GPO est désactivée côté ordinateur.
## 3 La GPO est désactivée à la fois pour les utilisateurs et ordinateurs.
## 4 Indique que la GPO est une stratégie de groupe starter (modèle de GPO préconfiguré).
if gpo_value['flags'][0] in [1, 3]:
continue
# si ce n'est pas une gpo network drive on fait rien
if '{2EA1A81B-48E5-45E9-8BB7-A6E3AC170006}' not in str(gpo_value.get('gPCUserExtensionNames', '')):
continue
#0 case appliqué décocher, lien activé cocher
#1 case appliqué décocher, lien activé décocher
#2 case appliqué cocher, lien activé cocher
#3 case appliqué cocher, lien activé décocher
if (gpo_param_flag == "0" and not apply_forced) or (gpo_param_flag == "2" and apply_forced):
if self.check_acl_gpo(gpo_value['nTSecurityDescriptor'][0]):
dict_network_drive.update(self.apply_gpo(gpo_param[0]))
# Première passe : récupération des GPO appliquées
for ou in reversed(self.list_ou):
if block_inheritance and not ou.lower().endswith(block_inheritance):
continue
process_gpo_links(ou, apply_forced=False)
# Deuxième passe : application des GPO forcées
for ou in reversed(self.list_ou):
process_gpo_links(ou, apply_forced=True)
return dict_network_drive
6948aa6302ee0320ea4ec3b93152e44c01b012d0e19a69b0138f43271d28bdaa : .gitignore
38d056ab130f7bf7c481c12636a4e9959de36561d3dfcbe54c6e3571bc0c1dc3 : WAPT/certificate.crt
3a0fe7ecef060808ffea7128145be18b67cdb92b119edd223eb6b893cb3c99c3 : WAPT/control
25b0beebb5a383774223652d265c32e809a14cbe789e6ffc140a1e9e43591361 : WAPT/icon.png
6e63ea2c27883cf1d8ec9c7295488870940ffee9ddc0a2c909933a2b674c880a : luti.json
6013f9a157a267904caa151fcf5c2d1e99c02dcf89b41cf00ba5380f80cc48ee : setup.py
cb320c6b6e395d57ef245c74b9953505ee99d0b1cbe4335c335e2115d8c5b0ca : testgpo-printer.py
c3990a40f3a61a132a8f7c731130a5e204d3973d6d19cac114235a9da4d60f7a : testgpo.py