tis-mount-network-share icon

Mount Network Share

Paquet d’installation silencieuse pour Mount Network Share

1-13

  • package: tis-mount-network-share
  • name: Mount Network Share
  • version: 1-13
  • maintainer: WAPT Team,Tranquil IT,Flavien SCHELFAUT,Simon FONTENEAU
  • editor: WAPT
  • target_os: all
  • architecture: all
  • signature_date:
  • size: 24.34 Ko

package           : tis-mount-network-share
version           : 1-13
architecture      : all
section           : base
priority          : optional
name              : Mount Network Share
categories        : 
maintainer        : WAPT Team,Tranquil IT,Flavien SCHELFAUT,Simon FONTENEAU
description       : A package that lets you set up network shares on all platforms.
depends           : 
conflicts         : 
maturity          : PROD
locale            : 
target_os         : all
min_wapt_version  : 2.6
sources           : 
installed_size    : 
impacted_process  : 
description_fr    : Un paquet qui permet de monter des partages réseaux sur toutes les plateformes.
description_pl    : Pakiet do konfigurowania udziałów sieciowych na wszystkich platformach.
description_de    : Ein Paket, mit dem man Netzwerkfreigaben auf allen Plattformen einrichten kann.
description_es    : Un paquete para configurar recursos compartidos de red en todas las plataformas.
description_pt    : Um pacote para configurar partilhas de rede em todas as plataformas.
description_it    : Un pacchetto per l'impostazione delle condivisioni di rete su tutte le piattaforme.
description_nl    : Een pakket om netwerkshares in te stellen op alle platformen.
description_ru    : Пакет для настройки общих сетевых ресурсов на всех платформах.
audit_schedule    : 
editor            : WAPT
keywords          : 
licence           : 
homepage          : 
package_uuid      : f08c7369-d72e-4173-b888-e5c46e9e5393
valid_from        : 
valid_until       : 
forced_install_on : 
changelog         : 
min_os_version    : 
max_os_version    : 
icon_sha256sum    : 25b0beebb5a383774223652d265c32e809a14cbe789e6ffc140a1e9e43591361
signer            : Tranquil IT
signer_fingerprint: 8c5127a75392be9cc9afd0dbae1222a673072c308c14d88ab246e23832e8c6bb
signature_date    : 2025-03-21T13:13:55.000000
signed_attributes : package,version,architecture,section,priority,name,categories,maintainer,description,depends,conflicts,maturity,locale,target_os,min_wapt_version,sources,installed_size,impacted_process,description_fr,description_pl,description_de,description_es,description_pt,description_it,description_nl,description_ru,audit_schedule,editor,keywords,licence,homepage,package_uuid,valid_from,valid_until,forced_install_on,changelog,min_os_version,max_os_version,icon_sha256sum,signer,signer_fingerprint,signature_date,signed_attributes
signature         : s4NTLu2CyWNFzY3dc1/aS+qMICqzsvFwmmFRii5QM9jeOdEU7sMfK9NDJq14tyYzMT3Le+BG2spfBSDm+wMNluanErWvaczh8myimD39UFDfIyN5/Bbc2uz87i7sTCZ5cQR2FxB3jouZlWOqCcfyywJdwkuYc6uGVfeSRbRX4rfIwrHhy/Xr/h70vpcMTf75cWXLt8adqk485SP8fJ4vc7gVmTKtBWZ5Q104qpHV6VT/xCwtx7l+mnPlFjfoycAgXT8ChCnbeyudPBcX3xpxXeMMI5o2uodMZ+6rJqKdF8qL9KNO3GJBkfbkxVvUvAwfNPkRdVYHsTJmxhOnbllGnQ==

# -*- coding: utf-8 -*-
from setuphelpers import *
from waptutils import WAPT_VERSION_FULL
import pyldap
import os, json
from typing import List
from dataclasses import dataclass
import platform
import xml.etree.ElementTree as ET

###
# DO NOT TOUCH THESE VARIABLES
###
wapt_session_setup = "/etc/xdg/autostart/tis-waptsessionsetup.desktop"
gnome_config_dir = makepath(application_data(), "gtk-3.0")
bookmarks_gnome_file = makepath(gnome_config_dir, "bookmarks")
bookmarks_wapt_file = makepath(application_data(), 'wapt', 'bookmarks.json')

default_conf_smbnetfs = "/etc/smbnetfs.conf"
default_conf_samba = "/etc/samba/smb.conf"

current_username = get_current_user().lower()
current_computer = f"{get_computername().split('.')[0]}$"

