
tis-cyberwatch-plugin-import-from-cyberwatch
Paquet d’installation silencieuse pour tis-cyberwatch-plugin-import-from-cyberwatch
12-17
- package: tis-cyberwatch-plugin-import-from-cyberwatch
- version: 12-17
- maintainer: sfonteneau
- locale: all
- target_os: all
- architecture: all
- signature_date:
- size: 12.62 Ko
package : tis-cyberwatch-plugin-import-from-cyberwatch
version : 12-17
architecture : all
section : base
priority : optional
name :
categories :
maintainer : sfonteneau
description : Package for tis-cyberwatch-plugin
depends :
conflicts :
maturity : PROD
locale : all
target_os : all
min_wapt_version : 2.0
sources :
installed_size :
impacted_process :
description_fr : Paquet pour tis-cyberwatch-plugin
description_pl : Pakiet dla tis-cyberwatch-plugin
description_de : Paket für tis-cyberwatch-plugin
description_es : Paquete para tis-cyberwatch-plugin
description_pt : Pacote para tis-cyberwatch-plugin
description_it : Pacchetto per tis-cyberwatch-plugin
description_nl : Pakket voor tis-cyberwatch-plugin
description_ru : Пакет для tis-cyberwatch-plugin
audit_schedule : 1h
editor :
keywords :
licence :
homepage :
package_uuid : c987eb99-f7b3-41de-a411-8d88072b3f0d
valid_from :
valid_until :
forced_install_on :
changelog :
min_os_version :
max_os_version :
icon_sha256sum : 84c8d943064b7613cd3ed456ddd81ab07c487931ae81fbbe029c670255d379d2
signer : Tranquil IT
signer_fingerprint: 8c5127a75392be9cc9afd0dbae1222a673072c308c14d88ab246e23832e8c6bb
signature_date : 2025-02-14T13:00:11.000000
signed_attributes : package,version,architecture,section,priority,name,categories,maintainer,description,depends,conflicts,maturity,locale,target_os,min_wapt_version,sources,installed_size,impacted_process,description_fr,description_pl,description_de,description_es,description_pt,description_it,description_nl,description_ru,audit_schedule,editor,keywords,licence,homepage,package_uuid,valid_from,valid_until,forced_install_on,changelog,min_os_version,max_os_version,icon_sha256sum,signer,signer_fingerprint,signature_date,signed_attributes
signature : mOoJJT1G9SgB10MQFY6pP7d3Og0WmwqKhF1aWn5YLg/7dlYntv1YGbteuL4xDmYnIurIZdlsHKBjCLXafYOlG5abVk8jIVsDwrsV/LSvrsdYDNHkEq7b1pGLEd2tV7GEtPbw46PpzvpPmFGI75HnquKZZ2sTbh8ue8YS/SZdk2XGpNMN/w1UTZBCcdIvTyYw9R/IN5HshwXrPPWOq8izelBQaBonhvGyoGmE5MXETqo3JqIUy5Cg66HdaACAL2y9Vkw0FpEmTl6k5tgB1Kl4Ie3vFVOJlyImlMyUddBb50lEK6IjLhQT86Lz4Xme4YADboSikyFH5cZMtvfLz+0yKA==
# -*- coding: utf-8 -*-
from setuphelpers import *
from configparser import ConfigParser
from waptutils import get_verify_cert
from common import get_requests_client_cert_session
import platform
import waptlicences
import json
import concurrent.futures
simultaneous_maximum_send=10
def audit():
CONFWAPT = ConfigParser()
CONFWAPT.read(makepath(WAPT.private_dir, "wapt_api.ini"))
username_wapt = CONFWAPT.get("wapt", "username")
password_wapt = CONFWAPT.get("wapt", "password")
srvwapt_url = CONFWAPT.get("wapt", "url")
t = waptlicences.waptserver_login(WAPT.config_filename,username_wapt,password_wapt)
if not 'session' in t['session_cookies']:
session_cookies = [u for u in t['session_cookies'] if u['Domain'] == WAPT.waptserver.server_url.split('://')[-1]][0]
else:
session_cookies = t['session_cookies']['session']
session_cookies['Name'] = 'session'
sessionwapt = get_requests_client_cert_session(WAPT.waptserver.server_url,cert=(t['client_certificate'],t['client_private_key'],t['client_private_key_password']),verify=WAPT.waptserver.verify_cert)
sessionwapt.cookies.set(session_cookies['Name'], session_cookies['Value'], domain=session_cookies['Domain'])
sessionwapt.verify = WAPT.waptserver.verify_cert
CONFCYB = ConfigParser()
CONFCYB.read(makepath(WAPT.private_dir, "cyberwatch_api.ini"))
cyberwatch_url = CONFCYB.get("cyberwatch", "url")
if CONFCYB.has_option("cyberwatch", 'verify_cert'):
verify_cert_cyb = get_verify_cert(CONFCYB.get("cyberwatch", 'verify_cert'))
else:
verify_cert_cyb = True
CLIENT = CBWApi(CONFCYB.get("cyberwatch", "url"), CONFCYB.get("cyberwatch", "api_key"), CONFCYB.get("cyberwatch", "secret_key"), verify_ssl=verify_cert_cyb)
dict_hostname_uuid = {}
for pc in json.loads(sessionwapt.get("%s/api/v3/hosts?&limit=1000000" % WAPT.waptserver.server_url).content)["result"]:
dict_hostname_uuid[str(pc["computer_name"]).lower()] = pc["uuid"]
list_error = []
list_run = []
for hostname, uuid in dict_hostname_uuid.items():
list_run.append({
"sessionwapt": sessionwapt,
"CLIENT": CLIENT,
"cyberwatch_url": cyberwatch_url,
"dict_hostname_uuid": {hostname: uuid},
"WAPT": WAPT,
"hostname":hostname,
"list_error":list_error
})
results = []
with concurrent.futures.ThreadPoolExecutor(simultaneous_maximum_send) as executor:
futures = {executor.submit(import_from_cyberwatch, **g): g for g in list_run}
for future in concurrent.futures.as_completed(futures):
results.append(future.result())
waptlicences.waptserver_logout(WAPT.config_filename)
if list_error:
print(" \n".join(list_error))
return "ERROR"
else:
return "OK"
def import_from_cyberwatch(sessionwapt=None,CLIENT=None,cyberwatch_url=None,dict_hostname_uuid=None,WAPT=None,hostname=None,list_error=None):
waptdata = []
resultcyb = CLIENT.servers(params={'hostname':hostname})
if not resultcyb:
return
entry=resultcyb[0]
# for entry in CLIENT.assets() :
if str(entry.hostname).lower() in dict_hostname_uuid:
try:
####dictionary conversion for easy searching in console###########
resultpc = json.loads(CLIENT._request("GET", ["/api/v3/vulnerabilities/servers", str(entry.id)]).content.decode("utf-8"))
list_patch = {}
for patch in resultpc["updates"]:
if patch.get("target", {}):
if patch.get("cve_announcements", []):
list_patch[patch.get("target", {}).get("product", "")]=None
resultpc['cyberwatch_url'] = cyberwatch_url
resultpc["list_patch"] = sorted(list(list_patch))
resultpc["updates"] = list_to_dict(newkey="id", listconvert=resultpc["updates"])
resultpc["security_issues"] = list_to_dict(newkey="id", listconvert=resultpc["security_issues"])
resultpc["groups"] = list_to_dict(newkey="name", listconvert=resultpc["groups"])
resultpc["cve_announcements"] = list_to_dict(
newkey="cve_code", listconvert=[cve for cve in resultpc["cve_announcements"] if cve["active"]]
)
resultpc["compliance_repositories"] = list_to_dict(newkey="id", listconvert=resultpc["compliance_repositories"])
##################################################################
waptdata.append(
{
"host_id": dict_hostname_uuid[str(entry.hostname).lower()],
"value_id": int(time.monotonic() * 1000),
"value_date": datetime2isodate(datetime.datetime.utcnow()),
"value_section": "cyberwatch",
"value_key": "cyberwatch",
"value": resultpc,
"expiration_date": datetime2isodate(datetime.datetime.utcnow() + datetime.timedelta(minutes=1440)),
}
)
print("Get %s from cyberwatch" % str(entry.hostname).lower())
time.sleep(0.0000001)
except Exception as e:
list_error.append("Error %s %s" % (str(entry.hostname).lower(), str(e)))
sessionwapt.post("%s/api/v3/update_hosts_audit_data" % WAPT.waptserver.server_url, data=json.dumps(waptdata))
def install():
if not isfile(makepath(WAPT.private_dir, "cyberwatch_api.ini")):
filecopyto("cyberwatch_api.ini", makepath(WAPT.private_dir, "cyberwatch_api.ini"))
if not isfile(makepath(WAPT.private_dir, "wapt_api.ini")):
filecopyto("wapt_api.ini", makepath(WAPT.private_dir, "wapt_api.ini"))
# Cyberwatch python lib
import json
import time
import datetime
import base64
import hmac
from hashlib import sha256
from requests.auth import AuthBase
import logging
import functools
import requests
from collections import namedtuple
from urllib.parse import urlparse, urljoin
from urllib.parse import parse_qs
from requests.exceptions import ProxyError, SSLError, RetryError, InvalidHeader, MissingSchema
from urllib3.exceptions import NewConnectionError, MaxRetryError
ROUTE_SERVERS = "/api/v3/vulnerabilities/servers"
SIGNATURE_HEADER = "CyberWatch APIAuth-HMAC-SHA256"
SIGNATURE_HTTP_HEADER = "Authorization"
TIMESTAMP_HTTP_HEADER = "Date"
SIGNATURE_DELIM = ":"
CONTENT_TYPE_HEADER = "Content-Type"
JSON_CONTENT_TYPE = "application/json"
class CBWApi: # pylint: disable=R0904
"""Class used to communicate with the CBW API"""
def __init__(
self,
api_url=None,
api_key=None,
secret_key=None,
verify_ssl=False,
):
self.verify_ssl = verify_ssl
self.logger = logging.getLogger(self.__class__.__name__)
try:
self.api_url = api_url
self.api_key = api_key
self.secret_key = secret_key
except KeyError as error:
error("CBWApi is missing one of its argument. You can use the environnement variable %s.")
def _build_route(self, params):
return functools.reduce(lambda base_url, arg: urljoin(f"{base_url}/", arg), [self.api_url] + params)
def _cbw_parser(self, response):
"""Parse the response text of an API request"""
try:
result = json.loads(response.text, object_hook=lambda d: namedtuple("cbw_object", d.keys())(*d.values()))
return result
except TypeError:
self.logger.error("An error occurred while parsing response")
sys.exit(-1)
def _request(self, verb, payloads, body_params=None, params=None):
route = self._build_route(payloads)
if body_params is not None:
body_params = json.dumps(body_params)
try:
return requests.request(verb, route, data=body_params, params=params, auth=CBWAuth(self.api_key, self.secret_key), verify=self.verify_ssl)
except (ConnectionError, ProxyError, SSLError, NewConnectionError, RetryError, InvalidHeader, MaxRetryError):
self.logger.exception("An error occurred when requesting {}".format(route))
sys.exit(-1)
except MissingSchema:
self.logger.error("An error occurred, please check your API_URL.")
sys.exit(-1)
def _get_pages(self, verb, route, params):
"""Get one or more pages for a method using api v3 pagination"""
response_list = []
if params is None:
params = {}
params["per_page"] = 100
if "per_page" not in params:
params["per_page"] = 100
if "page" in params:
response = self._request(verb, route, params)
if not response.ok:
logging.error("Error::{}".format(response.text))
return None
return self._cbw_parser(response)
response = self._request(verb, route, params)
if not response.ok:
logging.error("Error::{}".format(response.text))
return None
response_list.extend(self._cbw_parser(response))
while "next" in response.links:
next_url = urlparse(response.links["next"]["url"])
params["page"] = parse_qs(next_url.query)["page"][0]
response = self._request(verb, route, params)
response_list.extend(self._cbw_parser(response))
return response_list
def servers(self, params=None):
"""GET request to /api/v3/servers to get all servers"""
response = self._get_pages("GET", [ROUTE_SERVERS], params)
return response
class CBWAuth(AuthBase):
"""Used to make the authentication for the API requests"""
def __init__(self, api_key, secret_key):
self.api_key = api_key
self.secret_key = secret_key
self.raw_data = ""
self.hash_data = ""
self.type_data = ""
def __call__(self, request):
self._encode(request)
return request
def _encode(self, request):
timestamp = datetime.datetime.utcnow().strftime("%a, %d %b %Y %H:%M:%S GMT")
# get data
self.raw_data = request.body if request.body else ""
self.type_data = JSON_CONTENT_TYPE if self.raw_data else ""
# add headers
if self.raw_data:
self._add_content_type(request)
request.headers[TIMESTAMP_HTTP_HEADER] = timestamp
self._add_signature(request, timestamp)
def _add_signature(self, request, timestamp):
method = request.method
path = request.path_url
signature = self._sign(method, timestamp, path).decode("utf-8")
request.headers[SIGNATURE_HTTP_HEADER] = "{0} {1}:{2}".format(SIGNATURE_HEADER, self.api_key, signature)
def _add_content_type(self, request):
request.headers[CONTENT_TYPE_HEADER] = self.type_data
def _sign(self, method, timestamp, path):
# Build the message to sign
message = ",".join([method, self.type_data, self.hash_data, path, timestamp])
# Create the signature
digest = hmac.new(bytes(self.secret_key, "utf8"), bytes(message, "utf8"), sha256).digest()
return base64.b64encode(digest).strip()
def list_to_dict(newkey=None, listconvert=None):
newdict = {}
for p in listconvert:
newdict[p[newkey]] = p
return newdict
80a99d6468013de9f7445c0f40a6e4539ae2bd31b5fde69e9786f0daf3fef83c : .gitignore
38d056ab130f7bf7c481c12636a4e9959de36561d3dfcbe54c6e3571bc0c1dc3 : WAPT/certificate.crt
a67a4b828f9fbf4b816b2f6b86c0fe04d12319e9f36f34dc5971aa478a8371a4 : WAPT/control
84c8d943064b7613cd3ed456ddd81ab07c487931ae81fbbe029c670255d379d2 : WAPT/icon.png
e5cb0662e2b27d07eec7a643f4cdaffef8d6dcf15164d9d121826cb12f7623f1 : cyberwatch_api.ini
f772f8e5e534f019c13d7e3d02344531ccc405fa4f6f462a4462977c84ffdb96 : luti.json
d5f63f1085e2bfef1fcdfd99572be416f4391e5db1037df9c82249763506aeaf : setup.py
cb302f842cf1695f553389a436898f9e580d7d866a6726789dca1994a55e9c3c : wapt_api.ini