tis-cyberwatch-plugin-export-to-cyberwatch-airgap
11-13
Package for tis-cyberwatch-plugin
1917 downloads
See build result See VirusTotal scan
control
package : tis-cyberwatch-plugin-export-to-cyberwatch-airgap
version : 11-13
architecture : all
section : base
priority : optional
name :
categories :
maintainer : sfonteneau
description : Package for tis-cyberwatch-plugin
depends :
conflicts :
maturity : PROD
locale : all
target_os : all
min_wapt_version : 2.0
sources :
installed_size :
impacted_process :
description_fr : Paquet pour tis-cyberwatch-plugin
description_pl : Pakiet dla tis-cyberwatch-plugin
description_de : Paket für tis-cyberwatch-plugin
description_es : Paquete para tis-cyberwatch-plugin
description_pt : Pacote para tis-cyberwatch-plugin
description_it : Pacchetto per tis-cyberwatch-plugin
description_nl : Pakket voor tis-cyberwatch-plugin
description_ru : Пакет для tis-cyberwatch-plugin
audit_schedule : 1h
editor :
keywords :
licence :
homepage :
package_uuid : edeb3b70-0d89-4a79-90c7-6293fdde1c53
valid_from :
valid_until :
forced_install_on :
changelog :
min_os_version :
max_os_version :
icon_sha256sum : 84c8d943064b7613cd3ed456ddd81ab07c487931ae81fbbe029c670255d379d2
signer : Tranquil IT
signer_fingerprint: 8c5127a75392be9cc9afd0dbae1222a673072c308c14d88ab246e23832e8c6bb
signature : Q5NNtFIBGUmCMweB/kcM8Lvt448b07xXoO8yP38wipJM8gUrW/YRZefswHMPqQ93Xb1D9oBFaWfomvuKfQFKNB5YWbmdxpVIXuyVh6PHsRXlJbZVQzcs2s3WpK2Zr32eCnfqpSB3KB0Ji95wfXVBdObnApWYz5UuuqsBLfee1JiBBRYOCJdrk9nTEZkQBdVKtwYFirEt92Omd3a2kCbOveIDz91Ytexcf42Vi7p97cqeO135RxYE6pxGVD/V3Zb+qQIPhg/GbOdtmvEMBqwofs0Nu9X8NIQ2IyYMOVGKRVf/QQL8EKjgKqmGlI36cGf8ZfZN166bdlQ1sxtb1vC+WA==
signature_date : 2024-07-21T09:00:08.491756
signed_attributes : package,version,architecture,section,priority,name,categories,maintainer,description,depends,conflicts,maturity,locale,target_os,min_wapt_version,sources,installed_size,impacted_process,description_fr,description_pl,description_de,description_es,description_pt,description_it,description_nl,description_ru,audit_schedule,editor,keywords,licence,homepage,package_uuid,valid_from,valid_until,forced_install_on,changelog,min_os_version,max_os_version,icon_sha256sum,signer,signer_fingerprint,signature_date,signed_attributes
Setup.py
# -*- coding: utf-8 -*-
from setuphelpers import *
from configparser import ConfigParser
from common import WaptServer
from waptutils import get_verify_cert
import traceback
import platform
def audit():
CONFWAPT = ConfigParser()
CONFWAPT.read(makepath(WAPT.private_dir, "wapt_api.ini"))
username_wapt = CONFWAPT.get("wapt", "username")
password_wapt = CONFWAPT.get("wapt", "password")
srvwapt_url = CONFWAPT.get("wapt", "url")
WAPT.waptserver.post('api/v3/login' ,data=json.dumps({'user': username_wapt, 'password': password_wapt}))
# certifi import with system_redirection but install run without system_redirection so copy cacert.pem
if platform.system() == 'Windows':
if isfile(r'C:\Windows\SysWOW64\config\systemprofile\AppData\Local\.certifi\cacert.pem'):
mkdirs(r'C:\WINDOWS\system32\config\systemprofile\AppData\Local\.certifi')
filecopyto(r'C:\Windows\SysWOW64\config\systemprofile\AppData\Local\.certifi\cacert.pem',r'C:\WINDOWS\system32\config\systemprofile\AppData\Local\.certifi\cacert.pem')
CONFCYB = ConfigParser()
CONFCYB.read(makepath(WAPT.private_dir, "cyberwatch_api.ini"))
if CONFCYB.has_option("cyberwatch", 'verify_cert'):
verify_cert_cyb = get_verify_cert(CONFCYB.get("cyberwatch", 'verify_cert'))
else:
verify_cert_cyb = True
CLIENT = CBWApi(CONFCYB.get("cyberwatch", "url"), CONFCYB.get("cyberwatch", "api_key"), CONFCYB.get("cyberwatch", "secret_key"), verify_ssl=verify_cert_cyb)
dict_arch = {"x64": "AMD64", "x86": ""}
list_pc_wapt = WAPT.waptserver.get(
"api/v3/hosts?&limit=1000000"
)["result"]
inerror =False
for pctest in list_pc_wapt:
uuidwapt = pctest["uuid"]
try :
pc = WAPT.waptserver.get('api/v3/hosts?columns=dmi,wmi,host_info,host_metrics,waptwua_status,wuauserv_status,host_capabilities,wapt_status&uuid=%s' % uuidwapt)["result"][0]
if pc.get("host_capabilities"):
if not "windows" in pc.get("host_capabilities", {}).get("tags", []):
continue
else:
continue
list_key = ["wmi","dmi","host_info","host_metrics","wuauserv_status","host_capabilities"]
for k in list_key:
if not pc.get(k,{}):
pc[k]={}
hostname = str(pc.get("computer_name",''))
arch = dict_arch[pc.get("host_capabilities",{}).get("architecture",'')]
os_build = pc.get("host_capabilities",{}).get("os_version",'')
win10 = False
if os_build.startswith("10.0."):
win10 = True
if pc["host_info"].get("windows_version_ubr", ""):
os_build = str(os_build[5:] + "." + pc["host_info"]["windows_version_ubr"])
os_prettyname = pc.get("wmi",{}).get("Win32_OperatingSystem",{}).get("Caption",'')
OperatingSystemSKU = pc.get("wmi",{}).get("Win32_OperatingSystem",{}).get("OperatingSystemSKU",'')
os_version_prettyname = pc["host_info"].get("windows_version_prettyname")
if not os_version_prettyname:
os_version_prettyname = pc["host_info"].get("windows_version_releaseid")
wua_agent_version = pc["host_info"].get("wua_agent_version",'')
pending_reboot_reasons = pc.get("wuauserv_status",{}).get("reboot_needed",'')
boot_time = pc.get("host_metrics",{}).get("last_bootup_time",'')
list_ip = "\n".join(["IP:" + ip for ip in pc["host_info"].get("connected_ips",[])])
list_kb = []
dict_kb = {}
for kb in WAPT.waptserver.get("api/v3/host_data?field=wsusupdates&uuid=%s" % pc["uuid"])["result"]:
if not kb["local_status_installed"]:
continue
if win10:
list_kb.append("PACKAGE:KB%s" % (kb["kbids"][0]))
else:
list_kb.append("PACKAGE_WUA:KB%s|%s" % (kb["kbids"][0], kb["update_id"]))
dict_kb["KB" + kb["kbids"][0]] = None
list_soft = []
for soft in WAPT.waptserver.get("api/v3/host_data?field=installed_softwares&uuid=%s" % pc["uuid"])["result"]:
if soft["name"].startswith("KB"):
name_kb = soft["name"].split(" ")[0]
if not name_kb in dict_kb:
list_kb.append("PACKAGE:" + name_kb)
continue
list_soft.append("APPLICATION:%s|%s" % (soft["name"], soft["version"]))
list_soft_content = "\n".join(list_soft)
list_kb_content = "\n".join(list_kb)
list_list_to_dict = ["Processor_Information", "Base_Board_Information"]
for entry in list_list_to_dict:
if not entry in pc["dmi"]:
pc["dmi"][entry] = {}
if type(pc["dmi"][entry]) is list:
pc["dmi"][entry] = pc["dmi"][entry][0]
try:
list_service = "\n".join(
"SERVICE:" + service + "|" + pc["host_info"]["list_services"][service]["startup"]
for service in pc["host_info"].get("list_services", {})
)
except:
list_service = ""
list_port = {}
for p in pc["host_info"].get("listening_sockets", []):
if p["local_ip"] in ["127.0.0.1", "::"]:
continue
if p["type"] == "SOCK_STREAM":
list_port[p["local_port"]] = "TCP"
else:
list_port[p["local_port"]] = "UDP"
content_port = []
for p in list_port:
content_port.append("%s:%s" % (list_port[p], p))
content_port = "\n".join(content_port)
content = f"""HOSTNAME:{hostname}
ARCH:{arch}
OS_BUILD:{os_build}
OS_PRETTYNAME:{os_prettyname}
OS_VERSION:{os_version_prettyname}
OS_EDITION:SKU {OperatingSystemSKU}
WUAVERSION:{wua_agent_version}
{list_kb_content}
{list_soft_content}
WUAVERSION:{wua_agent_version}
REBOOT:{pending_reboot_reasons}
BOOT_TIME:{boot_time}
{list_ip}
META:bios-version|{pc.get('dmi',{}).get('BIOS_Information',{}).get('Version','')}
META:bios-release-date|{pc.get('dmi',{}).get('BIOS_Information',{}).get('Release_Date','')}
META:bios-vendor|{pc['dmi'].get('BIOS_Information',{}).get('Vendor','')}
META:baseboard-product|{pc['dmi'].get('Base_Board_Information',{}).get('Product_Name','')}
META:baseboard-version|{pc['dmi'].get('Base_Board_Information',{}).get('Version','')}
META:processor-model-name|{pc['dmi']['Processor_Information'].get('Version')}
META:processor-socket|{pc['dmi'].get('Processor_Information',{}).get('Socket_Designation')}
META:processor-description|{pc['host_info'].get('cpu_identifier','')}
META:baseboard-serialnumber|{pc['dmi'].get('Base_Board_Information',{}).get('Serial_Number')}
META:system-domain|{pc['host_info'].get('domain_name','')}
META:system-model|{pc['host_info'].get('system_productname','')}
META:system-manufacturer|{pc['host_info'].get('system_manufacturer','')}
META:system-ram|{pc['host_metrics'].get('physical_memory','')}
META:system-uuid|{pc['dmi'].get('System_Information',{}).get('UUID','')}
META:uuid-wapt|{uuidwapt}
META:product-key|{pc['host_info'].get('windows_product_infos',{}).get('product_key','')}
{list_service}
{content_port}
"""
data = {"output": content, "groups": "upload_by_wapt"}
CLIENT.upload_airgapped_results(data)
except Exception as e :
inerror = True
print('----------------------------')
print(uuidwapt)
print(str(traceback.format_exc()))
prefix = control.package.split('-')[0]
if WAPT.is_installed('%s-cyberwatch-plugin-import-from-cyberwatch' % prefix):
WAPT.audit('%s-cyberwatch-plugin-import-from-cyberwatch' % prefix,force=True)
if inerror :
return "ERROR"
else:
return "OK"
def install():
if not isfile(makepath(WAPT.private_dir, "cyberwatch_api.ini")):
filecopyto("cyberwatch_api.ini", makepath(WAPT.private_dir, "cyberwatch_api.ini"))
if not isfile(makepath(WAPT.private_dir, "wapt_api.ini")):
filecopyto("wapt_api.ini", makepath(WAPT.private_dir, "wapt_api.ini"))
# Cyberwatch python lib
import json
import datetime
import base64
import hmac
from hashlib import sha256
from requests.auth import AuthBase
import logging
import functools
import requests
from urllib.parse import urlparse, urljoin
from requests.exceptions import ProxyError, SSLError, RetryError, InvalidHeader, MissingSchema
from urllib3.exceptions import NewConnectionError, MaxRetryError
ROUTE_AIRGAPPED_SCRIPTS = "/api/v2/cbw_scans/scripts"
SIGNATURE_HEADER = "CyberWatch APIAuth-HMAC-SHA256"
SIGNATURE_HTTP_HEADER = "Authorization"
TIMESTAMP_HTTP_HEADER = "Date"
SIGNATURE_DELIM = ":"
CONTENT_TYPE_HEADER = "Content-Type"
JSON_CONTENT_TYPE = "application/json"
class CBWApi: # pylint: disable=R0904
"""Class used to communicate with the CBW API"""
def __init__(
self,
api_url=None,
api_key=None,
secret_key=None,
verify_ssl=False,
):
self.verify_ssl = verify_ssl
self.logger = logging.getLogger(self.__class__.__name__)
try:
self.api_url = api_url
self.api_key = api_key
self.secret_key = secret_key
except KeyError as error:
error("CBWApi is missing one of its argument. You can use the environnement variable %s.")
def _build_route(self, params):
return functools.reduce(lambda base_url, arg: urljoin(f"{base_url}/", arg), [self.api_url] + params)
def _request(self, verb, payloads, body_params=None, params=None):
route = self._build_route(payloads)
if body_params is not None:
body_params = json.dumps(body_params)
try:
return requests.request(verb, route, data=body_params, params=params, auth=CBWAuth(self.api_key, self.secret_key), verify=self.verify_ssl)
except (ConnectionError, ProxyError, SSLError, NewConnectionError, RetryError, InvalidHeader, MaxRetryError):
self.logger.exception("An error occurred when requesting {}".format(route))
sys.exit(-1)
except MissingSchema:
self.logger.error("An error occurred, please check your API_URL.")
sys.exit(-1)
def upload_airgapped_results(self, content):
"""POST request to /api/v2/cbw_scans/scripts to upload air gapped scanning script result"""
response = self._request("POST", [ROUTE_AIRGAPPED_SCRIPTS], content)
if response.status_code != 201:
logging.error("Error::{}".format(response.text))
return None
return response
class CBWAuth(AuthBase):
"""Used to make the authentication for the API requests"""
def __init__(self, api_key, secret_key):
self.api_key = api_key
self.secret_key = secret_key
self.raw_data = ""
self.hash_data = ""
self.type_data = ""
def __call__(self, request):
self._encode(request)
return request
def _encode(self, request):
timestamp = datetime.datetime.utcnow().strftime("%a, %d %b %Y %H:%M:%S GMT")
# get data
self.raw_data = request.body if request.body else ""
self.type_data = JSON_CONTENT_TYPE if self.raw_data else ""
# add headers
if self.raw_data:
self._add_content_type(request)
request.headers[TIMESTAMP_HTTP_HEADER] = timestamp
self._add_signature(request, timestamp)
def _add_signature(self, request, timestamp):
method = request.method
path = request.path_url
signature = self._sign(method, timestamp, path).decode("utf-8")
request.headers[SIGNATURE_HTTP_HEADER] = "{0} {1}:{2}".format(SIGNATURE_HEADER, self.api_key, signature)
def _add_content_type(self, request):
request.headers[CONTENT_TYPE_HEADER] = self.type_data
def _sign(self, method, timestamp, path):
# Build the message to sign
message = ",".join([method, self.type_data, self.hash_data, path, timestamp])
# Create the signature
digest = hmac.new(bytes(self.secret_key, "utf8"), bytes(message, "utf8"), sha256).digest()
return base64.b64encode(digest).strip()
290172c3678ccad765c524714ec1a5ff8af0e33101047dedd28d1e6e2b664444 : setup.py
cb302f842cf1695f553389a436898f9e580d7d866a6726789dca1994a55e9c3c : wapt_api.ini
ec0a3f3c5123020aa38c06da50ba806fb2dbee917bb6bac26f30353f8ec01c04 : cyberwatch_api.ini
84c8d943064b7613cd3ed456ddd81ab07c487931ae81fbbe029c670255d379d2 : WAPT/icon.png
a5a97261381e1d0ad46ee15916abec9c2631d0201f5cc50ceb0197a165a0bbbf : WAPT/certificate.crt
2ca57813b4d3860630b21d1438abaa52cd67221617db91561015260ee67525f9 : luti.json
80a99d6468013de9f7445c0f40a6e4539ae2bd31b5fde69e9786f0daf3fef83c : .gitignore
a92ff8151cebbd7b16d6b99a2038a2ce1a9dcea18813b2e0a5663f8cbacfbe5a : WAPT/control