Source code for augraphy.base.augmentationpipeline

import os
import random
import time
from copy import copy
from copy import deepcopy
from glob import glob

import cv2
import numpy as np

from augraphy.base.augmentation import Augmentation
from augraphy.base.augmentationresult import AugmentationResult
from augraphy.base.augmentationsequence import AugmentationSequence
from augraphy.base.oneof import OneOf
from augraphy.utilities.detectdpi import dpi_resize
from augraphy.utilities.detectdpi import DPIMetrics
from augraphy.utilities.overlaybuilder import OverlayBuilder


[docs] class AugraphyPipeline: """Contains phases of image augmentations and their results. :param pre_phase: Collection of Augmentations to apply :param ink_phase: Collection of Augmentations to apply. :type ink_phase: base.augmentationsequence or list :param paper_phase: Collection of Augmentations to apply. :type paper_phase: base.augmentationsequence or list :param post_phase: Collection of Augmentations to apply. :type post_phase: base.augmentationsequence or list :param overlay_type: Blending method to print ink into paper. :type overlay_type: string, optional. :param overlay_alpha: The alpha value for certain overlay methods. :type overlay_alpha: float, optional. :param ink_color_range: Pair of ints determining the range from which to sample the ink color. :type ink_color_range: tuple, optional :param paper_color_range: Pair of ints determining the range from which to sample the paper color. :type paper_color_range: tuple, optional :param mask: The mask of labels for each pixel. Mask value should be in range of 1 to 255. Value of 0 will be assigned to the filled area after the transformation. :type mask: numpy array (uint8), optional :param keypoints: A dictionary of single or multiple labels where each label is a nested list of points coordinate. For example: keypoints = {"label1":[[xpoint1, ypoint1], [xpoint2, ypoint2]], "label2":[[xpoint3, ypoint3]]}. :type keypoints: dictionary, optional :param bounding_boxes: A nested list where each nested list contains box location (x1, y1, x2, y2). For example: bounding_boxes = [[xa1,ya1,xa2,ya2], [xb1,yb2,xb2,yb2]] :type bounding_boxes: list, optional :param save_outputs: Flag to enable saving each phase output image. :type save_outputs: bool, optional :param fixed_dpi: Flag to enable a same DPI in both input and augmented image. :type fixed_dpi: bool, optional :param log: Flag to enable logging. :type log: bool, optional :param random_seed: The initial value for PRNGs used in Augraphy. :type random_seed: int, optional """ def __init__( self, ink_phase=[], paper_phase=[], post_phase=[], pre_phase=[], overlay_type="ink_to_paper", overlay_alpha=0.3, ink_color_range=(-1, -1), paper_color_range=(255, 255), mask=None, keypoints=None, bounding_boxes=None, save_outputs=False, fixed_dpi=False, log=False, random_seed=None, ): """Constructor method""" self.pre_phase = self.wrapListMaybe(pre_phase) self.ink_phase = self.wrapListMaybe(ink_phase) self.paper_phase = self.wrapListMaybe(paper_phase) self.post_phase = self.wrapListMaybe(post_phase) self.overlay_type = overlay_type self.overlay_alpha = overlay_alpha self.ink_color_range = ink_color_range self.paper_color_range = paper_color_range self.mask = mask self.keypoints = keypoints self.bounding_boxes = bounding_boxes self.save_outputs = save_outputs self.fixed_dpi = fixed_dpi self.log = log self.random_seed = random_seed # ensure determinism if random_seed set if self.random_seed: random.seed(self.random_seed) np.random.seed(self.random_seed) cv2.setRNGSeed(self.random_seed) # create directory to store log files if self.log: self.log_prob_path = os.path.join(os.getcwd(), "logs/") os.makedirs(self.log_prob_path, exist_ok=True) if self.save_outputs: # create each phase folder self.save_paths = [] self.save_paths.append(os.path.join(os.getcwd(), "augmentation_images/pre/")) self.save_paths.append(os.path.join(os.getcwd(), "augmentation_images/ink/")) self.save_paths.append(os.path.join(os.getcwd(), "augmentation_images/paper/")) self.save_paths.append(os.path.join(os.getcwd(), "augmentation_images/post/")) for i in range(len(self.save_paths)): os.makedirs(self.save_paths[i], exist_ok=True)
[docs] def wrapListMaybe(self, augs): """Converts a bare list to an AugmentationSequence, or does nothing.""" if type(augs) is list: return AugmentationSequence(augs) else: return augs
[docs] def augment(self, image, return_dict=1): """Applies the Augmentations in each phase of the pipeline. :param image: The image to apply Augmentations to. Minimum 30x30 pixels. :type image: numpy.array or list :return: 1. A dictionary of AugmentationResults representing the changes in each phase of the pipeline if the input is image. 2. A list contains output images if the input is list of images. 3. A four dimensional numpy array if the input is a four dimensional numpy array (batch size, channels, height, width). :rtype: 1. dictionary 2. list 3. numpy array (B, C, H, W) :param return_dict: Flag to return output in dictionary format. Not applicable when input is 4 dimensional array. When input is 4 dimensional numpy array, output will be a 4 dimensional array too. :type return_dict: int """ # image is a list of images if isinstance(image, list): output = [] for i, single_image in enumerate(image): # retrieve mask, keypoints, bounding boxes (if there's any) mask, keypoints, bounding_boxes = None, None, None if self.mask is not None: mask = self.mask[i] if self.keypoints is not None: keypoints = self.keypoints[i] if self.bounding_boxes is not None: bounding_boxes = self.bounding_boxes[i] data = self.augment_single_image( image=single_image, mask=mask, keypoints=keypoints, bounding_boxes=bounding_boxes, ) if return_dict: output.append(data) else: if (mask is not None) or (keypoints is not None) or (bounding_boxes is not None): output.append([data["output"], data["mask"], data["keypoints"], data["bounding_boxes"]]) else: output.append(data["output"]) # image is a 4 dimensional numpy array elif len(image.shape) == 4: batch_size, channels, height, width = image.shape output = np.zeros((batch_size, channels, height, width), dtype=image.dtype) output_masks = [] output_keypoints = [] output_bounding_boxes = [] for i in range(batch_size): # retrieve mask, keypoints, bounding boxes (if there's any) mask, keypoints, bounding_boxes = None, None, None if self.mask is not None: mask = self.mask[i] if self.keypoints is not None: keypoints = self.keypoints[i] if self.bounding_boxes is not None: bounding_boxes = self.bounding_boxes[i] # perform augmentation single_image = image[i].reshape(height, width, channels) output_data = self.augment_single_image( single_image, mask=mask, keypoints=keypoints, bounding_boxes=bounding_boxes, ) # retrieve image and each additional output format output_image = output_data["output"] output_masks.append(output_data["mask"]) output_keypoints.append(output_data["keypoints"]) output_bounding_boxes.append(output_data["bounding_boxes"]) # output is color image but input is in grayscale, convert output to grayscale if len(output_image.shape) > channels: output_image = cv2.cvtColor(output_image, cv2.COLOR_BGR2GRAY) # output is in grayscale but input is color image, convert output to color image if len(output_image.shape) != channels: output_image = cv2.cvtColor(output_image, cv2.COLOR_GRAY2BGR) # rescale image size if the image size is changes after the augmentation if output_image.shape[0] != height or output_image.shape[1] != width: output_image = cv2.resize(output_image, (width, height), interpolation=cv2.INTER_AREA) output[i] = output_image.reshape(channels, height, width) if (mask is not None) or (keypoints is not None) or (bounding_boxes is not None): output = [output, output_masks, output_keypoints, output_bounding_boxes] # single image else: data = self.augment_single_image( image, mask=self.mask, keypoints=self.keypoints, bounding_boxes=self.bounding_boxes, ) if return_dict: output = data else: # returns output in [image, mask, keypoints and bounding boxes if ( (data["mask"] is not None) or (data["keypoints"] is not None) or (data["bounding_boxes"] is not None) ): output = [data["output"], data["mask"], data["keypoints"], data["bounding_boxes"]] else: output = data["output"] return output
[docs] def augment_single_image(self, image, mask, keypoints, bounding_boxes): """Applies the Augmentations in each phase of the pipeline. :param image: The image to apply Augmentations to. Minimum 30x30 pixels. :type image: numpy.array :param mask: The mask of labels for each pixel. Mask value should be in range of 0 to 255. :type mask: numpy array (uint8), optional :param keypoints: A dictionary of single or multiple labels where each label is a nested list of points coordinate (x, y). :type keypoints: dictionary, optional :param bounding_boxes: A nested list where each nested list contains box location (x1, y1, x2, y2). :type bounding_boxes: list, optional :return: A dictionary of AugmentationResults representing the changes in each phase of the pipeline. :rtype: dictionary """ # Check if image has correct channel if len(image.shape) > 2 and (image.shape[2] != 3 and image.shape[2] != 4): raise Exception( "Image should have channel number of 3 (BGR) or 4 (BGRA), but actual dimensions were {}.".format( image.shape, ), ) # Check that image is the correct size. if (image.shape[0] < 30) or (image.shape[1] < 30): raise Exception( "Image should have dimensions greater than 30x30, but actual dimensions were {}.".format( image.shape, ), ) # get and check valid image type ( uint or float) image_type = str(image.dtype) image_max_value = 255 if image_type[:5] == "float": if np.max(image) <= 1: image_max_value = 1 image = np.uint8(image * 255) else: image = np.uint8(image) elif image_type[:4] != "uint": raise Exception( "Image type should be uint or float, but the image type is {}.".format( image_type, ), ) # create augraphy cache folder cache_folder_path = os.path.join(os.getcwd() + "/augraphy_cache/") os.makedirs(cache_folder_path, exist_ok=True) cache_image_paths = glob(cache_folder_path + "*.png", recursive=True) file_indices = [] modified_time = [] for image_path in cache_image_paths: file_name = os.path.basename(image_path) file_indices.append(int(file_name[file_name.index("_") + 1 : -4])) modified_time.append(os.path.getmtime(image_path)) # store 30 cache image files if len(cache_image_paths) >= 30: oldest_index = np.argmin(modified_time) outfilename = cache_folder_path + "image_" + str(file_indices[oldest_index]) + ".png" cv2.imwrite( outfilename, image, ) else: current_image_index = len(cache_image_paths) outfilename = cache_folder_path + "image_" + str(current_image_index) + ".png" cv2.imwrite( outfilename, image, ) data = dict() # Store performance metadata and other logs here. data["log"] = dict() # For storing augmentation execution times. data["log"]["time"] = list() data["log"]["augmentation_name"] = list() data["log"]["augmentation_status"] = list() data["log"]["augmentation_parameters"] = list() # This is useful. data["log"]["image_shape"] = image.shape data["image"] = image.copy() data["mask"] = None data["keypoints"] = None data["bounding_boxes"] = None data["pipeline"] = self data["pre"] = list() data["ink"] = list() data["paper"] = list() data["post"] = list() # If phases were defined None or [] in a custom pipeline, they wouldn't # be callable objects, so make them empty AugmentationSequences if len(self.ink_phase) == 0: self.ink_phase = AugmentationSequence([]) if len(self.paper_phase) == 0: self.paper_phase = AugmentationSequence([]) if len(self.post_phase) == 0: self.post_phase = AugmentationSequence([]) # the input image image_input = data["image"].copy() # pre phase if len(self.pre_phase) > 0: if self.fixed_dpi: # compute and save a copy of image original dpi and doc dimensions dpi_object = DPIMetrics(image_input) original_dpi, doc_dimensions = dpi_object() # pre phase input data["pre"].append( AugmentationResult(None, image_input, mask=mask, keypoints=keypoints, bounding_boxes=bounding_boxes), ) # apply pre phase augmentations self.apply_phase(data, layer="pre", phase=self.pre_phase) # the output of pre phase is the input for ink phase ink = data["pre"][-1].result mask = data["pre"][-1].mask keypoints = data["pre"][-1].keypoints bounding_boxes = data["pre"][-1].bounding_boxes else: ink = image_input # ink phase # ink phase input data["ink"].append(AugmentationResult(None, ink, mask=mask, keypoints=keypoints, bounding_boxes=bounding_boxes)) # apply ink phase augmentations self.apply_phase(data, layer="ink", phase=self.ink_phase) # paper phase if (self.paper_color_range[0] != 0) | (self.paper_color_range[1] != 0): paper_color = random.randint( self.paper_color_range[0], self.paper_color_range[1], ) else: paper_color = 255 data["log"]["paper_color"] = paper_color # paper phase input data["paper"].append( AugmentationResult( None, np.full( (data["ink"][-1].result.shape[0], data["ink"][-1].result.shape[1], 3), paper_color, dtype=np.uint8, ), mask=data["ink"][-1].mask, keypoints=data["ink"][-1].keypoints, bounding_boxes=data["ink"][-1].bounding_boxes, ), ) # apply paper phase augmentations self.apply_phase(data, layer="paper", phase=self.paper_phase) # post phase # post phase input: ink and paper phases always have at least one result by now data["post"].append( AugmentationResult( None, self.print_ink_to_paper( data, data["ink"][-1].result.copy(), data["paper"][-1].result.copy(), ), mask=data["ink"][-1].mask, keypoints=data["ink"][-1].keypoints, bounding_boxes=data["ink"][-1].bounding_boxes, ), ) # apply post phase augmentations self.apply_phase(data, layer="post", phase=self.post_phase) if self.fixed_dpi and len(self.pre_phase) > 0: dpi_object = DPIMetrics(data["post"][-1].result) current_dpi, current_doc_dimensions = dpi_object() # resize to original input dpi if dpi is changed if current_dpi != original_dpi: iysize, ixsize = image_input.shape[:2] # rescale image image_resize = cv2.resize(data["post"][-1].result, (ixsize, iysize), interpolation=cv2.INTER_AREA) # rescale mask if data["post"][-1].mask is not None: mask_resize = cv2.resize(data["post"][-1].mask, (ixsize, iysize), interpolation=cv2.INTER_AREA) else: mask_resize = None # rescale keypoints if data["post"][-1].keypoints is not None: scale_x = ixsize / data["post"][-1].result.shape[1] scale_y = iysize / data["post"][-1].result.shape[0] keypoints_resize = {} for name, points in data["post"][-1].keypoints.items(): keypoints_resize[name] = [] for i, (xpoint, ypoint) in enumerate(points): keypoints_resize[name].append([round(xpoint * scale_x), round(ypoint * scale_y)]) else: keypoints_resize = None # scale bounding boxes if bounding_boxes is not None: scale_x = ixsize / data["post"][-1].result.shape[1] scale_y = iysize / data["post"][-1].result.shape[0] bounding_boxes_resize = [] for i, bounding_box in enumerate(bounding_boxes): xspoint, yspoint, xepoint, yepoint = bounding_box bounding_boxes_resize.append( [ round(xspoint * scale_x), round(yspoint * scale_y), round(xepoint * scale_x), round(yepoint * scale_y), ], ) else: bounding_boxes_resize = None data["post"].append( AugmentationResult( None, image_resize, mask_resize, keypoints_resize, bounding_boxes_resize, ), ) # revert to input image type if image_type[:5] == "float": if image_max_value == 1: data["output"] = (data["post"][-1].result.astype(image_type)) / 255 else: data["output"] = data["post"][-1].result.astype(image_type) else: data["output"] = data["post"][-1].result.astype("uint8") # additional outputs data["mask"] = data["post"][-1].mask data["keypoints"] = data["post"][-1].keypoints data["bounding_boxes"] = data["post"][-1].bounding_boxes # save each phase augmented images if self.save_outputs: self.save_images(data) # log probability if self.log: self.write_log(data) return data
[docs] def save_images(self, data): """Save each augmented image in each phases to local disk. :param data: A dictionary of AugmentationResults representing the changes in each phase of the pipeline. :type data: dictionary """ layer_names = ["pre", "ink", "paper", "post"] pre_layers = data["pre"] ink_layers = data["ink"] paper_layers = data["paper"] post_layers = data["post"] n = 0 for j, layers in enumerate([pre_layers, ink_layers, paper_layers, post_layers]): # output path for each ink, paper and post phase images save_path = self.save_paths[j] # name of layer or phase layer_name = layer_names[j] for i, layer_data in enumerate(layers): if layer_data.metadata is None: result = layer_data.result # input layer if layer_data.augmentation is None: augmentation_name = layer_name + "_layer_input" cv2.imwrite( save_path + "p" + str(n) + "_" + layer_name + str(i) + "_" + augmentation_name + ".png", result, ) n += 1 # one of elif layer_data.augmentation.__class__.__name__ == "OneOf": augmentation_name = "oneof_" n = self.get_oneof_data( layer_data.augmentation, result, save_path, layer_name, augmentation_name, i, n, ) # sequence elif layer_data.augmentation.__class__.__name__ == "AugmentationSequence": augmentation_name = "sequence_" n = self.get_sequence_data( layer_data.augmentation, result, save_path, layer_name, augmentation_name, i, n, ) # normal augmentations else: augmentation_name = layer_data.augmentation.__class__.__name__ cv2.imwrite( save_path + "p" + str(n) + "_" + layer_name + str(i) + "_" + augmentation_name + ".png", result, ) n += 1
[docs] def get_oneof_data(self, augmentation, result, save_path, layer_name, augmentation_name, i, n): """Get augmentation information from OneOf augmentation recursively or save the augmented image in disk. :param augmentation: Augmentation object of OneOf augmentation. :type augmentation: class instance :param result: Augmentation output, it may be nested in a list. :type result: list or numpy array. :param save_path: Output path of result. :type save_path: list :param layer_name: Name of current layer. :type layer_name: list :param augmentation_name: A combined name of augmentations, seperated by _. :type augmentation_name: list :param i: Index of current augmentation in total number of augmentations. :type i: int :param n: Index of current augmented in total number of augmented images. :type n: int """ current_augmentation = augmentation.augmentations[np.argmax(augmentation.augmentation_probabilities)] # sequence inside oneof if current_augmentation.__class__.__name__ == "AugmentationSequence": augmentation_name += "sequence_" n = self.get_sequence_data(current_augmentation, result, save_path, layer_name, augmentation_name, i, n) # oneof inside oneof elif current_augmentation.__class__.__name__ == "OneOf": augmentation_name += "oneof_" n = self.get_oneof_data(current_augmentation, result, save_path, layer_name, augmentation_name, i, n) # augmentations inside oneof else: augmentation_name += current_augmentation.__class__.__name__ cv2.imwrite(save_path + "p" + str(n) + "_" + layer_name + str(i) + "_" + augmentation_name + ".png", result) n += 1 return n
[docs] def get_sequence_data(self, augmentation, result, save_path, layer_name, input_augmentation_name, i, n): """Get augmentation information from AugmentationSequence augmentation recursively or save the augmented image in disk. :param augmentation: Augmentation object of OneOf augmentation. :type augmentation: class instance :param result: Augmentation output, it may be nested in a list. :type result: list or numpy array. :param save_path: Output path of result. :type save_path: list :param layer_name: Name of current layer. :type layer_name: list :param augmentation_name: A combined name of augmentations, seperated by _. :type augmentation_name: list :param i: Index of current augmentation in total number of augmentations. :type i: int :param n: Index of current augmented in total number of augmented images. :type n: int """ s = 0 for current_augmentation, result in zip(augmentation.augmentations, augmentation.results): augmentation_name = copy(input_augmentation_name) + str(s) + "_" # sequence inside sequence if current_augmentation.__class__.__name__ == "AugmentationSequence": # sequence returns (result, self.augmentations), so get result only here result = result[0] augmentation_name += "sequence_" n = self.get_sequence_data(current_augmentation, result, save_path, layer_name, augmentation_name, i, n) # oneof inside sequence elif current_augmentation.__class__.__name__ == "OneOf": # oneof returns (image, [augmentation]), so get image only here result = result[0] augmentation_name += "oneof_" n = self.get_oneof_data(current_augmentation, result, save_path, layer_name, augmentation_name, i, n) # augmentations inside sequence else: augmentation_name += current_augmentation.__class__.__name__ if result is not None: cv2.imwrite( save_path + "p" + str(n) + "_" + layer_name + str(i) + "_" + augmentation_name + ".png", result, ) n += 1 s += 1 return n
[docs] def write_log(self, data): """Save augmentations log to local disk. :param data: A dictionary of AugmentationResults representing the changes in each phase of the pipeline. :type data: dictionary """ # path to log file log_file_name = "log_" + time.strftime("%Y_%m_%d_%H_%M_%S", time.localtime()) + ".txt" log_prob_file_path = self.log_prob_path + log_file_name augmentation_names = data["log"]["augmentation_name"] augmentation_status = data["log"]["augmentation_status"] augmentation_parameters = deepcopy(data["log"]["augmentation_parameters"]) # remove image array and replace it with shape for j, augmentation_parameter in enumerate(augmentation_parameters): # check and convert from tuple to list if isinstance(augmentation_parameter, tuple): augmentation_parameter = list(augmentation_parameter) augmentation_parameters[j] = augmentation_parameter check_values = [augmentation_parameter] while check_values: value = check_values.pop(0) if value: if isinstance(value, list): for i, nested_value in enumerate(value): if hasattr(nested_value, "shape"): value[i] = nested_value.shape elif ( isinstance(nested_value, list) or isinstance(nested_value, tuple) or hasattr(nested_value, "shape") ): # convert from tuple to list if isinstance(nested_value, tuple): nested_value = list(nested_value) value[i] = nested_value check_values.append(nested_value) elif hasattr(value, "items"): for parameter, nested_value in value.items(): if hasattr(nested_value, "shape"): value[parameter] = nested_value.shape elif ( isinstance(nested_value, list) or isinstance(nested_value, tuple) or hasattr(nested_value, "shape") ): # convert from tuple to list if isinstance(nested_value, tuple): nested_value = list(nested_value) value[parameter] = nested_value check_values.append(nested_value) with open(log_prob_file_path, "w+") as file: for (name, status, parameters) in zip( augmentation_names, augmentation_status, augmentation_parameters, ): file.write("%s,%s,%s \n" % (name, status, parameters)) # put a space file.write("\n") file.close()
[docs] def apply_phase(self, data, layer, phase): """Applies every augmentation in a phase. :param data: A dictionary of AugmentationResults representing the changes in each phase of the pipeline. :type data: dictionary :param layer: The name of current layer or phase. :type layer: string :param phase: Collection of Augmentations to apply. :type phase: base.augmentationsequence or list """ for augmentation in phase.augmentations: result = data[layer][-1].result.copy() mask, keypoints, bounding_boxes = None, None, None if data[layer][-1].mask is not None: mask = data[layer][-1].mask.copy() if data[layer][-1].keypoints is not None: keypoints = deepcopy(data[layer][-1].keypoints) if data[layer][-1].bounding_boxes is not None: bounding_boxes = deepcopy(data[layer][-1].bounding_boxes) if augmentation.should_run(): start = time.process_time() # time at start of execution result = augmentation( image=result, layer=layer, mask=mask, keypoints=keypoints, bounding_boxes=bounding_boxes, force=True, ) end = time.process_time() # time at end of execution elapsed = end - start # execution duration data["log"]["time"].append((augmentation, elapsed)) # not "OneOf" or "AugmentationSequence" if ( isinstance(augmentation, Augmentation) and not isinstance(augmentation, AugmentationSequence) and not isinstance(augmentation, OneOf) ): # unpacking augmented image, mask, keypoints and bounding boxes from output if (mask is not None) or (keypoints is not None) or (bounding_boxes is not None): result, mask, keypoints, bounding_boxes = result else: result = None data["log"]["augmentation_name"].append(augmentation.__class__.__name__) if result is None: data["log"]["augmentation_status"].append(False) data["log"]["augmentation_parameters"].append("") data[layer].append( AugmentationResult( augmentation, data[layer][-1].result.copy(), mask, keypoints, bounding_boxes, 'This augmentation did not run, its "result" is unchanged.', ), ) else: data["log"]["augmentation_status"].append(True) data["log"]["augmentation_parameters"].append(augmentation.__dict__) # for "OneOf" or "AugmentationSequence" while isinstance(result, tuple) or isinstance(result, list): result, augmentations = result for nested_augmentation in augmentations: data["log"]["augmentation_name"].append( nested_augmentation.__class__.__name__, ) data["log"]["augmentation_status"].append(True) data["log"]["augmentation_parameters"].append( nested_augmentation.__dict__, ) # unpacking augmented image, mask, keypoints and bounding boxes from output if (mask is not None) or (keypoints is not None) or (bounding_boxes is not None): result, mask, keypoints, bounding_boxes = result data[layer].append(AugmentationResult(augmentation, result, mask, keypoints, bounding_boxes))
[docs] def print_ink_to_paper(self, data, overlay, background): """Applies the ink layer to the paper layer. :param data: A dictionary of AugmentationResults representing the changes in each phase of the pipeline. :type data: dictionary :param overlay: Foreground of overlay process, output from ink phase. :type overlay: numpy array :param background: Background of overlay process, output from paper phase. :type background: numpy array """ if (self.ink_color_range[0] != -1) or (self.ink_color_range[1] != -1): ink_color = random.randint(self.ink_color_range[0], self.ink_color_range[1]) else: ink_color = -1 data["log"]["ink_color"] = ink_color # prevent inconsistency in size between background and overlay if overlay.shape[:2] != background.shape[:2]: overlay_y, overlay_x = overlay.shape[:2] background = cv2.resize( background, (overlay_x, overlay_y), interpolation=cv2.INTER_AREA, ) # preserve alpha layer has_alpha = 0 if len(overlay.shape) > 2 and overlay.shape[2] == 4: has_alpha = 1 image_alpha = overlay[:, :, 3] overlay = overlay[:, :, :3] ink_to_paper_builder = OverlayBuilder( overlay_types=self.overlay_type, foreground=overlay, background=background, ntimes=1, nscales=(1, 1), edge="center", edge_offset=0, alpha=self.overlay_alpha, ink_color=ink_color, ) image_blended = ink_to_paper_builder.build_overlay() if has_alpha: image_blended = np.dstack((image_blended, image_alpha)) return image_blended
def __repr__(self): r = f"pre_phase = {repr(self.pre_phase)}\n\n" r += f"ink_phase = {repr(self.ink_phase)}\n\n" r += f"paper_phase = {repr(self.paper_phase)}\n\n" r += f"post_phase = {repr(self.post_phase)}\n\n" r += f"AugraphyPipeline(pre_phase , ink_phase, paper_phase, post_phase, overlay_type={self.overlay_type}, overlay_alpha={self.overlay_alpha}, ink_color_range={self.ink_color_range}, paper_color_range={self.paper_color_range}, save_outputs={self.save_outputs}, log={self.log}, random_seed={self.random_seed})" return r
[docs] def visualize(self): print(repr(self))
def __call__(self, image): return self.augment(image, return_dict=0)