tis-mount-network-share icon

Mount Network Share

Silent install package for Mount Network Share

2-8

  • package: tis-mount-network-share
  • name: Mount Network Share
  • version: 2-8
  • maintainer: WAPT Team,Tranquil IT,Flavien SCHELFAUT,Simon FONTENEAU
  • editor: WAPT
  • target_os: all
  • architecture: all
  • signature_date:
  • size: 26.06 Ko

package           : tis-mount-network-share
version           : 2-8
architecture      : all
section           : base
priority          : optional
name              : Mount Network Share
categories        : 
maintainer        : WAPT Team,Tranquil IT,Flavien SCHELFAUT,Simon FONTENEAU
description       : A package that lets you set up network shares on all platforms.
depends           : 
conflicts         : 
maturity          : PROD
locale            : 
target_os         : all
min_wapt_version  : 2.6
sources           : 
installed_size    : 
impacted_process  : 
description_fr    : Un paquet qui permet de monter des partages réseaux sur toutes les plateformes.
description_pl    : Pakiet do konfigurowania udziałów sieciowych na wszystkich platformach.
description_de    : Ein Paket, mit dem man Netzwerkfreigaben auf allen Plattformen einrichten kann.
description_es    : Un paquete para configurar recursos compartidos de red en todas las plataformas.
description_pt    : Um pacote para configurar partilhas de rede em todas as plataformas.
description_it    : Un pacchetto per l'impostazione delle condivisioni di rete su tutte le piattaforme.
description_nl    : Een pakket om netwerkshares in te stellen op alle platformen.
description_ru    : Пакет для настройки общих сетевых ресурсов на всех платформах.
audit_schedule    : 
editor            : WAPT
keywords          : 
licence           : 
homepage          : 
package_uuid      : 807cfe60-bb32-45cf-81c3-7bc40ea536b9
valid_from        : 
valid_until       : 
forced_install_on : 
changelog         : 
min_os_version    : 
max_os_version    : 
icon_sha256sum    : 25b0beebb5a383774223652d265c32e809a14cbe789e6ffc140a1e9e43591361
signer            : Tranquil IT
signer_fingerprint: 8c5127a75392be9cc9afd0dbae1222a673072c308c14d88ab246e23832e8c6bb
signature_date    : 2025-05-14T11:41:50.000000
signed_attributes : package,version,architecture,section,priority,name,categories,maintainer,description,depends,conflicts,maturity,locale,target_os,min_wapt_version,sources,installed_size,impacted_process,description_fr,description_pl,description_de,description_es,description_pt,description_it,description_nl,description_ru,audit_schedule,editor,keywords,licence,homepage,package_uuid,valid_from,valid_until,forced_install_on,changelog,min_os_version,max_os_version,icon_sha256sum,signer,signer_fingerprint,signature_date,signed_attributes
signature         : LjD19+3aCx62khGR6rQmDsKCe8tUdE9zUWOFpjkOptjzWnaMy7+DtwHmpQ/7Hg5EmIyxOOm49rq+JZ30cZ7HNHwMt2yjwPdQoft9hvR/pZBI38Sh9ZGaRP7s0bm2YlxmG8DT3NeyhZrFp4Ew7MyFVkeoqm+4luTAy7D4o4T9drJhn69jSFqtRXDqzyGA889z4u7R1JomQXdgoTJKQ3unLRvwOorrsCHzsTsVSqBWp7l+UbarqWsdJ2E4aCMsXTAiHIQSfH6NmV3ZJ1fhf8QrIJiDxmL5DP3FtikRudaV7Ze/j1fkmZGpJbTE5V7Zi0qhFlC+s4spYit0msuR42QLNg==

# -*- coding: utf-8 -*-
from setuphelpers import *
from waptutils import WAPT_VERSION_FULL
import pyldap
import os, json
from typing import List
from dataclasses import dataclass
import platform
import xml.etree.ElementTree as ET

###
# DO NOT TOUCH THESE VARIABLES
###
wapt_session_setup = "/etc/xdg/autostart/tis-waptsessionsetup.desktop"
local_share_dir = makepath(user_home_directory(), ".local", "share")

bookmarks_wapt_file = makepath(application_data(), 'wapt', 'bookmarks.json')

default_conf_smbnetfs = "/etc/smbnetfs.conf"
default_conf_samba = "/etc/samba/smb.conf"

current_username = get_current_user().lower()
current_computer = f"{get_computername().split('.')[0]}$"

def install():

    if Version(WAPT_VERSION_FULL.split('-')[0]) < Version("2.6.0.16881"):
        error("\!/ This package requires at least wapt version 2.6.0.16881 or higher \!/")

    if (is_debian_based() or is_redhat_based()) and not isfile(wapt_session_setup):
        error("\!/ This package requires the installation of waptagent-gui to be fully functional \!/")

    # Installing necessary dependencies
    if is_debian_based():
        install_apt('smbnetfs krb5-user smbclient')
    elif is_redhat_based():
        install_rpm('smbnetfs krb5-user smbclient')