def install():

    if Version(WAPT_VERSION_FULL.split('-')[0]) < Version("2.6.0.16881"):
        error("\!/ This package requires at least wapt version 2.6.0.16881 or higher \!/")

    if (is_debian_based() or is_redhat_based()) and not isfile(wapt_session_setup):
        error("\!/ This package requires the installation of waptagent-gui to be fully functional \!/")

    # Installing necessary dependencies
    if is_debian_based():
        install_apt('smbnetfs krb5-user smbclient')
    elif is_redhat_based():
        install_rpm('smbnetfs krb5-user smbclient')

def session_setup():

    domain_name = get_domain_name()
    if not domain_name:
        error('\!/ Unable to retrieve domain name \!/')

    network_share = NetworkShare()

    # Scan the domain sysvol to find the network drive GPO 
    ge = GpoEngine(domain_name)
    network_share.shares = convert_gpo_network_drive_to_share(ge.get_network_drive())

    # Alternative to GpoEngine, but must be defined manually. An example can be found below
    # ldap = LdapManager(domain_name)
    # if ldap.user_is_member_of_groups(['Administrators']):
    #     network_share.add('S', "Sysvol Another", 'fschelfaut.lan', 'sysvol/.stfolder')
    #     network_share.add('E', "Sysvol", 'fschelfaut.lan', 'sysvol')

    # if ldap.user_is_member_of_groups(['grp_bitwarden']):
    #     network_share.add('P', "Partage ISO", 'srvtemplates.ad.tranquil.it', 'iso')
    #     network_share.add('S', "Partage TEST", 'srvtemplates.ad.tranquil.it', 'test$')
    #     network_share.add('A', "Partage TEST2", 'srvtemplates.ad.tranquil.it', 'test2$')

    # if ldap.computer_is_member_of_groups(['test_wapt_profile', 'testimbrication2']):
    #     network_share.add('C', "Partage TEST2 Computer", 'srvtemplates.ad.tranquil.it', 'test2$')

    # if ldap.user_is_in_OU('OU=srp_hard,OU=users,OU=tranquilit,DC=ad,DC=tranquil,DC=it'):
    #     network_share.add('B', "Partage TEST2 User", 'srvtemplates.ad.tranquil.it', 'test2$')
    
    # Cleanup share before mounting new one 
    network_share.cleanup()

    # Mount all share to the current user  
    network_share.mount()

    return "RERUN"

def is_debian_based():
    return isfile('/etc/debian_version')

def is_redhat_based():
    return isfile('/etc/redhat-release')

def is_linux():
    return platform.system() == 'Linux'

def is_windows():
    return platform.system() == 'Windows'

def is_darwin():
    return platform.system() == 'Darwin'

def get_domain_name():
    try:
        if is_windows():
            return get_domain_fromregistry().lower()
        elif is_linux():
            return next(l.split(':')[-1].split('@')[-1].lower() for l in run('klist').splitlines() if l.startswith('Default principal'))
        elif is_darwin():
            return json.loads(run('klist --json'))['principal'].split('@')[-1].lower()
    except:
        return None

def read_txt_file(file: str) -> list:
    if isfile(file):
        with open(file) as f:
            return f.read().splitlines()
    return [] 

def write_txt_file(file: str, data: str, mode: str = 'w', endline: str = "\n") -> None:
    with open(file=file, mode=mode) as f:
        f.write(data + endline)

def save_bookmark_added_by_wapt(bookmark_line):
    bookmarks_data = {}
    if isfile(bookmarks_wapt_file):
        bookmarks_data = json_load_file(bookmarks_wapt_file)

    user_bookmark = bookmarks_data.setdefault(current_username, [])
    if bookmark_line not in user_bookmark:
        user_bookmark.append(bookmark_line)

    json_write_file(bookmarks_wapt_file, bookmarks_data)

def add_gnome_bookmark(bookmark_uri: str, bookmark_name: str = ""):
    """
    Add a GNOME bookmark for the current user in ~/.config/gtk-3.0/bookmarks.
    
    :param bookmark_uri: The URI of the bookmark (e.g., "file:///usr/share/doc/" or "ftp://ftp.gnome.org/").
    :param bookmark_name: (Optional) A user-friendly name for the bookmark (e.g., "Documentation").
    """

    bookmark_line = f"{bookmark_uri} {bookmark_name}".strip()

    mkdirs(gnome_config_dir)

    # Add the bookmark only if it doesn't exist
    if bookmark_line not in read_txt_file(bookmarks_gnome_file):

        write_txt_file(bookmarks_gnome_file, bookmark_line, mode='a')
        save_bookmark_added_by_wapt(bookmark_line)
    
        print(f"[+] Bookmark added: {bookmark_name if bookmark_name else bookmark_line}")
    else:
        print(f"[!] Bookmark already exists: {bookmark_name if bookmark_name else bookmark_line}")
        
