Skip to content

Modules

Agent Modules

create_agent

Create and return a configured Agent.

Initializes a strands Agent with the provided BedrockModel, system prompt, and tools.

Parameters: - model: BedrockModel powering the Agent. - system_prompt: Instructional prompt guiding the Agent. - tools: Optional list of tool names, specs, or tool objects to register.

Returns: - Agent: The initialized Agent instance.

Raises: - ValueError: If model or system_prompt is not provided.

Example:

agent = create_agent(
    model=bedrock_model,
    system_prompt=SYSTEM_PROMPT,
)
Source code in src/doc_redaction/agent.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def create_agent(
    system_prompt: str,
    model: BedrockModel | OllamaModel | None = bedrock_model,
    tools: list[str | dict[str, str] | Any] | None = None,
) -> Agent:
    """
    Create and return a configured Agent.

    Initializes a strands Agent with the provided BedrockModel, system prompt, and tools.

    Parameters:
    - model: BedrockModel powering the Agent.
    - system_prompt: Instructional prompt guiding the Agent.
    - tools: Optional list of tool names, specs, or tool objects to register.

    Returns:
    - Agent: The initialized Agent instance.

    Raises:
    - ValueError: If model or system_prompt is not provided.

    Example:
    ```python
    agent = create_agent(
        model=bedrock_model,
        system_prompt=SYSTEM_PROMPT,
    )
    ```
    """
    if not model:
        model = bedrock_model
    if not system_prompt:
        raise MissingArgumentError("system_prompt")

    agent: Agent = Agent(
        model=model,
        system_prompt=system_prompt,
        tools=tools,
    )

    logger.info("Agent created successfully.")

    return agent

Agentic Workflow Modules

run_doc_processing_wf

Source code in src/doc_redaction/workflow.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def run_doc_processing_wf(key: str = "spielbank_rocketbase_vertrag"):
    if not isinstance(key, str) or not key:
        raise InvalidDocumentKeyError()

    DOC_KEY: str = key

    # Step 1: Convert input contract from PDF to markdwon format
    multimodal_agent: Agent = create_agent(
        system_prompt=MULTIMODAL_SYSTEM_PROMPT,
        tools=[file_read, file_write],
    )

    CONVERT_IN: str = f"{DIR}{PREFIX['contract']}{DOC_KEY}{FORMAT['pdf']}"
    CONVERT_OUT: str = f"{DIR}{PREFIX['markdown']}{DOC_KEY}{FORMAT['md']}"
    CONVERT_USER_PROMPT: str = f"""
    Convert the following document to markdown: {CONVERT_IN}. Save the result to {CONVERT_OUT}.
    """

    convert_result = multimodal_agent(CONVERT_USER_PROMPT)
    logger.info(f"Conversion stopping reason: {convert_result.stop_reason}")
    logger.info(f"Conversion accumulated usage: {convert_result.metrics.accumulated_usage}")
    logger.info(f"Saved conversion result in {CONVERT_OUT}")

    # Step 2: Detect sensitve information
    DETECT_OUT: str = f"{DIR}{PREFIX['confidential']}{DOC_KEY}{FORMAT['json']}"
    DETECT_USER_PROMPT: str = f"""
    Analyze the following document: {convert_result.metrics.tool_metrics["file_write"].tool["input"]["content"]}.
    Detect sensitive data. Save the result to {DETECT_OUT}.

    IMPORTANT:
    - Only include information you find in the text.
    - Do not add any information on your own.
    - If you do not find any information for a specific field, return an empty list for that field.
    """

    detector_agent: Agent = create_agent(
        system_prompt=DETECTION_SYSTEM_PROMPT,
        tools=[detect_sensitive_data, omit_empty_keys, file_write],
    )

    detector_result = detector_agent.structured_output(
        output_model=SensitiveData,
        prompt=DETECT_USER_PROMPT,
    ).model_dump_json(indent=2)

    save_as_json(data=detector_result, filename=DETECT_OUT)
    logger.info(f"Saved detection result in {DETECT_OUT}")

    # Step 3: Redact sensitive information
    REDACT_OUT: str = f"{DIR}{PREFIX['redact']}{DOC_KEY}{FORMAT['md']}"

    REDACTED_USER_PROMPT: str = f"""
    Analyze the following document: {convert_result.metrics.tool_metrics["file_write"].tool["input"]["content"]}.
    Redact all information provided in {detector_result} except for the document_analysis field.
    Save the result to {REDACT_OUT}.
    """

    redact_agent: Agent = create_agent(
        system_prompt=REDACTED_SYSTEM_PROMPT,
        tools=[file_write, redact_sensitive_data],
    )

    res_redacted = redact_agent(REDACTED_USER_PROMPT)
    logger.info(f"Conversion stopping reason: {res_redacted.stop_reason}")
    logger.info(f"Conversion accumulated usage: {res_redacted.metrics.accumulated_usage}")
    logger.info(f"Saved redaction result in {REDACT_OUT}")

    return res_redacted