def session_setup():

    domain_name = get_domain_name()
    if not domain_name:
        error('\!/ Unable to retrieve domain name \!/')

    network_share = NetworkShare()

    # Scan the domain sysvol to find the network drive GPO 
    ge = GpoEngine(domain_name)
    network_share.shares = convert_gpo_network_drive_to_share(ge.get_network_drive())

    # Alternative to GpoEngine, but must be defined manually. An example can be found below
    # ldap = LdapManager(domain_name)
    # if ldap.user_is_member_of_groups(['Administrators']):
    #     network_share.add('S', "Sysvol Another", 'fschelfaut.lan', 'sysvol/.stfolder')
    #     network_share.add('E', "Sysvol", 'fschelfaut.lan', 'sysvol')

    # if ldap.user_is_member_of_groups(['grp_bitwarden']):
    #     network_share.add('P', "Partage ISO", 'srvtemplates.ad.tranquil.it', 'iso')
    #     network_share.add('S', "Partage TEST", 'srvtemplates.ad.tranquil.it', 'test$')
    #     network_share.add('A', "Partage TEST2", 'srvtemplates.ad.tranquil.it', 'test2$')

    # if ldap.computer_is_member_of_groups(['test_wapt_profile', 'testimbrication2']):
    #     network_share.add('C', "Partage TEST2 Computer", 'srvtemplates.ad.tranquil.it', 'test2$')

    # if ldap.user_is_in_OU('OU=srp_hard,OU=users,OU=tranquilit,DC=ad,DC=tranquil,DC=it'):
    #     network_share.add('B', "Partage TEST2 User", 'srvtemplates.ad.tranquil.it', 'test2$')
    
    # Cleanup share before mounting new one 
    network_share.cleanup()

    # Mount all share to the current user  
    network_share.mount()

    return "RERUN"

def session_cleanup():
    
    # Currently doesn't work because session_setup returns RERUN
    # So WAPT never knows the package is installed
    
    network_share = NetworkShare()
    network_share.cleanup()
    
    return "OK"

def is_debian_based():
    return isfile('/etc/debian_version')

def is_redhat_based():
    return isfile('/etc/redhat-release')

def is_linux():
    return platform.system() == 'Linux'

def is_windows():
    return platform.system() == 'Windows'

def is_darwin():
    return platform.system() == 'Darwin'

def get_domain_name():
    try:
        if is_windows():
            return get_domain_fromregistry().lower()
        elif is_linux():
            return next(l.split(':')[-1].split('@')[-1].lower() for l in run('klist').splitlines() if l.startswith('Default principal'))
        elif is_darwin():
            return json.loads(run('klist --json'))['principal'].split('@')[-1].lower()
    except:
        return None

def read_txt_file(file: str) -> list:
    if isfile(file):
        with open(file) as f:
            return f.read().splitlines()
    return [] 

def write_txt_file(file: str, data: str, mode: str = 'w', endline: str = "\n") -> None:
    with open(file=file, mode=mode) as f:
        f.write(data + endline)

def get_bookmark_by_wapt():
    bookmarks_data = []
    if isfile(bookmarks_wapt_file):
        bookmarks_data = json_load_file(bookmarks_wapt_file)
        
        # To handle old case. Before it was a dict
        if type(bookmarks_data) != list:
            bookmarks_data = []
        
    return bookmarks_data

def save_bookmark_added_by_wapt(bookmark_dict):
    bookmarks_data = get_bookmark_by_wapt()

    if bookmark_dict not in bookmarks_data:
        bookmarks_data.append(bookmark_dict)

    json_write_file(bookmarks_wapt_file, bookmarks_data)
    
def remove_bookmark_added_by_wapt(bookmark_dict):
    bookmarks_data = get_bookmark_by_wapt()

    if bookmark_dict in bookmarks_data:
        bookmarks_data.remove(bookmark_dict)

    json_write_file(bookmarks_wapt_file, bookmarks_data)

def convert_gpo_network_drive_to_share(network_drive_dict):

    return [
        Share(
            share['letter'], 
            share.get('label', share['path'].rsplit('\\', 1)[-1]), 
            *share['path'].removeprefix(r'\\').split('\\', 1)
        )
        for share in network_drive_dict.values()
    ]

@dataclass
class Share:
    letter: str
    name: str
    server: str
    path: str

class GnomeBookmark:
    
    def __init__(self):
        
        self.config_dir = makepath(application_data(), "gtk-3.0")
        
        if not isdir(self.config_dir):
            mkdirs(self.config_dir)
        
        self.bookmarks_file = makepath(self.config_dir, "bookmarks")
    
    def add(self, bookmark_uri: str, bookmark_name: str):
        """
        Add a GNOME bookmark for the current user
        
        :param bookmark_uri: The URI of the bookmark (e.g., "file:///usr/share/doc/" or "ftp://ftp.gnome.org/").
        :param bookmark_name: A name for the bookmark (e.g., "Documentation").
        """

        bookmark_line = f"file://{bookmark_uri} {bookmark_name}".strip()

        # Add the bookmark only if it doesn't exist
        if bookmark_line not in read_txt_file(self.bookmarks_file):
            write_txt_file(self.bookmarks_file, bookmark_line, mode='a')
            save_bookmark_added_by_wapt({'uri': bookmark_uri, 'name': bookmark_name})
        
            print(f"[+] GNOME - Bookmark added: {bookmark_name if bookmark_name else bookmark_line}")
        else:
            print(f"[!] GNOME - Bookmark already exists: {bookmark_name if bookmark_name else bookmark_line}")
    
    def remove(self, bookmark_uri: str, bookmark_name: str):
        """
        Remove a GNOME bookmark for the current user
        
        :param bookmark_uri: The URI of the bookmark (e.g., "file:///usr/share/doc/" or "ftp://ftp.gnome.org/").
        :param bookmark_name: A name for the bookmark (e.g., "Documentation").
        """

        bookmark_line = f"file://{bookmark_uri} {bookmark_name}".strip()
        bookmarks = read_txt_file(self.bookmarks_file)

        # Remove the bookmark only if it exist
        if bookmark_line in bookmarks:
            bookmarks.remove(bookmark_line)
            write_txt_file(self.bookmarks_file, bookmarks)
            remove_bookmark_added_by_wapt({'uri': bookmark_uri, 'name': bookmark_name})
        
            print(f"[+] GNOME - Bookmark removed: {bookmark_name if bookmark_name else bookmark_line}")
        else:
            print(f"[!] GNOME - Bookmark already removed: {bookmark_name if bookmark_name else bookmark_line}")

    def remove_all(self):

        wapt_bookmarks = get_bookmark_by_wapt()
        if not wapt_bookmarks:
            print("[!] No WAPT bookmarks found. Nothing to remove.")
            return

        gnome_bookmarks = set(read_txt_file(self.bookmarks_file))
        original_gnome_bookmarks = gnome_bookmarks.copy()

        # Remove WAPT bookmarks from GNOME bookmarks
        wapt_lines = {f"file://{bookmark['uri']} {bookmark['name']}" for bookmark in wapt_bookmarks}
        gnome_bookmarks -= wapt_lines

        updated_wapt_bookmarks = [bookmark for bookmark in wapt_bookmarks if f"file://{bookmark['uri']} {bookmark['name']}" in gnome_bookmarks]

        if gnome_bookmarks != original_gnome_bookmarks:
            write_txt_file(self.bookmarks_file, '\n'.join(gnome_bookmarks))
            json_write_file(bookmarks_wapt_file, updated_wapt_bookmarks)
            print(f"[+] GNOME - Successfully removed bookmark(s) added by WAPT for '{current_username}'.")
        else:
            print("[*] GNOME - No matching WAPT bookmarks found in GNOME bookmarks. Nothing changed.")