def remove_gnome_bookmark():

    if not isfile(bookmarks_wapt_file):
        print("[!] No WAPT bookmarks file found. Nothing to remove.")
        return
    
    bookmarks_wapt = json_load_file(bookmarks_wapt_file)
    bookmarks_gnome = read_txt_file(bookmarks_gnome_file)
    user_bookmark = bookmarks_wapt.get(current_username, [])

    if not user_bookmark:
        return
    
    for bookmark in user_bookmark:
        if bookmark in bookmarks_gnome:
            bookmarks_gnome.remove(bookmark)
    
    write_txt_file(bookmarks_gnome_file, '\n'.join(bookmarks_gnome))
    print(f"[+] Successfully removed bookmark(s) added by WAPT for '{current_username}'.")

def convert_gpo_network_drive_to_share(network_drive_dict):

    return [
        Share(share['letter'], share['label'], *share['path'].removeprefix(r'\\').split('\\', 1))
        for share in network_drive_dict.values()
    ]

@dataclass
class Share:
    letter: str
    name: str
    server: str
    path: str

class LdapManager:
    """
    Manages the connection and LDAP queries (Active Directory).
    """

    def __init__(self, domain_name):
        self.domain_ad = domain_name
        self.client = pyldap.PyLdapClient(domain_name=self.domain_ad)

        result = self.client.bind_sasl_kerberos()
        if not result[0]:
            error('Failed connect to active directory')

        self.base_domain_name = self.client.default_dn()

        self.all_user_info = self.__get_all_info_current_user()
        self.all_computer_info = self.__get_all_info_current_computer()

        self.user_DN = next(iter(self.all_user_info))
        self.user_OU = self.__get_OU_from_DN(self.user_DN)

        self.computer_DN = next(iter(self.all_computer_info))
        self.computer_OU = self.__get_OU_from_DN(self.computer_DN)

    def __get_OU_from_DN(self, dn: str):
        return ','.join(dn.split(',')[1:])

    def __get_all_info_current_user(self):
        """
        Retrieves all AD parameters of the current user.
        """

        return self.client.search_all(self.base_domain_name, f'(sAMAccountName={current_username})')

    def __get_all_info_current_computer(self):
        """
        Retrieves all AD parameters of the current computer.
        """

        return self.client.search_all(self.base_domain_name, f'(sAMAccountName={current_computer})')
    
    def __is_in_OU(self, object_dn: str, OU: str) -> bool:
        """
        Checks if the object's DN ends with the specified OU.
        """
        return object_dn.endswith(OU.lower())

    def user_is_in_OU(self, OU: str) -> bool:
        """
        Checks if the user is in the specified OU.
        """
        return self.__is_in_OU(self.user_DN, OU)

    def user_is_member_of_groups(self, groups: List[str]) -> bool:
        """
        Checks if the given user belongs exactly to the provided list of groups.
        """

        user_groups = self.client.get_authorized_user_groups(current_username, groups)
        return set(user_groups if user_groups else [] ) == set(groups)

    def computer_is_member_of_groups(self, groups: List[str]) -> bool:
        """
        Checks if the given computer belongs exactly to the provided list of groups.
        """

        # This current method not works, mormot need to fix that 
        # computer_groups = self.client.get_authorized_user_groups(current_computer, groups)
        # return computer_groups == groups

        group_filters = ''.join(f'(sAMAccountName={group})' for group in groups)
        ldap_filter = (
            f"(&(member:1.2.840.113556.1.4.1941:={self.computer_DN})"
            f"(|{group_filters}))"
        )

        self.client.search(self.base_domain_name, False, ldap_filter, ["distinguishedName"])
        response = self.client.search_result()

        computer_groups = [
            dn.split(',')[0].removeprefix('CN=')
            for entry in response
            for attr in entry.object_attributes
            for dn in attr.values
        ]

        return set(computer_groups) == set(groups)
    
    def computer_is_in_OU(self, OU: str) -> bool:
        """
        Checks if the computer is in the specified OU.
        """
        return self.__is_in_OU(self.computer_DN, OU)

    def print_all_info_current_user(self):
        """
        Prints all AD parameters of the current user.
        """

        print("AD user infos:", json.dumps(self.all_user_info, indent=4))

    def print_all_info_current_computer(self):
        """
        Prints all AD parameters of the current computer.
        """

        print("AD user infos:", json.dumps(self.all_computer_info, indent=4))