Output Modules

SensitiveData

Bases: BaseModel

Represents sensitive data detected in a document.

Source code in src/doc_redaction/output.py
68
69
70
71
72
73
74
75
76
class SensitiveData(BaseModel):
    """Represents sensitive data detected in a document."""

    document_analysis: DocumentAnalysis = Field(..., description="Metadata about the analyzed document.")
    parties: list[Party] = Field(..., description="Information about the parties involved in the contract.")
    representative: list[Representative] = Field(..., description="Information about the representatives of the parties.")
    contract_terms: ContractTerms = Field(..., description="Key terms and conditions of the contract.")
    risk_assessment: RiskAssessment = Field(..., description="Risk assessment of the contract.")
    data_protection_compliance: DataProtectionCompliance = Field(..., description="Data protection compliance details.")

Tool Modules

detect_sensitive_data

Tool for detecting sensitive data in markdown documents.

detect_sensitive_data(markdown_content)

Detects and extracts sensitive information from markdown documents.

Source code in src/doc_redaction/tool/detect_sensitive_data.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
@tool
def detect_sensitive_data(markdown_content: str) -> dict[str, list[str]]:
    """Detects and extracts sensitive information from markdown documents."""

    text = remove_markdown_formatting(markdown_content)
    results: dict[str, list[str]] = {}

    # Mapping of result key -> regex list or single regex
    pattern_mapping = {
        "email_addresses": [EMAIL_RE],
        "phone_numbers": PHONE_REGEXES,
        "credit_card_numbers": [CC_RE],
        "iban_numbers": [IBAN_RE],
        "account_numbers": [ACCOUNT_RE],
        "addresses": ADDRESS_REGEXES,
        "people_names": [NAME_RE],
        "currency_amounts": CURRENCY_REGEXES,
        "percentages": PERCENTAGE_REGEXES,
        "numbers": NUMBER_REGEXES,
    }

    for key, regexes in pattern_mapping.items():
        matches = set()
        for regex in regexes:
            for match in regex.findall(text):
                # Additional filters for some types
                if key == "phone_numbers" and len(re.sub(r"[^\d]", "", match)) < 7:
                    continue
                if key == "credit_card_numbers":
                    digits = re.sub(r"[^\d]", "", match)
                    if not (13 <= len(digits) <= 19):
                        continue
                if key == "people_names" and match in COMMON_NON_NAMES:
                    continue
                matches.add(match)
        if matches:
            results[key] = list(matches)

    # Add German & English number words
    numbers_set = set(results.get("numbers", []))
    for word in text.lower().split():
        clean_word = word.strip(".,;:!?")
        if clean_word in GERMAN_NUMBER_WORDS or clean_word in ENGLISH_NUMBER_WORDS:
            numbers_set.add(clean_word)
    if numbers_set:
        results["numbers"] = list(numbers_set)

    return results

remove_markdown_formatting(markdown_text)

Remove markdown formatting for cleaner analysis.

Source code in src/doc_redaction/tool/detect_sensitive_data.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
def remove_markdown_formatting(markdown_text: str) -> str:
    """Remove markdown formatting for cleaner analysis."""
    patterns = [
        (r"^#{1,6}\s+", ""),  # headers
        (r"\*\*(.+?)\*\*", r"\1"),  # bold
        (r"\*(.+?)\*", r"\1"),  # italic
        (r"__(.+?)__", r"\1"),  # underline
        (r"_(.+?)_", r"\1"),
        (r"`(.+?)`", r"\1"),  # inline code
        (r"```.*?```", "", re.DOTALL),  # code blocks
        (r"\[(.+?)\]\(.+?\)", r"\1"),  # links
        (r"!\[.*?\]\(.+?\)", ""),  # images
        (r"^---+$", "", re.MULTILINE),  # hr
        (r"^\s*[-*+]\s+", "", re.MULTILINE),  # unordered lists
        (r"^\s*\d+\.\s+", "", re.MULTILINE),  # ordered lists
        (r"^>\s+", "", re.MULTILINE),  # blockquotes
    ]
    text = markdown_text

    for pat in patterns:
        text = (
            re.sub(pat[0], pat[1], text)  # if len(pat) == 2 else re.sub(pat[0], pat[1], text)  # flags=pat[2]
        )

    text = re.sub(r"\n\s*\n", "\n\n", text)  # normalize spacing
    return re.sub(r"[ \t]+", " ", text).strip()

