pyoaev.helpers
Module Contents
- pyoaev.helpers.TRUTHY: List[str] = ['yes', 'true', 'True']
- pyoaev.helpers.FALSY: List[str] = ['no', 'false', 'False']
- pyoaev.helpers.data_to_temp_file(data)
- pyoaev.helpers.is_memory_certificate(certificate)
- pyoaev.helpers.ssl_cert_chain(ssl_context, cert_data, key_data, passphrase)
- pyoaev.helpers.ssl_verify_locations(ssl_context, certdata)
- pyoaev.helpers.create_mq_ssl_context(config) ssl.SSLContext
- class pyoaev.helpers.ListenQueue(config: Dict, injector_config, logger, callback)
Bases:
threading.ThreadA class that represents a thread of control.
This class can be safely subclassed in a limited fashion. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run() method in a subclass.
- pika_credentials = None
- pika_parameters = None
- pika_connection = None
- channel = None
- callback
- config
- logger
- host
- vhost
- use_ssl
- port
- user
- password
- queue_name
- exit_event
- thread = None
- _process_message(channel, method, properties, body) None
process a message from the rabbit queue
- Parameters:
channel (callable) – channel instance
method (callable) – message methods
properties (str) – unused
body (str or bytes or bytearray) – message body (data)
- _data_handler(json_data) None
- run() None
Method representing the thread’s activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
- stop()
- class pyoaev.helpers.PingAlive(api, config, logger, ping_type)
Bases:
pyoaev.utils.PingAliveA class that represents a thread of control.
This class can be safely subclassed in a limited fashion. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run() method in a subclass.
- class pyoaev.helpers.OpenAEVConfigHelper(base_path, variables: Dict | None, config_obj: pyoaev.configuration.Configuration)
- static from_configuration_object(config: pyoaev.configuration.Configuration)
- get_config_obj() pyoaev.configuration.Configuration
- get_conf(variable, is_number=None, default=None, required=None)
- to_configuration()
- class pyoaev.helpers.OpenAEVCollectorHelper(config: OpenAEVConfigHelper, icon, collector_type=None, security_platform_type=None, connect_run_and_terminate: bool = False)
- __daemon
- logger_class
- collector_logger
- api
- config_helper
- config
- schedule(message_callback, delay)
- class pyoaev.helpers.OpenAEVInjectorHelper(config: OpenAEVConfigHelper, icon)
- api
- config
- logger_class
- injector_logger
- injector_config
- connect_run_and_terminate = False
- scheduler
- listen_queue = None
- listen(message_callback: Callable[[Dict], None]) None
- class pyoaev.helpers.OpenAEVDetectionHelper(logger, relevant_signatures_types)
- logger
- relevant_signatures_types
- match_alert_element_fuzzy(signature_value, alert_values, fuzzy_scoring)
- match_alert_elements(signatures, alert_data)
- _match_alert_elements_original(signatures, alert_data)
- _match_alert_elements_for_command_line(signatures, alert_data)
- _decode_value(signature_value)
- pyoaev.helpers._is_base64_encoded(str_maybe_base64)