class NetworkShare():
    """
    Manages the mounting of network drives in the user's session depending on the type of machine.
    """

    def __init__(self):

        self.linux_mountdir = makepath(user_home_directory(), ".mountpoint")
        self.linux_network_drive_dir = makepath(user_home_directory(), "network_drive")
        self.shares: List[Share] = []

        if is_debian_based() or is_redhat_based():
            self.share_manager = SmbNetFsManager(self.linux_mountdir, self.linux_network_drive_dir)
        elif is_windows():
            self.share_manager = WindowsNetworkShare()
        elif is_darwin():
            self.share_manager = DarwinNetworkShare()


    def add(self, letter, name, server, path):
        """
        Adds a network share to the list of shares to be mounted, avoiding duplicates of 
        letter or (server, path).
        """

        if not letter or len(letter) > 1:
            raise ValueError("The share letter must be a single character (e.g., 'P').")

        if not server:
            raise ValueError("The server name must not be empty.")

        if not path:
            raise ValueError("The share path must not be empty.")
        
        if any('\\' in s for s in [server, path]):
            raise ValueError("The share server and path must not contains backslash ('\\') but slash ('/') instead !")

        # Check if the letter is already in use
        for s in self.shares:
            if s.letter == letter:
                print(f"The share '{letter}' already exists. Add operation canceled.")
                return

        # Check if couple (server, path) already in use
        for s in self.shares:
            if s.server == server and s.path == path:
                print(f"The share '{server}/{path}' already exists. Add operation canceled.")
                return

        new_share = Share(letter, name, server, path)
        self.shares.append(new_share)
        print(f"Share '{letter}' to '{server}/{path}' successfully added.")

    def mount(self):
        """
        Mounts the shares using the share manager.
        """
        if not self.shares:
            print('No shares to mount for the current user'); return
        
        self.share_manager.mount_shares(self.shares)

    def cleanup(self):
        """
        Cleans up/unmounts the shares.
        """

        self.share_manager.umount_shares(self.shares)

class WindowsNetworkShare():

    def __init__(self):
        import win32com.client
        self.WshNetwork = win32com.client.Dispatch("WScript.Network")

    def get_mapped_drives(self) -> List[Share]:
        """
        Retrieved mounts Windows network share
        """

        drives = self.WshNetwork.EnumNetworkDrives()
        mapped_drives = []
        for i in range(0, drives.Count(), 2):
            splited_path = drives.Item(i + 1).removeprefix(os.sep * 2).split(os.sep)
            letter = drives.Item(i).replace(':', '')
            server, path = splited_path[0], '/'.join(splited_path[1:])
            mapped_drives.append(Share(letter, "", server, path))
        return mapped_drives

    def is_mapped(self, share: Share):

        return any(d.letter in share.letter for d in self.get_mapped_drives())

    def rename_mapped_drive(self, share: Share):
        
        key_name = f"##{share.server}#{share.path.replace('/', '#')}"
        reg_path = 'SOFTWARE\Microsoft\Windows\CurrentVersion\Explorer\MountPoints2'
        with reg_openkey_noredir(HKEY_CURRENT_USER, makepath(reg_path, key_name), sam=KEY_WRITE, create_if_missing=True) as key:
            reg_setvalue(key, '_LabelFromReg', share.name, REG_SZ)

    def remove_named_drive(self, share: Share):

        key_name = f"##{share.server}#{share.path.replace('/', '#')}"
        reg_path = 'SOFTWARE\Microsoft\Windows\CurrentVersion\Explorer\MountPoints2'
        if reg_key_exists(HKEY_CURRENT_USER, makepath(reg_path, key_name)):
            registry_deletekey(HKEY_CURRENT_USER, 'SOFTWARE\Microsoft\Windows\CurrentVersion\Explorer\MountPoints2', key_name)

    def mount_shares(self, shares: List[Share]):
        """
        Mounts Windows network share
        """

        for share in shares:
            try:
                network_path = rf"\\{share.server}\{share.path.replace('/', os.sep)}"
                self.WshNetwork.MapNetworkDrive(f"{share.letter}:", network_path)
                self.rename_mapped_drive(share)
                print(f"Successfully mapped {share.letter}: ({network_path})")
            except Exception as e:
                print(f"Unable to mount {share.letter}: {e}")

    def umount_shares(self, shares: List[Share]):
        """
        Unmounts Windows network share
        """

        for share in shares:
            if not self.is_mapped(share):
                continue
            try:
                self.WshNetwork.RemoveNetworkDrive(f"{share.letter}:", True)
                self.remove_named_drive(share)
                print(f"Removed mapped drive {share.letter}: done")
            except Exception as e:
                print(f"Unable to remove mapped {share.letter}: {e}")

