Spaces:
Sleeping
Sleeping
Yaz Hobooti
Upgrade barcode reader to v2: multi-scale, multi-rotate, tiling, and tight rotated boxes for better recall
80fae67
| # barcode_reader.py — v2 (recall+tight boxes) | |
| from __future__ import annotations | |
| import io, os | |
| from typing import Any, Dict, List, Tuple, Optional | |
| import numpy as np | |
| from PIL import Image | |
| import cv2 | |
| # ---------- Engines ---------- | |
| _HAS_ZXING = False | |
| try: | |
| import zxingcpp # pip install zxing-cpp | |
| _HAS_ZXING = True | |
| except Exception: | |
| zxingcpp = None | |
| _HAS_ZXING = False | |
| _HAS_OCV_BARCODE = hasattr(cv2, "barcode") and hasattr(getattr(cv2, "barcode"), "BarcodeDetector") | |
| # ---------- PDF (PyMuPDF) ---------- | |
| try: | |
| import fitz # PyMuPDF | |
| _HAS_PYMUPDF = True | |
| except Exception: | |
| fitz = None | |
| _HAS_PYMUPDF = False | |
| # ========================= | |
| # Utils | |
| # ========================= | |
| def _to_bgr(img: Image.Image) -> np.ndarray: | |
| arr = np.array(img.convert("RGB")) | |
| return cv2.cvtColor(arr, cv2.COLOR_RGB2BGR) | |
| def _as_gray(arr_bgr: np.ndarray) -> np.ndarray: | |
| return cv2.cvtColor(arr_bgr, cv2.COLOR_BGR2GRAY) | |
| def _unrotate_points(pts: np.ndarray, rot: int, orig_w: int, orig_h: int) -> np.ndarray: | |
| """Map points from a np.rot90(rot) view back to the original image coordinate frame.""" | |
| p = pts.copy() | |
| if rot == 0: | |
| return p | |
| elif rot == 1: # 90° CCW | |
| x = orig_w - p[:, 1] | |
| y = p[:, 0] | |
| return np.stack([x, y], axis=1) | |
| elif rot == 2: # 180° | |
| x = orig_w - p[:, 0] | |
| y = orig_h - p[:, 1] | |
| return np.stack([x, y], axis=1) | |
| elif rot == 3: # 270° CCW | |
| x = p[:, 1] | |
| y = orig_h - p[:, 0] | |
| return np.stack([x, y], axis=1) | |
| return p | |
| def _norm_polygon(pts: Any, w: int, h: int) -> List[List[float]]: | |
| """ | |
| Normalize into 4 points. If fewer, approximate with minAreaRect on bbox. | |
| """ | |
| try: | |
| p = np.array(pts, dtype=np.float32).reshape(-1, 2) | |
| if p.shape[0] >= 4: | |
| p = p[:4] | |
| else: | |
| # fallback to bbox | |
| x1, y1 = p.min(axis=0) | |
| x2, y2 = p.max(axis=0) | |
| box = np.array([[x1, y1],[x2, y1],[x2, y2],[x1, y2]], dtype=np.float32) | |
| p = box | |
| except Exception: | |
| p = np.array([[0, 0], [w, 0], [w, h], [0, h]], dtype=np.float32) | |
| return p.astype(float).tolist() | |
| def _tight_rotated_box(poly: List[List[float]]) -> np.ndarray: | |
| """ | |
| Return a 4x2 polygon representing the tight rotated rectangle around input polygon. | |
| """ | |
| pts = np.array(poly, dtype=np.float32) | |
| rect = cv2.minAreaRect(pts) # (center, (w,h), angle) | |
| box = cv2.boxPoints(rect) # 4x2 | |
| return box.astype(np.float32) | |
| def _dedupe(results: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
| """ | |
| Deduplicate by (text, type) + polygon IoU. | |
| """ | |
| keep: List[Dict[str, Any]] = [] | |
| def iou(a, b): | |
| ax = np.array(a["polygon"], dtype=np.float32) | |
| bx = np.array(b["polygon"], dtype=np.float32) | |
| a_min, a_max = ax.min(axis=0), ax.max(axis=0) | |
| b_min, b_max = bx.min(axis=0), bx.max(axis=0) | |
| inter_min = np.maximum(a_min, b_min) | |
| inter_max = np.minimum(a_max, b_max) | |
| wh = np.maximum(inter_max - inter_min, 0) | |
| inter = wh[0] * wh[1] | |
| a_area = (a_max - a_min).prod() | |
| b_area = (b_max - b_min).prod() | |
| union = max(a_area + b_area - inter, 1e-6) | |
| return float(inter / union) | |
| for r in results: | |
| dup = False | |
| for k in keep: | |
| if r["text"] == k["text"] and r["type"] == k["type"] and iou(r, k) > 0.7: | |
| dup = True | |
| break | |
| if not dup: | |
| keep.append(r) | |
| return keep | |
| def _zxing_hints(): | |
| if not _HAS_ZXING: | |
| return None | |
| hints = zxingcpp.DecodeHints() | |
| hints.try_harder = True # improves 1D at small sizes | |
| hints.try_rotate = False # we rotate ourselves explicitly | |
| # Optionally restrict formats if you know your set, e.g. Code128|EAN_13|QR_CODE | |
| # hints.formats = zxingcpp.BarcodeFormat.Any (default) | |
| return hints | |
| # ========================= | |
| # Candidate generation (recall booster) | |
| # ========================= | |
| def _candidate_views(bgr: np.ndarray) -> List[Tuple[np.ndarray, float, int, Tuple[int,int]]]: | |
| """ | |
| Yield variants: (image, scale, rot, orig_wh). | |
| Rot ∈ {0,1,2,3} represents np.rot90(k=rot) CCW rotations. | |
| Scale ∈ {1.0, 1.5, 2.0} (skip big upscales for large inputs). | |
| """ | |
| H, W = bgr.shape[:2] | |
| scales = [1.0, 1.5] | |
| if max(H, W) < 1400: | |
| scales.append(2.0) | |
| out: List[Tuple[np.ndarray, float, int, Tuple[int,int]]] = [] | |
| for rot in (0, 1, 2, 3): | |
| img_rot = np.ascontiguousarray(np.rot90(bgr, k=rot)) if rot else bgr | |
| for s in scales: | |
| if s != 1.0: | |
| img_s = cv2.resize(img_rot, (0,0), fx=s, fy=s, interpolation=cv2.INTER_CUBIC) | |
| else: | |
| img_s = img_rot | |
| # Three light preprocess variants to help different symbologies: | |
| # - raw | |
| # - mild sharpen | |
| # - CLAHE on gray | |
| out.append((img_s, s, rot, (W, H))) | |
| k = np.array([[0,-1,0],[-1,5,-1],[0,-1,0]], dtype=np.float32) | |
| sharp = cv2.filter2D(img_s, -1, k) | |
| out.append((sharp, s, rot, (W, H))) | |
| g = _as_gray(img_s) | |
| clahe = cv2.createCLAHE(2.5, (8,8)).apply(g) | |
| clahe_rgb = cv2.cvtColor(clahe, cv2.COLOR_GRAY2BGR) | |
| out.append((clahe_rgb, s, rot, (W, H))) | |
| return out | |
| def _tile_views(bgr: np.ndarray, grid: int = 2) -> List[Tuple[np.ndarray, Tuple[int,int]]]: | |
| """ | |
| Optional small-ROI tiles (helps tiny/many codes): returns list of (tile_bgr, (x0,y0)) | |
| """ | |
| H, W = bgr.shape[:2] | |
| tiles: List[Tuple[np.ndarray, Tuple[int,int]]] = [] | |
| step_x = W // grid | |
| step_y = H // grid | |
| overlap_x = step_x // 6 | |
| overlap_y = step_y // 6 | |
| for iy in range(grid): | |
| for ix in range(grid): | |
| x0 = max(ix * step_x - overlap_x, 0) | |
| y0 = max(iy * step_y - overlap_y, 0) | |
| x1 = min((ix + 1) * step_x + overlap_x, W) | |
| y1 = min((iy + 1) * step_y + overlap_y, H) | |
| tiles.append((bgr[y0:y1, x0:x1], (x0, y0))) | |
| return tiles | |
| # ========================= | |
| # Decoders | |
| # ========================= | |
| def _decode_zxing_with_views(bgr: np.ndarray) -> List[Dict[str, Any]]: | |
| if not _HAS_ZXING: | |
| return [] | |
| hints = _zxing_hints() | |
| agg: List[Dict[str, Any]] = [] | |
| for img, scale, rot, (W, H) in _candidate_views(bgr): | |
| try: | |
| res = zxingcpp.read_barcodes(img, hints=hints) | |
| except Exception: | |
| continue | |
| for r in res or []: | |
| if not (r and getattr(r, "text", None)): | |
| continue | |
| try: | |
| fmt = getattr(r.format, "name", str(r.format)) | |
| except Exception: | |
| fmt = str(r.format) | |
| # Collect points (ZXing gives a quadrilateral for most types) | |
| pts = [] | |
| try: | |
| pos = r.position | |
| pts = np.array([[float(pt.x), float(pt.y)] for pt in pos], dtype=np.float32) | |
| except Exception: | |
| pts = np.empty((0,2), dtype=np.float32) | |
| # Map back to original frame | |
| if pts.size: | |
| pts = pts / float(scale) | |
| pts = _unrotate_points(pts, rot, W, H) | |
| poly = _norm_polygon(pts, W, H) | |
| else: | |
| poly = _norm_polygon([], W, H) | |
| agg.append({ | |
| "engine": "zxingcpp", | |
| "type": fmt, | |
| "text": r.text or "", | |
| "polygon": poly, | |
| }) | |
| if agg: # good enough | |
| break | |
| return agg | |
| def _decode_opencv_with_views(bgr: np.ndarray) -> List[Dict[str, Any]]: | |
| if not _HAS_OCV_BARCODE: | |
| return [] | |
| det = cv2.barcode.BarcodeDetector() | |
| agg: List[Dict[str, Any]] = [] | |
| for img, scale, rot, (W, H) in _candidate_views(bgr): | |
| gray = _as_gray(img) | |
| ok, infos, types, corners = det.detectAndDecode(gray) | |
| if not ok: | |
| continue | |
| for txt, typ, pts in zip(infos, types, corners): | |
| if not txt: | |
| continue | |
| pts = np.array(pts, dtype=np.float32).reshape(-1,2) | |
| # map back to original frame | |
| pts = pts / float(scale) | |
| pts = _unrotate_points(pts, rot, W, H) | |
| poly = _norm_polygon(pts, W, H) | |
| agg.append({ | |
| "engine": "opencv_barcode", | |
| "type": typ, | |
| "text": txt, | |
| "polygon": poly, | |
| }) | |
| if agg: | |
| break | |
| return agg | |
| def _decode_any(bgr: np.ndarray) -> List[Dict[str, Any]]: | |
| res = _decode_zxing_with_views(bgr) | |
| if res: | |
| return res | |
| res = _decode_opencv_with_views(bgr) | |
| if res: | |
| return res | |
| # Last-ditch: small tiles for tiny/overlapping codes | |
| hits: List[Dict[str, Any]] = [] | |
| for tile, (x0, y0) in _tile_views(bgr, grid=2): | |
| sub = _decode_zxing_with_views(tile) or _decode_opencv_with_views(tile) | |
| for h in sub: | |
| poly = np.array(h["polygon"], dtype=np.float32) | |
| poly[:, 0] += x0 | |
| poly[:, 1] += y0 | |
| h["polygon"] = poly.tolist() | |
| hits.append(h) | |
| return hits | |
| # ========================= | |
| # Image & PDF readers | |
| # ========================= | |
| def _pdf_extract_xobject_images(path: str, page_index: Optional[int] = None) -> List[Tuple[int, np.ndarray]]: | |
| if not _HAS_PYMUPDF: | |
| return [] | |
| out: List[Tuple[int, np.ndarray]] = [] | |
| doc = fitz.open(path) | |
| pages = range(len(doc)) if page_index is None else [page_index] | |
| for pno in pages: | |
| page = doc[pno] | |
| for info in page.get_images(full=True): | |
| xref = info[0] | |
| pix = fitz.Pixmap(doc, xref) | |
| if pix.n >= 4: | |
| pix = fitz.Pixmap(fitz.csRGB, pix) | |
| pil = Image.open(io.BytesIO(pix.tobytes("png"))).convert("RGB") | |
| out.append((pno, _to_bgr(pil))) | |
| doc.close() | |
| return out | |
| def _pdf_render_page(path: str, page: int, dpi: int) -> np.ndarray: | |
| if not _HAS_PYMUPDF: | |
| raise RuntimeError("PyMuPDF not available; cannot rasterize PDF.") | |
| doc = fitz.open(path) | |
| if page >= len(doc): | |
| n = len(doc); doc.close() | |
| raise ValueError(f"Page {page} out of range; PDF has {n} pages.") | |
| pg = doc[page] | |
| scale = dpi / 72.0 | |
| mat = fitz.Matrix(scale, scale) | |
| pix = pg.get_pixmap(matrix=mat, alpha=False) | |
| pil = Image.open(io.BytesIO(pix.tobytes("png"))).convert("RGB") | |
| doc.close() | |
| return _to_bgr(pil) | |
| def _decode_image_path(path: str) -> List[Dict[str, Any]]: | |
| pil = Image.open(path).convert("RGB") | |
| bgr = _to_bgr(pil) | |
| hits = _decode_any(bgr) | |
| for h in hits: | |
| h.update({"source": "image", "page": 0}) | |
| return _dedupe(hits) | |
| def _decode_pdf_path(path: str, max_pages: int = 8, raster_dpis: Tuple[int, ...] = (400, 600, 900)) -> List[Dict[str, Any]]: | |
| results: List[Dict[str, Any]] = [] | |
| for pno, img_bgr in _pdf_extract_xobject_images(path): | |
| hits = _decode_any(img_bgr) | |
| for h in hits: | |
| h.update({"source": "pdf_xobject_image", "page": pno}) | |
| results.extend(hits) | |
| if results: | |
| return _dedupe(results) | |
| if not _HAS_PYMUPDF: | |
| return [] | |
| doc = fitz.open(path) | |
| n = min(len(doc), max_pages) | |
| doc.close() | |
| for dpi in raster_dpis: | |
| for pno in range(n): | |
| img_bgr = _pdf_render_page(path, pno, dpi=dpi) | |
| hits = _decode_any(img_bgr) | |
| for h in hits: | |
| h.update({"source": f"pdf_raster_{dpi}dpi", "page": pno}) | |
| results.extend(hits) | |
| if results: | |
| break | |
| return _dedupe(results) | |
| # ========================= | |
| # Public API | |
| # ========================= | |
| def read_barcodes_from_path(path: str, | |
| max_pages: int = 8, | |
| raster_dpis: Tuple[int, ...] = (400, 600, 900)) -> List[Dict[str, Any]]: | |
| ext = os.path.splitext(path.lower())[1] | |
| if ext == ".pdf": | |
| return _decode_pdf_path(path, max_pages=max_pages, raster_dpis=raster_dpis) | |
| else: | |
| return _decode_image_path(path) | |
| # ========================= | |
| # Drawing helpers (tight boxes) | |
| # ========================= | |
| def draw_barcodes(bgr: np.ndarray, detections: List[Dict[str, Any]], color=(0,255,0), thickness: int = 2) -> np.ndarray: | |
| """ | |
| Draw a tight, rotated rectangle around each code (green by default), | |
| plus a small label (TYPE:TEXT) near the first corner. | |
| """ | |
| out = bgr.copy() | |
| for d in detections: | |
| poly = np.array(d["polygon"], dtype=np.float32).reshape(-1, 2) | |
| box = _tight_rotated_box(poly) if poly.shape[0] >= 2 else poly | |
| box_i = box.astype(np.int32).reshape(-1, 1, 2) | |
| cv2.polylines(out, [box_i], True, color, thickness, cv2.LINE_AA) | |
| x, y = int(box[0,0]), int(box[0,1]) | |
| label = f'{d.get("type","")}: {d.get("text","")}' | |
| cv2.putText(out, label[:48], (x, max(14, y-6)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,50,255), 1, cv2.LINE_AA) | |
| return out | |
| def render_preview_bgr(path: str, page: int = 0, dpi: int = 220) -> np.ndarray: | |
| ext = os.path.splitext(path.lower())[1] | |
| if ext == ".pdf": | |
| return _pdf_render_page(path, page=page, dpi=dpi) | |
| pil = Image.open(path).convert("RGB") | |
| return _to_bgr(pil) |