Browse Source

Detect by YOLO and track by dlib

* dlib sorta miss a lot
dev
sipp11 5 years ago
parent
commit
84a7df960a
  1. 4
      .gitignore
  2. 337
      src/main.py
  3. 119
      src/utils.py
  4. 10
      src/yolo.py

4
.gitignore vendored

@ -0,0 +1,4 @@
*__pycache__*
*.mp4
.DS_Store

337
src/main.py

@ -0,0 +1,337 @@
"""USAGE:
time python src/_detector.py --input ~/Desktop/5min.mp4 -o output.mp4
time python src/_detector.py --input ~/Desktop/5min.mp4 -l
"""
# import the necessary packages
import numpy as np
import argparse
import imutils
import time
import cv2
import os
import dlib
from utils import check_if_inside_the_boxes, is_it_the_same_obj, distance
# tracking
OPENCV_OBJECT_TRACKERS = {"csrt": cv2.TrackerCSRT_create}
trackers = []
finished = []
# construct the argument parse and parse the arguments
ap = argparse.ArgumentParser()
ap.add_argument("-i", "--input", required=True, help="path to input video")
ap.add_argument("-o", "--output", required=False, help="path to output video")
ap.add_argument("-l", "--live", action="store_true", help="Show live detection")
# ap.add_argument("-y", "--yolo", required=True,
# help="base path to YOLO directory")
ap.add_argument(
"-c",
"--confidence",
type=float,
default=0.95,
help="minimum probability to filter weak detections",
)
ap.add_argument(
"-t",
"--threshold",
type=float,
default=0.3,
help="threshold when applyong non-maxima suppression",
)
args = vars(ap.parse_args())
# load the COCO class labels our YOLO model was trained on
# labelsPath = os.path.sep.join([args["yolo"], "coco.names"])
labelsPath = "/Users/sipp11/syncthing/dropbox/tracking-obj/mytrain.names"
LABELS = open(labelsPath).read().strip().split("\n")
# 0 person, 1 wheelchair, 2 bicycle, 3 motorbike, 4 car, 5 bus, 6 truck
# initialize a list of colors to represent each possible class label
np.random.seed(42)
COLORS = np.random.randint(0, 255, size=(len(LABELS), 3), dtype="uint8")
# derive the paths to the YOLO weights and model configuration
weightsPath = "/Users/sipp11/syncthing/dropbox/tracking-obj/mytrain_final.weights"
configPath = "/Users/sipp11/syncthing/dropbox/tracking-obj/mytrain.cfg"
# load our YOLO object detector trained on COCO dataset (80 classes)
# and determine only the *output* layer names that we need from YOLO
print("[INFO] loading YOLO from disk...")
net = cv2.dnn.readNetFromDarknet(configPath, weightsPath)
ln = net.getLayerNames()
ln = [ln[i[0] - 1] for i in net.getUnconnectedOutLayers()]
def detect_stuffs(net, frame):
# construct a blob from the input frame and then perform a forward
# pass of the YOLO object detector, giving us our bounding boxes
# and associated probabilities
blob = cv2.dnn.blobFromImage(frame, 1 / 255.0, (416, 416), swapRB=True, crop=False)
net.setInput(blob)
start = time.time()
layerOutputs = net.forward(ln)
end = time.time()
# initialize our lists of detected bounding boxes, confidences,
# and class IDs, respectively
boxes = []
confidences = []
classIDs = []
# loop over each of the layer outputs
for output in layerOutputs:
# print(f'[{_frame_count:08d}] output -> ', len(output))
# loop over each of the detections
for detection in output:
# extract the class ID and confidence (i.e., probability)
# of the current object detection
scores = detection[5:]
classID = np.argmax(scores)
confidence = scores[classID]
# filter out weak predictions by ensuring the detected
# probability is greater than the minimum probability
if confidence <= args["confidence"]:
continue
# scale the bounding box coordinates back relative to
# the size of the image, keeping in mind that YOLO
# actually returns the center (x, y)-coordinates of
# the bounding box followed by the boxes' width and
# height
box = detection[0:4] * np.array([W, H, W, H])
(centerX, centerY, width, height) = box.astype("int")
# use the center (x, y)-coordinates to derive the top
# and and left corner of the bounding box
x = int(centerX - (width / 2))
y = int(centerY - (height / 2))
# update our list of bounding box coordinates,
# confidences, and class IDs
boxes.append([x, y, int(width), int(height)])
confidences.append(float(confidence))
classIDs.append(classID)
# apply non-maxima suppression to suppress weak, overlapping
# bounding boxes
idxs = cv2.dnn.NMSBoxes(
boxes, confidences, args["confidence"], args["threshold"]
)
# ensure at least one detection exists
if len(idxs) == 0:
continue
# NOTE: we are not going to draw anything from DETECTION,
# only from tracking one
# loop over the indexes we are keeping
# for i in idxs.flatten():
# # extract the bounding box coordinates
# (x, y) = (boxes[i][0], boxes[i][1])
# (w, h) = (boxes[i][2], boxes[i][3])
# # draw a bounding box rectangle and label on the frame
# color = [int(c) for c in COLORS[classIDs[i]]]
# cv2.rectangle(frame, (x, y), (x + w, y + h), color, 2)
# text = "{}: {:.4f}".format(LABELS[classIDs[i]], confidences[i])
# cv2.putText(
# frame, text, (x, y - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2
# )
return idxs, boxes, confidences, classIDs, start, end
# initialize the video stream, pointer to output video file, and
# frame dimensions
vs = cv2.VideoCapture(args["input"])
writer = None
(W, H) = (None, None)
# try to determine the total number of frames in the video file
try:
prop = (
cv2.cv.CV_CAP_PROP_FRAME_COUNT if imutils.is_cv2() else cv2.CAP_PROP_FRAME_COUNT
)
total = int(vs.get(prop))
print("[INFO] {} total frames in video".format(total))
# an error occurred while trying to determine the total
# number of frames in the video file
except:
print("[INFO] could not determine # of frames in video")
print("[INFO] no approx. completion time can be provided")
total = -1
_frame_count = 0
tracker_counter = 1
# loop over frames from the video file stream
while True:
# read the next frame from the file
(grabbed, frame) = vs.read()
# if the frame was not grabbed, then we have reached the end
# of the stream
if not grabbed:
break
# if the frame dimensions are empty, grab them
if W is None or H is None:
(H, W) = frame.shape[:2]
_frame_count += 1
# for dlib
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# only detect once a sec
if _frame_count % 15 == 1:
idxs, boxes, confidences, classIDs, start, end = detect_stuffs(net, frame)
# loop over the indexes we are keeping
for i in idxs.flatten():
# extract the bounding box coordinates
(x, y) = (boxes[i][0], boxes[i][1])
(w, h) = (boxes[i][2], boxes[i][3])
_class = LABELS[classIDs[i]]
_good = check_if_inside_the_boxes(x, y, w, h, _class)
if not _good:
continue
# (1) check whether it's the same object as one in trackers
is_same = False
for t in trackers:
tracker = t["tracker"]
if _class != t["type"]:
continue
pos = tracker.get_position()
i = int(pos.left())
j = int(pos.top())
_w = int(pos.right()) - i
_h = int(pos.bottom()) - j
print(f"[{t['id']}] - {t['type']}")
is_same = is_it_the_same_obj(x, y, w, h, i, j, _w, _h, id=t["id"])
if is_same:
break
if not is_same:
# add tracker to this obj
# create a new object tracker for the bounding box and add it
# to our multi-object tracker
# tracker = OPENCV_OBJECT_TRACKERS[args["tracker"]]()
# trackers.add(tracker, frame, box)
tracker = dlib.correlation_tracker()
rect = dlib.rectangle(x, y, x + w, y + h)
print("NEW TRACKER rect", rect)
t = {
"id": tracker_counter,
"type": _class,
"tracker": tracker,
"direction": "",
"last_distance": -1,
"last_position": (x + w / 2, y + h / 2),
"still": 0,
}
tracker_counter += 1
tracker.start_track(frame_rgb, rect)
trackers.append(t)
print(f" i -> {i} ({x},{y}), {w},{h} ({x + w},{y + h})")
# # draw a bounding box rectangle and label on the frame
# color = [int(c) for c in COLORS[classIDs[i]]]
# cv2.rectangle(frame, (x, y), (x + w, y + h), color, 2)
# text = "{}: {:.4f}".format(LABELS[classIDs[i]], confidences[i])
# cv2.putText(
# frame, text, (x, y - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2
# )
_what = ",".join([LABELS[c] for c in classIDs])
print(f"[{_frame_count:08d}] :: {_what}")
untracking = []
for tk in trackers:
tk["tracker"].update(frame_rgb)
pos = tk["tracker"].get_position()
# unpack the position object
startX = int(pos.left())
startY = int(pos.top())
endX = int(pos.right())
endY = int(pos.bottom())
tcx, tcy = (startX + endX) / 2, (startY + endY) / 2
# calculate distance
_x, _y = tk["last_position"]
_d = distance(_x, _y, tcx, tcy)
_last_distance = tk["last_distance"]
tk["last_distance"] = _d
tk["last_position"] = (tcx, tcy)
STILL_DISTANCE_IN_PX = 2
if _last_distance < STILL_DISTANCE_IN_PX and _d < STILL_DISTANCE_IN_PX:
tk["still"] += 1
else:
tk["still"] = 0
if tk["still"] > 30 or tcx < 10 or tcx > 1200:
untracking.append(tk)
cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2)
color = [int(c) for c in COLORS[0]]
print(
f"{tk['id']} - {tk['type']} - centroid: {tcx, tcy} - distance: [stl:{tk['still']}] {_last_distance:.3f} -> {_d:.3f}"
)
cv2.putText(
frame,
f"{tk['id']} - {tk['type']}",
(startX, startY - 5),
cv2.FONT_HERSHEY_SIMPLEX,
0.5,
color,
2,
)
# untracking
untracking_ids = [ut["id"] for ut in untracking]
trackers = [tk for tk in trackers if tk["id"] not in untracking_ids]
finished += untracking
if args["live"]:
cv2.imshow("Frame", frame)
key = cv2.waitKey(1) & 0xFF
# if the `q` key was pressed, break from the loop
if key == ord("q"):
break
if args["output"]:
# check if the video writer is None
if writer is None:
# initialize our video writer
fourcc = cv2.VideoWriter_fourcc(*"MJPG")
writer = cv2.VideoWriter(
args["output"], fourcc, 30, (frame.shape[1], frame.shape[0]), True
)
# some information on processing single frame
if total > 0:
elap = end - start
print("[INFO] single frame took {:.4f} seconds".format(elap))
print(
"[INFO] estimated total time to finish: {:.4f}".format(elap * total)
)
# write the output frame to disk
writer.write(frame)
# release the file pointers
print("[INFO] cleaning up...")
if writer:
writer.release()
vs.release()

