Compare commits
16 Commits
Author | SHA1 | Date |
---|---|---|
sipp11 | c8987ecd7b | 5 years ago |
sipp11 | ae3fa32138 | 5 years ago |
sipp11 | cf30d2df47 | 5 years ago |
sipp11 | 616d017a0d | 5 years ago |
sipp11 | 47536e673d | 5 years ago |
sipp11 | 33e0c4324e | 5 years ago |
sipp11 | 277c1deb60 | 5 years ago |
sipp11 | bfc4dc1b0b | 5 years ago |
sipp11 | e27db785b2 | 5 years ago |
sipp11 | 0b343a748a | 5 years ago |
sipp11 | 58075c4cec | 5 years ago |
sipp11 | 75a776deb1 | 5 years ago |
sipp11 | 780d5a910e | 5 years ago |
sipp11 | 84a7df960a | 5 years ago |
sipp11 | 1e025ece42 | 5 years ago |
sipp11 | f83352cee2 | 5 years ago |
12 changed files with 1518 additions and 389 deletions
@ -1,4 +1,7 @@
|
||||
{ |
||||
"python.pythonPath": "~/.virtualenvs/obj-tracking/bin/python", |
||||
"python.formatting.provider": "black" |
||||
"python.formatting.provider": "black", |
||||
"python.linting.pylintArgs": [ |
||||
"--extension-pkg-whitelist=cv2" |
||||
] |
||||
} |
@ -0,0 +1,109 @@
|
||||
"""USAGE |
||||
python examples/yolo_obj_detector.py \ |
||||
-c ~/dev/obj-tracking/yolov3.cfg \ |
||||
-w ~/dev/obj-tracking/yolov3.weights \ |
||||
-cl ~/dev/obj-tracking/yolo/darknet/data/coco.names \ |
||||
-i ~/dev/obj-tracking/person.jpg |
||||
|
||||
python examples/yolo_obj_detector.py \ |
||||
-c ~/syncthing/dropbox/tracking-obj/mytrain.cfg \ |
||||
-w ~/syncthing/dropbox/tracking-obj/mytrain_final.weights \ |
||||
-cl ~/syncthing/dropbox/tracking-obj/mytrain.names \ |
||||
-i /media/sipp11/500BUP/handai_photos/test/6294.jpg |
||||
""" |
||||
import cv2 |
||||
import argparse |
||||
import numpy as np |
||||
|
||||
ap = argparse.ArgumentParser() |
||||
ap.add_argument("-i", "--image", required=True, help="path to input image") |
||||
ap.add_argument("-c", "--config", required=True, help="path to yolo config file") |
||||
ap.add_argument( |
||||
"-w", "--weights", required=True, help="path to yolo pre-trained weights" |
||||
) |
||||
ap.add_argument( |
||||
"-cl", "--classes", required=True, help="path to text file containing class names" |
||||
) |
||||
args = ap.parse_args() |
||||
|
||||
|
||||
def get_output_layers(net): |
||||
layer_names = net.getLayerNames() |
||||
output_layers = [layer_names[i[0] - 1] for i in net.getUnconnectedOutLayers()] |
||||
return output_layers |
||||
|
||||
|
||||
def draw_prediction(img, class_id, confidence, x, y, x_plus_w, y_plus_h): |
||||
label = str(classes[class_id]) |
||||
color = COLORS[class_id] |
||||
cv2.rectangle(img, (x, y), (x_plus_w, y_plus_h), color, 2) |
||||
cv2.putText(img, label, (x - 10, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2) |
||||
|
||||
image = cv2.imread(args.image) |
||||
|
||||
Width = image.shape[1] |
||||
Height = image.shape[0] |
||||
scale = 0.00392 |
||||
|
||||
classes = None |
||||
|
||||
with open(args.classes, "r") as f: |
||||
classes = [line.strip() for line in f.readlines()] |
||||
|
||||
COLORS = np.random.uniform(0, 255, size=(len(classes), 3)) |
||||
|
||||
net = cv2.dnn.readNet(args.weights, args.config) |
||||
blob = cv2.dnn.blobFromImage(image, scale, (416, 416), (0, 0, 0), True, crop=False) |
||||
|
||||
net.setInput(blob) |
||||
|
||||
outs = net.forward(get_output_layers(net)) |
||||
|
||||
class_ids = [] |
||||
confidences = [] |
||||
boxes = [] |
||||
conf_threshold = 0.5 |
||||
nms_threshold = 0.4 |
||||
|
||||
|
||||
for out in outs: |
||||
for detection in out: |
||||
scores = detection[5:] |
||||
class_id = np.argmax(scores) |
||||
confidence = scores[class_id] |
||||
if confidence > 0.5: |
||||
center_x = int(detection[0] * Width) |
||||
center_y = int(detection[1] * Height) |
||||
w = int(detection[2] * Width) |
||||
h = int(detection[3] * Height) |
||||
x = center_x - w / 2 |
||||
y = center_y - h / 2 |
||||
class_ids.append(class_id) |
||||
confidences.append(float(confidence)) |
||||
boxes.append([x, y, w, h]) |
||||
|
||||
|
||||
indices = cv2.dnn.NMSBoxes(boxes, confidences, conf_threshold, nms_threshold) |
||||
|
||||
for i in indices: |
||||
i = i[0] |
||||
box = boxes[i] |
||||
x = box[0] |
||||
y = box[1] |
||||
w = box[2] |
||||
h = box[3] |
||||
draw_prediction( |
||||
image, |
||||
class_ids[i], |
||||
confidences[i], |
||||
round(x), |
||||
round(y), |
||||
round(x + w), |
||||
round(y + h), |
||||
) |
||||
|
||||
cv2.imshow("object detection", image) |
||||
cv2.waitKey() |
||||
|
||||
cv2.imwrite("object-detection.jpg", image) |
||||
cv2.destroyAllWindows() |
@ -1,103 +1,198 @@
|
||||
"""USAGE |
||||
python examples/yolo_obj_detector.py \ |
||||
-c ~/dev/obj-tracking/yolov3.cfg \ |
||||
-w ~/dev/obj-tracking/yolov3.weights \ |
||||
-cl ~/dev/obj-tracking/yolo/darknet/data/coco.names \ |
||||
-i ~/dev/obj-tracking/person.jpg |
||||
"""USAGE: |
||||
|
||||
time python examples/test.py --input ~/Desktop/5min.mp4 -o output.mp4 |
||||
time python examples/test.py --input ~/Desktop/5min.mp4 -l |
||||
|
||||
""" |
||||
import cv2 |
||||
import argparse |
||||
# import the necessary packages |
||||
import numpy as np |
||||
import argparse |
||||
import imutils |
||||
import time |
||||
import cv2 |
||||
import os |
||||
|
||||
# construct the argument parse and parse the arguments |
||||
ap = argparse.ArgumentParser() |
||||
ap.add_argument("-i", "--image", required=True, help="path to input image") |
||||
ap.add_argument("-c", "--config", required=True, help="path to yolo config file") |
||||
ap.add_argument("-i", "--input", required=True, help="path to input video") |
||||
ap.add_argument("-o", "--output", required=False, help="path to output video") |
||||
ap.add_argument("-l", "--live", action='store_true', help="Show live detection") |
||||
# ap.add_argument("-y", "--yolo", required=True, |
||||
# help="base path to YOLO directory") |
||||
ap.add_argument( |
||||
"-w", "--weights", required=True, help="path to yolo pre-trained weights" |
||||
"-c", |
||||
"--confidence", |
||||
type=float, |
||||
default=0.5, |
||||
help="minimum probability to filter weak detections", |
||||
) |
||||
ap.add_argument( |
||||
"-cl", "--classes", required=True, help="path to text file containing class names" |
||||
"-t", |
||||
"--threshold", |
||||
type=float, |
||||
default=0.3, |
||||
help="threshold when applyong non-maxima suppression", |
||||
) |
||||
args = ap.parse_args() |
||||
|
||||
|
||||
def get_output_layers(net): |
||||
layer_names = net.getLayerNames() |
||||
output_layers = [layer_names[i[0] - 1] for i in net.getUnconnectedOutLayers()] |
||||
return output_layers |
||||
|
||||
|
||||
def draw_prediction(img, class_id, confidence, x, y, x_plus_w, y_plus_h): |
||||
label = str(classes[class_id]) |
||||
color = COLORS[class_id] |
||||
cv2.rectangle(img, (x, y), (x_plus_w, y_plus_h), color, 2) |
||||
cv2.putText(img, label, (x - 10, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2) |
||||
|
||||
image = cv2.imread(args.image) |
||||
|
||||
Width = image.shape[1] |
||||
Height = image.shape[0] |
||||
scale = 0.00392 |
||||
|
||||
classes = None |
||||
|
||||
with open(args.classes, "r") as f: |
||||
classes = [line.strip() for line in f.readlines()] |
||||
|
||||
COLORS = np.random.uniform(0, 255, size=(len(classes), 3)) |
||||
|
||||
net = cv2.dnn.readNet(args.weights, args.config) |
||||
blob = cv2.dnn.blobFromImage(image, scale, (416, 416), (0, 0, 0), True, crop=False) |
||||
|
||||
net.setInput(blob) |
||||
args = vars(ap.parse_args()) |
||||
|
||||
# load the COCO class labels our YOLO model was trained on |
||||
# labelsPath = os.path.sep.join([args["yolo"], "coco.names"]) |
||||
labelsPath = "/home/sipp11/syncthing/dropbox/tracking-obj/mytrain.names" |
||||
LABELS = open(labelsPath).read().strip().split("\n") |
||||
|
||||
# initialize a list of colors to represent each possible class label |
||||
np.random.seed(42) |
||||
COLORS = np.random.randint(0, 255, size=(len(LABELS), 3), dtype="uint8") |
||||
|
||||
# derive the paths to the YOLO weights and model configuration |
||||
# weightsPath = os.path.sep.join([args["yolo"], "yolov3.weights"]) |
||||
# configPath = os.path.sep.join([args["yolo"], "yolov3.cfg"]) |
||||
|
||||
weightsPath = "/home/sipp11/syncthing/dropbox/tracking-obj/mytrain_final.weights" |
||||
configPath = "/home/sipp11/syncthing/dropbox/tracking-obj/mytrain.cfg" |
||||
|
||||
# load our YOLO object detector trained on COCO dataset (80 classes) |
||||
# and determine only the *output* layer names that we need from YOLO |
||||
print("[INFO] loading YOLO from disk...") |
||||
net = cv2.dnn.readNetFromDarknet(configPath, weightsPath) |
||||
ln = net.getLayerNames() |
||||
ln = [ln[i[0] - 1] for i in net.getUnconnectedOutLayers()] |
||||
|
||||
|
||||
# initialize the video stream, pointer to output video file, and |
||||
# frame dimensions |
||||
vs = cv2.VideoCapture(args["input"]) |
||||
writer = None |
||||
(W, H) = (None, None) |
||||
|
||||
# try to determine the total number of frames in the video file |
||||
try: |
||||
prop = ( |
||||
cv2.cv.CV_CAP_PROP_FRAME_COUNT if imutils.is_cv2() else cv2.CAP_PROP_FRAME_COUNT |
||||
) |
||||
total = int(vs.get(prop)) |
||||
print("[INFO] {} total frames in video".format(total)) |
||||
|
||||
# an error occurred while trying to determine the total |
||||
# number of frames in the video file |
||||
except: |
||||
print("[INFO] could not determine # of frames in video") |
||||
print("[INFO] no approx. completion time can be provided") |
||||
total = -1 |
||||
|
||||
|
||||
# loop over frames from the video file stream |
||||
while True: |
||||
# read the next frame from the file |
||||
(grabbed, frame) = vs.read() |
||||
|
||||
# if the frame was not grabbed, then we have reached the end |
||||
# of the stream |
||||
if not grabbed: |
||||
break |
||||
|
||||
# if the frame dimensions are empty, grab them |
||||
if W is None or H is None: |
||||
(H, W) = frame.shape[:2] |
||||
|
||||
# construct a blob from the input frame and then perform a forward |
||||
# pass of the YOLO object detector, giving us our bounding boxes |
||||
# and associated probabilities |
||||
blob = cv2.dnn.blobFromImage(frame, 1 / 255.0, (416, 416), swapRB=True, crop=False) |
||||
net.setInput(blob) |
||||
start = time.time() |
||||
layerOutputs = net.forward(ln) |
||||
end = time.time() |
||||
|
||||
# initialize our lists of detected bounding boxes, confidences, |
||||
# and class IDs, respectively |
||||
boxes = [] |
||||
confidences = [] |
||||
classIDs = [] |
||||
|
||||
# loop over each of the layer outputs |
||||
for output in layerOutputs: |
||||
# loop over each of the detections |
||||
for detection in output: |
||||
# extract the class ID and confidence (i.e., probability) |
||||
# of the current object detection |
||||
scores = detection[5:] |
||||
classID = np.argmax(scores) |
||||
confidence = scores[classID] |
||||
|
||||
# filter out weak predictions by ensuring the detected |
||||
# probability is greater than the minimum probability |
||||
if confidence > args["confidence"]: |
||||
# scale the bounding box coordinates back relative to |
||||
# the size of the image, keeping in mind that YOLO |
||||
# actually returns the center (x, y)-coordinates of |
||||
# the bounding box followed by the boxes' width and |
||||
# height |
||||
box = detection[0:4] * np.array([W, H, W, H]) |
||||
(centerX, centerY, width, height) = box.astype("int") |
||||
|
||||
# use the center (x, y)-coordinates to derive the top |
||||
# and and left corner of the bounding box |
||||
x = int(centerX - (width / 2)) |
||||
y = int(centerY - (height / 2)) |
||||
|
||||
# update our list of bounding box coordinates, |
||||
# confidences, and class IDs |
||||
boxes.append([x, y, int(width), int(height)]) |
||||
confidences.append(float(confidence)) |
||||
classIDs.append(classID) |
||||
|
||||
outs = net.forward(get_output_layers(net)) |
||||
# apply non-maxima suppression to suppress weak, overlapping |
||||
# bounding boxes |
||||
idxs = cv2.dnn.NMSBoxes( |
||||
boxes, confidences, args["confidence"], args["threshold"] |
||||
) |
||||
|
||||
class_ids = [] |
||||
confidences = [] |
||||
boxes = [] |
||||
conf_threshold = 0.5 |
||||
nms_threshold = 0.4 |
||||
# ensure at least one detection exists |
||||
if len(idxs) > 0: |
||||
# loop over the indexes we are keeping |
||||
for i in idxs.flatten(): |
||||
# extract the bounding box coordinates |
||||
(x, y) = (boxes[i][0], boxes[i][1]) |
||||
(w, h) = (boxes[i][2], boxes[i][3]) |
||||
|
||||
# draw a bounding box rectangle and label on the frame |
||||
color = [int(c) for c in COLORS[classIDs[i]]] |
||||
cv2.rectangle(frame, (x, y), (x + w, y + h), color, 2) |
||||
text = "{}: {:.4f}".format(LABELS[classIDs[i]], confidences[i]) |
||||
cv2.putText( |
||||
frame, text, (x, y - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2 |
||||
) |
||||
|
||||
if args["live"]: |
||||
cv2.imshow("Frame", frame) |
||||
key = cv2.waitKey(1) & 0xFF |
||||
|
||||
# if the `q` key was pressed, break from the loop |
||||
if key == ord("q"): |
||||
break |
||||
|
||||
if args["output"]: |
||||
# check if the video writer is None |
||||
if writer is None: |
||||
# initialize our video writer |
||||
fourcc = cv2.VideoWriter_fourcc(*"MJPG") |
||||
writer = cv2.VideoWriter( |
||||
args["output"], fourcc, 30, (frame.shape[1], frame.shape[0]), True |
||||
) |
||||
|
||||
for out in outs: |
||||
for detection in out: |
||||
scores = detection[5:] |
||||
class_id = np.argmax(scores) |
||||
confidence = scores[class_id] |
||||
if confidence > 0.5: |
||||
center_x = int(detection[0] * Width) |
||||
center_y = int(detection[1] * Height) |
||||
w = int(detection[2] * Width) |
||||
h = int(detection[3] * Height) |
||||
x = center_x - w / 2 |
||||
y = center_y - h / 2 |
||||
class_ids.append(class_id) |
||||
confidences.append(float(confidence)) |
||||
boxes.append([x, y, w, h]) |
||||
|
||||
|
||||
indices = cv2.dnn.NMSBoxes(boxes, confidences, conf_threshold, nms_threshold) |
||||
|
||||
for i in indices: |
||||
i = i[0] |
||||
box = boxes[i] |
||||
x = box[0] |
||||
y = box[1] |
||||
w = box[2] |
||||
h = box[3] |
||||
draw_prediction( |
||||
image, |
||||
class_ids[i], |
||||
confidences[i], |
||||
round(x), |
||||
round(y), |
||||
round(x + w), |
||||
round(y + h), |
||||
# some information on processing single frame |
||||
if total > 0: |
||||
elap = end - start |
||||
print("[INFO] single frame took {:.4f} seconds".format(elap)) |
||||
print( |
||||
"[INFO] estimated total time to finish: {:.4f}".format(elap * total) |
||||
) |
||||
|
||||
cv2.imshow("object detection", image) |
||||
cv2.waitKey() |
||||
# write the output frame to disk |
||||
writer.write(frame) |
||||
|
||||
cv2.imwrite("object-detection.jpg", image) |
||||
cv2.destroyAllWindows() |
||||
# release the file pointers |
||||
print("[INFO] cleaning up...") |
||||
writer.release() |
||||
vs.release() |
||||
|
@ -0,0 +1,38 @@
|
||||
import argparse |
||||
import matplotlib.pyplot as plt |
||||
import matplotlib.patches as patches |
||||
from PIL import Image |
||||
import numpy as np |
||||
from utils import AREAS |
||||
from matplotlib.cm import get_cmap |
||||
|
||||
|
||||
ap = argparse.ArgumentParser() |
||||
ap.add_argument("-i", "--input", required=True, help="path to input iamge") |
||||
args = vars(ap.parse_args()) |
||||
|
||||
im = np.array(Image.open(args['input']), dtype=np.uint8) |
||||
|
||||
# Create figure and axes |
||||
fig, ax = plt.subplots(1) |
||||
|
||||
# Display the image |
||||
ax.imshow(im) |
||||
|
||||
for _area in AREAS: |
||||
a = dict(_area) |
||||
color = get_cmap('tab20')(a['id']) |
||||
x1, y1 = a['area'][0] |
||||
x2, y2 = a['area'][1] |
||||
w, h = x2 - x1, y2- y1 |
||||
# Create a Rectangle patch |
||||
rect = patches.Rectangle( |
||||
a['area'][0], w, h, linewidth=1, edgecolor=color, facecolor="none" |
||||
) |
||||
# Add the patch to the Axes |
||||
ax.add_patch(rect) |
||||
ax.text(x1, y1 - 5, f"AREA: {a['id']}", fontsize=5, color=color) |
||||
|
||||
|
||||
plt.savefig("test.jpg", dpi=300) |
||||
|
@ -0,0 +1,19 @@
|
||||
import logging |
||||
|
||||
|
||||
def init_logger(): |
||||
logger = logging.getLogger() # get the root logger |
||||
logger.setLevel(logging.DEBUG) |
||||
|
||||
handler = logging.StreamHandler() |
||||
handler.setLevel(logging.DEBUG) |
||||
formatter = logging.Formatter("[%(levelname)s] %(message)s") |
||||
handler.setFormatter(formatter) |
||||
logger.addHandler(handler) |
||||
|
||||
trace_handler = logging.FileHandler("trace.log", "w") |
||||
trace_handler.setLevel(logging.INFO) |
||||
# trace_formatter = logging.Formatter("%(asctime)s[%(levelname)s],%(message)s") |
||||
trace_formatter = logging.Formatter("%(message)s") |
||||
trace_handler.setFormatter(trace_formatter) |
||||
logger.addHandler(trace_handler) |
@ -0,0 +1,566 @@
|
||||
"""USAGE: |
||||
|
||||
time python src/_detector.py --input ~/Desktop/5min.mp4 -o output.mp4 |
||||
time python src/_detector.py --input ~/Desktop/5min.mp4 -l |
||||
|
||||
""" |
||||
import sys |
||||
import time |
||||
import argparse |
||||
import pprint |
||||
import numpy as np |
||||
import imutils |
||||
import cv2 |
||||
from utils import ( |
||||
check_if_inside_the_boxes, |
||||
is_it_the_same_obj, |
||||
box_distance, |
||||
get_heading, |
||||
get_avg_heading, |
||||
OBJ_LEAVING_COND, |
||||
DONTCARE, |
||||
) |
||||
from log import init_logger, logging |
||||
|
||||
init_logger() |
||||
|
||||
pp = pprint.PrettyPrinter(indent=2) |
||||
# tracking |
||||
OPENCV_OBJECT_TRACKERS = {"csrt": cv2.TrackerCSRT_create} |
||||
# initialize OpenCV's special multi-object tracker |
||||
cv_trackers = cv2.MultiTracker_create() |
||||
trackers = [] |
||||
finished = [] |
||||
W4A = {} # this stands for "wait for arrival [at ...]" |
||||
|
||||
# construct the argument parse and parse the arguments |
||||
ap = argparse.ArgumentParser() |
||||
ap.add_argument("-i", "--input", required=True, help="path to input video") |
||||
ap.add_argument("-o", "--output", required=False, help="path to output video") |
||||
ap.add_argument("-l", "--live", action="store_true", help="Show live detection") |
||||
# ap.add_argument("-y", "--yolo", required=True, |
||||
# help="base path to YOLO directory") |
||||
ap.add_argument( |
||||
"-c", |
||||
"--confidence", |
||||
type=float, |
||||
default=0.95, |
||||
help="minimum probability to filter weak detections", |
||||
) |
||||
ap.add_argument( |
||||
"-t", |
||||
"--threshold", |
||||
type=float, |
||||
default=0.3, |
||||
help="threshold when applyong non-maxima suppression", |
||||
) |
||||
args = vars(ap.parse_args()) |
||||
|
||||
# load the COCO class labels our YOLO model was trained on |
||||
# labelsPath = os.path.sep.join([args["yolo"], "coco.names"]) |
||||
labelsPath = "../../syncthing/dropbox/tracking-obj/mytrain.names" |
||||
LABELS = open(labelsPath).read().strip().split("\n") |
||||
# 0 person, 1 wheelchair, 2 bicycle, 3 motorbike, 4 car, 5 bus, 6 truck |
||||
|
||||
# initialize a list of colors to represent each possible class label |
||||
np.random.seed(42) |
||||
COLORS = np.random.randint(0, 255, size=(len(LABELS), 3), dtype="uint8") |
||||
|
||||
# derive the paths to the YOLO weights and model configuration |
||||
weightsPath = "../../syncthing/dropbox/tracking-obj/mytrain_final.weights" |
||||
configPath = "../../syncthing/dropbox/tracking-obj/mytrain.cfg" |
||||
|
||||
|
||||
# load our YOLO object detector trained on COCO dataset (80 classes) |
||||
# and determine only the *output* layer names that we need from YOLO |
||||
logging.debug("[INFO] loading YOLO from disk...") |
||||
net = cv2.dnn.readNetFromDarknet(configPath, weightsPath) |
||||
ln = net.getLayerNames() |
||||
ln = [ln[i[0] - 1] for i in net.getUnconnectedOutLayers()] |
||||
start = end = 0 |
||||
|
||||
|
||||
def detect_stuffs(_frame): |
||||
# construct a blob from the input frame and then perform a forward |
||||
# pass of the YOLO object detector, giving us our bounding boxes |
||||
# and associated probabilities |
||||
blob = cv2.dnn.blobFromImage(_frame, 1 / 255.0, (416, 416), swapRB=True, crop=False) |
||||
net.setInput(blob) |
||||
start = time.time() |
||||
layerOutputs = net.forward(ln) |
||||
end = time.time() |
||||
|
||||
# initialize our lists of detected bounding boxes, confidences, |
||||
# and class IDs, respectively |
||||
boxes = [] |
||||
confidences = [] |
||||
classIDs = [] |
||||
|
||||
# loop over each of the layer outputs |
||||
for output in layerOutputs: |
||||
# print(f'[{_frame_count:08d}] output -> ', len(output)) |
||||
# loop over each of the detections |
||||
for detection in output: |
||||
# extract the class ID and confidence (i.e., probability) |
||||
# of the current object detection |
||||
scores = detection[5:] |
||||
classID = np.argmax(scores) |
||||
confidence = scores[classID] |
||||
|
||||
# filter out weak predictions by ensuring the detected |
||||
# probability is greater than the minimum probability |
||||
if confidence <= args["confidence"]: |
||||
continue |
||||
|
||||
# scale the bounding box coordinates back relative to |
||||
# the size of the image, keeping in mind that YOLO |
||||
# actually returns the center (x, y)-coordinates of |
||||
# the bounding box followed by the boxes' width and |
||||
# height |
||||
box = detection[0:4] * np.array([W, H, W, H]) |
||||
(centerX, centerY, width, height) = box.astype("int") |
||||
|
||||
# use the center (x, y)-coordinates to derive the top |
||||
# and and left corner of the bounding box |
||||
x = int(centerX - (width / 2)) |
||||
y = int(centerY - (height / 2)) |
||||
|
||||
# update our list of bounding box coordinates, |
||||
# confidences, and class IDs |
||||
boxes.append([x, y, int(width), int(height)]) |
||||
confidences.append(float(confidence)) |
||||
classIDs.append(classID) |
||||
|
||||
# apply non-maxima suppression to suppress weak, overlapping |
||||
# bounding boxes |
||||
idxs = cv2.dnn.NMSBoxes( |
||||
boxes, confidences, args["confidence"], args["threshold"] |
||||
) |
||||
|
||||
# ensure at least one detection exists |
||||
if len(idxs) == 0: |
||||
continue |
||||
|
||||
return idxs, boxes, confidences, classIDs, start, end |
||||
|
||||
|
||||
# initialize the video stream, pointer to output video file, and |
||||
# frame dimensions |
||||
vs = cv2.VideoCapture(args["input"]) |
||||
writer = None |
||||
(W, H) = (None, None) |
||||
|
||||
# try to determine the total number of frames in the video file |
||||
try: |
||||
prop = ( |
||||
cv2.cv.CV_CAP_PROP_FRAME_COUNT if imutils.is_cv2() else cv2.CAP_PROP_FRAME_COUNT |
||||
) |
||||
total = int(vs.get(prop)) |
||||
print("[INFO] {} total frames in video".format(total)) |
||||
|
||||
# an error occurred while trying to determine the total |
||||
# number of frames in the video file |
||||
except: |
||||
print("[INFO] could not determine # of frames in video") |
||||
print("[INFO] no approx. completion time can be provided") |
||||
total = -1 |
||||
|
||||
_frame_count = 0 |
||||
tracker_counter = 1 |
||||
|
||||
logging.debug("INIT") |
||||
|
||||
# for xx in range(3996): |
||||
# (grabbed, frame) = vs.read() |
||||
# _frame_count += 1 |
||||
|
||||
# loop over frames from the video file stream |
||||
while True: |
||||
# read the next frame from the file |
||||
(grabbed, frame) = vs.read() |
||||
|
||||
# if the frame was not grabbed, then we have reached the end |
||||
# of the stream |
||||
if not grabbed: |
||||
break |
||||
|
||||
# if the frame dimensions are empty, grab them |
||||
if W is None or H is None: |
||||
(H, W) = frame.shape[:2] |
||||
|
||||
_frame_count += 1 |
||||
|
||||
# grab the updated bounding box coordinates (if any) for each |
||||
# object that is being tracked |
||||
(success, boxes) = cv_trackers.update(frame) |
||||
# print("success", success) |
||||
# print("boxes", boxes) |
||||
print(f"[{_frame_count:08d}] ::") |
||||
|
||||
# quit if unable to read the video file |
||||
if not success: |
||||
print("Failed to read video", grabbed, frame) |
||||
|
||||
cv_trackers.clear() |
||||
cv_trackers = cv2.MultiTracker_create() |
||||
# print(" INIT AGAIN cv_tracker", type(cv_trackers), cv_trackers) |
||||
# get rid of them in trackers too |
||||
trackers = [_ for _ in trackers if _["id"] not in ut_ids] |
||||
# add tracker again |
||||
for _trckr in trackers: |
||||
__tkr = OPENCV_OBJECT_TRACKERS["csrt"]() |
||||
cv_trackers.add(__tkr, frame, tuple(_trckr["curr_position"])) |
||||
(success, boxes) = cv_trackers.update(frame) |
||||
# print('SUCCESS? ', success) |
||||
|
||||
untracking = [] |
||||
# loop over the bounding boxes and draw then on the frame |
||||
|
||||
obj_cnt = len(boxes) |
||||
# print(f"obj_cnt: ", obj_cnt, ' | tracker #', len(trackers)) |
||||
for idx in range(obj_cnt): |
||||
GONE = False |
||||
|
||||
box = boxes[idx] |
||||
(x, y, w, h) = [int(v) for v in box] |
||||
|
||||
logging.info(f"{_frame_count},{trackers[idx]['id']},{trackers[idx]['type']},POSITION,{x},{y},{w},{h}") |
||||
|
||||
# check if size is growing? if more than twice then, untrack it |
||||
# TODO: make it static bound should be better |
||||
ow, oh = trackers[idx]["size"] |
||||
if ow / w > 2 or oh / h > 2: |
||||
print(f" {tk['id']} GROW_TOO_BIG") |
||||
trackers[idx]["status"] = "grow-too-big" |
||||
logging.info( |
||||
f"{_frame_count},{tk['id']},{tk['type']},GROW_TOO_BIG,{x},{y},{w},{h}" |
||||
) |
||||
untracking.append(trackers[idx]) |
||||
continue |
||||
|
||||
_last_pos = trackers[idx]["history"][0] |
||||
curr_distance = box_distance(box, _last_pos) |
||||
last_distance = trackers[idx]["distance"] |
||||
trackers[idx]["distance"] = curr_distance |
||||
trackers[idx]["curr_position"] = box # [int(v) for v in box] |
||||
trackers[idx]["history"].insert(0, box) |
||||
|
||||
_last_idx = 2 if len(trackers[idx]["history"]) > 2 else 1 |
||||
_x, _y, _w, _h = trackers[idx]["history"][_last_idx] |
||||
_heading = get_heading(_x, _y, x, y) |
||||
tk = trackers[idx] |
||||
|
||||
STILL_DIST_PX = 1 |
||||
if last_distance < STILL_DIST_PX and curr_distance < STILL_DIST_PX: |
||||
trackers[idx]["still"] += 1 |
||||
else: |
||||
trackers[idx]["still"] = 0 |
||||
|
||||
if trackers[idx]["still"] == 0: |
||||
trackers[idx]["heading"] = [_heading] + trackers[idx]["heading"] |
||||
|
||||
# trackers[idx]["heading"] = trackers[idx]["heading"][:20] |
||||
|
||||
if trackers[idx]["still"] > 30 or x < 5 or x > 1250: |
||||
logging.info( |
||||
f"{_frame_count},{tk['id']},{tk['type']},LEFT,{x},{y},{w},{h},STILL" |
||||
) |
||||
untracking.append(trackers[idx]) |
||||
|
||||
if trackers[idx]["still"] > 10 and len(trackers[idx]["heading"]) < 5: |
||||
print(f" {tk['id']} LEFT - short-life") |
||||
logging.info( |
||||
f"{_frame_count},{tk['id']},{tk['type']},LEFT,{x},{y},{w},{h},SHORT_LIFE" |
||||
) |
||||
trackers[idx]["status"] = "short-life" |
||||
untracking.append(trackers[idx]) |
||||
|
||||
# check if it's hit the first |
||||
avg_heading = get_avg_heading(trackers[idx]["heading"]) |
||||
|
||||
h_count = len(tk["heading"]) |
||||
if 5 < h_count and h_count < 10: |
||||
# enough to validate, but never an established one |
||||
DONTCARE_IDS = [_[0][1] for _ in DONTCARE] |
||||
if tk["origin"]["id"] in DONTCARE_IDS: |
||||
dc_id = DONTCARE_IDS.index(tk["origin"]["id"]) |
||||
dc_dict = dict(DONTCARE[dc_id]) |
||||
print(DONTCARE_IDS, tk["origin"]["id"], dc_id, dc_dict["heading"]) |
||||
if avg_heading in dc_dict["heading"]: |
||||
print(f" {tk['id']} DONT-CARE condition") |
||||
logging.info( |
||||
f"{_frame_count},{tk['id']},{tk['type']},LEFT,{x},{y},{w},{h},WRONG_DETECTION" |
||||
) |
||||
trackers[idx]["status"] = "dont-care" |
||||
untracking.append(trackers[idx]) |
||||
|
||||
# TODO: assign id here! for anything that is -1 id |
||||
gid = None |
||||
if tk["id"] == -1 and 5 < h_count and h_count < 15: |
||||
_area_id = f"id_{tk['origin']['id']}" |
||||
if _area_id in W4A: |
||||
# check candidates which has "matched" opportunity too |
||||
_po = W4A[_area_id]["objects"] |
||||
_po = [_ for _ in _po if _frame_count > _[2]] # opportunity |
||||
_po = sorted(_po, key=lambda kk: kk[2]) # first one first |
||||
if _po: |
||||
trackers[idx]["id"] = gid = _po[0][0]["id"] |
||||
# remove this id out of next W4A |
||||
W4A[_area_id]["objects"] = [ |
||||
_ for _ in W4A[_area_id]["objects"] if _[0]["id"] != gid |
||||
] |
||||
# print(f" --- {len(_po)} candicate: picked id={gid}") |
||||
# else: |
||||
# print(f" --- no candidate") |
||||
print( |
||||
f" {gid} RE-ENTERS (origin: {trackers[idx]['origin']['id']})" |
||||
) |
||||
logging.info( |
||||
f"{_frame_count},{tk['id']},{tk['type']},RE-ENTER,{x},{y},{w},{h}" |
||||
) |
||||
elif tk["id"] == -1 and h_count >= 15: |
||||
# assign an id |
||||
trackers[idx]["id"] = tracker_counter |
||||
tracker_counter += 1 |
||||
logging.info( |
||||
f"{_frame_count},{tk['id']},{tk['type']},ENTER,{x},{y},{w},{h}" |
||||
) |
||||
print(f" {trackers[idx]['id']} ENTERS (& ID assigned)") |
||||
|
||||
for lvng_cnd in OBJ_LEAVING_COND: |
||||
_origin = tk["origin"]["id"] |
||||
cond = dict(lvng_cnd) |
||||
if cond["origin_id"] != _origin: |
||||
continue |
||||
if cond["heading"] != "" and avg_heading != cond["heading"]: |
||||
continue |
||||
REACH_condition = False |
||||
if cond["x"] != -1: |
||||
# print("REACH condition: X", x, cond["x"]) |
||||
if cond["heading"] in "NE": |
||||
_cond = cond["x"](y + w) if callable(cond["x"]) else cond["x"] |
||||
if x + w > _cond: |
||||
REACH_condition = True |
||||
elif cond["heading"] in "SW": |
||||
_cond = cond["x"](y + h) if callable(cond["x"]) else cond["x"] |
||||
if x < _cond: |
||||
REACH_condition = True |
||||
elif cond["heading"] == "": |
||||
_cond = cond["x"](y) if callable(cond["x"]) else cond["x"] |
||||
if x < _cond: |
||||
REACH_condition = True |
||||
elif cond["y"] != -1: |
||||
_cond = cond["y"](x + h) if callable(cond["y"]) else cond["y"] |
||||
# Don't have one yet |
||||
if cond["heading"] in "SW" and y + h > _cond: |
||||
print("REACH condition: Y", y + h, _cond) |
||||
REACH_condition = True |
||||
# pass |
||||
if not REACH_condition: |
||||
continue |
||||
|
||||
# print("MATCH COND") |
||||
# pp.pprint(cond) |
||||
if not cond["next_area"]: |
||||
# left the frame |
||||
print(f" {tk['id']} LEFT from frame") |
||||
logging.info( |
||||
f"{_frame_count},{tk['id']},{tk['type']},LEFT,{x},{y},{w},{h}" |
||||
) |
||||
untracking.append(tk) |
||||
continue |
||||
|
||||
_nid = f"id_{cond['next_area'][0]}" |
||||
# print(f"#{tk['id']} origin:#{_origin} to#{_nid}", end="") |
||||
print(f" {tk['id']} LEFT from {_origin} -> {_nid}") |
||||
|
||||
logging.info( |
||||
f"{_frame_count},{tk['id']},{tk['type']},WAIT,{x},{y},{w},{h},SWITCH_ZONE:{_origin}:{_nid}" |
||||
) |
||||
|
||||
if _nid not in W4A: |
||||
# print(f">>add AREA {_nid} to W4A", end="") |
||||
W4A[_nid] = {"objects": []} |
||||
# put this object to W4A for next area if doesn't exist |
||||
has_this = [_ for _ in W4A[_nid]["objects"] if _[0]["id"] == tk["id"]] |
||||
if not has_this: |
||||
GONE = True |
||||
# unit in frame |
||||
_expected_frame = _frame_count + cond["duration_to_next"] |
||||
W4A[_nid]["objects"].append((tk, _frame_count, _expected_frame)) |
||||
untracking.append(tk) |
||||
# print(f'>>GONE - W#{len(W4A[_nid]["objects"])}') |
||||
# print(f' {_nid} objs: ') |
||||
# pp.pprint(W4A[_nid]["objects"]) |
||||
# print(f' {_nid} untracking: ', [_['id'] for _ in untracking]) |
||||
|
||||
if GONE: |
||||
continue |
||||
|
||||
print( |
||||
f"[{tk['id']}{tk['type'][:1]}] o#{tk['origin']['id']} (x,y)=({x},{y},{w},{h})" |
||||
f" | still #{tk['still']} | distance: " |
||||
f"{last_distance:.3f} -> {curr_distance:.3f}", |
||||
end="", |
||||
) |
||||
_htxt = ",".join(trackers[idx]["heading"][:18]) |
||||
print(f"|heading:{_htxt}", end="") |
||||
print(f"|avg:{avg_heading}", end="") |
||||
print("") |
||||
|
||||
# DRAW on FRAME |
||||
color = [int(c) for c in COLORS[0]] |
||||
cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2) |
||||
cv2.putText( |
||||
frame, |
||||
f"{tk['id'] if tk['id'] > 0 else '?'} - {tk['type']}", |
||||
(x, y - 5), |
||||
cv2.FONT_HERSHEY_SIMPLEX, |
||||
0.5, |
||||
color, |
||||
2, |
||||
) |
||||
|
||||
# Cleanup Tracker |
||||
if untracking: |
||||
ut_ids = [_["id"] for _ in untracking] # untracking ids |
||||
# cv_trackers.clear() |
||||
cv_trackers.clear() |
||||
cv_trackers = None |
||||
time.sleep(1) |
||||
cv_trackers = cv2.MultiTracker_create() |
||||
# print(" cv_tracker", type(cv_trackers), cv_trackers) |
||||
# get rid of them in trackers too |
||||
trackers = [_ for _ in trackers if _["id"] not in ut_ids] |
||||
# add tracker again |
||||
for _trckr in trackers: |
||||
__tkr = OPENCV_OBJECT_TRACKERS["csrt"]() |
||||
cv_trackers.add(__tkr, frame, tuple(_trckr["curr_position"])) |
||||
# print(f"=== AFTER CLEANUP ---- UNTRACKING ===") |
||||
# print(f" cv #{len(cv_trackers.getObjects())} trackers #{len(trackers)}") |
||||
print(f" Total #{len(cv_trackers.getObjects())}") |
||||
|
||||
# only detect once a sec |
||||
if _frame_count % 15 == 1: |
||||
idxs, boxes, confidences, classIDs, start, end = detect_stuffs(frame) |
||||
# loop over the indexes we are keeping |
||||
if len(idxs) == 0: |
||||
print("CAN NOT DETECT STUFFS??") |
||||
continue |
||||
if isinstance(idxs, tuple): |
||||
pp.pprint(idxs) |
||||
|
||||
for i in idxs.flatten(): |
||||
# extract the bounding box coordinates |
||||
(x, y) = (boxes[i][0], boxes[i][1]) |
||||
(w, h) = (boxes[i][2], boxes[i][3]) |
||||
|
||||
_class = LABELS[classIDs[i]] |
||||
found_at = check_if_inside_the_boxes(x, y, w, h, _class) |
||||
if not found_at: |
||||
continue |
||||
|
||||
# only do for person/wheelchair |
||||
if _class not in ["person", "wheelchair", "bicycle"]: |
||||
continue |
||||
|
||||
# (1) check whether it's the same object as one in trackers |
||||
is_same = False |
||||
for t in trackers: |
||||
if _class != t["type"]: |
||||
continue |
||||
|
||||
_x, _y, _w, _h = t["curr_position"] |
||||
# print(f"[{t['id']}] - {t['type']}") |
||||
is_same = is_it_the_same_obj(x, y, w, h, _x, _y, _w, _h, id=t["id"]) |
||||
if is_same: |
||||
break |
||||
|
||||
if not is_same: |
||||
# create tracker and add to multi-object tracker |
||||
_tracker = OPENCV_OBJECT_TRACKERS["csrt"]() |
||||
bbox = (x, y, w, h) |
||||
cv_trackers.add(_tracker, frame, bbox) |
||||
t = { |
||||
"id": -1, # NOTE: assign later; tracker_counter if gid is None else gid, |
||||
"type": _class, |
||||
"status": "", |
||||
"curr_position": bbox, |
||||
"size": (w, h), |
||||
"heading": [], |
||||
"origin": found_at, |
||||
"distance": -1, |
||||
"first_position": bbox, |
||||
"still": 0, |
||||
"history": [bbox], |
||||
} |
||||
trackers.append(t) |
||||
print(f" total #{len(trackers)}") |
||||
pp.pprint(t) |
||||
# print(f" i -> {i} ({x},{y}), {w},{h} ({x + w},{y + h})") |
||||
|
||||
# _what = ",".join([LABELS[c] for c in classIDs]) |
||||
# print(f"[{_frame_count:08d}] :: {_what}") |
||||
|
||||
# CLEANUP |
||||
cleanups = [] |
||||
for k in W4A: |
||||
print(f" CLEANUP[area {k}] ", end="") |
||||
for _o in W4A[k]["objects"]: |
||||
print(f" {_o[0]['id']} - {_o[1]} - {_o[2]}", end="") |
||||
if _o[2] < _frame_count: |
||||
continue |
||||
wf = _o[2] - _o[1] |
||||
wf = wf if wf < 30 * 6 else 180 |
||||
if _o[2] - wf > _frame_count: |
||||
# get rid of this |
||||
cleanups.append(_o) |
||||
# remove this id out of next W4A |
||||
W4A[k]["objects"] = [ |
||||
_ for _ in W4A[_area_id]["objects"] if _[0]["id"] != _o[0]["id"] |
||||
] |
||||
print(f" | tracked #{len(trackers)}") |
||||
for obj, _s, _e in cleanups: |
||||
print( |
||||
f" {obj['id']} CLEANED UP should found at {_e - _frame_count} ago" |
||||
) |
||||
|
||||
if args["live"]: |
||||
cv2.imshow("Frame", frame) |
||||
key = cv2.waitKey(1) & 0xFF |
||||
|
||||
if key == ord("w"): |
||||
while cv2.waitKey(1) & 0xFF != ord("w"): |
||||
pass |
||||
# if the `q` key was pressed, break from the loop |
||||
elif key == ord("q"): |
||||
break |
||||
|
||||
if args["output"]: |
||||
# check if the video writer is None |
||||
if writer is None: |
||||
# initialize our video writer |
||||
fourcc = cv2.VideoWriter_fourcc(*"MJPG") |
||||
writer = cv2.VideoWriter( |
||||
args["output"], fourcc, 30, (frame.shape[1], frame.shape[0]), True |
||||
) |
||||
|
||||
# some information on processing single frame |
||||
if total > 0: |
||||
elap = end - start |
||||
print("[INFO] single frame took {:.4f} seconds".format(elap)) |
||||
print( |
||||
"[INFO] estimated total time to finish: {:.4f}".format(elap * total) |
||||
) |
||||
|
||||
# write the output frame to disk |
||||
writer.write(frame) |
||||
|
||||
# TODO: find a way to get rid of IMPOSSIBLE obj for example |
||||
# suddenly appear on area 7 then move up NORTH... -- this is incorrectly detected indeed. |
||||
|
||||
# release the file pointers |
||||
print("[INFO] cleaning up...") |
||||
if writer: |
||||
writer.release() |
||||
vs.release() |
@ -0,0 +1,291 @@
|
||||
import collections |
||||
import math |
||||
from collections import namedtuple |
||||
|
||||
Rectangle = namedtuple("Rectangle", "xmin ymin xmax ymax") |
||||
|
||||
|
||||
def area(a, b): |
||||
# returns None if rectangles don't intersect |
||||
dx = min(a.xmax, b.xmax) - max(a.xmin, b.xmin) |
||||
dy = min(a.ymax, b.ymax) - max(a.ymin, b.ymin) |
||||
if (dx >= 0) and (dy >= 0): |
||||
return dx * dy |
||||
|
||||
|
||||
# detecting area |
||||
AREAS = [ |
||||
[ |
||||
("id", 1), |
||||
("area", ((0, 40), (120, 129))), |
||||
("target", ["car", "bus", "motorbike"]), |
||||
("next", [6]), |
||||
], |
||||
[ |
||||
("id", 2), |
||||
("area", ((85, 0), (222, 74))), |
||||
("target", ["person", "bicycle"]), |
||||
("next", [7]), |
||||
], |
||||
[ |
||||
("id", 3), |
||||
("area", ((38, 340), (130, 482))), |
||||
("target", ["person", "wheelchair"]), |
||||
("next", [5]), |
||||
], |
||||
[ |
||||
("id", 4), |
||||
("area", ((96, 310), (145, 461))), |
||||
("target", ["person", "wheelchair"]), |
||||
], |
||||
[ |
||||
("id", 5), |
||||
("area", ((286, 230), (441, 346))), |
||||
("target", ["person", "wheelchair"]), |
||||
("next", [8]), |
||||
], |
||||
[ |
||||
("id", 6), |
||||
("area", ((421, 190), (555, 304))), |
||||
("target", ["car", "bus", "motorbike"]), |
||||
("next", []), |
||||
], |
||||
[ |
||||
("id", 7), |
||||
("area", ((555, 170), (700, 295))), |
||||
("target", ["person", "wheelchair", "bicycle"]), |
||||
("next", [4]), |
||||
], |
||||
[ |
||||
("id", 11), |
||||
("area", ((875, 179), (945, 278))), |
||||
("target", ["person", "wheelchair"]), |
||||
("next", [9]), |
||||
], |
||||
[ |
||||
("id", 8), |
||||
("area", ((901, 223), (976, 342))), |
||||
("target", ["person", "wheelchair"]), |
||||
("next", [7, 5]), |
||||
], |
||||
[ |
||||
("id", 9), |
||||
("area", ((1047, 229), (1120, 338))), |
||||
("target", ["person", "wheelchair"]), |
||||
("next", [8]), |
||||
], |
||||
[ |
||||
("id", 10), |
||||
("area", ((1158, 200), (1230, 307))), |
||||
("target", ["person", "wheelchair"]), |
||||
("next", [9]), |
||||
], |
||||
] |
||||
|
||||
OBJ_LEAVING_COND = [ |
||||
# [ |
||||
# ('origin_id', ), |
||||
# ('heading', ''), |
||||
# ('x', ), |
||||
# ('y', ), |
||||
# ('next_area', []), |
||||
# ('duration_to_next', ) |
||||
# ], |
||||
[ |
||||
("origin_id", 2), |
||||
("heading", ""), |
||||
("x", lambda y: (y - 112.6) / -0.2270), |
||||
("y", -1), |
||||
("next_area", [5, 7]), |
||||
("duration_to_next", 30 * 5.5), |
||||
], |
||||
[ |
||||
("origin_id", 3), |
||||
("heading", "N"), |
||||
("x", lambda y: (y + 1993) / 15), |
||||
("y", -1), |
||||
("next_area", [5]), |
||||
("duration_to_next", 30 * 4), |
||||
], |
||||
[ |
||||
("origin_id", 4), |
||||
("heading", "S"), |
||||
("x", 34), |
||||
("y", -1), |
||||
("next_area", []), |
||||
("duration_to_next", 30 * 0), |
||||
], |
||||
[ |
||||
("origin_id", 5), |
||||
("heading", "N"), |
||||
("x", 715), |
||||
("y", -1), |
||||
("next_area", [11]), |
||||
("duration_to_next", 30 * 3), |
||||
], |
||||
[ |
||||
("origin_id", 5), |
||||
("heading", "S"), |
||||
("x", lambda y: (y - 165.3) / 0.4143), |
||||
("y", -1), |
||||
("next_area", [4]), |
||||
("duration_to_next", 30 * 4), |
||||
], |
||||
[ |
||||
("origin_id", 7), |
||||
("heading", "S"), |
||||
("x", -1), |
||||
("y", lambda x: 0.4 * x + 152), |
||||
("next_area", [4]), |
||||
("duration_to_next", 30 * 4), |
||||
], |
||||
[ |
||||
("origin_id", 11), |
||||
("heading", "S"), |
||||
("x", 870), |
||||
("y", -1), |
||||
("next_area", [7]), |
||||
("duration_to_next", 30 * 2), |
||||
], |
||||
# [ |
||||
# ('origin_id', ), |
||||
# ('heading', ''), |
||||
# ('x', ), |
||||
# ('y', ), |
||||
# ('next_area', []), |
||||
# ('duration_to_next', ) |
||||
# ], |
||||
] |
||||
|
||||
DONTCARE = ( |
||||
(("origin_id", 5), ("heading", ["S", "W"])), |
||||
(("origin_id", 4), ("heading", ["S", "W"])), |
||||
(("origin_id", 7), ("heading", ["N", "E"])), |
||||
(("origin_id", 10), ("heading", ["N", "E"])), |
||||
) |
||||
|
||||
|
||||
def get_linear_equation(pnt1, pnt2): |
||||
if not (isinstance(pnt1, tuple) or isinstance(pnt2, tuple)): |
||||
return None |
||||
a = (pnt2[1] - pnt1[1]) / (pnt2[0] - pnt1[0]) |
||||
b = pnt1[1] - a * pnt1[0] |
||||
return f"y = {a:.4f}x + {b:.1f}" |
||||
|
||||
|
||||
def get_avg_heading(headings): |
||||
latest = headings[:15] |
||||
_h = "".join(latest).replace("W", "").replace("E", "") |
||||
chars = collections.Counter(_h).most_common(10) |
||||
if chars: |
||||
return chars[0][0] |
||||
return None |
||||
|
||||
|
||||
def get_heading(x1, y1, x2, y2): |
||||
diff_x, diff_y = x2 - x1, y2 - y1 |
||||
if diff_x > 0 and diff_y > 0: |
||||
return "NE" |
||||
if diff_x > 0 and diff_y == 0: |
||||
return "N" |
||||
if diff_x > 0: |
||||
return "NW" |
||||
if diff_y > 0: |
||||
return "SE" |
||||
if diff_y == 0: |
||||
return "S" |
||||
return "SW" |
||||
|
||||
|
||||
def box_distance(pos1, pos2): |
||||
x1, y1, w1, h1 = pos1 |
||||
x2, y2, w2, h2 = pos2 |
||||
tx1, tx2 = x1 + w1 / 2, x2 + w2 / 2 |
||||
ty1, ty2 = y1 + h1 / 2, y2 + h2 / 2 |
||||
return math.sqrt(math.pow(tx2 - tx1, 2) + math.pow(ty2 - ty1, 2)) |
||||
|
||||
|
||||
def distance(x2, y2, x1, y1): |
||||
return math.sqrt(math.pow(x2 - x1, 2) + math.pow(y2 - y1, 2)) |
||||
|
||||
|
||||
def check_if_inside_the_boxes(x, y, w, h, _type): |
||||
cx, cy = x + w / 2, y + h / 2 |
||||
# print(cx, cy) |
||||
is_inside = False |
||||
for _box in AREAS: |
||||
if is_inside: |
||||
break |
||||
box = dict(_box) |
||||
((x1, y1), (x2, y2)) = box["area"] |
||||
# print(x1, cx, x2, ' -- ', y1, cy, y2, _type, box['target']) |
||||
if x1 < cx and cx < x2 and y1 < cy and cy < y2 and _type in box["target"]: |
||||
# print('inside --> ', _type, cx, cy, box['id']) |
||||
is_inside = True |
||||
# if diff_x < box_w |
||||
if is_inside: |
||||
# print("INSIDE!! this -> ", box) |
||||
return box |
||||
return False |
||||
|
||||
|
||||
def is_it_the_same_obj(x1, y1, w1, h1, i1, j1, w2, h2, **kwargs): |
||||
"""We would use the centroid location to check whether they are the same |
||||
object and of course, dimension too. |
||||
""" |
||||
_id = kwargs.get("id", None) |
||||
# if _id: |
||||
# print(" :: check against id:", _id, end="") |
||||
|
||||
# if first coords are pretty much the same spot, then they are the same |
||||
if abs(x1 - i1) / x1 < 0.05 and abs(y1 - j1) / y1 < 0.05: |
||||
# print(" same 1st coords") |
||||
return True |
||||
|
||||
DIMENSION_SHIFT = 0.15 |
||||
# we have to use centroid !! from the experience |
||||
cx1, cy1, cx2, cy2 = x1 + w1 / 2, y1 + h1 / 2, i1 + w2 / 2, j1 + h2 / 2 |
||||
|
||||
c_dff_x, c_dff_y = abs(cx2 - cx1), abs(cy2 - cy1) |
||||
w_shift, h_shift = w1 * DIMENSION_SHIFT, h1 * DIMENSION_SHIFT |
||||
# print(" ::SAME:: shift", end="") |
||||
# print(f" | SHIFT w:{w_shift},h:{h_shift}", end="") |
||||
# print(f" | centroid {c_dff_x}, {c_dff_y}", end="") |
||||
if c_dff_x > w_shift and c_dff_y > h_shift: |
||||
# print(" ::SAME:: shift too much already -- NOT THE SAME") |
||||
return False |
||||
|
||||
# if one inside the other |
||||
if i1 > x1 and (w1 - w2) > (i1 - x1) and j1 > y1 and h1 - h2 > j1 - y1: |
||||
# one is inside the other |
||||
# print(" ::SAME:: new one inside existing tracker") |
||||
return True |
||||
if x1 > i1 and (w2 - w1) > x1 - i1 and y1 > j1 and h2 - h1 > y1 - j1: |
||||
# one is inside the other |
||||
# print(" ::SAME:: existing tracker inside new tracker") |
||||
return True |
||||
|
||||
# if it's 90% overlapped then, assumed it's the same |
||||
# ra = Rectangle(x1, y1, x1 + w1, y1 + h1) |
||||
# rb = Rectangle(i1, j1, i1 + w2, j1 + w2) |
||||
# print(f'overlapped area: {area(ra, rb)} 1:{w1*h1:.1f} 1:{w2*h2:.1f}') |
||||
# print(f'||one {x1},{y1},{w1},{h1} two {i1},{j1},{w2},{h2}||') |
||||
|
||||
# if it's not inside the other, then we can use "size" if it's different |
||||
size1, size2 = w1 * h1, w2 * h2 |
||||
# if size is larger than 20%, then it's not the same thing |
||||
# print(" ::SAME:: size") |
||||
if abs(size2 - size1) / size1 > 0.45: |
||||
# print(f" sz {size1}, {size2}, diff%{abs(size2 - size1)/size1}", end="") |
||||
# print(" ^^ too diff in size -- NOT THE SAME", end="") |
||||
return False |
||||
|
||||
# print(" ::SAME:: last") |
||||
# print("") |
||||
return True |
||||
|
||||
|
||||
if __name__ == "__main__": |
||||
check_if_inside_the_boxes(461, 263, 24, 65, "person") |
||||
check_if_inside_the_boxes(8, 45, 172, 193, "bus") |
||||
check_if_inside_the_boxes(300, 300, 24, 65, "person") |
Loading…
Reference in new issue