Spaces:
Runtime error
Runtime error
| import cv2 | |
| import numpy as np | |
| # np.set_printoptions(threshold=np.inf) | |
| import random | |
| import torch | |
| import torchvision.transforms as transforms | |
| from pathlib import Path | |
| from torch.utils.data import Dataset | |
| from utils.utils import letterbox, augment_hsv, random_perspective | |
| from tqdm.autonotebook import tqdm | |
| import json | |
| import albumentations as A | |
| class BddDataset(Dataset): | |
| def __init__(self, params, is_train, inputsize=640, transform=None): | |
| """ | |
| initial all the characteristic | |
| Inputs: | |
| -params: configuration parameters | |
| -is_train(bool): whether train set or not | |
| -transform: ToTensor and Normalize | |
| Returns: | |
| None | |
| """ | |
| self.single_cls = True # just detect vehicle | |
| self.is_train = is_train | |
| self.params = params | |
| self.transform = transform | |
| self.inputsize = inputsize | |
| self.Tensor = transforms.ToTensor() | |
| img_root = Path(params.dataset['dataroot']) | |
| label_root = Path(params.dataset['labelroot']) | |
| mask_root = Path(params.dataset['maskroot']) | |
| lane_root = Path(params.dataset['laneroot']) | |
| if is_train: | |
| indicator = params.dataset['train_set'] | |
| else: | |
| indicator = params.dataset['test_set'] | |
| self.img_root = img_root / indicator | |
| self.label_root = label_root / indicator | |
| self.mask_root = mask_root / indicator | |
| self.lane_root = lane_root / indicator | |
| # self.label_list = self.label_root.iterdir() | |
| self.mask_list = self.mask_root.iterdir() | |
| self.data_format = params.dataset['data_format'] | |
| self.scale_factor = params.dataset['scale_factor'] | |
| self.rotation_factor = params.dataset['rot_factor'] | |
| self.flip = params.dataset['flip'] | |
| self.color_rgb = params.dataset['color_rgb'] | |
| self.albumentations_transform = A.Compose([ | |
| A.Blur(p=0.01), | |
| A.MedianBlur(p=0.01), | |
| A.ToGray(p=0.01), | |
| A.CLAHE(p=0.01), | |
| A.RandomBrightnessContrast(p=0.01), | |
| A.RandomGamma(p=0.01), | |
| A.ImageCompression(quality_lower=75, p=0.01)], | |
| bbox_params=A.BboxParams(format='pascal_voc', label_fields=['class_labels']), | |
| additional_targets={'mask0': 'mask'}) | |
| # bdd_labels = { | |
| # 'unlabeled':0, 'dynamic': 1, 'ego vehicle': 2, 'ground': 3, | |
| # 'static': 4, 'parking': 5, 'rail track': 6, 'road': 7, | |
| # 'sidewalk': 8, 'bridge': 9, 'building': 10, 'fence': 11, | |
| # 'garage': 12, 'guard rail': 13, 'tunnel': 14, 'wall': 15, | |
| # 'banner': 16, 'billboard': 17, 'lane divider': 18,'parking sign': 19, | |
| # 'pole': 20, 'polegroup': 21, 'street light': 22, 'traffic cone': 23, | |
| # 'traffic device': 24, 'traffic light': 25, 'traffic sign': 26, 'traffic sign frame': 27, | |
| # 'terrain': 28, 'vegetation': 29, 'sky': 30, 'person': 31, | |
| # 'rider': 32, 'bicycle': 33, 'bus': 34, 'car': 35, | |
| # 'caravan': 36, 'motorcycle': 37, 'trailer': 38, 'train': 39, | |
| # 'truck': 40 | |
| # } | |
| self.id_dict = {'person': 0, 'rider': 1, 'car': 2, 'bus': 3, 'truck': 4, | |
| 'bike': 5, 'motor': 6, 'tl_green': 7, 'tl_red': 8, | |
| 'tl_yellow': 9, 'tl_none': 10, 'traffic sign': 11, 'train': 12} | |
| self.id_dict_single = {'car': 0, 'bus': 1, 'truck': 2, 'train': 3} | |
| # id_dict = {'car': 0, 'bus': 1, 'truck': 2} | |
| self.shapes = np.array(params.dataset['org_img_size']) | |
| self.db = self._get_db() | |
| def _get_db(self): | |
| """ | |
| get database from the annotation file | |
| Inputs: | |
| Returns: | |
| gt_db: (list)database [a,b,c,...] | |
| a: (dictionary){'image':, 'information':, ......} | |
| image: image path | |
| mask: path of the segmetation label | |
| label: [cls_id, center_x//256, center_y//256, w//256, h//256] 256=IMAGE_SIZE | |
| """ | |
| print('building database...') | |
| gt_db = [] | |
| height, width = self.shapes | |
| for mask in tqdm(list(self.mask_list)): | |
| mask_path = str(mask) | |
| label_path = mask_path.replace(str(self.mask_root), str(self.label_root)).replace(".png", ".json") | |
| image_path = mask_path.replace(str(self.mask_root), str(self.img_root)).replace(".png", ".jpg") | |
| lane_path = mask_path.replace(str(self.mask_root), str(self.lane_root)) | |
| with open(label_path, 'r') as f: | |
| label = json.load(f) | |
| data = label['frames'][0]['objects'] | |
| data = self.select_data(data) | |
| gt = np.zeros((len(data), 5)) | |
| for idx, obj in enumerate(data): | |
| category = obj['category'] | |
| if category == "traffic light": | |
| color = obj['attributes']['trafficLightColor'] | |
| category = "tl_" + color | |
| if category in self.id_dict.keys(): | |
| x1 = float(obj['box2d']['x1']) | |
| y1 = float(obj['box2d']['y1']) | |
| x2 = float(obj['box2d']['x2']) | |
| y2 = float(obj['box2d']['y2']) | |
| cls_id = self.id_dict[category] | |
| if self.single_cls: | |
| cls_id = 0 | |
| gt[idx][0] = cls_id | |
| box = self.convert((width, height), (x1, x2, y1, y2)) | |
| gt[idx][1:] = list(box) | |
| rec = [{ | |
| 'image': image_path, | |
| 'label': gt, | |
| 'mask': mask_path, | |
| 'lane': lane_path | |
| }] | |
| # img = cv2.imread(image_path, cv2.IMREAD_COLOR | cv2.IMREAD_IGNORE_ORIENTATION | cv2.IMREAD_UNCHANGED) | |
| # # img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) | |
| # for label in gt: | |
| # # print(label[1]) | |
| # x1 = label[1] - label[3] / 2 | |
| # x1 *= 1280 | |
| # x1 = int(x1) | |
| # # print(x1) | |
| # x2 = label[1] + label[3] / 2 | |
| # x2 *= 1280 | |
| # x2 = int(x2) | |
| # y1 = label[2] - label[4] / 2 | |
| # y1 *= 720 | |
| # y1 = int(y1) | |
| # y2 = label[2] + label[4] / 2 | |
| # y2 *= 720 | |
| # y2 = int(y2) | |
| # img = cv2.rectangle(img, (x1, y1), (x2, y2), (255, 0, 0), 2) | |
| # cv2.imwrite('gt/{}'.format(image_path.split('/')[-1]), img) | |
| gt_db += rec | |
| print('database build finish') | |
| return gt_db | |
| def evaluate(self, params, preds, output_dir): | |
| """ | |
| finished on children dataset | |
| """ | |
| raise NotImplementedError | |
| def __len__(self, ): | |
| """ | |
| number of objects in the dataset | |
| """ | |
| return len(self.db) | |
| def __getitem__(self, idx): | |
| """ | |
| Get input and groud-truth from database & add data augmentation on input | |
| Inputs: | |
| -idx: the index of image in self.db(database)(list) | |
| self.db(list) [a,b,c,...] | |
| a: (dictionary){'image':, 'information':} | |
| Returns: | |
| -image: transformed image, first passed the data augmentation in __getitem__ function(type:numpy), then apply self.transform | |
| -target: ground truth(det_gt,seg_gt) | |
| function maybe useful | |
| cv2.imread | |
| cv2.cvtColor(data, cv2.COLOR_BGR2RGB) | |
| cv2.warpAffine | |
| """ | |
| data = self.db[idx] | |
| img = cv2.imread(data["image"], cv2.IMREAD_COLOR | cv2.IMREAD_IGNORE_ORIENTATION) | |
| img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) | |
| if self.params.num_seg_class == 3: | |
| seg_label = cv2.imread(data["mask"]) | |
| else: | |
| seg_label = cv2.imread(data["mask"], 0) | |
| lane_label = cv2.imread(data["lane"], 0) | |
| # print(lane_label.shape) | |
| # print(seg_label.shape) | |
| # print(lane_label.shape) | |
| # print(seg_label.shape) | |
| resized_shape = self.inputsize | |
| if isinstance(resized_shape, list): | |
| resized_shape = max(resized_shape) | |
| h0, w0 = img.shape[:2] # orig hw | |
| r = resized_shape / max(h0, w0) # resize image to img_size | |
| if r != 1: # always resize down, only resize up if training with augmentation | |
| interp = cv2.INTER_AREA if r < 1 else cv2.INTER_LINEAR | |
| img = cv2.resize(img, (int(w0 * r), int(h0 * r)), interpolation=interp) | |
| seg_label = cv2.resize(seg_label, (int(w0 * r), int(h0 * r)), interpolation=interp) | |
| lane_label = cv2.resize(lane_label, (int(w0 * r), int(h0 * r)), interpolation=interp) | |
| h, w = img.shape[:2] | |
| (img, seg_label, lane_label), ratio, pad = letterbox((img, seg_label, lane_label), resized_shape, auto=True, | |
| scaleup=self.is_train) | |
| shapes = (h0, w0), ((h / h0, w / w0), pad) # for COCO mAP rescaling | |
| # ratio = (w / w0, h / h0) | |
| # print(resized_shape) | |
| det_label = data["label"] | |
| # print(det_label) | |
| labels = [] | |
| labels_app = np.array([]) | |
| if det_label.size > 0: | |
| # Normalized xywh to pixel xyxy format | |
| labels = det_label.copy() | |
| labels[:, 1] = ratio[0] * w * (det_label[:, 1] - det_label[:, 3] / 2) + pad[0] # pad width | |
| labels[:, 2] = ratio[1] * h * (det_label[:, 2] - det_label[:, 4] / 2) + pad[1] # pad height | |
| labels[:, 3] = ratio[0] * w * (det_label[:, 1] + det_label[:, 3] / 2) + pad[0] | |
| labels[:, 4] = ratio[1] * h * (det_label[:, 2] + det_label[:, 4] / 2) + pad[1] | |
| # print(labels[:, 1:4]) | |
| if self.is_train: | |
| # albumentations | |
| try: | |
| new = self.albumentations_transform(image=img, mask=seg_label, mask0=lane_label, | |
| bboxes=labels[:, 1:] if len(labels) else labels, | |
| class_labels=labels[:, 0] if len(labels) else labels) | |
| img = new['image'] | |
| labels = np.array([[c, *b] for c, b in zip(new['class_labels'], new['bboxes'])]) if len(labels) else labels | |
| seg_label = new['mask'] | |
| lane_label = new['mask0'] | |
| except ValueError: # bbox have width or height == 0 | |
| pass | |
| # augmentation | |
| combination = (img, seg_label, lane_label) | |
| (img, seg_label, lane_label), labels = random_perspective( | |
| combination=combination, | |
| targets=labels, | |
| degrees=self.params.dataset['rot_factor'], | |
| translate=self.params.dataset['translate'], | |
| scale=self.params.dataset['scale_factor'], | |
| shear=self.params.dataset['shear'] | |
| ) | |
| # print(labels.shape) | |
| augment_hsv(img, hgain=self.params.dataset['hsv_h'], sgain=self.params.dataset['hsv_s'], vgain=self.params.dataset['hsv_v']) | |
| # img, seg_label, labels = cutout(combination=combination, labels=labels) | |
| # random left-right flip | |
| lr_flip = True | |
| if lr_flip and random.random() < 0.5: | |
| img = img[:, ::-1, :] | |
| if len(labels): | |
| rows, cols, channels = img.shape | |
| x1 = labels[:, 1].copy() | |
| x2 = labels[:, 3].copy() | |
| x_tmp = x1.copy() | |
| labels[:, 1] = cols - x2 | |
| labels[:, 3] = cols - x_tmp | |
| # Segmentation | |
| seg_label = np.fliplr(seg_label) | |
| lane_label = np.fliplr(lane_label) | |
| # cv2.imwrite('img0.jpg',img) | |
| # cv2.imwrite('img1.jpg',seg_label) | |
| # cv2.imwrite('img2.jpg',lane_label) | |
| # exit() | |
| # print(labels) | |
| # random up-down flip | |
| ud_flip = False | |
| if ud_flip and random.random() < 0.5: | |
| img = np.flipud(img) | |
| seg_label = np.filpud(seg_label) | |
| lane_label = np.filpud(lane_label) | |
| if len(labels): | |
| rows, cols, channels = img.shape | |
| y1 = labels[:, 2].copy() | |
| y2 = labels[:, 4].copy() | |
| y_tmp = y1.copy() | |
| labels[:, 2] = rows - y2 | |
| labels[:, 4] = rows - y_tmp | |
| # for anno in labels: | |
| # x1, y1, x2, y2 = [int(x) for x in anno[1:5]] | |
| # print(x1,y1,x2,y2) | |
| # cv2.rectangle(img, (x1,y1), (x2,y2), (0,0,255), 3) | |
| # cv2.imwrite(data["image"].split("/")[-1], img) | |
| if len(labels): | |
| labels_app = np.zeros((len(labels), 5)) | |
| labels_app[:, 0:4] = labels[:, 1:5] | |
| labels_app[:, 4] = labels[:, 0] | |
| img = np.ascontiguousarray(img) | |
| _, seg1 = cv2.threshold(seg_label, 1, 255, cv2.THRESH_BINARY) | |
| _, lane1 = cv2.threshold(lane_label, 1, 255, cv2.THRESH_BINARY) | |
| # prefer lane | |
| seg1 = seg1 - (seg1 & lane1) | |
| union = seg1 | lane1 | |
| # print(union.shape) | |
| background = 255 - union | |
| # print(img.shape) | |
| # print(lane1.shape) | |
| # img_copy = img.copy() | |
| # img_copy[lane1 == 255] = (0, 255, 0) | |
| # cv2.imwrite('seg_gt/' + data['image'].split('/')[-1], img_copy) | |
| # cv2.imwrite('background.jpg', background) | |
| # cv2.imwrite('{}.jpg'.format(data['image'].split('/')[-1]), img) | |
| # cv2.imwrite('{}-lane.jpg'.format(data['image'].split('/')[-1]),lane1) | |
| # cv2.imwrite('{}-seg.jpg'.format(data['image'].split('/')[-1]),seg1) | |
| seg1 = self.Tensor(seg1) | |
| lane1 = self.Tensor(lane1) | |
| background = self.Tensor(background) | |
| segmentation = torch.cat([background, seg1, lane1], dim=0) | |
| # print(segmentation.size()) | |
| # print(seg1.shape) | |
| # for anno in labels_app: | |
| # x1, y1, x2, y2 = [int(x) for x in anno[anno != -1][:4]] | |
| # cv2.rectangle(img, (x1,y1), (x2,y2), (0,0,255), 1) | |
| # cv2.imwrite(data["image"].split("/")[-1], img) | |
| img = self.transform(img) | |
| return img, data["image"], shapes, torch.from_numpy(labels_app), segmentation | |
| def select_data(self, db): | |
| """ | |
| You can use this function to filter useless images in the dataset | |
| Inputs: | |
| -db: (list)database | |
| Returns: | |
| -db_selected: (list)filtered dataset | |
| """ | |
| remain = [] | |
| for obj in db: | |
| if 'box2d' in obj.keys(): # obj.has_key('box2d'): | |
| if self.single_cls: | |
| if obj['category'] in self.id_dict_single.keys(): | |
| remain.append(obj) | |
| else: | |
| remain.append(obj) | |
| return remain | |
| def convert(self, size, box): | |
| dw = 1. / (size[0]) | |
| dh = 1. / (size[1]) | |
| x = (box[0] + box[1]) / 2.0 | |
| y = (box[2] + box[3]) / 2.0 | |
| w = box[1] - box[0] | |
| h = box[3] - box[2] | |
| x = x * dw | |
| w = w * dw | |
| y = y * dh | |
| h = h * dh | |
| return x, y, w, h | |
| def collate_fn(batch): | |
| img, paths, shapes, labels_app, segmentation = zip(*batch) | |
| filenames = [file.split('/')[-1] for file in paths] | |
| # print(len(labels_app)) | |
| max_num_annots = max(label.size(0) for label in labels_app) | |
| if max_num_annots > 0: | |
| annot_padded = torch.ones((len(labels_app), max_num_annots, 5)) * -1 | |
| for idx, label in enumerate(labels_app): | |
| if label.size(0) > 0: | |
| annot_padded[idx, :label.size(0), :] = label | |
| else: | |
| annot_padded = torch.ones((len(labels_app), 1, 5)) * -1 | |
| # print("ABC", seg1.size()) | |
| return {'img': torch.stack(img, 0), 'annot': annot_padded, 'segmentation': torch.stack(segmentation, 0), | |
| 'filenames': filenames, 'shapes': shapes} | |