119
src/utils.py

@ -0,0 +1,119 @@
import math
# detecting area
AREAS = [
[
("id", 1),
("area", ((0, 40), (12, 129))),
("target", ["car", "bus", "motorbike"]),
],
[("id", 2), ("area", ((85, 0), (222, 74))), ("target", ["person", "bicycle"])],
[("id", 3), ("area", ((38, 340), (99, 482))), ("target", ["person", "wheelchair"])],
[
("id", 4),
("area", ((106, 310), (164, 461))),
("target", ["person", "wheelchair"]),
],
[
("id", 5),
("area", ((286, 230), (441, 346))),
("target", ["person", "wheelchair"]),
],
[
("id", 6),
("area", ((421, 190), (555, 304))),
("target", ["car", "bus", "motorbike"]),
],
[
("id", 7),
("area", ((555, 170), (720, 295))),
("target", ["person", "wheelchair", "bicycle"]),
],
[
("id", 8),
("area", ((877, 224), (947, 334))),
("target", ["person", "wheelchair"]),
],
[
("id", 9),
("area", ((1047, 229), (112, 338))),
("target", ["person", "wheelchair"]),
],
[
("id", 10),
("area", ((1158, 200), (1230, 307))),
("target", ["person", "wheelchair"]),
],
]
def distance(x2, y2, x1, y1):
return math.sqrt(math.pow(x2 - x1, 2) + math.pow(y2 - y1, 2))
def check_if_inside_the_boxes(x, y, w, h, _type):
cx, cy = x + w / 2, y + h / 2
# print(cx, cy)
is_inside = False
for _box in AREAS:
if is_inside:
break
box = dict(_box)
((x1, y1), (x2, y2)) = box["area"]
# print(x1, cx, x2, ' -- ', y1, cy, y2, _type, box['target'])
if x1 < cx and cx < x2 and y1 < cy and cy < y2 and _type in box["target"]:
# print('inside --> ', _type, cx, cy, box['id'])
is_inside = True
# if diff_x < box_w
if is_inside:
print("INSIDE!! this -> ", box)
return is_inside
def is_it_the_same_obj(x1, y1, w1, h1, i1, j1, w2, h2, **kwargs):
"""We would use the centroid location to check whether they are the same
object and of course, dimension too.
"""
_id = kwargs.get("id", None)
if _id:
print(" :: check against id:", _id)
DIMENSION_SHIFT = 0.15
# we have to use centroid !! from the experience
cx1, cy1, cx2, cy2 = x1 + w1 / 2, y1 + h1 / 2, i1 + w2 / 2, j1 + h2 / 2
c_dff_x, c_dff_y = abs(cx2 - cx1), abs(cy2 - cy1)
w_shift, h_shift = w1 * DIMENSION_SHIFT, h1 * DIMENSION_SHIFT
print(" ::SAME:: shift")
print(f" ---> SHIFT --> w:{w_shift}, h:{h_shift}")
print(f" ---> centroid {c_dff_x}, {c_dff_y}")
if c_dff_x > w_shift and c_dff_y > h_shift:
print(" ::SAME:: shift too much already -- NOT THE SAME")
return False
# if one inside the other
if i1 > x1 and (w1 - w2) > i1 - x1 and j1 > y1 and h1 - h2 > j1 - y1:
# one is inside the other
print(" ::SAME:: new one inside existing tracker")
return True
if x1 > i1 and (w2 - w1) > x1 - i1 and y1 > j1 and h2 - h1 > y1 - j1:
# one is inside the other
print(" ::SAME:: existing tracker inside new tracker")
return True
# if it's not inside the other, then we can use "size" if it's different
size1, size2 = w1 * h1, w2 * h2
# if size is larger than 20%, then it's not the same thing
print(f" ---> size {size1}, {size2}, diff % : {abs(size2 - size1)/size1}")
print(" ::SAME:: size")
if abs(size2 - size1) / size1 > 0.45:
print(" ::SAME:: too diff in size -- NOT THE SAME")
return False
print(" ::SAME:: last")
return True
if __name__ == "__main__":
check_if_inside_the_boxes(461, 263, 24, 65, "person")
check_if_inside_the_boxes(8, 45, 172, 193, "bus")
check_if_inside_the_boxes(300, 300, 24, 65, "person")

10
src/yolo.py

@ -0,0 +1,10 @@
import cv2
import time
import numpy as np
def detect_stuffs(frame, net, ln, confidence, threshold, W, H):
return idxs, start, end
Loading…
Cancel
Save