Source code for akida_models.detection.map_evaluation

#!/usr/bin/env python
# ******************************************************************************
# Copyright 2017-2018 Fizyr (https://fizyr.com)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ******************************************************************************
"""
Module used to compute mAP scores for YOLO classification.
"""

__all__ = ["MapEvaluation"]

import keras
import numpy as np
import tensorflow as tf

from collections import defaultdict
from keras.utils import io_utils
from .processing import decode_output, desize_bboxes, get_affine_transform, preprocess_image
from .box_utils import compute_overlap
from .data_utils import Coord
from tqdm import tqdm


[docs]class MapEvaluation(keras.callbacks.Callback): """ Evaluate a given dataset using a given model. Code originally from https://github.com/fizyr/keras-retinanet. Note that mAP is computed for IoU thresholds from 0.5 to 0.95 with a step size of 0.05. Args: model (keras.Model): model to evaluate. val_data (dict): dictionary containing validation data as obtained using `preprocess_widerface.py` module num_valid (int): the length of the validation dataset labels (list): list of labels as strings anchors (list): list of anchors boxes period (int, optional): periodicity the precision is printed, defaults to once per epoch. Defaults to 1. obj_threshold (float, optional): confidence threshold for a box. Defaults to 0.5. nms_threshold (float, optional): non-maximal supression threshold. Defaults to 0.5. max_box_per_image (int, optional): maximum number of detections per image, Defaults to 10. preserve_aspect_ratio (bool, optional): Whether aspect ratio is preserved during resizing. Defaults to False. is_keras_model (bool, optional): indicated if the model is a Keras model (True) or an Akida model (False). Defaults to True. decode_output_fn (Callable, optional): function to decode model's outputs. Defaults to :func:`decode_output` (yolo decode output function). Returns: A dict mapping class names to mAP scores. """ def __init__(self, model, val_data, num_valid, labels, anchors, period=1, obj_threshold=0.5, nms_threshold=0.5, max_box_per_image=10, preserve_aspect_ratio=False, is_keras_model=True, decode_output_fn=decode_output): super().__init__() self._model = model self._data = val_data self._data_len = num_valid self._labels = labels self._num_classes = len(labels) self._anchors = anchors self._period = period self._obj_threshold = obj_threshold self._nms_threshold = nms_threshold self._max_box_per_image = max_box_per_image self._preserve_aspect_ratio = preserve_aspect_ratio self._is_keras_model = is_keras_model self._decode_output = decode_output_fn
[docs] def on_epoch_end(self, epoch, logs=None): """ Keras callback called at the end of an epoch. Args: epoch (int): index of epoch. logs (dict, optional): metric results for this training epoch, and for the validation epoch if validation is performed. Validation result keys are prefixed with val. For training epoch, the values of the Model’s metrics are returned. Example: {‘loss’: 0.2, ‘acc’: 0.7}. Defaults to None. """ epoch += 1 if self._period != 0 and (epoch % self._period == 0 or epoch == self.params.get('epochs', -1)): _map_dict, average_precisions = self.evaluate_map() mean_map = sum(_map_dict.values()) / len(_map_dict) io_utils.print_msg("") io_utils.print_msg('mAP 50: {:.4f}'.format(_map_dict[0.5])) io_utils.print_msg('mAP 75: {:.4f}\n'.format(_map_dict[0.75])) for label, average_precision in average_precisions.items(): io_utils.print_msg(self._labels[label] + ' {:.4f}'.format(average_precision)) io_utils.print_msg('mAP: {:.4f}\n'.format(mean_map)) logs.update({'map': mean_map})
[docs] def evaluate_map(self): """ Evaluates current mAP score on the model. mAP is computed for IoU thresholds from 0.5 to 0.95 with a step size of 0.05 Returns: tuple: a dictionnary containing mAP for each threshold and a dictionnary of label containing mAP for each class. """ # predictions, overlaps and 10 IoU thresholds self._pbar = tqdm(total=self._data_len + self._num_classes + 10, leave=False) all_detections, all_annotations = self._get_predictions() all_overlaps = self._compute_all_overlaps(all_detections, all_annotations) # Thresholds from 0.5 to 0.95 with a step size of 0.05 iou_thresholds = np.linspace(0.5, 0.95, num=10) total_iterations = len(iou_thresholds) mean_avgs = defaultdict(float) map_dict = {} for th in iou_thresholds: self._pbar.set_description(f"Computing average precisions th = {th:.2f}") average_precisions = self._calc_avg_precisions(all_detections, all_annotations, all_overlaps, th) _map = sum(average_precisions.values()) / len(average_precisions) map_dict[th] = _map for label, ap in average_precisions.items(): mean_avgs[label] += ap / total_iterations self._pbar.update(1) self._pbar.close() tf.keras.backend.clear_session() return map_dict, mean_avgs
def _load_annotations(self, data): objects = data['objects'] h, w, _ = data['image'].shape bbox = objects['bbox'].numpy() labels = objects['label'].numpy() x1 = (bbox[:, Coord.x1] * w).astype(int) y1 = (bbox[:, Coord.y1] * h).astype(int) x2 = (bbox[:, Coord.x2] * w).astype(int) y2 = (bbox[:, Coord.y2] * h).astype(int) return np.column_stack([x1, y1, x2, y2, labels]) def _get_predictions(self): # gather all detections and annotations all_detections = [[None for _ in range(self._num_classes)] for _ in range(self._data_len)] all_annotations = [[None for _ in range(self._num_classes)] for _ in range(self._data_len)] self._pbar.set_description("Getting predictions") for i, data in enumerate(self._data): raw_image = data['image'] raw_height, raw_width, _ = raw_image.shape input_shape = self._model.input_shape[1:] if self._is_keras_model \ else self._model.input_shape affine_transform = None if self._preserve_aspect_ratio: center = np.array([raw_width / 2., raw_height / 2.], dtype=np.float32) affine_transform = get_affine_transform(center, [raw_width, raw_height], [input_shape[1], input_shape[0]]) image = preprocess_image(raw_image.numpy(), input_shape, affine_transform) if self._is_keras_model: input_image = image[np.newaxis, :] output = self._model.predict(input_image, verbose=0)[0] else: input_image = image[np.newaxis, :].astype(np.uint8) potentials = self._model.predict(input_image)[0] if self._anchors: h, w, _ = potentials.shape output = potentials.reshape( (h, w, len(self._anchors), 4 + 1 + self._num_classes)) else: output = potentials pred_boxes = self._decode_output(output, self._anchors, self._num_classes, self._obj_threshold, self._nms_threshold) score = np.array([box.get_score() for box in pred_boxes]) pred_labels = np.array([box.get_label() for box in pred_boxes]) if len(pred_boxes) > 0: pred_boxes = desize_bboxes(pred_boxes, score, raw_height, raw_width, input_shape[0], input_shape[1], self._preserve_aspect_ratio) else: pred_boxes = np.array([[]]) # sort the boxes and the labels according to scores score_sort = np.argsort(-score) pred_labels = pred_labels[score_sort] pred_boxes = pred_boxes[score_sort] # limit the number of predictions to max_box_per_image based on # score number_of_predictions = pred_boxes.shape[0] if number_of_predictions > self._max_box_per_image: pred_labels = pred_labels[:self._max_box_per_image] pred_boxes = pred_boxes[:self._max_box_per_image, :] # copy detections to all_detections for label in range(self._num_classes): all_detections[i][label] = pred_boxes[pred_labels == label, :] annotations = self._load_annotations(data) # copy ground truth to all_annotations for label in range(self._num_classes): all_annotations[i][label] = annotations[annotations[:, 4] == label, :4].copy() self._pbar.update(1) return all_detections, all_annotations def _compute_all_overlaps(self, all_detections, all_annotations): all_overlaps = {} self._pbar.set_description("Computing overlaps") for label in range(self._num_classes): for i in range(self._data_len): detections = all_detections[i][label] annotations = all_annotations[i][label] overlaps = compute_overlap(detections, annotations, mode="outer_product", box_format="xyxy") all_overlaps[(i, label)] = overlaps self._pbar.update(1) return all_overlaps def _calc_avg_precisions(self, all_detections, all_annotations, all_overlaps, iou_threshold): # compute mAP by comparing all detections and all annotations average_precisions = {} for label in range(self._num_classes): false_positives = np.zeros((0,)) true_positives = np.zeros((0,)) scores = np.zeros((0,)) num_annotations = 0.0 for i in range(self._data_len): detections = all_detections[i][label] annotations = all_annotations[i][label] num_annotations += annotations.shape[0] overlaps = all_overlaps[(i, label)] detected_annotations = [] for idx, d in enumerate(detections): scores = np.append(scores, d[4]) if annotations.shape[0] == 0: false_positives = np.append(false_positives, 1) true_positives = np.append(true_positives, 0) continue assigned_annotation = np.argmax(overlaps, axis=1)[idx] max_overlap = overlaps[idx, assigned_annotation] if (max_overlap >= iou_threshold and assigned_annotation not in detected_annotations): false_positives = np.append(false_positives, 0) true_positives = np.append(true_positives, 1) detected_annotations.append(assigned_annotation) else: false_positives = np.append(false_positives, 1) true_positives = np.append(true_positives, 0) # no annotations -> AP for this class is 0 (is this correct?) if num_annotations == 0: average_precisions[label] = 0 continue # sort by score indices = np.argsort(-scores) false_positives = false_positives[indices] true_positives = true_positives[indices] # compute false positives and true positives false_positives = np.cumsum(false_positives) true_positives = np.cumsum(true_positives) # compute recall and precision recall = true_positives / num_annotations precision = true_positives / np.maximum( true_positives + false_positives, np.finfo(np.float64).eps) # compute average precision average_precision = self._compute_ap(recall, precision) average_precisions[label] = average_precision return average_precisions @staticmethod def _compute_ap(recall, precision): """ Compute the average precision, given the recall and precision curves. Code originally from https://github.com/rbgirshick/py-faster-rcnn. Args: recall (list): the recall curve precision (list): the precision curve Returns: The average precision as computed in py-faster-rcnn. """ # correct AP calculation # first append sentinel values at the end mrec = np.concatenate(([0.], recall, [1.])) mpre = np.concatenate(([0.], precision, [0.])) # compute the precision envelope for i in range(mpre.size - 1, 0, -1): mpre[i - 1] = np.maximum(mpre[i - 1], mpre[i]) # to calculate area under PR curve, look for points # where X axis (recall) changes value i = np.where(mrec[1:] != mrec[:-1])[0] # and sum (\Delta recall) * prec ap = np.sum((mrec[i + 1] - mrec[i]) * mpre[i + 1]) return ap