<a href="https://github.com/CSP-GD/notes/blob/master/theory/%E6%B7%B1%E5%BA%A6%E5%AD%B8%E7%BF%92/%E7%89%A9%E4%BB%B6%E8%AD%98%E5%88%A5/faster-rcnn/fasterRCNN.ipynb">
  <img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a>

In [0]:
%tensorflow_version 2.x
import tensorflow as tf
import tensorflow_datasets as tfds
import numpy as np

print(tf.__version__)
print(tfds.__version__)
print(np.__version__)

2.2.0-rc1
2.1.0
1.18.2


In [0]:
# tfds.object_detection.voc.xml.parsers.expat
# tfds.load('c')

# Construct a tf.data.Dataset
ds = tfds.load('mnist', split='train', shuffle_files=True)

# Build your input pipeline
ds = ds.shuffle(1024).batch(32).prefetch(tf.data.experimental.AUTOTUNE)
for example in ds.take(1):
  image, label = example["image"], example["label"]

[1mDownloading and preparing dataset mnist/3.0.0 (download: 11.06 MiB, generated: Unknown size, total: 11.06 MiB) to /root/tensorflow_datasets/mnist/3.0.0...[0m


local data directory. If you'd instead prefer to read directly from our public
GCS bucket (recommended if you're running on GCP), you can instead set
data_dir=gs://tfds-data/datasets.



HBox(children=(IntProgress(value=0, description='Dl Completed...', max=4, style=ProgressStyle(description_widt…



[1mDataset mnist downloaded and prepared to /root/tensorflow_datasets/mnist/3.0.0. Subsequent calls will reuse this data.[0m


In [0]:
# 參考 https://blog.csdn.net/Eddy_zheng/article/details/52126641

# detection_result : [batch,4=>x1,y1,x2,y2]
# ground_truth : [batch,4=>x1,y1,x2,y2]
def IoU(detection_result, ground_truth):
    def xy_wh(box):
        _box = tf.reshape(box, [-1,2,2])
        _box = tf.unstack(_box, axis=1)
        _box_xy = _box[0]
        _box_wh = tf.subtract(_box[1], _box_xy)
        return [_box_xy, _box_wh]

    [dr_xy, dr_wh] = xy_wh(detection_result)
    dr_area = tf.reduce_prod(dr_wh, 1)

    [gt_xy, gt_wh] = xy_wh(ground_truth)
    gt_area = tf.reduce_prod(gt_wh, 1)

    start = tf.minimum(dr_xy, gt_xy)
    end = tf.maximum(tf.add(dr_xy, dr_wh), tf.add(gt_xy, gt_wh))
    wh = tf.subtract(
        tf.add(dr_wh, gt_wh),
        tf.subtract(end, start)
    )
    wh = tf.nn.relu(wh)
    area = tf.reduce_prod(wh, 1)

    ratio = tf.divide(
        area,
        tf.subtract(
            tf.add(dr_area, gt_area),
            area
        )
    )

    return ratio

In [0]:
def generate_anchor(scales,radios):
    anchor = []
    for scale in scales:
        for radio in radios:
            anchor.append([scale*radio/min(radio,1),scale/min(radio,1)])
    return anchor

In [0]:
@tf.function
def generate_coordinate(w,h):
    x = tf.tile(
        tf.reshape(
            tf.linspace(0., w-1, w),
            [w,1]
        ),
        [1,h]
    )
    y = tf.reshape(
        tf.tile(
            tf.linspace(0., h-1, h),
            [w]
        ),
        [w,-1]
    )
    return tf.stack([x,y], axis=2)

# Region Proposal Networks (RPN)

> RPN 會接受前一層特徵提取器輸出的 feature map，  
> 並且輸出該位置的 anchor 有物件的機率以及對 anchor 的 offset，  
> 使用 offset 應用至 anchor 後即可得出 bbox。

In [0]:
def RPN(
        last_channel = 512,
        base_anchors = generate_anchor((128, 256, 512),(0.5, 1, 2))
    ):
    k = len(base_anchors)

    conv_w_base = tf.Variable(tf.random.truncated_normal((3, 3, last_channel, last_channel)))
    conv_b_base = tf.Variable(tf.random.truncated_normal((512,)))
    
    conv_w_for_object = tf.Variable(tf.random.truncated_normal((1, 1, last_channel, 2 * k)))
    conv_b_for_object = tf.Variable(tf.random.truncated_normal((2 * k,)))
    
    conv_w_for_offset = tf.Variable(tf.random.truncated_normal((1, 1, last_channel, 4 * k)))
    conv_b_for_offset = tf.Variable(tf.random.truncated_normal((4 * k,)))

    @tf.function
    def rpn(feature_map, image_size, top_k=128,bottom_k=None):
        bottom_k = bottom_k if bottom_k else top_k
        shape = tf.shape(feature_map)
        factor = [image_size[0]/shape[1], image_size[1]/shape[2]]
        
        # 3 * 3 * last_channel 的 conv
        base = tf.nn.bias_add(
            tf.nn.conv2d(feature_map,conv_w_base, 1, "same"),
            conv_b_base
        )
        base = tf.nn.relu(base)

        # 1 * 1 * 2k 的 conv
        for_object = tf.nn.bias_add(
            tf.nn.conv2d(base, conv_w_for_object, 1, "same"),
            conv_b_for_object
        )
        # 為做 softmax 將其 reshape 成 [batch, W * H * k, 2]
        for_object = tf.reshape(for_object, [shape[0], shape[1] * shape[2] * k, 2])
        # softmax
        for_object = tf.nn.softmax(for_object, 2)
        # 為了取出 positive anchor 與 negative anchor 的 index，先將其分解成 [batch, W * H * k] * 2
        [positive_anchor_idx, negative_anchor_idx] = tf.unstack(for_object, axis = 2)
        # 取得 positive anchor index
        positive_anchor_idx = tf.nn.top_k(positive_anchor_idx, top_k)[1]
        # 取得 negative anchor index
        negative_anchor_idx = tf.nn.top_k(negative_anchor_idx, bottom_k)[1]

        # 為了方便取得每個 batch 的 anchor，先對讓 index 對 batch 做 offset
        # 算出對每個要 offset 多少
        offset_of_expansion_batch = tf.expand_dims(
            tf.range(
                0,
                shape[0] * shape[1] * shape[2] * k,
                shape[1] * shape[2] * k
            ),
            1
        )
        # 將 positive anchor index reshape 成 [batch * W * H * k]
        positive_anchor_idx = tf.reshape(
            # 對 positive anchor index 做 offset
            tf.add(
                positive_anchor_idx,
                offset_of_expansion_batch
            ),
            [-1]
        )
        # 將 negative anchor index reshape 成 [batch * W * H * k]
        negative_anchor_idx = tf.reshape(
        # 對 negative anchor index 做 offset
            tf.add(
                negative_anchor_idx,
                offset_of_expansion_batch
            ),
            [-1]
        )

        # 1 * 1 * 4k 的 conv，計算出 anchor offset
        for_offset = tf.nn.bias_add(
            tf.nn.conv2d(base,conv_w_for_offset, 1, "same"),
            conv_b_for_offset
        )
        # 把 anchor offset 分成「對座標」的 coordinate_offset 與「對大小」的 scale_offset
        [coordinate_offset, scale_offset] = tf.unstack(
            # 先 reshape 成 [batch, W, H, k, 2, 2] 比較好分解
            tf.reshape(
                for_offset, 
                [shape[0], shape[1], shape[2], k, 2, 2]
            ),
            axis=4
        )

        # 為後續計算暫時做轉置
        coordinate_offset = tf.transpose(coordinate_offset, [0,3,1,2,4])
        # 加上絕對座標位置
        anchor_coordinate = tf.add(
            coordinate_offset,
            tf.reshape(
                tf.multiply(
                    tf.add(generate_coordinate(shape[1],shape[2]), 0.5),
                    factor
                ),
                [1, 1, shape[1], shape[2], 2]
            )
        )
        # 將先前的轉置變換回來
        anchor_coordinate = tf.transpose(anchor_coordinate, [0,2,3,1,4])
        
        # 將其縮放倍率乘上anchor大小
        anchor_scale = tf.multiply(
            scale_offset,
            base_anchors
        )

        # 合併成為 anchor，shape : [batch, W, H, k, 2, 2]
        anchor = tf.stack([anchor_coordinate, anchor_scale], 4)
        # reshape 成 [batch * W * H * k, 4]，方便取出 anchor
        anchor = tf.reshape(anchor, [-1, 4])

        # reshape 成 [batch, tok_k, 4]
        positive_anchor = tf.reshape(
            # 取出 positive anchor
            tf.gather(
                anchor,
                positive_anchor_idx
            ),
            [-1, top_k, 4]
        )

        # reshape 成 [batch, bottom_k, 4]
        negative_anchor = tf.reshape(
            # 取出 negative anchor
            tf.gather(
                anchor,
                negative_anchor_idx
            ),
            [-1, bottom_k, 4]
        )
        
        # 回傳 anchor
        return [positive_anchor, negative_anchor]
    def get_weights():
        return [
                conv_w,
                conv_b,
                conv_w_for_object,
                conv_b_for_object,
                conv_w_for_offset,
                conv_b_for_offset
            ]

    return [rpn, get_weights]


In [0]:
# anchor : tensor<shape = [batch, -1, 4 => x, y, w, h]>
@tf.function
def cull_anchor(anchor, image_size, min_size):
    [xy, wh] = tf.split(anchor, 2, axis=2)

    min_xy = tf.subtract(xy, tf.divide(wh, 2))
    min_xy = tf.greater_equal(min_xy, [0, 0])

    max_xy = tf.add(xy, tf.divide(wh, 2))
    max_xy = tf.less_equal(max_xy, image_size)

    cull_size = tf.greater_equal(wh, [0, 0])

    mask = tf.reduce_all(tf.concat([min_xy, max_xy, cull_size], 2), 2)

    return tf.split(
        tf.boolean_mask(anchor, mask),
        tf.reduce_sum(tf.cast(mask, "int32"), -1)
    )
   

In [0]:
# anchor : tensor<shape = [batch, -1, 4 => x, y, w, h]>
@tf.function
def clip_anchor(anchor, image_size, min_size):
    [xy, wh] = tf.split(anchor, 2, axis=2)

    min_xy = tf.subtract(xy, tf.divide(wh, 2))
    min_xy = tf.maximum(min_xy, [0, 0])

    max_xy = tf.add(xy, tf.divide(wh, 2))
    max_xy = tf.minimum(max_xy, image_size)

    clip_xy = tf.divide(tf.add(min_xy, max_xy), 2)
    clip_wh = tf.subtract(max_xy, min_xy)

    return tf.concat(
        [clip_xy, clip_wh],
        axis=2
    )
   

# RoI Pooling
> 使用 RPN 輸出的 bbox 對特徵提取器輸出的 feature map 進行裁切，  
> 將裁切下來的部分做 pooling 變成 7 * 7 * Channel 後送出。

In [0]:
# anchor : [batch] : tensor<shape = [-1, 4 => x, y, w, h]>
@tf.function
def roi_pooling(feature_map, anchor, image_size, output_shape = (7, 7)):
    factor = [image_size[0]/feature_map.shape[1], image_size[1]/feature_map.shape[2]]
    _feature_map = tf.unstack(feature_map, 0)
    
    clip_feature_maps = range(len(anchor))
    for b in tf.range(len(anchor)):
        [xy, wh] = tf.split(anchor[b], 2, axis=1)

        starts = tf.subtract(xy, tf.divide(wh, 2))
        starts = tf.floor(tf.divide(starts, factor))
        starts = tf.unstack(tf.subtract(starts, tf.divide(wh, 2)), 0)

        sizes = tf.ceil(tf.divide(wh, factor))
        sizes = tf.unstack(sizes, 0)
        
        clip_feature_maps[b] = range(len(anchor[b],shape[0]))
        for a in tf.range(len(anchor[b],shape[0])):
            clip_feature_maps[b][a] = tf.image.resize(
                tf.slice(_feature_map[b], starts[a], sizes[a]),
                output_shape
            )
        clip_feature_maps[b] = tf.stack(clip_feature_maps[b], 0)
    return clip_feature_maps

# 分類與再次調整 bbox

> 對從 RoI pooling 取得的 feature map 做 conv 與 dense，  
> 取得該 feature map 的分類以及再次對 bbox 做 offset，  
> 對 bbox 使用 offset 便能取得最終的 bbox

In [0]:
# 分類與回歸
def Pred(
        input_shape = (7, 7, 512),
        class_number
    ):
    k = len(base_anchors)

    conv_w_base_1 = tf.Variable(tf.random.truncated_normal((input_shape[0], input_shape[1], input_shape[2], input_shape[2])))
    conv_b_base_1 = tf.Variable(tf.random.truncated_normal((input_shape[2],)))
    
    conv_w_base_2 = tf.Variable(tf.random.truncated_normal((1, 1, input_shape[2], input_shape[2])))
    conv_b_base_2 = tf.Variable(tf.random.truncated_normal((input_shape[2],)))

    conv_w_classtify = tf.Variable(tf.random.truncated_normal((1, 1, input_shape[2], class_number)))
    conv_b_classtify = tf.Variable(tf.random.truncated_normal((class_number,)))
    
    conv_w_bbox = tf.Variable(tf.random.truncated_normal((1, 1, input_shape[2], 4 * class_number)))
    conv_b_bbox = tf.Variable(tf.random.truncated_normal((4 * class_number,)))

    # feature_map : [batch] : tensor<shape = [anchor number, ...input_shape]>
    @tf.function
    def pred(feature_map, image_size, top_k=128,bottom_k=None):
        bottom_k = bottom_k if bottom_k else top_k
        shape = tf.shape(feature_map)
        factor = [image_size[0]/shape[1], image_size[1]/shape[2]]
        
        # 3 * 3 * last_channel 的 conv
        base_1 = tf.nn.bias_add(
            tf.nn.conv2d(feature_map, conv_w_base_1, 1),
            conv_b_base_1
        )
        base_1 = tf.nn.relu(base_1)

        base_2 = tf.nn.bias_add(
            tf.nn.conv2d(base_1, conv_w_base_2, 1),
            conv_b_base_2
        )
        base_2 = tf.nn.relu(base_2)

        classtify = tf.nn.bias_add(
            tf.nn.conv2d(base_2, conv_w_classtify, 1),
            conv_b_classtify
        )
        classtify = tf.nn.softmax(classtify, -1)

        bbox = tf.nn.bias_add(
            tf.nn.conv2d(base_2, conv_w_bbox, 1),
            conv_b_bbox
        )

        return [classtify, bbox]
    def get_weights():
        return [
                conv_w_base_1,
                conv_b_base_1,
                conv_w_base_2,
                conv_b_base_2,
                conv_w_classtify,
                conv_b_classtify,
                conv_w_bbox,
                conv_b_bbox
            ]

    return [pred, get_weights]

In [0]:
vgg16=tf.keras.applications.VGG16()
# vgg16.summary()
vgg16_=tf.keras.models.Model(vgg16.layers[0].input, vgg16.layers[17].output)
vgg16_.summary()

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/vgg16/vgg16_weights_tf_dim_ordering_tf_kernels.h5
Model: "model"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         [(None, 224, 224, 3)]     0         
_________________________________________________________________
block1_conv1 (Conv2D)        (None, 224, 224, 64)      1792      
_________________________________________________________________
block1_conv2 (Conv2D)        (None, 224, 224, 64)      36928     
_________________________________________________________________
block1_pool (MaxPooling2D)   (None, 112, 112, 64)      0         
_________________________________________________________________
block2_conv1 (Conv2D)        (None, 112, 112, 128)     73856     
_________________________________________________________________
block2_conv2 (Conv2D)        (None, 112, 112, 128)     14758

In [0]:
[rpn, get_rpn_weights] = RPN()
[pred, get_pred_weights] = Pred()

In [0]:
vgg16_.predict
rpn
cull_anchor
roi_pool
pred

# Loss 計算

## anchor

> 每張圖片經過特徵提取器後的大小會變成 [M, N, C]，  
> 那麼 anchor 的總數量就會是 M * N * K，  
> K 代表 anchor 的類別數量。

* ### positive anchor

    當 anchor 與 ground truth 之間的 IoU > 0.7，便將此 anchor 視為 positive anchor。

* ### negative anchor

    當 anchor 與 ground truth 之間的 IoU < 0.3，便將此 anchor 視為 negative anchor。