Detection Modules
Customize underlying rules, load custom YARA rules or set static language lists.
deconvolute.scanners
deconvolute.scanners.content.signature
deconvolute.scanners.content.signature.engine
SignatureScanner Objects
class SignatureScanner(BaseScanner)Scans for known threats, adversarial patterns, and PII using signatures.
This scanner performs high-performance pattern matching against text content. It is designed to be the primary defense layer for scanning document ingestion pipelines (RAG) or validating raw user inputs.
By default, it uses the SDK's bundled security baselines in 'rules/', which covers common jailbreaks and injection attempts. Users can override this by providing a path to their own custom YARA rules file.
Attributes:
local_rules_pathPath - The file path to the compiled or source YARA rules._rulesyara.Rules - The compiled YARA rules object (C-extension)._executorThreadPoolExecutor - Thread pool for offloading blocking match operations.
__init__
def __init__(rules_path: str | Path | None = None)Initialize the SignatureScanner with a specific rule set.
The scanner compiles the rules immediately upon initialization. This is a blocking operation designed to fail fast if the rule file is missing or malformed.
Arguments:
rules_path- Optional path to a file (.yar) OR a directory of files. If None, loads the SDK's internal 'rules/' directory.
Raises:
ConfigurationError- If the rule file does not exist or contains syntax errors that prevent compilation.
check
def check(content: str, **kwargs: Any) -> SecurityResultSynchronously scans the provided content against the loaded signature rules.
This method performs a blocking scan. While underlying YARA is highly optimized, scanning very large documents may block the execution thread briefly.
Arguments:
content- The string content to scan.**kwargs- Additional arguments (unused, kept for interface compatibility).
Returns:
SecurityResult- A structured result containing:- status (SecurityStatus): UNSAFE if any rule matched, else SAFE.
- metadata (dict): specific matches, tags, and match count.
a_check
async def a_check(content: str, **kwargs: Any) -> SecurityResultAsync version.
deconvolute.scanners.content
deconvolute.scanners.content.language.models
LanguageSecurityResult Objects
class LanguageSecurityResult(SecurityResult)Result model for Language Consistency Checks.
deconvolute.scanners.content.language
deconvolute.scanners.content.language.engine
LanguageScanner Objects
class LanguageScanner(BaseScanner)Scans for language inconsistencies to prevent Payload Splitting attacks.
This scanner can verify if the output language matches:
- A specific allow-list (Policy Mode).
- The language of the input prompt (Correspondence Mode).
Requires 'lingua-language-detector' installed. https://github.com/pemistahl/lingua-py
__init__
def __init__(allowed_languages: list[str] | None = None,
languages_to_load: list[str] | None = None)Arguments:
-
allowed_languages- List of ISO 639-1 codes (e.g. ['en', 'fr']). If None, no static policy is enforced (useful if you only want to check input-output correspondence). -
languages_to_load- Performance optimization. List of ISO codes to load into memory.- If None (Default): Loads ALL languages (~100MB). Maximum accuracy.
- If provided: Only loads these models. Much lighter, but cannot detect languages outside this list.
check
def check(content: str, **kwargs: Any) -> LanguageSecurityResultChecks if content matches allowed languages or reference text.
Arguments:
content- The text to analyze (usually LLM output). **kwargs:reference_textstr - If provided, 'content' must match the language of this text (usually User Input).
a_check
async def a_check(content: str, **kwargs: Any) -> LanguageSecurityResultAsync version of check() using a thread pool.
deconvolute.scanners.integrity.canary.models
CanarySecurityResult Objects
class CanarySecurityResult(SecurityResult)Result model specific to the Canary Jailbreak Detection module.
Attributes:
token_foundstr | None - The specific canary token string found in the LLM output, if any.
deconvolute.scanners.integrity.canary
deconvolute.scanners.integrity.canary.generator
generate_raw_token
def generate_raw_token(length: int = 16, prefix: str = "dcv-") -> strGenerates a cryptographically secure random token.
Arguments:
length- The length of the random suffix (default 16).prefix- Static prefix to ensure regex uniqueness (default "dcv-").
Returns:
Token string (e.g. "dcv-8f7a2b91...")
deconvolute.scanners.integrity.canary.engine
CanaryScanner Objects
class CanaryScanner(BaseScanner)Jailbreak Detection via Instructional Adherence Checks (Integrity Canary).
Detects if the System Prompt has been overridden by verifying if the model still follows mandatory output instructions.
Attributes:
token_length- The length of the canary token. Defaults to 16.
inject
def inject(prompt: str) -> tuple[str, str]Injects the integrity check into the prompt.
Arguments:
prompt- The string containing the prompt.
Returns:
tuple[str, str]:
- The prompt with the added Canary Token and surrounding instructions.
- The full token string (to pass to check/clean later).
check
def check(content: str, **kwargs: Any) -> CanarySecurityResultVerifies if the content string contains the mandatory integrity token.
Arguments:
content- The string output from the LLM.**kwargs- Must contain 'token' (the string returned by inject).
Returns:
CanarySecurityResult- The detection result.
Raises:
ValueError- If 'token' is missing from kwargs.
clean
def clean(content: str, token: str) -> strRemoves the integrity token from the content to prevent user confusion.
Arguments:
content- The string containing the token string.token- The full token string.
Returns:
The cleaned response string.
a_check
async def a_check(content: str, **kwargs: Any) -> CanarySecurityResultAsync version of check() using a thread pool.
a_clean
async def a_clean(content: str, token: str) -> strAsync version of clean() using a thread pool.
deconvolute.scanners.integrity
deconvolute.scanners.base
BaseScanner Objects
class BaseScanner(ABC)Abstract Base Class for all security scanners.
check
@abstractmethod
def check(content: str, **kwargs: Any) -> SecurityResultAnalyzes the provided content for threats.
Arguments:
content- The text (prompt or response) to analyze.**kwargs- Additional context.
Returns:
SecurityResult- The assessment of the content.
a_check
@abstractmethod
async def a_check(content: str, **kwargs: Any) -> SecurityResultAsync version of check.