class KdeBookmark:
    
    def __init__(self):
        
        ET.register_namespace('bookmark', 'http://www.freedesktop.org/standards/desktop-bookmarks')
        ET.register_namespace('mime',     'http://www.freedesktop.org/standards/shared-mime-info')
        ET.register_namespace('kdepriv',   'http://www.kde.org/kdepriv')
        
        self.bookmarks_file = makepath(local_share_dir, "user-places.xbel")

    @staticmethod
    def next_bookmark_id(existing_ids, timestamp):
        
        base = str(timestamp)
        suffixes = [int(i.split('/', 1)[1]) if '/' in i else 1 for i in existing_ids if i.split('/', 1)[0] == base]
        n = max(suffixes) if suffixes else 0
        return f"{base}/{n+1}"

    def add(self, bookmark_uri: str, bookmark_name: str):
        """
        Add a KDE bookmark for the current user
        
        :param bookmark_uri: The URI of the bookmark (e.g., "file:///usr/share/doc/").
        :param bookmark_name: A user-friendly name for the bookmark (e.g., "Documentation").
        """

        if not isfile(self.bookmarks_file):
            return
        
        full_bookmark_uri = f"file://{bookmark_uri}"
       
        tree = ET.parse(self.bookmarks_file)
        root = tree.getroot()

        # Check if bookmark already added
        for bm in root.findall("bookmark"):
            if bm.get("href") == full_bookmark_uri and bm.find("title").text == bookmark_name:
                print(f"[!] KDE - Bookmark already exists: {bookmark_name if bookmark_name else full_bookmark_uri}")
                return

        bookmark = ET.Element("bookmark", href=full_bookmark_uri)

        title = ET.SubElement(bookmark, "title")
        title.text = bookmark_name

        info = ET.SubElement(bookmark, "info")

        metadata_fd = ET.SubElement(info, "metadata", owner="http://freedesktop.org")
        ET.SubElement(metadata_fd, "bookmark:icon", name="folder")

        metadata_kde = ET.SubElement(info, "metadata", owner="http://www.kde.org")
        timestamp = int(time.time())
        
        existing_ids = [id_elem.text for id_elem in root.findall('.//bookmark/info/metadata/ID') if id_elem.text]
        new_id = self.next_bookmark_id(existing_ids, timestamp)

        ET.SubElement(metadata_kde, "ID").text = new_id

        root.append(bookmark)
        ET.indent(tree)
        
        tree.write(self.bookmarks_file)
        
        save_bookmark_added_by_wapt({'uri': bookmark_uri, 'name': bookmark_name})
        
        print(f"[+] KDE - Bookmark added: {bookmark_name if bookmark_name else full_bookmark_uri}")

    def remove(self, bookmark_uri: str, bookmark_name: str):
        """
        Remove a KDE bookmark for the current user

        :param bookmark_uri: The URI of the bookmark to remove (e.g., "usr/share/doc/").
        :param bookmark_name: A user-friendly name for the bookmark (e.g., "Documentation").
        """
        if not isfile(self.bookmarks_file):
            print(f"[!] KDE - Bookmark file not found: {self.bookmarks_file}")
            return

        full_bookmark_uri = f"file://{bookmark_uri}"
        
        tree = ET.parse(self.bookmarks_file)
        root = tree.getroot()

        # Look for the exact bookmark element
        for bm in root.findall("bookmark"):
            if bm.get("href") == full_bookmark_uri and bm.find("title").text == bookmark_name:
                root.remove(bm)
                
                ET.indent(tree)
                tree.write(self.bookmarks_file)

                remove_bookmark_added_by_wapt({'uri': bookmark_uri, 'name': bookmark_name})

                print(f"[+] KDE - Bookmark removed: {bookmark_name if bookmark_name else full_bookmark_uri}")
                return

        print(f"[!] KDE - Bookmark not found: {bookmark_name if bookmark_name else full_bookmark_uri}")

    def remove_all(self):
        """
        Remove a KDE bookmark for the current user

        :param bookmark_uri: The URI of the bookmark to remove (e.g., "usr/share/doc/").
        :param bookmark_name: A user-friendly name for the bookmark (e.g., "Documentation").
        """
               
        wapt_bookmarks = get_bookmark_by_wapt()
        if not wapt_bookmarks:
            print("[!] No WAPT bookmarks found. Nothing to remove.")
            return
        
        if not isfile(self.bookmarks_file):
            print(f"[!] KDE - Bookmark file not found : {self.bookmarks_file}")
            return
        
        tree = ET.parse(self.bookmarks_file)
        root = tree.getroot()

        wapt_ids = {(f"file://{bm['uri']}", bm['name']) for bm in wapt_bookmarks}
        original_wapt = list(wapt_bookmarks)

        # Check if bookmark already added
        removed = set() # to avoid duplicate
        for bm in root.findall("bookmark"):
            href = bm.get("href")
            title = bm.find("title").text
            if (href, title) in wapt_ids:
                root.remove(bm)
                removed.add((href, title))

        if removed:
            ET.indent(tree)
            tree.write(self.bookmarks_file)

            updated_wapt = [bm for bm in original_wapt if (f"file://{bm['uri']}", bm['name']) not in removed]
            json_write_file(bookmarks_wapt_file, updated_wapt)

            print(f"[+] KDE - Successfully removed bookmark(s) added by WAPT for '{current_username}'.")
        else:
            print("[*] KDE - No matching WAPT bookmarks found in KDE bookmarks. Nothing changed.")