redact_sensitive_data

Tool for redacting sensitive information from markdown documents.

apply_redactions(content, rules, redaction_symbol, preserve_structure)

Apply redaction rules to the content based on user specifications.

Source code in src/doc_redaction/tool/redact_sensitive_data.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def apply_redactions(content: str, rules: str, redaction_symbol: str, preserve_structure: bool) -> str:
    """
    Apply redaction rules to the content based on user specifications.
    """
    rules_lower = rules.lower()

    # Common patterns
    patterns = {
        "email": (
            [r"email", r"e-mail", "@"],
            r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
        ),
        "phone": (
            [r"phone", r"telephone", r"number"],
            r"(\+?1[-.\s]?)?\(?([0-9]{3})\)?[-.\s]?([0-9]{3})[-.\s]?([0-9]{4})",
        ),
        "ssn": (
            [r"ssn", r"social security", r"social"],
            r"\b\d{3}-?\d{2}-?\d{4}\b",
        ),
        "credit_card": (
            [r"credit card", r"card number", r"credit"],
            r"\b(?:\d{4}[-\s]?){3}\d{4}\b",
        ),
        "zip_code": (
            [r"zip code", r"postal code", r"zip"],
            r"\b\d{5}(?:-\d{4})?\b",
        ),
        "ip_address": (
            [r"ip address", r"ip"],
            r"\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b",
        ),
        "url": (
            [r"url", r"link", r"website"],
            r'https?://[^\s<>"{}|\\^`\[\]]+',
        ),
        "date": (
            [r"date", r"birthday", r"birth"],
            r"\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b|\b\d{4}[/-]\d{1,2}[/-]\d{1,2}\b",
        ),
        "name": (
            [r"name", r"person", r"individual"],
            r"\b[A-Z][a-z]+ [A-Z][a-z]+\b",
        ),
        "address": (
            [r"address", r"street", r"location"],
            r"\d+\s+[A-Za-z\s]+(?:Street|St|Avenue|Ave|Road|Rd|Drive|Dr|Lane|Ln|Boulevard|Blvd)",
        ),
    }

    def should_apply(keywords: list[str]) -> bool:
        return any(term in rules_lower for term in keywords)

    redacted_content = content
    for _, (keywords, pattern) in patterns.items():
        if should_apply(keywords):
            redacted_content = redact_pattern(redacted_content, pattern, redaction_symbol, preserve_structure)

    # Handle custom terms
    for term in extract_custom_terms(rules):
        if term and len(term) > 2:
            pattern = re.escape(term)
            redacted_content = redact_pattern(redacted_content, pattern, redaction_symbol, preserve_structure, case_insensitive=True)

    return redacted_content

extract_custom_terms(rules)

Extract potential custom terms to redact from the rules text. This is a simple approach - could be enhanced with NLP.

Source code in src/doc_redaction/tool/redact_sensitive_data.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
def extract_custom_terms(rules: str) -> list[str]:
    """
    Extract potential custom terms to redact from the rules text.
    This is a simple approach - could be enhanced with NLP.
    """
    # Look for quoted terms or terms after "redact"
    custom_terms = []

    # Extract quoted terms
    quoted_terms = re.findall(r"['\"]([^'\"]+)['\"]", rules)
    custom_terms.extend(quoted_terms)

    # Extract terms after "redact" or "remove"
    redact_terms = re.findall(r"(?:redact|remove|hide)\s+(?:all\s+)?([a-zA-Z\s]+?)(?:\s+(?:and|or|from)|$)", rules, re.IGNORECASE)
    for term in redact_terms:
        term = term.strip()
        if term and not any(common in term.lower() for common in ["email", "phone", "number", "address", "name", "ssn", "credit", "card"]):
            custom_terms.append(term)

    return custom_terms

redact_pattern(content, pattern, redaction_symbol, preserve_structure, case_insensitive=False)

Redact matches of a specific pattern in the content.

