pyoaev.helpers

Module Contents

pyoaev.helpers.TRUTHY: List[str] = ['yes', 'true', 'True']
pyoaev.helpers.FALSY: List[str] = ['no', 'false', 'False']
pyoaev.helpers.data_to_temp_file(data)
pyoaev.helpers.is_memory_certificate(certificate)
pyoaev.helpers.ssl_cert_chain(ssl_context, cert_data, key_data, passphrase)
pyoaev.helpers.ssl_verify_locations(ssl_context, certdata)
pyoaev.helpers.create_mq_ssl_context(config) ssl.SSLContext
class pyoaev.helpers.ListenQueue(config: Dict, injector_config, logger, callback)

Bases: threading.Thread

A class that represents a thread of control.

This class can be safely subclassed in a limited fashion. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run() method in a subclass.

pika_credentials = None
pika_parameters = None
pika_connection = None
channel = None
callback
config
logger
host
vhost
use_ssl
port
user
password
queue_name
exit_event
thread = None
_process_message(channel, method, properties, body) None

process a message from the rabbit queue

Parameters:
  • channel (callable) – channel instance

  • method (callable) – message methods

  • properties (str) – unused

  • body (str or bytes or bytearray) – message body (data)

_data_handler(json_data) None
run() None

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

stop()
class pyoaev.helpers.PingAlive(api, config, logger, ping_type)

Bases: pyoaev.utils.PingAlive

A class that represents a thread of control.

This class can be safely subclassed in a limited fashion. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run() method in a subclass.

class pyoaev.helpers.OpenAEVConfigHelper(base_path, variables: Dict | None, config_obj: pyoaev.configuration.Configuration)
static from_configuration_object(config: pyoaev.configuration.Configuration)
get_config_obj() pyoaev.configuration.Configuration
get_conf(variable, is_number=None, default=None, required=None)
to_configuration()
class pyoaev.helpers.OpenAEVCollectorHelper(config: OpenAEVConfigHelper, icon, collector_type=None, security_platform_type=None, connect_run_and_terminate: bool = False)
__daemon
logger_class
collector_logger
api
config_helper
config
schedule(message_callback, delay)
class pyoaev.helpers.OpenAEVInjectorHelper(config: OpenAEVConfigHelper, icon)
api
config
logger_class
injector_logger
injector_config
connect_run_and_terminate = False
scheduler
listen_queue = None
listen(message_callback: Callable[[Dict], None]) None
class pyoaev.helpers.OpenAEVDetectionHelper(logger, relevant_signatures_types)
logger
relevant_signatures_types
match_alert_element_fuzzy(signature_value, alert_values, fuzzy_scoring)
match_alert_elements(signatures, alert_data)
_match_alert_elements_original(signatures, alert_data)
_match_alert_elements_for_command_line(signatures, alert_data)
_decode_value(signature_value)
pyoaev.helpers._is_base64_encoded(str_maybe_base64)