class LinuxBookmarkManager:
    """
    Manage desktop bookmarks on Linux (GNOME or KDE).
    Uses LinuxDesktop.detect() to determine the environment and
    dispatches add/remove operations accordingly.
    """

    def __init__(self):
        self.desktop = LinuxDesktop().detect()
        
        self.gnome_bookmark = GnomeBookmark()
        self.kde_bookmark = KdeBookmark()

    def add(self, uri: str, name: str) -> None:
        if self.desktop == "GNOME":
            self.gnome_bookmark.add(uri, name)
        elif self.desktop == "KDE":
            self.kde_bookmark.add(uri, name)
        else:
            print(f"Adding bookmarks not supported on {self.desktop}")

    def remove(self, uri: str, name: str) -> None:
        if self.desktop == "GNOME":
            self.gnome_bookmark.remove(uri, name)
        elif self.desktop == "KDE":
            self.kde_bookmark.remove(uri, name)
        else:
            print(f"Removing bookmarks not supported on {self.desktop}")

    def remove_all(self) -> None:
        if self.desktop == "GNOME":
            self.gnome_bookmark.remove_all()
        elif self.desktop == "KDE":
            self.kde_bookmark.remove_all()
        else:
            print(f"Removing bookmarks not supported on {self.desktop}")

class LinuxDesktop:
    
    _desktops = {
        "GNOME": r"gnome-session|cinnamon$",
        "KDE": r"kded[0-9]$",
        "UNITY": r"unity-panel",
        "XFCE": r"xfce[0-9]-session",
        "MATE": r"mate-panel$",
        "LXDE": r"lxsession$"
    }

    # TODO: change that with a subprocess to avoid the log warning
    def _is_running(self, pattern):
        return run_notfatal(f"ps -e | grep -E '{pattern}'")

    def detect(self):
        for name, pattern in self._desktops.items():
            return_code = self._is_running(pattern).returncode
            if not return_code:
                return name
        return "UNKNOWN"

class LdapManager:
    """
    Manages the connection and LDAP queries (Active Directory).
    """

    def __init__(self, domain_name):
        self.domain_ad = domain_name
        self.client = pyldap.PyLdapClient(domain_name=self.domain_ad)

        result = self.client.bind_sasl_kerberos()
        if not result[0]:
            error('Failed connect to active directory')

        self.base_domain_name = self.client.default_dn()

        self.all_user_info = self.__get_all_info_current_user()
        self.all_computer_info = self.__get_all_info_current_computer()

        self.user_DN = next(iter(self.all_user_info))
        self.user_OU = self.__get_OU_from_DN(self.user_DN)

        self.computer_DN = next(iter(self.all_computer_info))
        self.computer_OU = self.__get_OU_from_DN(self.computer_DN)

    def __get_OU_from_DN(self, dn: str):
        return ','.join(dn.split(',')[1:])

    def __get_all_info_current_user(self):
        """
        Retrieves all AD parameters of the current user.
        """

        return self.client.search_all(self.base_domain_name, f'(sAMAccountName={current_username})')

    def __get_all_info_current_computer(self):
        """
        Retrieves all AD parameters of the current computer.
        """

        return self.client.search_all(self.base_domain_name, f'(sAMAccountName={current_computer})')
    
    def __is_in_OU(self, object_dn: str, OU: str) -> bool:
        """
        Checks if the object's DN ends with the specified OU.
        """
        return object_dn.endswith(OU.lower())

    def user_is_in_OU(self, OU: str) -> bool:
        """
        Checks if the user is in the specified OU.
        """
        return self.__is_in_OU(self.user_DN, OU)

    def user_is_member_of_groups(self, groups: List[str]) -> bool:
        """
        Checks if the given user belongs exactly to the provided list of groups.
        """

        user_groups = self.client.get_authorized_user_groups(current_username, groups)
        return set(user_groups if user_groups else [] ) == set(groups)

    def computer_is_member_of_groups(self, groups: List[str]) -> bool:
        """
        Checks if the given computer belongs exactly to the provided list of groups.
        """

        # This current method not works, mormot need to fix that 
        # computer_groups = self.client.get_authorized_user_groups(current_computer, groups)
        # return computer_groups == groups

        group_filters = ''.join(f'(sAMAccountName={group})' for group in groups)
        ldap_filter = (
            f"(&(member:1.2.840.113556.1.4.1941:={self.computer_DN})"
            f"(|{group_filters}))"
        )

        self.client.search(self.base_domain_name, False, ldap_filter, ["distinguishedName"])
        response = self.client.search_result()

        computer_groups = [
            dn.split(',')[0].removeprefix('CN=')
            for entry in response
            for attr in entry.object_attributes
            for dn in attr.values
        ]

        return set(computer_groups) == set(groups)
    
    def computer_is_in_OU(self, OU: str) -> bool:
        """
        Checks if the computer is in the specified OU.
        """
        return self.__is_in_OU(self.computer_DN, OU)

    def print_all_info_current_user(self):
        """
        Prints all AD parameters of the current user.
        """

        print("AD user infos:", json.dumps(self.all_user_info, indent=4))

    def print_all_info_current_computer(self):
        """
        Prints all AD parameters of the current computer.
        """

        print("AD user infos:", json.dumps(self.all_computer_info, indent=4))