Source code in src/doc_redaction/tool/redact_sensitive_data.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def redact_pattern(content: str, pattern: str, redaction_symbol: str, preserve_structure: bool, case_insensitive: bool = False) -> str:
    """
    Redact matches of a specific pattern in the content.
    """
    flags = re.IGNORECASE if case_insensitive else 0

    def replace_match(match):
        matched_text = match.group(0)
        if preserve_structure:
            # Replace each character with a redaction character, preserving spaces and structure
            redaction_char = "█"
            return re.sub(r"\S", redaction_char, matched_text)
        else:
            return redaction_symbol

    return re.sub(pattern, replace_match, content, flags=flags)

redact_sensitive_data(tool, **kwargs)

Redact sensitive information from markdown documents based on user specifications.

Parameters:

Name Type Description Default
tool ToolUse

The tool use object containing tool execution details

required
**kwargs Any

Additional arguments passed to the tool

{}

Returns:

Name Type Description
ToolResult ToolResult

The redacted markdown document

Source code in src/doc_redaction/tool/redact_sensitive_data.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def redact_sensitive_data(tool: ToolUse, **kwargs: Any) -> ToolResult:
    """
    Redact sensitive information from markdown documents based on user specifications.

    Args:
        tool: The tool use object containing tool execution details
        **kwargs: Additional arguments passed to the tool

    Returns:
        ToolResult: The redacted markdown document
    """
    try:
        # Extract parameters
        markdown_content = tool.get("input", {}).get("markdown_content", "")
        redaction_rules = tool.get("input", {}).get("redaction_rules", "")
        redaction_symbol = tool.get("input", {}).get("redaction_symbol", "[REDACTED]")
        preserve_structure = tool.get("input", {}).get("preserve_structure", False)

        if not markdown_content:
            return {
                "toolUseId": tool["toolUseId"],
                "status": "error",
                "content": [{"text": "Error: No markdown content provided"}],
            }

        if not redaction_rules:
            return {
                "toolUseId": tool["toolUseId"],
                "status": "error",
                "content": [{"text": "Error: No redaction rules specified"}],
            }

        # Parse redaction rules and apply redactions
        redacted_content = apply_redactions(markdown_content, redaction_rules, redaction_symbol, preserve_structure)

        return {
            "toolUseId": tool["toolUseId"],
            "status": "success",
            "content": [{"text": f"Successfully redacted sensitive information based on rules: '{redaction_rules}'\n\nRedacted markdown document:\n\n{redacted_content}"}],
        }

    except Exception as e:
        return {
            "toolUseId": tool["toolUseId"],
            "status": "error",
            "content": [{"text": f"Error processing redaction: {e!s}"}],
        }

Data Storage

Write a JSON-formatted string to a file and log the operation.

Parameters:

Name Type Description Default
data str

JSON string content to write.

required
filename str

Destination file path. Existing content will be overwritten.

required

Returns:

Type Description
None

None

Raises:

Type Description
OSError

If the file cannot be opened or written.

Logs

INFO: On successful save with the target file path.

Example

save_as_json(res, "data/confidential/rocketbase_aws_agreement_sensitive_structures_v4.json")

Source code in src/doc_redaction/tool/store_data.py
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@tool
def save_as_json(data: str, filename: str) -> None:
    """Write a JSON-formatted string to a file and log the operation.

    Parameters:
        data: JSON string content to write.
        filename: Destination file path. Existing content will be overwritten.

    Returns:
        None

    Raises:
        OSError: If the file cannot be opened or written.

    Logs:
        INFO: On successful save with the target file path.

    Example:
        save_as_json(res, "data/confidential/rocketbase_aws_agreement_sensitive_structures_v4.json")
    """
    with open(filename, "w") as f:
        f.write(data)
    logger.info(f"Saved structured output to {filename}")

Other Tools

Parse a JSON object string and return only the items with non-empty values.

Parameters:

Name Type Description Default
s str

A JSON-encoded object mapping string keys to lists of strings.

required

Returns:

Type Description
dict[str, list[str]]

A dict[str, list[str]] containing only entries whose values are truthy (e.g., non-empty lists).

Raises:

Type Description
JSONDecodeError

If the input is not valid JSON.

Source code in src/doc_redaction/tool/tool_utils.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
@tool
def omit_empty_keys(s: str) -> dict[str, list[str]]:
    """
    Parse a JSON object string and return only the items with non-empty values.

    Args:
        s: A JSON-encoded object mapping string keys to lists of strings.

    Returns:
        A dict[str, list[str]] containing only entries whose values are truthy (e.g., non-empty lists).

    Raises:
        json.JSONDecodeError: If the input is not valid JSON.
    """
    return {key: value for key, value in json.loads(s).items() if value}

