Spaces:
Running
on
Zero
Running
on
Zero
| #!/usr/bin/python | |
| # | |
| from __future__ import print_function, absolute_import, division | |
| import glob | |
| import json | |
| import os | |
| import struct | |
| import xml.etree.ElementTree as ET | |
| from collections import defaultdict | |
| from collections import namedtuple | |
| import numpy as np | |
| from matplotlib import cm | |
| from skimage import io, filters | |
| # get current date and time | |
| # A point in a polygon | |
| Point = namedtuple('Point', ['x', 'y']) | |
| from abc import ABCMeta | |
| from datasets.kitti_360.labels import labels, id2label, kittiId2label, name2label | |
| MAX_N = 1000 | |
| def local2global(semanticId, instanceId): | |
| globalId = semanticId*MAX_N + instanceId | |
| if isinstance(globalId, np.ndarray): | |
| return globalId.astype(np.int) | |
| else: | |
| return int(globalId) | |
| def global2local(globalId): | |
| semanticId = globalId // MAX_N | |
| instanceId = globalId % MAX_N | |
| if isinstance(globalId, np.ndarray): | |
| return semanticId.astype(np.int), instanceId.astype(np.int) | |
| else: | |
| return int(semanticId), int(instanceId) | |
| annotation2global = defaultdict() | |
| # Abstract base class for annotation objects | |
| class KITTI360Object: | |
| __metaclass__ = ABCMeta | |
| def __init__(self): | |
| # the label | |
| self.label = "" | |
| # colormap | |
| self.cmap = cm.get_cmap('Set1') | |
| self.cmap_length = 9 | |
| def getColor(self, idx): | |
| if idx==0: | |
| return np.array([0,0,0]) | |
| return np.asarray(self.cmap(idx % self.cmap_length)[:3])*255. | |
| def assignColor(self): | |
| if self.semanticId>=0: | |
| self.semanticColor = id2label[self.semanticId].color | |
| if self.instanceId>0: | |
| self.instanceColor = self.getColor(self.instanceId) | |
| else: | |
| self.instanceColor = self.semanticColor | |
| # Class that contains the information of a single annotated object as 3D bounding box | |
| class KITTI360Bbox3D(KITTI360Object): | |
| # Constructor | |
| def __init__(self): | |
| KITTI360Object.__init__(self) | |
| # the polygon as list of points | |
| self.vertices = [] | |
| self.faces = [] | |
| self.lines = [[0,5],[1,4],[2,7],[3,6], | |
| [0,1],[1,3],[3,2],[2,0], | |
| [4,5],[5,7],[7,6],[6,4]] | |
| # the ID of the corresponding object | |
| self.semanticId = -1 | |
| self.instanceId = -1 | |
| self.annotationId = -1 | |
| # the window that contains the bbox | |
| self.start_frame = -1 | |
| self.end_frame = -1 | |
| # timestamp of the bbox (-1 if statis) | |
| self.timestamp = -1 | |
| # projected vertices | |
| self.vertices_proj = None | |
| self.meshes = [] | |
| # name | |
| self.name = '' | |
| def __str__(self): | |
| return self.name | |
| def generateMeshes(self): | |
| self.meshes = [] | |
| if self.vertices_proj: | |
| for fidx in range(self.faces.shape[0]): | |
| self.meshes.append( [ Point(self.vertices_proj[0][int(x)], self.vertices_proj[1][int(x)]) for x in self.faces[fidx]] ) | |
| def parseOpencvMatrix(self, node): | |
| rows = int(node.find('rows').text) | |
| cols = int(node.find('cols').text) | |
| data = node.find('data').text.split(' ') | |
| mat = [] | |
| for d in data: | |
| d = d.replace('\n', '') | |
| if len(d)<1: | |
| continue | |
| mat.append(float(d)) | |
| mat = np.reshape(mat, [rows, cols]) | |
| return mat | |
| def parseVertices(self, child): | |
| transform = self.parseOpencvMatrix(child.find('transform')) | |
| R = transform[:3,:3] | |
| T = transform[:3,3] | |
| vertices = self.parseOpencvMatrix(child.find('vertices')) | |
| faces = self.parseOpencvMatrix(child.find('faces')) | |
| vertices = np.matmul(R, vertices.transpose()).transpose() + T | |
| self.vertices = vertices | |
| self.faces = faces | |
| self.R = R | |
| self.T = T | |
| def parseBbox(self, child): | |
| semanticIdKITTI = int(child.find('semanticId').text) | |
| self.semanticId = kittiId2label[semanticIdKITTI].id | |
| self.instanceId = int(child.find('instanceId').text) | |
| self.name = kittiId2label[semanticIdKITTI].name | |
| self.start_frame = int(child.find('start_frame').text) | |
| self.end_frame = int(child.find('end_frame').text) | |
| self.timestamp = int(child.find('timestamp').text) | |
| self.annotationId = int(child.find('index').text) + 1 | |
| global annotation2global | |
| annotation2global[self.annotationId] = local2global(self.semanticId, self.instanceId) | |
| self.parseVertices(child) | |
| def parseStuff(self, child): | |
| classmap = {'driveway': 'parking', 'ground': 'terrain', 'unknownGround': 'ground', | |
| 'railtrack': 'rail track', 'bigPole': 'pole', 'unknownObject': 'unknown object', | |
| 'smallPole': 'smallpole', 'trafficSign': 'traffic sign', 'trashbin': 'trash bin', | |
| 'guardrail': 'guard rail', 'trafficLight': 'traffic light', 'pedestrian': 'person', | |
| 'vendingmachine': 'vending machine', 'unknownConstruction': 'unknown construction', | |
| 'unknownVehicle': 'unknown vehicle'} | |
| label = child.find('label').text | |
| if label in classmap.keys(): | |
| label = classmap[label] | |
| self.start_frame = int(child.find('start_frame').text) | |
| self.end_frame = int(child.find('end_frame').text) | |
| self.timestamp = int(child.find('timestamp').text) | |
| self.semanticId = name2label[label].id | |
| self.name = label | |
| self.parseVertices(child) | |
| # Class that contains the information of the point cloud a single frame | |
| class KITTI360Point3D(KITTI360Object): | |
| # Constructor | |
| def __init__(self): | |
| KITTI360Object.__init__(self) | |
| self.vertices = [] | |
| self.vertices_proj = None | |
| # the ID of the corresponding object | |
| self.semanticId = -1 | |
| self.instanceId = -1 | |
| self.annotationId = -1 | |
| # name | |
| self.name = '' | |
| # color | |
| self.semanticColor = None | |
| self.instanceColor = None | |
| def __str__(self): | |
| return self.name | |
| def generateMeshes(self): | |
| pass | |
| # The annotation of a whole image, including semantic and instance | |
| class Annotation2D: | |
| # Constructor | |
| def __init__(self, colormap='Set1'): | |
| # the width of that image and thus of the label image | |
| self.imgWidth = 0 | |
| # the height of that image and thus of the label image | |
| self.imgHeight = 0 | |
| self.instanceId = None | |
| self.semanticId = None | |
| self.instanceImg = None | |
| self.semanticImg = None | |
| # savedId = semanticId*N + instanceId | |
| self.N = 1000 | |
| # colormap | |
| self.cmap = cm.get_cmap(colormap) | |
| if colormap == 'Set1': | |
| self.cmap_length = 9 | |
| else: | |
| raise "Colormap length need to be specified!" | |
| def getColor(self, idx): | |
| if idx==0: | |
| return np.array([0,0,0]) | |
| return np.asarray(self.cmap(idx % self.cmap_length)[:3])*255. | |
| # Load confidence map | |
| def loadConfidence(self, imgPath): | |
| self.confidenceMap = io.imread(imgPath) | |
| self.confidenceMap = np.asarray(self.confidenceMap).astype(np.float)/255. | |
| # Load instance id | |
| def loadInstance(self, imgPath, gtType='instance', toImg=True, contourType='instance', semanticCt=True, instanceCt=True): | |
| instanceId = io.imread(imgPath) | |
| self.instanceId = np.asarray( instanceId % self.N ) | |
| self.semanticId = np.asarray( instanceId // self.N ) | |
| if not toImg: | |
| return | |
| if gtType=='semantic': | |
| self.toSemanticImage() | |
| elif gtType=='instance': | |
| self.toInstanceImage() | |
| if semanticCt or instanceCt: | |
| self.getBoundary() | |
| if gtType=='semantic' and semanticCt: | |
| boundaryImg = self.toBoundaryImage(contourType=contourType, instanceOnly=False) | |
| self.semanticImg = self.semanticImg * (1-boundaryImg) + \ | |
| np.ones_like(self.semanticImg) * boundaryImg * 255 | |
| if gtType=='instance' and instanceCt: | |
| boundaryImg = self.toBoundaryImage(contourType=contourType, instanceOnly=True) | |
| self.instanceImg = self.instanceImg * (1-boundaryImg) + \ | |
| np.ones_like(self.instanceImg) * boundaryImg * 255 | |
| def toSemanticImage(self): | |
| self.semanticImg = np.zeros((self.semanticId.size, 3)) | |
| for label in labels: | |
| mask = self.semanticId==label.id | |
| mask = mask.flatten() | |
| self.semanticImg[mask] = np.asarray(label.color) | |
| self.semanticImg = self.semanticImg.reshape(*self.semanticId.shape, 3) | |
| def toInstanceImage(self): | |
| self.instanceImg = np.zeros((self.instanceId.size, 3)) | |
| uniqueId = np.unique(self.instanceId) | |
| for uid in uniqueId: | |
| mask = self.instanceId==uid | |
| mask = mask.flatten() | |
| self.instanceImg[mask] = np.asarray(self.getColor(uid)) | |
| self.instanceImg = self.instanceImg.reshape(*self.instanceId.shape, 3) | |
| def getBoundary(self): | |
| # semantic contours | |
| uniqueId = np.unique(self.semanticId) | |
| self.semanticContours = {} | |
| for uid in uniqueId: | |
| mask = (self.semanticId==uid).astype(np.uint8) * 255 | |
| mask_filter = filters.laplace(mask) | |
| self.semanticContours[uid] = np.expand_dims(np.abs(mask_filter)>0, 2) | |
| # instance contours | |
| globalId = local2global(self.semanticId, self.instanceId) | |
| uniqueId = np.unique(globalId) | |
| self.instanceContours = {} | |
| for uid in uniqueId: | |
| mask = (globalId==uid).astype(np.uint8) * 255 | |
| mask_filter = filters.laplace(mask) | |
| self.instanceContours[uid] = np.expand_dims(np.abs(mask_filter)>0, 2) | |
| def toBoundaryImage(self, contourType='instance', instanceOnly=True): | |
| if contourType=='semantic': | |
| contours = self.semanticContours | |
| assert(instanceOnly==False) | |
| elif contourType=='instance': | |
| contours = self.instanceContours | |
| else: | |
| raise ("Contour type can only be 'semantic' or 'instance'!") | |
| if not instanceOnly: | |
| boundaryImg = [contours[k] for k in contours.keys()] | |
| else: | |
| boundaryImg = [contours[k] for k in contours.keys() if global2local(k)[1]!=0] | |
| boundaryImg = np.sum(np.asarray(boundaryImg), axis=0) | |
| boundaryImg = boundaryImg>0 | |
| return boundaryImg | |
| class Annotation2DInstance: | |
| def __init__(self, gtPath, cam=0): | |
| # trace the instances in all images | |
| self.instanceDict = defaultdict(list) | |
| # | |
| instanceDictCached = os.path.join(gtPath, 'instanceDict.json') | |
| print(instanceDictCached) | |
| if os.path.isfile(instanceDictCached) and os.path.getsize(instanceDictCached)>0: | |
| cachedDict = json.load( open(instanceDictCached) ) | |
| for k,v in cachedDict.items(): | |
| self.instanceDict[int(k)] = v | |
| return | |
| obj = Annotation2D() | |
| gtPaths = glob.glob( os.path.join(gtPath, 'instance', '*.png') ) | |
| print (f'Found {len(gtPaths)} label images...') | |
| for i,imgPath in enumerate(gtPaths): | |
| if i%1000==0: | |
| print(f'Processed {i}/{len(gtPaths)} label images...') | |
| obj.loadInstance(imgPath, toImg=False) | |
| globalId = local2global(obj.semanticId, obj.instanceId) | |
| globalIdUnique = np.unique(globalId) | |
| for idx in globalIdUnique: | |
| self.instanceDict[int(idx)].append(os.path.basename(imgPath)) | |
| json.dump( self.instanceDict, open(instanceDictCached, 'w')) | |
| # returns the paths that contains the specific instance | |
| def __call__(self, semanticId, instanceId): | |
| globalId = local2global(semanticId, instanceId) | |
| return self.instanceDict[globalId] | |
| # Meta class for KITTI360Bbox3D | |
| class Annotation3D: | |
| # Constructor | |
| def __init__(self, labelDir='', sequence=''): | |
| labelPath = glob.glob(os.path.join(labelDir, '*', '%s.xml' % sequence)) # train or test | |
| if len(labelPath)!=1: | |
| raise RuntimeError('%s does not exist! Please specify KITTI360_DATASET in your environment path.' % labelPath) | |
| else: | |
| labelPath = labelPath[0] | |
| print('Loading %s...' % labelPath) | |
| self.init_instance(labelPath) | |
| def init_instance(self, labelPath): | |
| # load annotation | |
| tree = ET.parse(labelPath) | |
| root = tree.getroot() | |
| self.objects = defaultdict(dict) | |
| self.num_bbox = 0 | |
| for child in root: | |
| if child.find('transform') is None: | |
| continue | |
| obj = KITTI360Bbox3D() | |
| obj.parseBbox(child) | |
| globalId = local2global(obj.semanticId, obj.instanceId) | |
| self.objects[globalId][obj.timestamp] = obj | |
| self.num_bbox+=1 | |
| globalIds = np.asarray(list(self.objects.keys())) | |
| semanticIds, instanceIds = global2local(globalIds) | |
| for label in labels: | |
| if label.hasInstances: | |
| print(f'{label.name:<30}:\t {(semanticIds==label.id).sum()}') | |
| print(f'Loaded {len(globalIds)} instances') | |
| print(f'Loaded {self.num_bbox} boxes') | |
| def __call__(self, semanticId, instanceId, timestamp=None): | |
| globalId = local2global(semanticId, instanceId) | |
| if globalId in self.objects.keys(): | |
| # static object | |
| if len(self.objects[globalId].keys())==1: | |
| if -1 in self.objects[globalId].keys(): | |
| return self.objects[globalId][-1] | |
| else: | |
| return None | |
| # dynamic object | |
| else: | |
| return self.objects[globalId][timestamp] | |
| else: | |
| return None | |
| class Annotation3DPly: | |
| # parse fused 3D point cloud | |
| def __init__(self, labelDir='', sequence='', isLabeled=True, isDynamic=False, showStatic=True): | |
| if isLabeled and not isDynamic: | |
| # x y z r g b semanticId instanceId isVisible confidence | |
| self.fmt = '=fffBBBiiBf' | |
| self.fmt_len = 28 | |
| elif isLabeled and isDynamic: | |
| # x y z r g b semanticId instanceId isVisible timestamp confidence | |
| self.fmt = '=fffBBBiiBif' | |
| self.fmt_len = 32 | |
| elif not isLabeled and not isDynamic: | |
| # x y z r g b | |
| self.fmt = '=fffBBBB' | |
| self.fmt_len = 16 | |
| else: | |
| raise RuntimeError('Invalid binary format!') | |
| # True for training data, False for testing data | |
| self.isLabeled = isLabeled | |
| # True for dynamic data, False for static data | |
| self.isDynamic = isDynamic | |
| # True for inspecting static data, False for inspecting dynamic data | |
| self.showStatic = showStatic | |
| pcdFolder = 'static' if self.showStatic else 'dynamic' | |
| trainTestDir = 'train' if self.isLabeled else 'test' | |
| self.pcdFileList = sorted(glob.glob(os.path.join(labelDir, trainTestDir, sequence, pcdFolder, '*.ply'))) | |
| print('Found %d ply files in %s' % (len(self.pcdFileList), sequence)) | |
| def readBinaryPly(self, pcdFile, n_pts=None): | |
| with open(pcdFile, 'rb') as f: | |
| plyData = f.readlines() | |
| headLine = plyData.index(b'end_header\n')+1 | |
| plyData = plyData[headLine:] | |
| plyData = b"".join(plyData) | |
| n_pts_loaded = len(plyData)/self.fmt_len | |
| # sanity check | |
| if n_pts: | |
| assert(n_pts_loaded==n_pts) | |
| n_pts_loaded = int(n_pts_loaded) | |
| data = [] | |
| for i in range(n_pts_loaded): | |
| pts=struct.unpack(self.fmt, plyData[i*self.fmt_len:(i+1)*self.fmt_len]) | |
| data.append(pts) | |
| data=np.asarray(data) | |
| return data | |
| def writeBinaryPly(self, pcdFile, data): | |
| fmt = '=fffBBBiiB' | |
| fmt_len = 24 | |
| n_pts = data.shape[0] | |
| with open(pcdFile, 'wb') as f: | |
| f.write(b'ply\n') | |
| f.write(b'format binary_little_endian 1.0\n') | |
| f.write(b'comment author Yiyi Liao\n') | |
| f.write(b'element vertex %d\n' % n_pts) | |
| f.write(b'property float x\n') | |
| f.write(b'property float y\n') | |
| f.write(b'property float z\n') | |
| f.write(b'property uchar red\n') | |
| f.write(b'property uchar green\n') | |
| f.write(b'property uchar blue\n') | |
| f.write(b'property int semantic\n') | |
| class Annotation3DInstance(object): | |
| instance_id = 0 | |
| labelId = 0 | |
| vert_count = 0 | |
| med_dist = -1 | |
| dist_conf = 0.0 | |
| def __init__(self, mesh_vert_instances, instance_id): | |
| if (instance_id == -1): | |
| return | |
| self.instance_id = int(instance_id) | |
| self.labelId = int(self.get_labelId(instance_id)) | |
| self.vert_count = int(self.get_instance_verts(mesh_vert_instances, instance_id)) | |
| def get_labelId(self, instance_id): | |
| return int(instance_id // 1000) | |
| def get_instance_verts(self, mesh_vert_instances, instance_id): | |
| return (mesh_vert_instances == instance_id).sum() | |
| def to_json(self): | |
| return json.dumps(self, default=lambda o: o.__dict__, sort_keys=True, indent=4) | |
| def to_dict(self): | |
| dict = {} | |
| dict["instance_id"] = self.instance_id | |
| dict["labelId"] = self.labelId | |
| dict["vert_count"] = self.vert_count | |
| dict["med_dist"] = self.med_dist | |
| dict["dist_conf"] = self.dist_conf | |
| return dict | |
| def from_json(self, data): | |
| self.instance_id = int(data["instance_id"]) | |
| self.labelId = int(data["labelId"]) | |
| self.vert_count = int(data["vert_count"]) | |
| if ("med_dist" in data): | |
| self.med_dist = float(data["med_dist"]) | |
| self.dist_conf = float(data["dist_conf"]) | |
| def __str__(self): | |
| return "("+str(self.instance_id)+")" | |
| # a dummy example | |
| if __name__ == "__main__": | |
| ann = Annotation3D() | |