class NetworkShare():
    """
    Manages the mounting of network drives in the user's session depending on the type of machine.
    """

    def __init__(self):

        self.linux_mountdir = makepath(user_home_directory(), ".mountpoint")
        self.linux_network_drive_dir = makepath(user_home_directory(), "network_drive")
        self.shares: List[Share] = []

        if is_debian_based() or is_redhat_based():
            self.share_manager = SmbNetFsManager(self.linux_mountdir, self.linux_network_drive_dir)
        elif is_windows():
            self.share_manager = WindowsNetworkShare()
        elif is_darwin():
            self.share_manager = DarwinNetworkShare()

    def add(self, letter, name, server, path):
        """
        Adds a network share to the list of shares to be mounted, avoiding duplicates of 
        letter or (server, path).
        """

        if not letter or len(letter) > 1:
            raise ValueError("The share letter must be a single character (e.g., 'P').")

        if not server:
            raise ValueError("The server name must not be empty.")

        if not path:
            raise ValueError("The share path must not be empty.")
        
        if any('\\' in s for s in [server, path]):
            raise ValueError("The share server and path must not contains backslash ('\\') but slash ('/') instead !")

        # Check if the letter is already in use
        for s in self.shares:
            if s.letter == letter:
                print(f"The share '{letter}' already exists. Add operation canceled.")
                return

        # Check if couple (server, path) already in use
        for s in self.shares:
            if s.server == server and s.path == path:
                print(f"The share '{server}/{path}' already exists. Add operation canceled.")
                return

        new_share = Share(letter, name, server, path)
        self.shares.append(new_share)
        print(f"Share '{letter}' to '{server}/{path}' successfully added.")

    def mount(self):
        """
        Mounts the shares using the share manager.
        """

        self.share_manager.mount_shares(self.shares)

    def cleanup(self):
        """
        Cleans up/unmounts the shares.
        """

        self.share_manager.umount_shares(self.shares)

class WindowsNetworkShare():

    def __init__(self):
        import win32com.client
        self.WshNetwork = win32com.client.Dispatch("WScript.Network")

    def get_mapped_drives(self) -> List[Share]:
        """
        Retrieved mounts Windows network share
        """

        drives = self.WshNetwork.EnumNetworkDrives()
        mapped_drives = []
        for i in range(0, drives.Count(), 2):
            splited_path = drives.Item(i + 1).removeprefix(os.sep * 2).split(os.sep)
            letter = drives.Item(i).replace(':', '')
            server, path = splited_path[0], '/'.join(splited_path[1:])
            mapped_drives.append(Share(letter, "", server, path))
        return mapped_drives

    def is_mapped(self, share: Share):

        return any(d.letter in share.letter for d in self.get_mapped_drives())

    def rename_mapped_drive(self, share: Share):
        
        key_name = f"##{share.server}#{share.path.replace('/', '#')}"
        reg_path = 'SOFTWARE\Microsoft\Windows\CurrentVersion\Explorer\MountPoints2'
        with reg_openkey_noredir(HKEY_CURRENT_USER, makepath(reg_path, key_name), sam=KEY_WRITE, create_if_missing=True) as key:
            reg_setvalue(key, '_LabelFromReg', share.name, REG_SZ)

    def remove_named_drive(self, share: Share):

        key_name = f"##{share.server}#{share.path.replace('/', '#')}"
        reg_path = 'SOFTWARE\Microsoft\Windows\CurrentVersion\Explorer\MountPoints2'
        if reg_key_exists(HKEY_CURRENT_USER, makepath(reg_path, key_name)):
            registry_deletekey(HKEY_CURRENT_USER, 'SOFTWARE\Microsoft\Windows\CurrentVersion\Explorer\MountPoints2', key_name)

    def mount_shares(self, shares: List[Share]):
        """
        Mounts Windows network share
        """

        for share in shares:
            try:
                network_path = rf"\\{share.server}\{share.path.replace('/', os.sep)}"
                self.WshNetwork.MapNetworkDrive(f"{share.letter}:", network_path)
                self.rename_mapped_drive(share)
                print(f"Successfully mapped {share.letter}: ({network_path})")
            except Exception as e:
                print(f"Unable to mount {share.letter}: {e}")

    def umount_shares(self, shares: List[Share]):
        """
        Unmounts Windows network share
        """

        for share in shares:
            if not self.is_mapped(share):
                continue
            try:
                self.WshNetwork.RemoveNetworkDrive(f"{share.letter}:", True)
                self.remove_named_drive(share)
                print(f"Removed mapped drive {share.letter}: done")
            except Exception as e:
                print(f"Unable to remove mapped {share.letter}: {e}")

class DarwinNetworkShare():

    def __init__(self):
        pass

    def mount_shares(self):
        pass

    def umount_shares(self):
        pass