Utility Modules

This section includes common utility functions.

save_as_json

Write a JSON-formatted string to a file and log the operation.

Parameters:

Name Type Description Default
data str

JSON string content to write.

required
filename str

Destination file path. Existing content will be overwritten.

required

Returns:

Type Description
None

None

Raises:

Type Description
OSError

If the file cannot be opened or written.

Logs

INFO: On successful save with the target file path.

Example

save_as_json(res, "data/confidential/rocketbase_aws_agreement_sensitive_structures.json")

Source code in src/doc_redaction/utils.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def save_as_json(data: str, filename: str) -> None:
    """Write a JSON-formatted string to a file and log the operation.

    Parameters:
        data: JSON string content to write.
        filename: Destination file path. Existing content will be overwritten.

    Returns:
        None

    Raises:
        OSError: If the file cannot be opened or written.

    Logs:
        INFO: On successful save with the target file path.

    Example:
        save_as_json(res, "data/confidential/rocketbase_aws_agreement_sensitive_structures.json")
    """
    with open(filename, "w") as f:
        f.write(data)
    logger.info(f"Saved structured output to {filename}")

get_pdf_page_count

Return the number of pages in a PDF file.

Parameters:

Name Type Description Default
file_path str

Path to the local PDF file.

required

Returns:

Name Type Description
int int

Total number of pages in the PDF.

Raises:

Type Description
PDFProcessingError

If the file cannot be opened, is not a valid PDF, or the page count cannot be determined.

Example

page_count = get_pdf_page_count("path/to/file.pdf") print(f"The document has {page_count} pages.")

Source code in src/doc_redaction/utils.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def get_pdf_page_count(file_path: str) -> int:
    """
    Return the number of pages in a PDF file.

    Parameters:
        file_path (str): Path to the local PDF file.

    Returns:
        int: Total number of pages in the PDF.

    Raises:
        PDFProcessingError: If the file cannot be opened, is not a valid PDF, or the page count cannot be determined.

    Example:
        page_count = get_pdf_page_count("path/to/file.pdf")
        print(f"The document has {page_count} pages.")
    """
    try:
        with open(file_path, "rb") as file:
            pdf_reader = PyPDF2.PdfReader(file)
            return len(pdf_reader.pages)
    except Exception as e:
        raise PDFProcessingError(file_path, e) from e

get_file_size

Return the size of a file in bytes.

Parameters:
    file_path (str): Path to the file.

Returns:
    int: File size in bytes.

Raises:
    FileNotFoundError: If the file does not exist.
    OSError: If the size cannot be retrieved due to an OS-related error.

Example:
    size = get_file_size("/path/to/file.txt")
    print(f"File size: {size} bytes")
Source code in src/doc_redaction/utils.py
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def get_file_size(file_path: str) -> int:
    """
    Return the size of a file in bytes.

        Parameters:
            file_path (str): Path to the file.

        Returns:
            int: File size in bytes.

        Raises:
            FileNotFoundError: If the file does not exist.
            OSError: If the size cannot be retrieved due to an OS-related error.

        Example:
            size = get_file_size("/path/to/file.txt")
            print(f"File size: {size} bytes")
    """
    return os.path.getsize(file_path)

InvalidDocumentKeyError

Bases: ValueError

Raised when the provided document key is missing or invalid.

Source code in src/doc_redaction/utils.py
14
15
16
17
18
class InvalidDocumentKeyError(ValueError):
    """Raised when the provided document key is missing or invalid."""

    def __init__(self) -> None:
        super().__init__("A document key must be provided as a non-empty string.")

MissingArgumentError

Bases: ValueError

Raised when a required argument is missing.

Source code in src/doc_redaction/utils.py
 7
 8
 9
10
11
class MissingArgumentError(ValueError):
    """Raised when a required argument is missing."""

    def __init__(self, argument_name: str) -> None:
        super().__init__(f"{argument_name} must be provided")

PDFProcessingError

Bases: Exception

Raised when PDF processing fails.

Source code in src/doc_redaction/utils.py
21
22
23
24
25
26
class PDFProcessingError(Exception):
    """Raised when PDF processing fails."""

    def __init__(self, file_path: str, e: Exception) -> None:
        super().__init__(f"Could not determine page count for {file_path}: {e}")
        self.file_path = file_path