OXIESEC PANEL
- Current Dir:
/
/
opt
/
alt
/
python311
/
lib
/
python3.11
/
site-packages
/
tests
/
unit
Server IP: 2a02:4780:11:1084:0:327f:3464:10
Upload:
Create Dir:
Name
Size
Modified
Perms
📁
..
-
05/08/2024 06:43:38 PM
rwxr-xr-x
📄
__init__.py
212 bytes
05/08/2024 06:43:38 PM
rw-r--r--
📁
__pycache__
-
05/08/2024 06:43:38 PM
rwxr-xr-x
📄
helper.py
371 bytes
05/08/2024 06:43:38 PM
rw-r--r--
📄
test_ExcludeFilter.py
1.17 KB
05/08/2024 06:43:38 PM
rw-r--r--
📄
test_GELFRabbitHandler.py
1.21 KB
05/08/2024 06:43:38 PM
rw-r--r--
📄
test_chunking.py
4.96 KB
05/08/2024 06:43:38 PM
rw-r--r--
📄
test_handler.py
7.72 KB
05/08/2024 06:43:38 PM
rw-r--r--
Editing: test_handler.py
Close
#!/usr/bin/env python # -*- coding: utf-8 -*- """pytests for the formatting and construction of GELF logs by the graypy logging handlers .. note:: These tests mock sending to Graylog and do not require an active graylog instance to operate. """ import datetime import json import logging import socket import sys import zlib import mock import pytest from graypy.handler import BaseGELFHandler, GELFHTTPHandler, GELFTLSHandler from tests.helper import handler, logger, formatted_logger from tests.unit.helper import MOCK_LOG_RECORD, MOCK_LOG_RECORD_NAME UNICODE_REPLACEMENT = u"\ufffd" class TestClass(object): def __repr__(self): return "<TestClass>" @pytest.yield_fixture def mock_send(handler): try: with mock.patch.object(handler, "send") as mock_send: yield mock_send except Exception: with mock.patch.object(handler, "emit") as mock_send: yield mock_send def get_mock_send_arg(mock_send): assert mock_send.call_args_list != [] [[[arg], _]] = mock_send.call_args_list # TODO: this is inaccurate solution for mocking non-send handlers if isinstance(arg, logging.LogRecord): return json.loads( BaseGELFHandler(compress=False).makePickle(arg).decode("utf-8") ) try: return json.loads(zlib.decompress(arg).decode("utf-8")) except zlib.error: # we have a uncompress message try: return json.loads(arg.decode("utf-8")) except Exception: # that is null terminated return json.loads(arg[:-1].decode("utf-8")) @pytest.mark.parametrize( "message,expected", [ (u"\u20AC", u"\u20AC"), (u"\u20AC".encode("utf-8"), u"\u20AC"), (b"\xc3", UNICODE_REPLACEMENT), (["a", b"\xc3"], ["a", UNICODE_REPLACEMENT]), ], ) def test_pack(message, expected): assert expected == json.loads( BaseGELFHandler._pack_gelf_dict(message).decode("utf-8") ) def test_manual_exc_info_handler(logger, mock_send): """Check that a the ``full_message`` traceback info is passed when the ``exc_info=1`` flag is given within a log message""" try: raise SyntaxError("Syntax error") except SyntaxError: logger.error("Failed", exc_info=1) arg = get_mock_send_arg(mock_send) assert "Failed" == arg["short_message"] assert arg["full_message"].startswith("Traceback (most recent call last):") # GELFHTTPHandler mocking does not complete the stacktrace # thus a missing \n assert arg["full_message"].endswith("SyntaxError: Syntax error") or arg[ "full_message" ].endswith("SyntaxError: Syntax error\n") def test_normal_exception_handler(logger, mock_send): try: raise SyntaxError("Syntax error") except SyntaxError: logger.exception("Failed") arg = get_mock_send_arg(mock_send) assert "Failed" == arg["short_message"] assert arg["full_message"].startswith("Traceback (most recent call last):") # GELFHTTPHandler mocking does not complete the stacktrace # thus a missing \n assert arg["full_message"].endswith("SyntaxError: Syntax error") or arg[ "full_message" ].endswith("SyntaxError: Syntax error\n") def test_unicode(logger, mock_send): logger.error(u"Mensaje de registro espa\xf1ol") arg = get_mock_send_arg(mock_send) assert u"Mensaje de registro espa\xf1ol" == arg["short_message"] @pytest.mark.skipif(sys.version_info[0] >= 3, reason="python2 only") def test_broken_unicode_python2(logger, mock_send): # py3 record.getMessage() returns a binary string here # which is safely converted to unicode during the sanitization # process logger.error(b"Broken \xde log message") decoded = get_mock_send_arg(mock_send) assert u"Broken %s log message" % UNICODE_REPLACEMENT == decoded["short_message"] @pytest.mark.skipif(sys.version_info[0] < 3, reason="python3 only") def test_broken_unicode_python3(logger, mock_send): # py3 record.getMessage() returns somewhat broken "b"foo"" if the # message string is not a string, but a binary object: b"foo" logger.error(b"Broken \xde log message") decoded = get_mock_send_arg(mock_send) assert "b'Broken \\xde log message'" == decoded["short_message"] def test_extra_field(logger, mock_send): logger.error("Log message", extra={"foo": "bar"}) decoded = get_mock_send_arg(mock_send) assert "Log message" == decoded["short_message"] assert "bar" == decoded["_foo"] def test_list(logger, mock_send): logger.error("Log message", extra={"foo": ["bar", "baz"]}) decoded = get_mock_send_arg(mock_send) assert "Log message" == decoded["short_message"] assert ["bar", "baz"] == decoded["_foo"] def test_arbitrary_object(logger, mock_send): logger.error("Log message", extra={"foo": TestClass()}) decoded = get_mock_send_arg(mock_send) assert "Log message" == decoded["short_message"] assert "<TestClass>" == decoded["_foo"] def test_message_to_pickle_serializes_datetime_objects_instead_of_blindly_repring_them( logger, mock_send ): timestamp = datetime.datetime(2001, 2, 3, 4, 5, 6, 7) logger.error("Log message", extra={"ts": timestamp}) decoded = get_mock_send_arg(mock_send) assert "datetime.datetime" not in decoded["_ts"] assert timestamp.isoformat() == decoded["_ts"] def test_status_field_issue(logger, mock_send): logger.error("Log message", extra={"status": "OK"}) decoded = get_mock_send_arg(mock_send) assert "Log message" == decoded["short_message"] assert "OK" == decoded["_status"] def test_add_level_name(): gelf_dict = dict() BaseGELFHandler._add_level_names(gelf_dict, MOCK_LOG_RECORD) assert "INFO" == gelf_dict["level_name"] def test_resolve_host(): """Test all posible resolutions of :meth:`BaseGELFHandler._resolve_host`""" assert socket.gethostname() == BaseGELFHandler._resolve_host(False, None) assert socket.getfqdn() == BaseGELFHandler._resolve_host(True, None) assert socket.getfqdn() == BaseGELFHandler._resolve_host(True, "localhost") assert "localhost" == BaseGELFHandler._resolve_host(False, "localhost") assert "" == BaseGELFHandler._resolve_host(False, "") def test_set_custom_facility(): gelf_dict = dict() facility = "test facility" BaseGELFHandler._set_custom_facility(gelf_dict, facility, MOCK_LOG_RECORD) assert MOCK_LOG_RECORD_NAME == gelf_dict["_logger"] assert "test facility" == gelf_dict["facility"] def test_formatted_logger(formatted_logger, mock_send): """Test the ability to set and modify the graypy handler's :class:`logging.Formatter` and have the resultant ``short_message`` be formatted by the set :class:`logging.Formatter`""" for handler in formatted_logger.handlers: if isinstance(handler, GELFHTTPHandler): pytest.skip("formatting not mocked for GELFHTTPHandler") formatted_logger.error("Log message") decoded = get_mock_send_arg(mock_send) assert "ERROR : Log message" == decoded["short_message"] def test_invalid_fqdn_localhost(): """Test constructing :class:`graypy.handler.BaseGELFHandler` with specifying conflicting arguments ``fqdn`` and ``localname``""" with pytest.raises(ValueError): BaseGELFHandler(fqdn=True, localname="localhost") def test_invalid_ca_certs(): """Test constructing :class:`graypy.handler.GELFTLSHandler` with incorrect arguments specifying server ca cert verification""" with pytest.raises(ValueError): GELFTLSHandler("127.0.0.1", validate=True) def test_invalid_client_certs(): """Test constructing :class:`graypy.handler.GELFTLSHandler` with incorrect arguments specifying client cert/key verification""" with pytest.raises(ValueError): # missing client cert GELFTLSHandler("127.0.0.1", keyfile="/dev/null")