class DarwinNetworkShare():

    def __init__(self):
        pass

    def mount_shares(self):
        pass

    def umount_shares(self):
        pass

class SmbNetFsManager():
    """
    Manages the mounting of network drives on Debian-based machines.
    """

    def __init__(self, mountdir, network_drive_dir):
        
        self.mountdir = mountdir
        self.network_drive_dir = network_drive_dir

        self.user_smb_folder = makepath(user_home_directory(), ".smb")
        self.user_conf_smbnetfs = makepath(self.user_smb_folder, "smbnetfs.conf")
        self.user_host_smbnetfs = makepath(self.user_smb_folder, "wapt-smbnetfs.host")

        self.all_fqdn_servers = list()
        self.mounted_servers = list()

        self.__setup_smbnetfs_config()

    def __setup_smbnetfs_config(self):
        """
        Sets up the configuration for smbnetfs in the user's home directory.
        """

        line_to_add = f'include \t\t"{os.path.basename(self.user_host_smbnetfs)}"'
        mkdirs(self.user_smb_folder)

        if not isfile(self.user_conf_smbnetfs):
            filecopyto(default_conf_smbnetfs, self.user_smb_folder)

        lines = read_txt_file(self.user_conf_smbnetfs)
        if line_to_add not in lines:
            write_txt_file(self.user_conf_smbnetfs, line_to_add, mode='a')

    def __setup_smbnetfs_host(self):
        """
        Writes the servers configuration for smbnetfs.
        """

        conf = str()
        for server in self.all_fqdn_servers:
            conf += f"host {server} visible=true\n"

        write_txt_file(self.user_host_smbnetfs, conf)
        os.chmod(self.user_host_smbnetfs, 0o600)

    def __is_smbnetfs_mounted(self):
        """
        Checks if smbnetfs is already mounted.
        """

        return any(line for line in read_txt_file('/etc/mtab') if line.startswith(f'smbnetfs {self.mountdir}'))

    def __mount_smbnetfs(self):
        """
        Mounts smbnetfs
        """

        mkdirs(self.mountdir)
        run(f'smbnetfs {self.mountdir}')
        
        mounted_servers = os.listdir(self.mountdir)
        for server_to_mount in self.all_fqdn_servers:
            if server_to_mount not in mounted_servers:
                print(f"\!/ Failed to mount smbnetfs share for '{server_to_mount}' server \!/")
            else:
                self.mounted_servers.append(server_to_mount)

    def mount_shares(self, shares: List[Share]):
        """
        Mounts the specified shares, creating symbolic links.
        """

        self.all_fqdn_servers = list({share.server for share in shares})
        self.__setup_smbnetfs_host()

        if not self.__is_smbnetfs_mounted():
            self.__mount_smbnetfs()

        mkdirs(self.network_drive_dir)
        remove_gnome_bookmark()

        for share in shares:
            
            if share.server not in self.mounted_servers:
                print(f"'\!/ Failed to create share '{share.letter}' because smbnetfs cannot mount '{share.server}' \!/'")
                continue

            share_mountdir = makepath(self.mountdir, share.server, share.path)
            share_destination = makepath(self.network_drive_dir, share.letter)

            # ensure the symlink not exist  
            if os.path.islink(share_destination):
                remove_file(share_destination)

            os.symlink(share_mountdir, share_destination)
            add_gnome_bookmark(f"file:///{share_destination}", share.name)

        if self.mounted_servers:
            print(f"Successfully mapped: {', '.join([share.letter for share in shares if share.server in self.mounted_servers])}")
        else:
            print(f"No network shares mapped...")

    def umount_shares(self, shares: List[Share]):
        """
        Unmounts smbnetfs if mounted and removes the network drive directory.
        """

        if self.__is_smbnetfs_mounted():
            run(f"fusermount -u {self.mountdir}")

        # Cleanup the folder
        if isdir(self.network_drive_dir):
            remove_tree(self.network_drive_dir)