class SmbNetFsManager():
    """
    Manages the mounting of network drives on Debian-based machines.
    """

    def __init__(self, mountdir, network_drive_dir):
        
        self.mountdir = mountdir
        self.network_drive_dir = network_drive_dir

        self.user_smb_folder = makepath(user_home_directory(), ".smb")
        self.user_conf_smbnetfs = makepath(self.user_smb_folder, "smbnetfs.conf")
        self.user_host_smbnetfs = makepath(self.user_smb_folder, "wapt-smbnetfs.host")

        self.all_fqdn_servers = list()
        self.mounted_servers = list()
        
        self.bookmark_manager = LinuxBookmarkManager()

        self.__setup_smbnetfs_config()

    def __setup_smbnetfs_config(self):
        """
        Sets up the configuration for smbnetfs in the user's home directory.
        """

        line_to_add = f'include \t\t"{os.path.basename(self.user_host_smbnetfs)}"'
        mkdirs(self.user_smb_folder)

        if not isfile(self.user_conf_smbnetfs):
            filecopyto(default_conf_smbnetfs, self.user_smb_folder)

        lines = read_txt_file(self.user_conf_smbnetfs)
        if line_to_add not in lines:
            write_txt_file(self.user_conf_smbnetfs, line_to_add, mode='a')

    def __setup_smbnetfs_host(self):
        """
        Writes the servers configuration for smbnetfs.
        """

        conf = str()
        for server in self.all_fqdn_servers:
            conf += f"host {server} visible=true\n"

        write_txt_file(self.user_host_smbnetfs, conf)
        os.chmod(self.user_host_smbnetfs, 0o600)

    def __is_smbnetfs_mounted(self):
        """
        Checks if smbnetfs is already mounted.
        """

        return any(line for line in read_txt_file('/etc/mtab') if line.startswith(f'smbnetfs {self.mountdir}'))

    def __mount_smbnetfs(self):
        """
        Mounts smbnetfs
        """

        mkdirs(self.mountdir)
        run(f'smbnetfs {self.mountdir}')
        
        mounted_servers = os.listdir(self.mountdir)
        for server_to_mount in self.all_fqdn_servers:
            if server_to_mount not in mounted_servers:
                print(f"\!/ Failed to mount smbnetfs share for '{server_to_mount}' server \!/")
            else:
                self.mounted_servers.append(server_to_mount)

    def mount_shares(self, shares: List[Share]):
        """
        Mounts the specified shares, creating symbolic links.
        """

        self.all_fqdn_servers = list({share.server for share in shares})
        self.__setup_smbnetfs_host()

        if not self.__is_smbnetfs_mounted():
            self.__mount_smbnetfs()

        mkdirs(self.network_drive_dir)

        for share in shares:
            
            if share.server not in self.mounted_servers:
                print(f"'\!/ Failed to create share '{share.letter}' because smbnetfs cannot mount '{share.server}' \!/'")
                continue

            share_mountdir = makepath(self.mountdir, share.server, share.path)
            share_destination = makepath(self.network_drive_dir, share.letter)

            # ensure the symlink not exist  
            if os.path.islink(share_destination):
                remove_file(share_destination)

            os.symlink(share_mountdir, share_destination)
            
            self.bookmark_manager.add(share_destination, share.name)

        if self.mounted_servers:
            print(f"Successfully mapped: {', '.join([share.letter for share in shares if share.server in self.mounted_servers])}")
        else:
            print(f"No network shares mapped...")

    def umount_shares(self, shares: List[Share]):
        """
        Unmounts smbnetfs if mounted and removes the network drive directory.
        """

        if self.__is_smbnetfs_mounted():
            run(f"fusermount -u {self.mountdir}")

        # Cleanup the folder
        if isdir(self.network_drive_dir) and isdir(self.mountdir):
            remove_tree(self.network_drive_dir)
            remove_tree(self.mountdir)
            
        self.bookmark_manager.remove_all()

