Edit File: malware_response.py
""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>. Copyright © 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see <https://www.imunify360.com/legal/eula> """ import asyncio import json import logging import os from dataclasses import dataclass from typing import Any, AsyncGenerator, Iterable, List from urllib.parse import quote_from_bytes, urljoin from urllib.request import Request, urlopen from defence360agent import utils from defence360agent.api import inactivity from defence360agent.contracts.config import Core from defence360agent.contracts.config import Malware as Config from defence360agent.contracts.license import LicenseCLN, LicenseError from defence360agent.internals.iaid import ( IAIDTokenError, IndependentAgentIDAPI, ) from imav.contracts.config import MalwareTune logger = logging.getLogger(__name__) if utils.OsReleaseInfo.id_like() & utils.OsReleaseInfo.DEBIAN: _CURL = "/opt/alt/curlssl/usr/bin/curl" else: _CURL = "/opt/alt/curlssl11/usr/bin/curl" _API_BASE_URL = os.environ.get("I360_MRS_API_BASE_URL", Core.API_BASE_URL) _ENDPOINT_UPLOAD = os.environ.get("I360_MRS_ENDPOINT_UPLOAD", "api/v1/upload") _ENDPOINT_CHECK = os.environ.get( "I360_MRS_ENDPOINT_CHECK", "api/v1/check-known-hashes" ) _POST_FILE_TIMEOUT = int( os.environ.get("IMUNIFY360_POST_FILE_TIMEOUT", 60 * 60) # hour ) SUBMIT_TIMEOUT = 5 # seconds FALSE_NEGATIVE = "false_negative" FALSE_POSITIVE = "false_positive" UNKNOWN_REASON = "unknown" DEFAULT_CHUNK_SIZE = 1000 class UploadFailure(Exception): """Base for upload failures""" class ClientUploadError(UploadFailure): """A client error during file upload""" class ConnectionError(ClientUploadError): """A connection error during file upload""" class TimeoutError(ClientUploadError): """A timeout during file upload""" class FileTooLargeError(ClientUploadError): """File is too large to be uploaded""" class UploadFileResponseError(ClientUploadError): def __init__(self, response): super().__init__() self.response = response def __str__(self): return f"failed with response: {self.response}" class MalwareHitPath(os.PathLike): """ Wrapper that is used to send a file whose original contents may be located in a different path. """ def __init__(self, content_path: str, real_path: str | None = None): self._content_path = content_path self._real_path = real_path @property def content_path(self): return self._content_path def __eq__(self, other): if isinstance(other, MalwareHitPath): return ( self._content_path == other._content_path and self._real_path == other._real_path ) return self._content_path == other def __str__(self): if self._real_path is not None: return self._real_path return self.content_path def __repr__(self): return self.__str__() def __fspath__(self): return self.__str__() def _token_to_headers(): token = LicenseCLN.get_token() headers = { "I360-Id": token["id"], "I360-Limit": token["limit"], "I360-Status": token["status"], "I360-Token-Expire-Utc": token["token_expire_utc"], "I360-Token-Created-Utc": token["token_created_utc"], "I360-Sign": token["sign"], } headers = {key: str(value) for key, value in headers.items()} return headers async def _post_file( file: str | MalwareHitPath, url, headers=None, timeout=None ): """ Post *file* as multipart/form-data to *url* with given HTTP *headers*. Return server response as bytes (http body). Raise TimeoutError on timeout. Raise ConnectionError if failed to connect to host. Raise ClientUploadError on error. """ if headers is None: headers = {} headers_args = [ b"-H%s: %s" % (header.encode("ascii"), value.encode("latin-1")) for header, value in headers.items() ] content_path = file if isinstance(file, MalwareHitPath): content_path = file.content_path quoted_full_path = quote_from_bytes(os.fsencode(file), safe="").encode( "ascii" ) cmd = ( [os.fsencode(_CURL)] + headers_args + [b"--max-time", str(timeout).encode("ascii")] * (timeout is not None) + [ b"--form", # https://curl.haxx.se/docs/knownbugs.html#multipart_formposts_file_name_en b'file=@"%s";filename="%s"' % ( # escape backslash, double quotes os.fsencode(content_path) .replace(b"\\", b"\\\\") .replace(b'"', b'\\"'), quoted_full_path, ), b"--fail", # disable progress meter b"--silent", b"--show-error", url.encode("ascii"), ] ) rc, out, err = await utils.run(cmd) if rc != 0: if rc == 28: raise TimeoutError("Upload timed out") elif rc == 26: raise ClientUploadError(file) else: def _safe_text(value): if isinstance(value, bytes): return repr(value)[2:-1] # strip b'' quotes return repr(value) safe_msg = ("Failed to post file to {url}: {err}").format( url=_safe_text(url), err=_safe_text(err), ) if rc == 7: raise ConnectionError(safe_msg) else: raise ClientUploadError(safe_msg) return out async def upload_file( file: str | MalwareHitPath, upload_reason=UNKNOWN_REASON ): """ Upload a file to Malware Response Service. :param file: path to file :param upload_reason: one of 'unknown', 'false_positive', 'false_negative' :return: dict representing json response :raises LicenseError: """ if not LicenseCLN.is_valid(): raise LicenseError( "File uploading to Malware Responce Serivce " "requires a valid license" ) content_path = ( file.content_path if isinstance(file, MalwareHitPath) else file ) file_size = os.path.getsize(content_path) if file_size > Config.MAX_MRS_UPLOAD_FILE: raise FileTooLargeError( "File {} is {} bytes, files larger than {} bytes " "are not allowed.".format( file, file_size, Config.MAX_MRS_UPLOAD_FILE ) ) url = urljoin(_API_BASE_URL, _ENDPOINT_UPLOAD) headers = { **_token_to_headers(), "I360-Upload-Reason": upload_reason, } response_body = await _post_file( file, url, headers, timeout=_POST_FILE_TIMEOUT ) result = json.loads(response_body.decode()) logger.info( "Uploaded file %r to the Malware Response Service with reason: %s." " More" " info:" " https://blog.imunify360.com/malware-protection-powered-by-imunify-cloudav", file, upload_reason, ) if result.get("status") != "ok": raise UploadFileResponseError(result) return result async def notify_after_timeout( future: asyncio.Future, timeout: float ) -> tuple[bool, asyncio.Future]: """ Wait for the future to complete for the specified timeout. Returns (timed_out, shielded_future) where: - timed_out: True if timeout occurred, False if future completed - shielded_future: The shielded future that can be awaited later Does not cancel the future on timeout. """ shielded = asyncio.shield(future) try: await asyncio.wait_for(shielded, timeout) return False, shielded except asyncio.TimeoutError: logger.warning("Upload task timed out. Will continue in background.") shielded.add_done_callback( lambda fut: utils.log_future_errors( fut, logger.warning, "Background upload task failed after timeout", ) ) return True, shielded async def _run_retries_in_background(file, upload_reason, initial_error): """Helper coroutine to run the retry loop in the background.""" logger.warning( "Initial upload failed for %s, reason: %s. Starting retry mechanism in" " background.", file, initial_error, ) inactivity.track.start("mrs-background-upload") try: # Check if file still exists before starting retries content_path = ( file.content_path if isinstance(file, MalwareHitPath) else file ) if not os.path.exists(content_path): logger.info( "File %s no longer exists. Cancelling background upload" " retries.", file, ) return delays = [0.5, 2.5, 6, 15, 40, 100, 200] max_tries = len(delays) + 1 error = initial_error for i, pause in enumerate(delays, start=1): logger.warning( "Background retry attempt %d/%d for file %s. Retrying in %s" " seconds", i, max_tries, file, pause, ) await asyncio.sleep(pause) # Check if file still exists before each retry attempt if not os.path.exists(content_path): logger.info( "File %s was deleted during retry. Cancelling background" " upload.", file, ) return error = await _try_upload( file, raise_errors=False, upload_reason=upload_reason ) if not error: logger.info( "Background upload for %s succeeded on retry.", file ) return if error: logger.error( "Background upload for %s failed after all retries. Final" " error: %s", file, error, ) finally: inactivity.track.stop("mrs-background-upload") async def upload_with_retries( file: str | MalwareHitPath, upload_reason=UNKNOWN_REASON, notify_timeout=None, ): """ :param file: File to upload :param upload_reason: Reason for upload :param notify_timeout: Time in seconds after which to notify as pending but continue upload :raises LicenseError, ClientUploadError, ConnectionError, FileTooLargeError, """ content_path = ( file.content_path if isinstance(file, MalwareHitPath) else file ) if not os.path.exists(content_path): raise ClientUploadError(f"File {content_path} does not exist.") inactivity.track.start("mrs-upload") try: error = await _try_upload( file, raise_errors=False, upload_reason=upload_reason, notify_timeout=notify_timeout, ) except asyncio.CancelledError: raise finally: inactivity.track.stop("mrs-upload") if isinstance(error, ConnectionError): logger.warning(f"Connection error for {file}, retrying.") inactivity.track.start("mrs-upload-retry") try: error = await _try_upload( file, raise_errors=False, upload_reason=upload_reason, notify_timeout=notify_timeout, ) except asyncio.CancelledError: raise finally: inactivity.track.stop("mrs-upload-retry") if not error: return None if isinstance(error, TimeoutError): raise error if isinstance(error, FileTooLargeError): logger.warning("File %s is too big. Will not retry.", file) raise error if isinstance(error, (ConnectionError, ClientUploadError)): loop = asyncio.get_event_loop() loop.create_task( _run_retries_in_background(file, upload_reason, error) ) raise TimeoutError( "Initial connection failed. Retrying in background." ) if error: raise error return None async def _try_upload( file: str | MalwareHitPath, raise_errors, *, upload_reason=UNKNOWN_REASON, notify_timeout=None, ): """Return error instead of raising it unless *raise_errors* is true. :param notify_timeout: If set, returns TimeoutError after this many seconds but continues upload :raises LicenseError: :raises ClientUploadError, TimeoutError, ConnectionError, FileTooLargeError: if raise_errors is True """ try: if notify_timeout is not None: upload_task = asyncio.create_task( upload_file(file, upload_reason=upload_reason) ) timed_out, shielded = await notify_after_timeout( upload_task, notify_timeout ) if timed_out: error = TimeoutError("Upload notification timeout") error.args = ("Upload notification timeout",) return error await shielded else: await upload_file(file, upload_reason=upload_reason) except ( ClientUploadError, ConnectionError, FileTooLargeError, TimeoutError, ) as e: logger.debug("Failed to upload file %s. Error: %s", file, e) if raise_errors: raise return e return None @dataclass class HitInfo: file: str hash: str async def check_known_hashes( loop: asyncio.AbstractEventLoop, hashes: Iterable[str], upload_reason=UNKNOWN_REASON, chunk_size=DEFAULT_CHUNK_SIZE, ) -> AsyncGenerator[List[str], None]: hashes = list(hashes) # Usually, this condition is true only in unit and rpm-tests if MalwareTune.NO_CHECK_KNOWN_HASHES: logger.error("NO_CHECK_KNOWN_HASHES is enabled, skipping check") yield hashes return try: token = await IndependentAgentIDAPI.get_token() except IAIDTokenError as e: logger.warning("Failed to acquire IAID token: %s", e) return url = urljoin(_API_BASE_URL, _ENDPOINT_CHECK) headers = { "X-Auth": token, "I360-Upload-Reason": upload_reason, "Content-Type": "application/json", } chunks = utils.split_for_chunk(hashes, chunk_size) with utils.timeit("Check known hashes", log=logger.info): for chunk in chunks: request = {"hashes": chunk} with utils.timeit(f"Check {len(chunk)} hashes", log=logger.info): try: result = await _do_request( loop, Request( url, data=json.dumps(request).encode(), headers=headers, method="POST", ), ) yield result["unknown_hashes"] except Exception as e: logger.warning("Failed to check known hashes: %s", e) @utils.retry_on( Exception, on_error=utils.backoff_sleep, timeout=utils.HTTP_REQUEST_RETRY_TIMEOUT, ) async def _do_request( loop: asyncio.AbstractEventLoop, request: Request ) -> dict[str, Any]: return await loop.run_in_executor( None, _do_request_sync, request, ) def _do_request_sync(request: Request) -> dict[str, Any]: logger.info("Requesting %s", request.full_url) with urlopen(request, timeout=Core.DEFAULT_SOCKET_TIMEOUT) as response: if response.status != 200: logger.warning("HTTP response status code is %s", response.status) raise Exception("status code is {}".format(response.status)) return json.loads(response.read().decode())