class GpoEngine:
    def __init__(self, domain_name):
        self.domain_ad = domain_name
        self.site = pyldap.cldap_get_domain_info(domain_name=self.domain_ad)['client_site']

        self.client = pyldap.PyLdapClient(domain_name=self.domain_ad)
        result = self.client.bind_sasl_kerberos()
        if not result[0]:
            error('Failed connect to active directory')

        self.base_domain_name = self.client.default_dn()

        # Récupération des informations de l'utilisateur
        user_result = self.client.search_all(self.base_domain_name, f"(sAMAccountName={current_username})", ["primaryGroupID", "objectSid"])
        self.dn_user = next(iter(user_result))
        user_values = next(iter(user_result.values()))
        self.primary_group_id = user_values['primaryGroupID'][0]
        self.object_sid_user = user_values['objectSid'][0]

        # Récupération des unités organisationnelles de l'utilisateur
        result = self.client.search_all(self.base_domain_name, f"(sAMAccountName={current_username})")
        parent_ou_split = list(result)[0].split(',')
        self.list_ou = [','.join(parent_ou_split[i:]) for i in range(len(parent_ou_split))]

        # Détermination du contrôleur de domaine et du nom NetBIOS
        self.controller = pyldap.cldap_get_ldap_controller(domain_name=self.domain_ad).split(':')[0]
        self.netbios_name = self._get_netbios_name()

    def _get_netbios_name(self):
        # Recherche du nom NetBIOS du contrôleur
        search_filter = f"(sAMAccountName={self.controller.split('.')[0]}$)"
        attributes = ['servicePrincipalName']
        service_principal_names = next(iter(self.client.search_all(self.base_domain_name, search_filter, attributes).values())).get('servicePrincipalName')
        return next((u.split('/')[-1] for u in service_principal_names if len(u.split('/')) > 2 and '.' not in str(u.split('/')[-1])), None)
    
    def apply_gpo(self, cn_guid):
        guid = cn_guid.split('=')[1].split(',')[0]
        drive_file = rf"\\{self.controller}\sysvol\{self.domain_ad}\Policies\{guid}\User\Preferences\Drives\Drives.xml"
        return self._parse_drives_xml_with_targeting(drive_file)
    
    def _filter_sid(self, sidtest):
        # Vérification de l'appartenance à un SID
        if not sidtest:
            return False
        if sidtest == self.object_sid_user:
            return True
        return bool(self.client.search_all(self.base_domain_name, f"(&(sAMAccountName={current_username})(objectsid={sidtest}))", ['objectsid']))
    
    def filter_domain(self, attrs):
        # Vérification du contexte de l'utilisateur
        return attrs['userContext'] == "1" and self.netbios_name == attrs['name']

    def filter_computer(self, attrs):
        # Vérification du contexte machine (NETBIOS ou DNS)
        if attrs['type'] == "NETBIOS":
            return get_hostname().split('.')[0] == attrs['name'].lower()
        if attrs['type'] == "DNS":
            return get_fqdn() == attrs['name'].lower()
        return False

    def filter_user(self, attrs):
        return self._filter_sid(attrs.get('sid'))
    
    def filter_group(self, attrs):
        # Vérification de l'appartenance à un groupe
        sidtest = attrs.get('sid')
        if not sidtest or attrs['localGroup'] == '1' or attrs['userContext'] != '1':
            return False
        
        if sidtest.startswith(self.object_sid_user.rsplit('-', 1)[0]) and str(sidtest.split('-')[-1]) == str(self.primary_group_id):
            return True
        
        if attrs['primaryGroup'] == '1':
            return False

        return bool(self.client.search_all(self.base_domain_name, f"(&(member:1.2.840.113556.1.4.1941:={self.dn_user})(objectsid={sidtest}))", ['objectsid']))

    def _read_xml_file(self, xml_file_path):
        # Lecture du fichier XML
        if is_windows():
            with open(xml_file_path, encoding='utf-8') as file:
                return file.read()
        
        path_parts = xml_file_path.split('\\', 4)

        if is_linux():
            server, share, subpath, file = path_parts[2], path_parts[3], path_parts[4].rsplit('\\', 1)[0], path_parts[4].rsplit('\\', 1)[1]
            result = run(f'smbclient -N --use-kerberos=desired //{server}/{share} -D "{subpath}" -c "get {file} /dev/stdout" --realm={self.domain_ad.upper()}').rsplit('getting file ', 1)[0]
            # Find the index of the first line beginning with '<?xml'
            lines = result.splitlines()
            start_of_xml = next(i for i, line in enumerate(lines) if line.strip().startswith('<?xml'))
            return "\n".join(lines[start_of_xml:])
        
        if is_darwin():
            # TODO: Umount the share at the end
            tmp_sysvol_mountdir = f"/private/tmp/sysvol_{current_username}"
            if not isdir(tmp_sysvol_mountdir):
                mkdirs(tmp_sysvol_mountdir)
            run(f'mount_smbfs //{self.controller}/sysvol {tmp_sysvol_mountdir}')

            converted_path = '/'.join(path_parts[-1].split('\\'))
            with open(f"{tmp_sysvol_mountdir}/{converted_path}", encoding='utf-8') as file:
                return file.read()
         
    def evaluate_filters(self, filters_element):

        last_condition_result = None
        last_operator = "AND"

        for filter_element in filters_element:
            #Pour le premier check le last result est True si condition AND, Sinon False
            if last_condition_result == None:
                if filter_element.attrib["bool"] == "OR":
                    last_condition_result = False
                else:
                    last_condition_result = True
            ###################################################

            #On évalue le filter et on passe le resulat précédent pour renvoyer le bon résultat en fonction de l'operateur
            result_filter = self.evaluate_filter(filter_element,last_condition_result)

            # comportement étrange mais visiblement le moteur de ciblage fonctionne comme ceci actuellement
            # Si le dernier résultat est True et que le dernier Operateur a True et l'operateur actuelle est a True alors on s'arrête
            if last_condition_result and last_operator == "OR" and  filter_element.attrib["bool"] == "OR":
                return True
            ####################################################################

            # on conserve les resultat pour le prochaine check
            last_condition_result = result_filter
            last_operator = filter_element.attrib["bool"]

        return last_condition_result
    
    def evaluate_filter(self, filter_element, last_condition_result):
        #on prend que les filtre pris en charge sinon on dit que le résultat c'est non
        type_filter = filter_element.tag
        if (type_filter != 'FilterCollection') and (not type_filter in dir(self)):
            return False

        operator = filter_element.attrib["bool"]

        # si la condition c'est OR et que les dernier check on a résultat a True alors pas besoin de tester
        if operator == "OR" and last_condition_result:
            return True

        #on appel la fonction associter au filter
        if type_filter == "FilterCollection":
            result_filter = self.evaluate_filters(filter_element.findall("*"))
        else:
            result_filter = getattr(self,type_filter)(filter_element.attrib)

        #on vérifie si la condition attendu doit être inversé
        if filter_element.attrib["not"] == '1':
            result_filter = not result_filter

        #On renvoie les resultat en calculant avec les last result
        if operator == "OR":
            return result_filter or last_condition_result

        if operator == "AND":
            return result_filter and last_condition_result

    def _parse_drives_xml_with_targeting(self, xml_file_path):
        # Analyse et extraction des informations des lecteurs réseau
        try:
            xml_data = self._read_xml_file(xml_file_path)
            root = ET.fromstring(xml_data)
            share_paths = {}
            
            for drive in root.findall("Drive"):
                if drive.attrib.get('disabled') == "1":
                    continue
                properties = drive.find("Properties")
                if properties is None:
                    continue
                props = {key: properties.get(key) for key in ['path', 'useLetter', 'label', 'action', 'letter'] if properties.get(key)}
                if drive.find("Filters") and not self.evaluate_filters(drive.find("Filters")):
                    continue
                if "letter" in props:
                    share_paths[props["letter"]] = props
                    
            return share_paths
        except (FileNotFoundError, ET.ParseError) as e:
            print(f"Erreur : {e}")
            return {}
    
    def check_acl_gpo(self, data):
        datasplit = data.split('(')

        group_primary =  self.object_sid_user.rsplit('-',1)[0] + '-' + str(self.primary_group_id)

        all_sid = {}
        for p in datasplit:
            if ';edacfd8f-ffb3-11d1-b41d-00a0c968f939;' in p:
                sid_test = p.split(';')[5].split(')')[0]
                all_sid[sid_test] = None

        all_user_sid_member = {group_primary:None,"AU":None}

        if all_sid:
            filter_ldap = "(&(|%s)(member:1.2.840.113556.1.4.1941:=%s))" %  (''.join(['(objectsid=' + u + ')' for u in all_sid]), self.dn_user)
            result_ldap = self.client.search_all(self.base_domain_name, filter_ldap,['objectSid'])
            for entry in result_ldap:
                all_user_sid_member[result_ldap[entry]['objectSid'][0]] = None

        for p in datasplit:
            if ';edacfd8f-ffb3-11d1-b41d-00a0c968f939;' in p:
                if not p.split(';')[0] == 'OD':
                    continue
                sid_test = p.split(';')[5].split(')')[0]
                if sid_test in all_user_sid_member:
                    return False

        for p in datasplit:
            if ';edacfd8f-ffb3-11d1-b41d-00a0c968f939;' in p:
                if not p.split(';')[0] == 'OA':
                    continue
                sid_test = p.split(';')[5].split(')')[0]
                if sid_test in all_user_sid_member:
                    return True

        return False

    def get_network_drive(self):
        """ Récupère les lecteurs réseaux en fonction des GPO appliquées. """

        dict_network_drive = {}
        dict_ou_result = {}
        dict_gpo_value = {}

        # Recherche de toutes les OU et récupération des GPO associées
        for ou in self.list_ou:
            dict_ou_result[ou] = self.client.search_all(self.base_domain_name,
                f"(&(distinguishedName={ou})(gPLink=*)(!(gPOptions=2))(!(gPOptions=3)))",
                ['gPOptions','gPLink']
            ).get(ou)

        if self.site :
            rsite = self.client.search_all(f'CN=Configuration,{self.base_domain_name}', f'(&(name={self.site})(objectClass=site)(gPLink=*))')
            if rsite:
                dict_ou_result[self.list_ou[-1]] = list(rsite.values())[0]

        # Détection d'une OU avec Block Inheritance
        block_inheritance = next(
            (
                ou.lower() for ou in self.list_ou
                if dict_ou_result.get(ou) and
                dict_ou_result[ou].get('gPOptions', [''])[0] == 1
            ), None
        )

        def process_gpo_links(ou, apply_forced=False):

            if not dict_ou_result.get(ou):
                return

            for u in dict_ou_result[ou]['gPLink'][0].split('['):
                u = u.strip(' ]')
                if not u:
                    continue
                
                gpo_param = u.split('//')[1].rsplit(';', 1)
                gpo_distinguished_name = gpo_param[0]
                gpo_param_flag = gpo_param[1]
                
                if u not in dict_gpo_value:
                    gpo_value = list(self.client.search_all(
                        self.base_domain_name,
                        f"(distinguishedName={gpo_distinguished_name})",
                        ['nTSecurityDescriptor', 'gPCUserExtensionNames', 'flags']
                    ).values())[0]
                    dict_gpo_value[u] = gpo_value
                else:
                    gpo_value = dict_gpo_value[u]

                ## 0 Aucun drapeau actif (configuration par défaut).
                ## 1 La GPO est désactivée côté utilisateur.
                ## 2 La GPO est désactivée côté ordinateur.
                ## 3 La GPO est désactivée à la fois pour les utilisateurs et ordinateurs.
                ## 4 Indique que la GPO est une stratégie de groupe starter (modèle de GPO préconfiguré).
                if gpo_value['flags'][0] in [1, 3]:
                    continue
                    
                # si ce n'est pas une gpo network drive on fait rien
                if '{2EA1A81B-48E5-45E9-8BB7-A6E3AC170006}' not in str(gpo_value.get('gPCUserExtensionNames', '')):
                    continue

                #0 case appliqué décocher, lien activé cocher
                #1 case appliqué décocher, lien activé décocher
                #2 case appliqué cocher, lien activé cocher
                #3 case appliqué cocher, lien activé décocher
                if (gpo_param_flag == "0" and not apply_forced) or (gpo_param_flag == "2" and apply_forced):
                    if self.check_acl_gpo(gpo_value['nTSecurityDescriptor'][0]):
                        dict_network_drive.update(self.apply_gpo(gpo_param[0]))

        # Première passe : récupération des GPO appliquées
        for ou in reversed(self.list_ou):
            if block_inheritance and not ou.lower().endswith(block_inheritance):
                continue
            process_gpo_links(ou, apply_forced=False)

        # Deuxième passe : application des GPO forcées
        for ou in reversed(self.list_ou):
            process_gpo_links(ou, apply_forced=True)

        return dict_network_drive

6948aa6302ee0320ea4ec3b93152e44c01b012d0e19a69b0138f43271d28bdaa : .gitignore
38d056ab130f7bf7c481c12636a4e9959de36561d3dfcbe54c6e3571bc0c1dc3 : WAPT/certificate.crt
3a0fe7ecef060808ffea7128145be18b67cdb92b119edd223eb6b893cb3c99c3 : WAPT/control
25b0beebb5a383774223652d265c32e809a14cbe789e6ffc140a1e9e43591361 : WAPT/icon.png
6e63ea2c27883cf1d8ec9c7295488870940ffee9ddc0a2c909933a2b674c880a : luti.json
6013f9a157a267904caa151fcf5c2d1e99c02dcf89b41cf00ba5380f80cc48ee : setup.py
cb320c6b6e395d57ef245c74b9953505ee99d0b1cbe4335c335e2115d8c5b0ca : testgpo-printer.py
c3990a40f3a61a132a8f7c731130a5e204d3973d6d19cac114235a9da4d60f7a : testgpo.py