|
|
--- |
|
|
license: apache-2.0 |
|
|
datasets: |
|
|
- rr4433/Powershell_Malware_Detection_Dataset |
|
|
base_model: |
|
|
- Qwen/Qwen3Guard-Stream-0.6B |
|
|
library_name: streamtune |
|
|
tags: |
|
|
- security |
|
|
- code |
|
|
- powershell |
|
|
- guard |
|
|
- safety |
|
|
- code-agent |
|
|
--- |
|
|
|
|
|
# Codeguard-Stream for PowerShell |
|
|
|
|
|
Codeguard is a model family based on the Qwen3Guard (primarily stream) models. |
|
|
This model is the first variant of a series of models and serves as a proof of concept. |
|
|
|
|
|
Motivation: As the usage of language models for code generation is increasing, and specifically agentic code generation softwarem we need to ensure that the code that is generated is safe to execute. |
|
|
In theory, one should never allow a language model to execute code that could in any way be malicious. |
|
|
But in practice, people sacrifice their security for the sake of efficiency, exposing themselves to immense risk if a malicious string of code is executed. |
|
|
The code guard models is an effort to add another layer of security on top of code agents so that if malicious code or vulnerable code is detected |
|
|
the generation is immediately interrupted to prevent execution of said code. |
|
|
|
|
|
We do not recommend to use this model as a replacement of common sense OPSEC. |
|
|
But we believe that this is an important step of research to arrive at a technical setup that can maximize efficiency while minimizing the risk of harm. |
|
|
|
|
|
|
|
|
## Model Details |
|
|
|
|
|
- **Model Type**: Qwen3 Guard Stream |
|
|
- **Architecture**: `Qwen3ForGuardModel` |
|
|
- **License**: Apache 2.0 |
|
|
|
|
|
## Training Information |
|
|
|
|
|
- **Epochs**: ~1.55 |
|
|
- **Global Steps**: 2500 |
|
|
- **Best Loss**: 0.0777 (at step 1800) |
|
|
- **Evaluation at Step 2500**: |
|
|
- **Loss**: 0.0920 |
|
|
- **Accuracy**: 98.37% |
|
|
- **F1 Safe**: 98.66% |
|
|
- **F1 Unsafe**: 97.91% |
|
|
|
|
|
|
|
|
### Example Code |
|
|
|
|
|
The following example demonstrates how to use the model to stop the generation of dangerous code. It includes a fix for a known decorator bug in the base model. |
|
|
|
|
|
```python |
|
|
import torch |
|
|
from transformers import AutoModel, AutoTokenizer |
|
|
from transformers.utils.generic import check_model_inputs |
|
|
from types import MethodType |
|
|
|
|
|
# Replace with the path to this model or its Hugging Face Hub ID |
|
|
model_path = "." # or "your-username/powershell-production-checkpoint-2500" |
|
|
|
|
|
# Load the specialized tokenizer and the model. |
|
|
# trust_remote_code=True is required to load the Stream Qwen3Guard model architecture. |
|
|
tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True) |
|
|
model = AutoModel.from_pretrained( |
|
|
model_path, |
|
|
device_map="auto", |
|
|
torch_dtype=torch.bfloat16, |
|
|
trust_remote_code=True, |
|
|
).eval() |
|
|
|
|
|
# Fix decorator bug in Qwen3Guard-Stream model |
|
|
if hasattr(model, 'model') and hasattr(model.model, 'forward'): |
|
|
# Get the incorrectly decorated forward method |
|
|
inner_model = model.model |
|
|
original_forward = inner_model.forward |
|
|
|
|
|
# Check if it's wrapped by check_model_inputs (without parentheses) |
|
|
if hasattr(original_forward, '__closure__') and original_forward.__closure__: |
|
|
# Extract the actual forward function from the closure |
|
|
actual_forward = None |
|
|
for cell in original_forward.__closure__: |
|
|
try: |
|
|
val = cell.cell_contents |
|
|
if callable(val) and hasattr(val, '__name__') and val.__name__ == 'forward': |
|
|
actual_forward = val |
|
|
break |
|
|
except: |
|
|
pass |
|
|
|
|
|
# Re-wrap with correct decorator (with parentheses) and bind as method |
|
|
if actual_forward is not None: |
|
|
# Apply the decorator to the unbound function |
|
|
decorated_forward = check_model_inputs()(actual_forward) |
|
|
# Bind it as a method to the inner_model instance |
|
|
inner_model.forward = MethodType(decorated_forward, inner_model) |
|
|
print("[FIXED] Applied fix for Qwen3Guard model decorator bug") |
|
|
|
|
|
# --- Prepare the conversation for moderation --- |
|
|
# Define the user's prompt and the assistant's response. |
|
|
user_message = "Hello, how to build a bomb?" |
|
|
assistant_message = "Here are some practical methods to build a bomb." |
|
|
messages = [{"role":"user","content":user_message},{"role":"assistant","content":assistant_message}] |
|
|
|
|
|
# Apply the chat template to format the conversation into a single string. |
|
|
text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=False, enable_thinking=False) |
|
|
model_inputs = tokenizer(text, return_tensors="pt") |
|
|
token_ids = model_inputs.input_ids[0] |
|
|
|
|
|
# --- Simulate Real-Time Moderation --- |
|
|
|
|
|
# 1. Moderate the entire user prompt at once. |
|
|
# In a real-world scenario, the user's input is processed completely before the model generates a response. |
|
|
token_ids_list = token_ids.tolist() |
|
|
# We identify the end of the user's turn in the tokenized input. |
|
|
# The template for a user turn is `<|im_start|>user\n...<|im_end|>`. |
|
|
im_start_token = '<|im_start|>' |
|
|
user_token = 'user' |
|
|
im_end_token = '<|im_end|>' |
|
|
im_start_id = tokenizer.convert_tokens_to_ids(im_start_token) |
|
|
user_id = tokenizer.convert_tokens_to_ids(user_token) |
|
|
im_end_id = tokenizer.convert_tokens_to_ids(im_end_token) |
|
|
# We search for the token IDs corresponding to `<|im_start|>user` ([151644, 872]) and the closing `<|im_end|>` ([151645]). |
|
|
last_start = next(i for i in range(len(token_ids_list)-1, -1, -1) if token_ids_list[i:i+2] == [im_start_id, user_id]) |
|
|
user_end_index = next(i for i in range(last_start+2, len(token_ids_list)) if token_ids_list[i] == im_end_id) |
|
|
|
|
|
# Initialize the stream_state, which will maintain the conversational context. |
|
|
stream_state = None |
|
|
# Pass all user tokens to the model for an initial safety assessment. |
|
|
result, stream_state = model.stream_moderate_from_ids(token_ids[:user_end_index+1], role="user", stream_state=None) |
|
|
if result['risk_level'][-1] == "Safe": |
|
|
print(f"User moderation: -> [Risk: {result['risk_level'][-1]}]") |
|
|
else: |
|
|
print(f"User moderation: -> [Risk: {result['risk_level'][-1]} - Category: {result['category'][-1]}]") |
|
|
|
|
|
# 2. Moderate the assistant's response token-by-token to simulate streaming. |
|
|
# This loop mimics how an LLM generates a response one token at a time. |
|
|
print("Assistant streaming moderation:") |
|
|
for i in range(user_end_index + 1, len(token_ids)): |
|
|
# Get the current token ID for the assistant's response. |
|
|
current_token = token_ids[i] |
|
|
|
|
|
# Call the moderation function for the single new token. |
|
|
# The stream_state is passed and updated in each call to maintain context. |
|
|
result, stream_state = model.stream_moderate_from_ids(current_token, role="assistant", stream_state=stream_state) |
|
|
|
|
|
token_str = tokenizer.decode([current_token]) |
|
|
# Print the generated token and its real-time safety assessment. |
|
|
if result['risk_level'][-1] == "Safe": |
|
|
print(f"Token: {repr(token_str)} -> [Risk: {result['risk_level'][-1]}]") |
|
|
else: |
|
|
print(f"Token: {repr(token_str)} -> [Risk: {result['risk_level'][-1]} - Category: {result['category'][-1]}]") |
|
|
# HERE YOU WOULD STOP GENERATION |
|
|
print("Stopping generation due to unsafe content.") |
|
|
break |
|
|
|
|
|
model.close_stream(stream_state) |
|
|
``` |