pyoaev.helpers ============== .. py:module:: pyoaev.helpers Module Contents --------------- .. py:data:: TRUTHY :type: List[str] :value: ['yes', 'true', 'True'] .. py:data:: FALSY :type: List[str] :value: ['no', 'false', 'False'] .. py:function:: data_to_temp_file(data) .. py:function:: is_memory_certificate(certificate) .. py:function:: ssl_cert_chain(ssl_context, cert_data, key_data, passphrase) .. py:function:: ssl_verify_locations(ssl_context, certdata) .. py:function:: create_mq_ssl_context(config) -> ssl.SSLContext .. py:class:: ListenQueue(config: Dict, injector_config, logger, callback) Bases: :py:obj:`threading.Thread` A class that represents a thread of control. This class can be safely subclassed in a limited fashion. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run() method in a subclass. .. py:attribute:: pika_credentials :value: None .. py:attribute:: pika_parameters :value: None .. py:attribute:: pika_connection :value: None .. py:attribute:: channel :value: None .. py:attribute:: callback .. py:attribute:: config .. py:attribute:: logger .. py:attribute:: host .. py:attribute:: vhost .. py:attribute:: use_ssl .. py:attribute:: port .. py:attribute:: user .. py:attribute:: password .. py:attribute:: queue_name .. py:attribute:: exit_event .. py:attribute:: thread :value: None .. py:method:: _process_message(channel, method, properties, body) -> None process a message from the rabbit queue :param channel: channel instance :type channel: callable :param method: message methods :type method: callable :param properties: unused :type properties: str :param body: message body (data) :type body: str or bytes or bytearray .. py:method:: _data_handler(json_data) -> None .. py:method:: run() -> None Method representing the thread's activity. You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively. .. py:method:: stop() .. py:class:: PingAlive(api, config, logger, ping_type) Bases: :py:obj:`pyoaev.utils.PingAlive` A class that represents a thread of control. This class can be safely subclassed in a limited fashion. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run() method in a subclass. .. py:class:: OpenAEVConfigHelper(base_path, variables: Dict | None, config_obj: pyoaev.configuration.Configuration) .. py:method:: from_configuration_object(config: pyoaev.configuration.Configuration) :staticmethod: .. py:method:: get_config_obj() -> pyoaev.configuration.Configuration .. py:method:: get_conf(variable, is_number=None, default=None, required=None) .. py:method:: to_configuration() .. py:class:: OpenAEVCollectorHelper(config: OpenAEVConfigHelper, icon, collector_type=None, security_platform_type=None, connect_run_and_terminate: bool = False) .. py:attribute:: __daemon .. py:attribute:: logger_class .. py:attribute:: collector_logger .. py:attribute:: api .. py:attribute:: config_helper .. py:attribute:: config .. py:method:: schedule(message_callback, delay) .. py:class:: OpenAEVInjectorHelper(config: OpenAEVConfigHelper, icon) .. py:attribute:: api .. py:attribute:: config .. py:attribute:: logger_class .. py:attribute:: injector_logger .. py:attribute:: injector_config .. py:attribute:: connect_run_and_terminate :value: False .. py:attribute:: scheduler .. py:attribute:: listen_queue :value: None .. py:method:: listen(message_callback: Callable[[Dict], None]) -> None .. py:class:: OpenAEVDetectionHelper(logger, relevant_signatures_types) .. py:attribute:: logger .. py:attribute:: relevant_signatures_types .. py:method:: match_alert_element_fuzzy(signature_value, alert_values, fuzzy_scoring) .. py:method:: match_alert_elements(signatures, alert_data) .. py:method:: _match_alert_elements_original(signatures, alert_data) .. py:method:: _match_alert_elements_for_command_line(signatures, alert_data) .. py:method:: _decode_value(signature_value) .. py:function:: _is_base64_encoded(str_maybe_base64)