异常检测示例
异常检测示例演示了如何设置一个异常检测管道,该管道实时从 Pub/Sub 读取文本,然后使用训练好的 HDBSCAN 聚类模型检测异常。
异常检测数据集
此示例使用一个名为 emotion 的数据集,其中包含 20,000 条带有 6 种基本情感的英文推特消息:愤怒、恐惧、快乐、爱、悲伤和惊讶。该数据集有三个拆分:训练(用于训练)、验证和测试(用于性能评估)。因为它包含文本和数据集的类别(类),所以它是一个监督数据集。你可以使用 Hugging Face 数据集页面 访问此数据集。
以下文本显示了来自数据集训练拆分的示例
文本 | 情感类型 |
---|---|
im grabbing a minute to post i feel greedy wrong | 愤怒 |
i am ever feeling nostalgic about the fireplace i will know that it is still on the property | 爱 |
ive been taking or milligrams or times recommended amount and ive fallen asleep a lot faster but i also feel like so funny | 恐惧 |
on a boat trip to denmark | 快乐 |
i feel you know basically like a fake in the realm of science fiction | 悲伤 |
i began having them several times a week feeling tortured by the hallucinations moving people and figures sounds and vibrations | 恐惧 |
异常检测算法
HDBSCAN 是一种聚类算法,它通过将 DBSCAN 转换为层次聚类算法,然后根据聚类的稳定性提取扁平化聚类来扩展 DBSCAN。训练后,如果新数据点是异常值,模型会预测 -1
,否则会预测现有的聚类之一。
导入到 Pub/Sub
将数据导入 Pub/Sub,以便在聚类过程中,模型可以从 Pub/Sub 读取推文。Pub/Sub 是一种用于在应用程序和服务之间交换事件数据的消息传递服务。流式分析和数据集成管道使用 Pub/Sub 来摄取和分发数据。
你可以在 GitHub 中看到将数据导入 Pub/Sub 的完整示例代码
以下图表显示了导入管道的文件结构
write_data_to_pubsub_pipeline/
├── pipeline/
│ ├── __init__.py
│ ├── options.py
│ └── utils.py
├── __init__.py
├── config.py
├── main.py
└── setup.py
pipeline/utils.py
包含加载情感数据集的代码以及两个用于数据转换的 beam.DoFn
。
pipeline/options.py
包含用于配置 Dataflow 管道的管道选项。
config.py
定义了多次使用的变量,如 Google Cloud PROJECT_ID 和 NUM_WORKERS。
setup.py
定义了管道运行所需的软件包和要求。
main.py
包含管道代码和用于运行管道的其他函数。
运行管道
要运行管道,请安装所需的软件包。对于此示例,你需要访问 Google Cloud 项目,并且需要在 config.py
文件中配置 Google Cloud 变量,如 PROJECT_ID
、REGION
、PubSub TOPIC_ID
等。
- 在本地机器上:
python main.py
- 在 GCP 上用于 Dataflow:
python main.py --mode cloud
write_data_to_pubsub_pipeline
包含四个不同的转换
- 使用 Hugging Face 数据集加载情感数据集(为简单起见,我们从三个类别而不是六个类别中获取样本)。
- 将每段文本与一个唯一的标识符 (UID) 关联。
- 将文本转换为 Pub/Sub 预期的格式。
- 将格式化的消息写入 Pub/Sub。
流式数据上的异常检测
将数据导入 Pub/Sub 后,运行异常检测管道。此管道从 Pub/Sub 读取流式消息,使用语言模型将文本转换为嵌入,并将嵌入提供给已训练的聚类模型,以预测消息是否为异常。此管道的一个先决条件是使用数据集的训练拆分训练一个 HDBSCAN 聚类模型。
你可以在 GitHub 中找到异常检测的完整示例代码
以下图表显示了 anomaly_detection 管道的文件结构
anomaly_detection_pipeline/
├── pipeline/
│ ├── __init__.py
│ ├── options.py
│ └── transformations.py
├── __init__.py
├── config.py
├── main.py
└── setup.py
pipeline/transformations.py
包含不同 beam.DoFn
的代码以及管道中使用的其他函数。
pipeline/options.py
包含用于配置 Dataflow 管道的管道选项。
config.py
定义了多次使用的变量,如 Google Cloud PROJECT_ID 和 NUM_WORKERS。
setup.py
定义了管道运行所需的软件包和要求。
main.py
包含管道代码和用于运行管道的其他函数。
运行管道
安装所需的软件包并将数据推送到 Pub/Sub。对于此示例,你需要访问 Google Cloud 项目,并且需要在 config.py
文件中配置 Google Cloud 变量,如 PROJECT_ID
、REGION
、PubSub SUBSCRIPTION_ID
等。
- 在本地机器上:
python main.py
- 在 GCP 上用于 Dataflow:
python main.py --mode cloud
管道包括以下步骤
- 从 Pub/Sub 读取消息。
- 将 Pub/Sub 消息转换为
PCollection
,其中键为 UID,值为推特文本。 - 使用分词器将文本编码为可由 Transformer 读取的令牌 ID 整数。
- 使用 RunInference 从基于 Transformer 的语言模型获取向量嵌入。
- 规范化嵌入。
- 使用 RunInference 从训练好的 HDBSCAN 聚类模型获取异常预测。
- 将预测写入 BigQuery,以便在需要时可以重新训练聚类模型。
- 如果检测到异常,则发送电子邮件警报。
以下代码片段显示了管道的头两个步骤
下一节将介绍以下管道步骤
- 对文本进行分词
- 使用 RunInference 获取嵌入
- 从 HDBSCAN 模型获取预测
从语言模型获取嵌入
为了对文本数据进行聚类,首先需要将文本映射到适合进行统计分析的数值向量。本示例使用名为 sentence-transformers/stsb-distilbert-base/stsb-distilbert-base 的基于 transformer 的语言模型。该模型将句子和段落映射到 768 维的稠密向量空间,你可以将其用于聚类或语义搜索等任务。
由于语言模型需要分词后的输入而不是原始文本,因此首先对文本进行分词。分词是预处理任务,将文本转换为可以输入模型以获取预测的形式。
这里,tokenize_sentence
是一个函数,它接受一个包含文本和 ID 的字典,对文本进行分词,并返回文本和 ID 的元组以及分词后的输出。
Tokenizer = AutoTokenizer.from_pretrained(cfg.TOKENIZER_NAME)
def tokenize_sentence(input_dict):
"""
Takes a dictionary with a text and an id, tokenizes the text, and
returns a tuple of the text and id and the tokenized text
Args:
input_dict: a dictionary with the text and id of the sentence
Returns:
A tuple of the text and id, and a dictionary of the tokens.
"""
text, uid = input_dict["text"], input_dict["id"]
tokens = Tokenizer([text], padding=True, truncation=True, return_tensors="pt")
tokens = {key: torch.squeeze(val) for key, val in tokens.items()}
return (text, uid), tokens
然后将分词后的输出传递给语言模型以获取嵌入。为了从语言模型获取嵌入,我们使用 Apache Beam 中的 RunInference()
。
embedding_model_handler
是我们将 PytorchNoBatchModelHandler
定义为 PytorchModelHandler
的包装器,以将批处理大小限制为 1。
# Can be removed once: https://github.com/apache/beam/issues/21863 is fixed
class PytorchNoBatchModelHandler(PytorchModelHandlerKeyedTensor):
"""Wrapper to PytorchModelHandler to limit batch size to 1.
The tokenized strings generated from BertTokenizer may have different
lengths, which doesn't work with torch.stack() in current RunInference
implementation since stack() requires tensors to be the same size.
Restricting max_batch_size to 1 means there is only 1 example per `batch`
in the run_inference() call.
"""
def batch_elements_kwargs(self):
return {"max_batch_size": 1}
由于 DistilBertModel
的 forward()
函数不会返回嵌入,因此我们自定义了模型类 ModelWrapper
来获取向量嵌入。
class ModelWrapper(DistilBertModel):
"""Wrapper to DistilBertModel to get embeddings when calling
forward function."""
def forward(self, **kwargs):
output = super().forward(**kwargs)
sentence_embedding = (
self.mean_pooling(output,
kwargs["attention_mask"]).detach().cpu().numpy())
return sentence_embedding
# Mean Pooling - Take attention mask into account for correct averaging
def mean_pooling(self, model_output, attention_mask):
"""
Calculates the mean of token embeddings
Args:
model_output: The output of the model.
attention_mask: This is a tensor that contains 1s for all input tokens and
0s for all padding tokens.
Returns:
The mean of the token embeddings.
"""
token_embeddings = model_output[
0] # First element of model_output contains all token embeddings
input_mask_expanded = (
attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float())
return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(
input_mask_expanded.sum(1), min=1e-9)
在获取每段 Twitter 文本的嵌入后,需要对嵌入进行归一化,因为训练后的模型需要归一化后的嵌入。
获取预测
然后将归一化的嵌入传递给训练后的 HDBSCAN 模型以获取预测。
其中 clustering_model_handler
是
我们将 CustomSklearnModelHandlerNumpy
定义为 SklearnModelHandlerNumpy
的包装器,以将批处理大小限制为 1,并覆盖 run_inference
函数,以便使用 hdbscan.approximate_predict()
来获取异常预测。
class CustomSklearnModelHandlerNumpy(SklearnModelHandlerNumpy):
# limit batch size to 1 can be removed once: https://github.com/apache/beam/issues/21863 is fixed
def batch_elements_kwargs(self):
"""Limit batch size to 1 for inference"""
return {"max_batch_size": 1}
# run_inference can be removed once: https://github.com/apache/beam/issues/22572 is fixed
def run_inference(self, batch, model, inference_args=None):
"""Runs inferences on a batch of numpy arrays.
Args:
batch: A sequence of examples as numpy arrays. They should
be single examples.
model: A numpy model or pipeline. Must implement predict(X).
Where the parameter X is a numpy array.
inference_args: Any additional arguments for an inference.
Returns:
An Iterable of type PredictionResult.
"""
_validate_inference_args(inference_args)
vectorized_batch = np.vstack(batch)
predictions = hdbscan.approximate_predict(model, vectorized_batch)
return [PredictionResult(x, y) for x, y in zip(batch, predictions)]
获取模型预测后,将 RunInference
的输出解码为字典。接下来,将预测存储在 BigQuery 表中以进行分析,更新 HDBSCAN 模型,并在预测为异常时发送电子邮件警报。
_ = (
predictions
| "Decode Prediction" >> beam.ParDo(DecodePrediction())
| "Write to BQ" >> beam.io.WriteToBigQuery(
table=cfg.TABLE_URI,
schema=cfg.TABLE_SCHEMA,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
))
_ = predictions | "Alert by Email" >> beam.ParDo(TriggerEmailAlert())
最后更新时间:2024/10/31
你是否找到了所有你需要的内容?
所有内容是否有用且清晰?你希望更改任何内容吗?请告诉我们!