class GpoEngine:
    def __init__(self, domain_name):
        self.domain_ad = domain_name
        self.site = pyldap.cldap_get_domain_info(domain_name=self.domain_ad)['client_site']

        self.client = pyldap.PyLdapClient(domain_name=self.domain_ad)
        result = self.client.bind_sasl_kerberos()
        if not result[0]:
            error('Failed connect to active directory')

        self.base_domain_name = self.client.default_dn()

        # Récupération des informations de l'utilisateur
        user_result = self.client.search_all(self.base_domain_name, f"(sAMAccountName={current_username})", ["primaryGroupID", "objectSid"])
        self.dn_user = next(iter(user_result))
        user_values = next(iter(user_result.values()))
        self.primary_group_id = user_values['primaryGroupID'][0]
        self.object_sid_user = user_values['objectSid'][0]

        # Récupération des unités organisationnelles de l'utilisateur
        result = self.client.search_all(self.base_domain_name, f"(sAMAccountName={current_username})")
        parent_ou_split = list(result)[0].split(',')
        self.list_ou = [','.join(parent_ou_split[i:]) for i in range(len(parent_ou_split))]

        # Détermination du contrôleur de domaine et du nom NetBIOS
        self.controller = pyldap.cldap_get_ldap_controller(domain_name=self.domain_ad).split(':')[0]
        self.netbios_name = self._get_netbios_name()

    def _get_netbios_name(self):
        # Recherche du nom NetBIOS du contrôleur
        search_filter = f"(sAMAccountName={self.controller.split('.')[0]}$)"
        attributes = ['servicePrincipalName']
        service_principal_names = next(iter(self.client.search_all(self.base_domain_name, search_filter, attributes).values())).get('servicePrincipalName')
        return next((u.split('/')[-1] for u in service_principal_names if len(u.split('/')) > 2 and '.' not in str(u.split('/')[-1])), None)
    
    def apply_gpo(self, cn_guid):
        guid = cn_guid.split('=')[1].split(',')[0]
        drive_file = rf"\\{self.controller}\sysvol\{self.domain_ad}\Policies\{guid}\User\Preferences\Drives\Drives.xml"
        return self._parse_drives_xml_with_targeting(drive_file)
    
    def _filter_sid(self, sidtest):
        # Vérification de l'appartenance à un SID
        if not sidtest:
            return False
        if sidtest == self.object_sid_user:
            return True
        return bool(self.client.search_all(self.base_domain_name, f"(&(sAMAccountName={current_username})(objectsid={sidtest}))", ['objectsid']))
    
    def filter_domain(self, attrs):
        # Vérification du contexte de l'utilisateur
        return attrs['userContext'] == "1" and self.netbios_name == attrs['name']

    def filter_computer(self, attrs):
        # Vérification du contexte machine (NETBIOS ou DNS)
        if attrs['type'] == "NETBIOS":
            return get_hostname().split('.')[0] == attrs['name'].lower()
        if attrs['type'] == "DNS":
            return get_fqdn() == attrs['name'].lower()
        return False

    def filter_user(self, attrs):
        return self._filter_sid(attrs.get('sid'))
    
    def filter_group(self, attrs):
        # Vérification de l'appartenance à un groupe
        sidtest = attrs.get('sid')
        if not sidtest or attrs['localGroup'] == '1' or attrs['userContext'] != '1':
            return False
        
        if sidtest.startswith(self.object_sid_user.rsplit('-', 1)[0]) and str(sidtest.split('-')[-1]) == str(self.primary_group_id):
            return True
        
        if attrs['primaryGroup'] == '1':
            return False

        return bool(self.client.search_all(self.base_domain_name, f"(&(member:1.2.840.113556.1.4.1941:={self.dn_user})(objectsid={sidtest}))", ['objectsid']))

    def _read_xml_file(self, xml_file_path):
        # Lecture du fichier XML
        if is_windows():
            with open(xml_file_path, encoding='utf-8') as file:
                return file.read()
        
        path_parts = xml_file_path.split('\\', 4)

        if is_linux():
            server, share, subpath, file = path_parts[2], path_parts[3], path_parts[4].rsplit('\\', 1)[0], path_parts[4].rsplit('\\', 1)[1]
            result = run(f'smbclient -N --use-kerberos=desired //{server}/{share} -D "{subpath}" -c "get {file} /dev/stdout" --realm={self.domain_ad.upper()}').rsplit('getting file ', 1)[0]
            # Find the index of the first line beginning with '<?xml'
            lines = result.splitlines()
            start_of_xml = next(i for i, line in enumerate(lines) if line.strip().startswith('<?xml'))
            return "\n".join(lines[start_of_xml:])
        
        if is_darwin():
            # TODO: Umount the share at the end
            tmp_sysvol_mountdir = f"/private/tmp/sysvol_{current_username}"
            if not isdir(tmp_sysvol_mountdir):
                mkdirs(tmp_sysvol_mountdir)
            run(f'mount_smbfs //{self.controller}/sysvol {tmp_sysvol_mountdir}')

            converted_path = '/'.join(path_parts[-1].split('\\'))
            with open(f"{tmp_sysvol_mountdir}/{converted_path}", encoding='utf-8') as file:
                return file.read()
         
    def evaluate_filters(self, filters_element):

        last_condition_result = None
        last_operator = "AND"

        for filter_element in filters_element:
            #Pour le premier check le last result est True si condition AND, Sinon False
            if last_condition_result is None:
                if filter_element.attrib["bool"] == "OR":
                    last_condition_result = False
                else:
                    last_condition_result = True
            ###################################################

            #On évalue le filter et on passe le resulat précédent pour renvoyer le bon résultat en fonction de l'operateur
            result_filter = self.evaluate_filter(filter_element,last_condition_result)

            # comportement étrange mais visiblement le moteur de ciblage fonctionne comme ceci actuellement
            # Si le dernier résultat est True et que le dernier Operateur a True et l'operateur actuelle est a True alors on s'arrête
            if last_condition_result and last_operator == "OR" and  filter_element.attrib["bool"] == "OR":
                return True
            ####################################################################

            # on conserve les resultat pour le prochaine check
            last_condition_result = result_filter
            last_operator = filter_element.attrib["bool"]

        return last_condition_result
    
    def evaluate_filter(self, filter_element, last_condition_result):
        #on prend que les filtre pris en charge sinon on dit que le résultat c'est non
        type_filter = filter_element.tag
        if (type_filter != 'FilterCollection') and (not type_filter in dir(self)):
            return False

        operator = filter_element.attrib["bool"]

        # si la condition c'est OR et que les dernier check on a résultat a True alors pas besoin de tester
        if operator == "OR" and last_condition_result:
            return True

        #on appel la fonction associter au filter
        if type_filter == "FilterCollection":
            result_filter = self.evaluate_filters(filter_element.findall("*"))
        else:
            result_filter = getattr(self,type_filter)(filter_element.attrib)

        #on vérifie si la condition attendu doit être inversé
        if filter_element.attrib["not"] == '1':
            result_filter = not result_filter

        #On renvoie les resultat en calculant avec les last result
        if operator == "OR":
            return result_filter or last_condition_result

        if operator == "AND":
            return result_filter and last_condition_result

    def _parse_drives_xml_with_targeting(self, xml_file_path):
        # Analyse et extraction des informations des lecteurs réseau
        try:
            xml_data = self._read_xml_file(xml_file_path)
            root = ET.fromstring(xml_data)
            share_paths = {}
            
            for drive in root.findall("Drive"):
                if drive.attrib.get('disabled') == "1":
                    continue
                properties = drive.find("Properties")
                if properties is None:
                    continue
                props = {key: properties.get(key) for key in ['path', 'useLetter', 'label', 'action', 'letter'] if properties.get(key)}
                if drive.find("Filters") and not self.evaluate_filters(drive.find("Filters")):
                    continue
                if "letter" in props:
                    share_paths[props["letter"]] = props
                    
            return share_paths
        except (FileNotFoundError, ET.ParseError) as e:
            print(f"Erreur : {e}")
            return {}
    
    def check_acl_gpo(self, data):
        datasplit = data.split('(')

        group_primary =  self.object_sid_user.rsplit('-',1)[0] + '-' + str(self.primary_group_id)

        all_sid = {}
        for p in datasplit:
            if ';edacfd8f-ffb3-11d1-b41d-00a0c968f939;' in p:
                sid_test = p.split(';')[5].split(')')[0]
                all_sid[sid_test] = None

        all_user_sid_member = {group_primary:None,"AU":None}

        if all_sid:
            filter_ldap = "(&(|%s)(member:1.2.840.113556.1.4.1941:=%s))" %  (''.join(['(objectsid=' + u + ')' for u in all_sid]), self.dn_user)
            result_ldap = self.client.search_all(self.base_domain_name, filter_ldap,['objectSid'])
            for entry in result_ldap:
                all_user_sid_member[result_ldap[entry]['objectSid'][0]] = None

        for p in datasplit:
            if ';edacfd8f-ffb3-11d1-b41d-00a0c968f939;' in p:
                if not p.split(';')[0] == 'OD':
                    continue
                sid_test = p.split(';')[5].split(')')[0]
                if sid_test in all_user_sid_member:
                    return False

        for p in datasplit:
            if ';edacfd8f-ffb3-11d1-b41d-00a0c968f939;' in p:
                if not p.split(';')[0] == 'OA':
                    continue
                sid_test = p.split(';')[5].split(')')[0]
                if sid_test in all_user_sid_member:
                    return True

        return False

    def get_network_drive(self):
        """ Récupère les lecteurs réseaux en fonction des GPO appliquées. """

        dict_network_drive = {}
        dict_ou_result = {}
        dict_gpo_value = {}

        # Recherche de toutes les OU et récupération des GPO associées
        for ou in self.list_ou:
            dict_ou_result[ou] = self.client.search_all(self.base_domain_name,
                f"(&(distinguishedName={ou})(gPLink=*)(!(gPOptions=2))(!(gPOptions=3)))",
                ['gPOptions','gPLink']
            ).get(ou)

        if self.site :
            rsite = self.client.search_all(f'CN=Configuration,{self.base_domain_name}', f'(&(name={self.site})(objectClass=site)(gPLink=*))')
            if rsite:
                dict_ou_result[self.list_ou[-1]] = list(rsite.values())[0]

        # Détection d'une OU avec Block Inheritance
        block_inheritance = next(
            (
                ou.lower() for ou in self.list_ou
                if dict_ou_result.get(ou) and
                dict_ou_result[ou].get('gPOptions', [''])[0] == 1
            ), None
        )

        def process_gpo_links(ou, apply_forced=False):

            if not dict_ou_result.get(ou):
                return

            for u in dict_ou_result[ou]['gPLink'][0].split('['):
                u = u.strip(' ]')
                if not u:
                    continue
                
                gpo_param = u.split('//')[1].rsplit(';', 1)
                gpo_distinguished_name = gpo_param[0]
                gpo_param_flag = gpo_param[1]
                
                if u not in dict_gpo_value:
                    gpo_value = list(self.client.search_all(
                        self.base_domain_name,
                        f"(distinguishedName={gpo_distinguished_name})",
                        ['nTSecurityDescriptor', 'gPCUserExtensionNames', 'flags']
                    ).values())[0]
                    dict_gpo_value[u] = gpo_value
                else:
                    gpo_value = dict_gpo_value[u]

                ## 0 Aucun drapeau actif (configuration par défaut).
                ## 1 La GPO est désactivée côté utilisateur.
                ## 2 La GPO est désactivée côté ordinateur.
                ## 3 La GPO est désactivée à la fois pour les utilisateurs et ordinateurs.
                ## 4 Indique que la GPO est une stratégie de groupe starter (modèle de GPO préconfiguré).
                if gpo_value['flags'][0] in [1, 3]:
                    continue
                    
                # si ce n'est pas une gpo network drive on fait rien
                if '{2EA1A81B-48E5-45E9-8BB7-A6E3AC170006}' not in str(gpo_value.get('gPCUserExtensionNames', '')):
                    continue

                #0 case appliqué décocher, lien activé cocher
                #1 case appliqué décocher, lien activé décocher
                #2 case appliqué cocher, lien activé cocher
                #3 case appliqué cocher, lien activé décocher
                if (gpo_param_flag == "0" and not apply_forced) or (gpo_param_flag == "2" and apply_forced):
                    if self.check_acl_gpo(gpo_value['nTSecurityDescriptor'][0]):
                        dict_network_drive.update(self.apply_gpo(gpo_param[0]))

        # Première passe : récupération des GPO appliquées
        for ou in reversed(self.list_ou):
            if block_inheritance and not ou.lower().endswith(block_inheritance):
                continue
            process_gpo_links(ou, apply_forced=False)

        # Deuxième passe : application des GPO forcées
        for ou in reversed(self.list_ou):
            process_gpo_links(ou, apply_forced=True)

        return dict_network_drive

6948aa6302ee0320ea4ec3b93152e44c01b012d0e19a69b0138f43271d28bdaa : .gitignore
38d056ab130f7bf7c481c12636a4e9959de36561d3dfcbe54c6e3571bc0c1dc3 : WAPT/certificate.crt
f86351b87231871e76d6c843209cd3c3e31b9622825cc9a83fe882a412fca2b4 : WAPT/control
25b0beebb5a383774223652d265c32e809a14cbe789e6ffc140a1e9e43591361 : WAPT/icon.png
8bc45f16943dc2ebd4a698e0dda180a572bf06c82aaf460098598a2bf54774a8 : luti.json
a499e197928246b4ed228d62ffe13d904359d8bc84e795402e64353bfa92762e : setup.py
cb320c6b6e395d57ef245c74b9953505ee99d0b1cbe4335c335e2115d8c5b0ca : testgpo-printer.py
c3990a40f3a61a132a8f7c731130a5e204d3973d6d19cac114235a9da4d60f7a : testgpo.py