tis-cyberwatch-plugin-export-to-cyberwatch-airgap
Silent install package for tis-cyberwatch-plugin-export-to-cyberwatch-airgap
15-21
- package: tis-cyberwatch-plugin-export-to-cyberwatch-airgap
- version: 15-21
- maintainer: sfonteneau
- locale: all
- target_os: all
- architecture: all
- signature_date:
- size: 12.81 Ko
package : tis-cyberwatch-plugin-export-to-cyberwatch-airgap
version : 15-21
architecture : all
section : base
priority : optional
name :
categories :
maintainer : sfonteneau
description : Package for tis-cyberwatch-plugin
depends :
conflicts :
maturity : PROD
locale : all
target_os : all
min_wapt_version : 2.0
sources :
installed_size :
impacted_process :
description_fr : Paquet pour tis-cyberwatch-plugin
description_pl : Pakiet dla tis-cyberwatch-plugin
description_de : Paket für tis-cyberwatch-plugin
description_es : Paquete para tis-cyberwatch-plugin
description_pt : Pacote para tis-cyberwatch-plugin
description_it : Pacchetto per tis-cyberwatch-plugin
description_nl : Pakket voor tis-cyberwatch-plugin
description_ru : Пакет для tis-cyberwatch-plugin
audit_schedule : 1h
editor :
keywords :
licence :
homepage :
package_uuid : 437beb3e-a3f5-4892-8105-e562f59f6a08
valid_from :
valid_until :
forced_install_on :
changelog :
min_os_version :
max_os_version :
icon_sha256sum : 84c8d943064b7613cd3ed456ddd81ab07c487931ae81fbbe029c670255d379d2
signer : Tranquil IT
signer_fingerprint: 8c5127a75392be9cc9afd0dbae1222a673072c308c14d88ab246e23832e8c6bb
signature_date : 2025-10-07T13:05:12.000000
signed_attributes : package,version,architecture,section,priority,name,categories,maintainer,description,depends,conflicts,maturity,locale,target_os,min_wapt_version,sources,installed_size,impacted_process,description_fr,description_pl,description_de,description_es,description_pt,description_it,description_nl,description_ru,audit_schedule,editor,keywords,licence,homepage,package_uuid,valid_from,valid_until,forced_install_on,changelog,min_os_version,max_os_version,icon_sha256sum,signer,signer_fingerprint,signature_date,signed_attributes
signature : hAl2UUHdub31zAKGie4s2P8DUyqQUxj1FtzTFsujmkJLOZFolnTFzn1pg2I0wqzKALEzjihtZ2eRSxTQGDgBzt3GPJeXnYzsWB0cQFK0O0aK8qQ9jeKqodkPSMTrT+8SD73E5QfNZU2xPhbk52w1FNxT81mjYZxH14r/wIYEwkPzx7xHu86OoEn014/ws+C06TXFZwNFFi/Bosf6seUqX7Wtu3wKSWTAq9iqu8GvtZSQD9+G0zePXoiAHHNgc7cQi7oYWp7HHgWEKwIm/a3+V61ZIC6l+8K7RxKy0Afatofj7U7v9DbtRVH2prkwPC0/7QamAfrsRMW3hHJQhIh93g==
# -*- coding: utf-8 -*-
from setuphelpers import *
from configparser import ConfigParser
from common import WaptServer
from waptutils import get_verify_cert
from common import get_requests_client_cert_session
import traceback
import platform
import waptlicences
import concurrent.futures
simultaneous_maximum_send=10
dict_arch = {"x64": "AMD64", "x86": ""}
def audit():
CONFWAPT = ConfigParser()
CONFWAPT.read(makepath(WAPT.private_dir, "wapt_api.ini"))
username_wapt = CONFWAPT.get("wapt", "username")
password_wapt = CONFWAPT.get("wapt", "password")
srvwapt_url = CONFWAPT.get("wapt", "url")
t = waptlicences.waptserver_login(WAPT.config_filename,username_wapt,password_wapt)
if not 'session' in t['session_cookies']:
session_cookies = [u for u in t['session_cookies'] if u['Domain'] == self.wremote.waptserver.server_url.split('://')[-1]][0]
else:
session_cookies = t['session_cookies']['session']
session_cookies['Name'] = 'session'
sessionwapt = get_requests_client_cert_session(WAPT.waptserver.server_url,cert=(t['client_certificate'],t['client_private_key'],t['client_private_key_password']),verify=WAPT.waptserver.verify_cert)
sessionwapt.cookies.set(session_cookies['Name'], session_cookies['Value'], domain=session_cookies['Domain'])
sessionwapt.verify = WAPT.waptserver.verify_cert
CONFCYB = ConfigParser()
CONFCYB.read(makepath(WAPT.private_dir, "cyberwatch_api.ini"))
if CONFCYB.has_option("cyberwatch", 'verify_cert'):
verify_cert_cyb = get_verify_cert(CONFCYB.get("cyberwatch", 'verify_cert'))
else:
verify_cert_cyb = True
CLIENT = Cyberwatch_Pyhelper()
CLIENT.api_url = CONFCYB.get("cyberwatch", "url")
CLIENT.api_key = CONFCYB.get("cyberwatch", "api_key")
CLIENT.api_secret = CONFCYB.get("cyberwatch", "secret_key")
CLIENT.verify_ssl = verify_cert_cyb
list_pc_wapt = json.loads(sessionwapt.get("%s/api/v3/hosts?&limit=1000000" % WAPT.waptserver.server_url).content)["result"]
inerror =[False]
list_run = []
for pctest in list_pc_wapt:
uuidwapt = pctest["uuid"]
list_run.append({
"uuidwapt":uuidwapt,
"sessionwapt": sessionwapt,
"CLIENT": CLIENT,
"WAPT": WAPT,
"inerror":inerror
})
results = []
with concurrent.futures.ThreadPoolExecutor(simultaneous_maximum_send) as executor:
futures = {executor.submit(export_to_cyerwatch, **g): g for g in list_run}
for future in concurrent.futures.as_completed(futures):
results.append(future.result())
waptlicences.waptserver_logout(WAPT.config_filename)
prefix = control.package.split('-')[0]
if WAPT.is_installed('%s-cyberwatch-plugin-import-from-cyberwatch' % prefix):
WAPT.audit('%s-cyberwatch-plugin-import-from-cyberwatch' % prefix,force=True)
if inerror[0] :
return "ERROR"
else:
return "OK"
def export_to_cyerwatch(uuidwapt=None,sessionwapt=None,CLIENT=None,WAPT=None,inerror=[True]):
try :
pc = json.loads(sessionwapt.get('%s/api/v3/hosts?columns=dmi,wmi,host_info,host_metrics,waptwua_status,wuauserv_status,host_capabilities,wapt_status&uuid=%s' % (WAPT.waptserver.server_url,uuidwapt)).content)["result"][0]
if pc.get("host_capabilities"):
if not "windows" in pc.get("host_capabilities", {}).get("tags", []):
return
else:
return
list_key = ["wmi","dmi","host_info","host_metrics","wuauserv_status","host_capabilities"]
for k in list_key:
if not pc.get(k,{}):
pc[k]={}
hostname = str(pc.get("computer_name",''))
arch = dict_arch.get(pc.get("host_capabilities",{}).get("architecture",''),'')
os_build = pc.get("host_capabilities",{}).get("os_version",'')
win10 = False
if os_build.startswith("10.0."):
win10 = True
if pc["host_info"].get("windows_version_ubr", ""):
os_build = str(os_build[5:] + "." + pc["host_info"]["windows_version_ubr"])
os_prettyname = pc.get("wmi",{}).get("Win32_OperatingSystem",{}).get("Caption",'')
OperatingSystemSKU = pc.get("wmi",{}).get("Win32_OperatingSystem",{}).get("OperatingSystemSKU",'')
os_version_prettyname = pc["host_info"].get("windows_version_prettyname")
if not os_version_prettyname:
os_version_prettyname = pc["host_info"].get("windows_version_releaseid")
wua_agent_version = pc["host_info"].get("wua_agent_version",'')
pending_reboot_reasons = pc.get("wuauserv_status",{}).get("reboot_needed",'')
boot_time = pc.get("host_metrics",{}).get("last_bootup_time",'')
list_ip = "\n".join(["IP:" + ip for ip in pc["host_info"].get("connected_ips",[])])
list_kb = []
dict_kb = {}
for kb in json.loads(sessionwapt.get("%s/api/v3/host_data?field=wsusupdates&uuid=%s" % (WAPT.waptserver.server_url,pc["uuid"])).content)["result"]:
if not kb["local_status_installed"]:
continue
if not kb["kbids"]:
continue
if win10:
list_kb.append("KB:%s" % (kb["kbids"][0]))
else:
list_kb.append("PACKAGE_WUA:KB%s|%s" % (kb["kbids"][0], kb["update_id"]))
dict_kb["KB" + kb["kbids"][0]] = None
list_soft = []
for soft in json.loads(sessionwapt.get("%s/api/v3/host_data?field=installed_softwares&uuid=%s" % (WAPT.waptserver.server_url,pc["uuid"])).content)["result"]:
if soft["name"].startswith("KB"):
name_kb = soft["name"].split(" ")[0]
if not name_kb in dict_kb:
list_kb.append("PACKAGE:" + name_kb)
continue
list_soft.append("WINDOWS_APPLICATION:%s|%s" % (soft["name"], soft["version"]))
list_soft_content = "\n".join(list_soft)
list_kb_content = "\n".join(list_kb)
list_list_to_dict = ["Processor_Information", "Base_Board_Information"]
for entry in list_list_to_dict:
if not entry in pc["dmi"]:
pc["dmi"][entry] = {}
if type(pc["dmi"][entry]) is list:
pc["dmi"][entry] = pc["dmi"][entry][0]
try:
list_service = "\n".join(
"SERVICE:" + service + "|" + pc["host_info"]["list_services"][service]["startup"]
for service in pc["host_info"].get("list_services", {})
)
except:
list_service = ""
list_port = {}
for p in pc["host_info"].get("listening_sockets", []):
if p["local_ip"] in ["127.0.0.1", "::"]:
continue
if p["type"] == "SOCK_STREAM":
list_port[p["local_port"]] = "TCP"
else:
list_port[p["local_port"]] = "UDP"
content_port = []
for p in list_port:
content_port.append("%s:%s" % (list_port[p], p))
content_port = "\n".join(content_port)
content = f"""HOSTNAME:{hostname}
ARCH:{arch}
OS_BUILD:{os_build}
OS_PRETTYNAME:{os_prettyname}
OS_VERSION:{os_version_prettyname}
OS_EDITION:SKU {OperatingSystemSKU}
WUAVERSION:{wua_agent_version}
{list_kb_content}
{list_soft_content}
WUAVERSION:{wua_agent_version}
REBOOT:{pending_reboot_reasons}
BOOT_TIME:{boot_time}
{list_ip}
META:bios-version|{pc.get('dmi',{}).get('BIOS_Information',{}).get('Version','')}
META:bios-release-date|{pc.get('dmi',{}).get('BIOS_Information',{}).get('Release_Date','')}
META:bios-vendor|{pc['dmi'].get('BIOS_Information',{}).get('Vendor','')}
META:baseboard-product|{pc['dmi'].get('Base_Board_Information',{}).get('Product_Name','')}
META:baseboard-version|{pc['dmi'].get('Base_Board_Information',{}).get('Version','')}
META:processor-model-name|{pc['dmi']['Processor_Information'].get('Version')}
META:processor-socket|{pc['dmi'].get('Processor_Information',{}).get('Socket_Designation')}
META:processor-description|{pc['host_info'].get('cpu_identifier','')}
META:baseboard-serialnumber|{pc['dmi'].get('Base_Board_Information',{}).get('Serial_Number')}
META:system-domain|{pc['host_info'].get('domain_name','')}
META:system-model|{pc['host_info'].get('system_productname','')}
META:system-manufacturer|{pc['host_info'].get('system_manufacturer','')}
META:system-ram|{pc['host_metrics'].get('physical_memory','')}
META:system-uuid|{pc['dmi'].get('System_Information',{}).get('UUID','')}
META:uuid-wapt|{uuidwapt}
META:product-key|{pc['host_info'].get('windows_product_infos',{}).get('product_key','')}
{list_service}
{content_port}
"""
data = {"output": content, "groups": "upload_by_wapt"}
CLIENT.request(method="POST",endpoint="/api/v2/cbw_scans/scripts", body_params=data)
except Exception as e :
inerror = [True]
print('----------------------------')
print(uuidwapt)
print(str(traceback.format_exc()))
def install():
if not isfile(makepath(WAPT.private_dir, "cyberwatch_api.ini")):
filecopyto("cyberwatch_api.ini", makepath(WAPT.private_dir, "cyberwatch_api.ini"))
if not isfile(makepath(WAPT.private_dir, "wapt_api.ini")):
filecopyto("wapt_api.ini", makepath(WAPT.private_dir, "wapt_api.ini"))
import os
import json
from configparser import ConfigParser
from typing import Generator
import requests
class Cyberwatch_Pyhelper():
def __init__(self):
self.api_url = None
self.api_key = None
self.api_secret = None
def clear_endpoint(f):
"""
Decorator that takes the endpoint that was given by the API user,
and replaces the {parameter} by the one that was given inside the params or body params dict
"""
def wrapper(*args, **kwargs):
endpoint = kwargs.get("endpoint")
parameters = [parameter.split("}")[0] for parameter in endpoint.split("{")[1:]]
for parameter in parameters:
parameter_value = (kwargs.get("params",{}).get(parameter) or kwargs.get("body_params",{}).get(parameter))
endpoint = endpoint.replace("{" + parameter + "}", str(parameter_value))
# Deleting the 'parameter' from the kwargs arguments if it exists
[kwargs[key].pop(parameter, "") for key in kwargs if isinstance(kwargs[key], dict)]
kwargs.update({"endpoint": endpoint})
return f(*args, **kwargs)
return wrapper
def __basic_auth(self) -> requests.auth.HTTPBasicAuth:
"""
Private method returning a BasicAuth
"""
return requests.auth.HTTPBasicAuth(self.api_key, self.api_secret)
@clear_endpoint
def request(self, **kwargs) -> Generator[requests.models.Response, None, None]:
"""
Only accessible method, handles every step of the API call
"""
if not isinstance(kwargs.get("method"), str):
raise Exception("The type of endpoint parameter should be str")
if kwargs.get("timeout") and not isinstance(kwargs.get("timeout"), int):
raise Exception("The type of timeout parameter should be int")
method = str(kwargs.get("method")).upper()
timeout = kwargs.get("timeout") or 10
params = kwargs.get("params") or {}
body_params = json.dumps(kwargs.get("body_params")) if kwargs.get("body_params") else {}
headers = {'Content-type': 'application/json'}
url = self.api_url + kwargs.get("endpoint")
while url:
response = requests.request(
method=method,
url=url,
headers=headers,
auth=self.__basic_auth(),
params=params,
data=body_params,
timeout=timeout,
verify=self.verify_ssl
)
url = response.links["next"]["url"] if "next" in response.links else None
yield response
80a99d6468013de9f7445c0f40a6e4539ae2bd31b5fde69e9786f0daf3fef83c : .gitignore
38d056ab130f7bf7c481c12636a4e9959de36561d3dfcbe54c6e3571bc0c1dc3 : WAPT/certificate.crt
d6b0171504fc59eafbb816039569967d6790cad66d82f45e759879363f7f1cc9 : WAPT/control
84c8d943064b7613cd3ed456ddd81ab07c487931ae81fbbe029c670255d379d2 : WAPT/icon.png
ec0a3f3c5123020aa38c06da50ba806fb2dbee917bb6bac26f30353f8ec01c04 : cyberwatch_api.ini
3500a40a12025e9f0e366cc7acbc40a09ec4aa38132293e848f4d210d01b26fb : luti.json
6045a3d71595caa8c0c778aba928d4703de545b83fb2eb36628b370855a78442 : setup.py
cb302f842cf1695f553389a436898f9e580d7d866a6726789dca1994a55e9c3c : wapt_api.ini