Compare commits

...

16 Commits
master ... dev

  1. 4
      .gitignore
  2. 5
      .vscode/settings.json
  3. 2
      README.md
  4. 166
      examples/dlib_objs_tracking.py
  5. 302
      examples/dlib_objs_tracking_queue.py
  6. 122
      examples/opencv_objs_tracking.py
  7. 109
      examples/yolo_img_obj_detector.py
  8. 283
      examples/yolo_obj_detector.py
  9. 38
      src/draw_area.py
  10. 19
      src/log.py
  11. 566
      src/main.py
  12. 291
      src/utils.py

4
.gitignore vendored

@ -0,0 +1,4 @@
*__pycache__*
*.mp4
.DS_Store

5
.vscode/settings.json vendored

@ -1,4 +1,7 @@
{ {
"python.pythonPath": "~/.virtualenvs/obj-tracking/bin/python", "python.pythonPath": "~/.virtualenvs/obj-tracking/bin/python",
"python.formatting.provider": "black" "python.formatting.provider": "black",
"python.linting.pylintArgs": [
"--extension-pkg-whitelist=cv2"
]
} }

2
README.md

@ -1,7 +1,7 @@
# Handai aerial detector # Handai aerial detector
We need to analyze road users in mid-block crossing area between Handai monorail station and handai hospital. We need to analyze road users in mid-block crossing area between Handai monorail station and Handai hospital.
## Output we need ## Output we need

166
examples/dlib_objs_tracking.py

@ -14,21 +14,21 @@ import dlib
# construct the argument parser and parse the arguments # construct the argument parser and parse the arguments
ap = argparse.ArgumentParser() ap = argparse.ArgumentParser()
ap.add_argument("-v", "--video", type=str, ap.add_argument("-v", "--video", type=str,
help="path to input video file") help="path to input video file")
ap.add_argument("-t", "--tracker", type=str, default="kcf", ap.add_argument("-t", "--tracker", type=str, default="kcf",
help="OpenCV object tracker type") help="OpenCV object tracker type")
args = vars(ap.parse_args()) args = vars(ap.parse_args())
# initialize a dictionary that maps strings to their corresponding # initialize a dictionary that maps strings to their corresponding
# OpenCV object tracker implementations # OpenCV object tracker implementations
OPENCV_OBJECT_TRACKERS = { OPENCV_OBJECT_TRACKERS = {
"csrt": cv2.TrackerCSRT_create, "csrt": cv2.TrackerCSRT_create,
"kcf": cv2.TrackerKCF_create, "kcf": cv2.TrackerKCF_create,
"boosting": cv2.TrackerBoosting_create, "boosting": cv2.TrackerBoosting_create,
"mil": cv2.TrackerMIL_create, "mil": cv2.TrackerMIL_create,
"tld": cv2.TrackerTLD_create, "tld": cv2.TrackerTLD_create,
"medianflow": cv2.TrackerMedianFlow_create, "medianflow": cv2.TrackerMedianFlow_create,
"mosse": cv2.TrackerMOSSE_create "mosse": cv2.TrackerMOSSE_create
} }
# initialize OpenCV's special multi-object tracker # initialize OpenCV's special multi-object tracker
@ -37,92 +37,92 @@ trackers = []
# if a video path was not supplied, grab the reference to the web cam # if a video path was not supplied, grab the reference to the web cam
if not args.get("video", False): if not args.get("video", False):
print("[INFO] starting video stream...") print("[INFO] starting video stream...")
vs = VideoStream(src=0).start() vs = VideoStream(src=0).start()
time.sleep(1.0) time.sleep(1.0)
# otherwise, grab a reference to the video file # otherwise, grab a reference to the video file
else: else:
vs = cv2.VideoCapture(args["video"]) vs = cv2.VideoCapture(args["video"])
# loop over frames from the video stream # loop over frames from the video stream
while True: while True:
# grab the current frame, then handle if we are using a # grab the current frame, then handle if we are using a
# VideoStream or VideoCapture object # VideoStream or VideoCapture object
frame = vs.read() frame = vs.read()
frame = frame[1] if args.get("video", False) else frame frame = frame[1] if args.get("video", False) else frame
# check to see if we have reached the end of the stream # check to see if we have reached the end of the stream
if frame is None: if frame is None:
break break
# resize the frame (so we can process it faster) # resize the frame (so we can process it faster)
# frame = imutils.resize(frame, width=600) # frame = imutils.resize(frame, width=600)
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# grab the updated bounding box coordinates (if any) for each # grab the updated bounding box coordinates (if any) for each
# object that is being tracked # object that is being tracked
# (success, boxes) = trackers.update(frame) # (success, boxes) = trackers.update(frame)
# print('success', success) # print('success', success)
# print('boxes', boxes) # print('boxes', boxes)
for tk in trackers: for tk in trackers:
tk.update(frame_rgb) tk.update(frame_rgb)
pos = tk.get_position() pos = tk.get_position()
# unpack the position object # unpack the position object
startX = int(pos.left()) startX = int(pos.left())
startY = int(pos.top()) startY = int(pos.top())
endX = int(pos.right()) endX = int(pos.right())
endY = int(pos.bottom()) endY = int(pos.bottom())
cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2) cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2)
# loop over the bounding boxes and draw then on the frame # loop over the bounding boxes and draw then on the frame
# for box in boxes: # for box in boxes:
# (x, y, w, h) = [int(v) for v in box] # (x, y, w, h) = [int(v) for v in box]
# cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2) # cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
# show the output frame # show the output frame
cv2.imshow("Frame", frame) cv2.imshow("Frame", frame)
key = cv2.waitKey(1) & 0xFF key = cv2.waitKey(1) & 0xFF
# if the 's' key is selected, we are going to "select" a bounding # if the 's' key is selected, we are going to "select" a bounding
# box to track # box to track
if key == ord("s"): if key == ord("s"):
# select the bounding box of the object we want to track (make # select the bounding box of the object we want to track (make
# sure you press ENTER or SPACE after selecting the ROI) # sure you press ENTER or SPACE after selecting the ROI)
box = cv2.selectROI("Frame", frame, fromCenter=False, box = cv2.selectROI("Frame", frame, fromCenter=False,
showCrosshair=True) showCrosshair=True)
print('select box: ', box) print('select box: ', box)
(x,y,w,h) = box (x,y,w,h) = box
startX = x startX = x
startY = y startY = y
endX = x + w endX = x + w
endY = y + h endY = y + h
print(startX, startY, endX, endY) print(startX, startY, endX, endY)
# create a new object tracker for the bounding box and add it # create a new object tracker for the bounding box and add it
# to our multi-object tracker # to our multi-object tracker
# tracker = OPENCV_OBJECT_TRACKERS[args["tracker"]]() # tracker = OPENCV_OBJECT_TRACKERS[args["tracker"]]()
# trackers.add(tracker, frame, box) # trackers.add(tracker, frame, box)
tracker = dlib.correlation_tracker() tracker = dlib.correlation_tracker()
rect = dlib.rectangle(startX, startY, endX, endY) rect = dlib.rectangle(startX, startY, endX, endY)
print('rect', rect) print('rect', rect)
tracker.start_track(frame_rgb, rect) tracker.start_track(frame_rgb, rect)
trackers.append(tracker) trackers.append(tracker)
# if the `q` key was pressed, break from the loop # if the `q` key was pressed, break from the loop
elif key == ord("q"): elif key == ord("q"):
break break
# if we are using a webcam, release the pointer # if we are using a webcam, release the pointer
if not args.get("video", False): if not args.get("video", False):
vs.stop() vs.stop()
# otherwise, release the file pointer # otherwise, release the file pointer
else: else:
vs.release() vs.release()
# close all windows # close all windows
cv2.destroyAllWindows() cv2.destroyAllWindows()

302
examples/dlib_objs_tracking_queue.py

@ -12,47 +12,47 @@ import dlib
import cv2 import cv2
def start_tracker(box, label, rgb, inputQueue, outputQueue): def start_tracker(box, label, rgb, inputQueue, outputQueue):
# construct a dlib rectangle object from the bounding box # construct a dlib rectangle object from the bounding box
# coordinates and then start the correlation tracker # coordinates and then start the correlation tracker
t = dlib.correlation_tracker() t = dlib.correlation_tracker()
rect = dlib.rectangle(box[0], box[1], box[2], box[3]) rect = dlib.rectangle(box[0], box[1], box[2], box[3])
t.start_track(rgb, rect) t.start_track(rgb, rect)
# loop indefinitely -- this function will be called as a daemon # loop indefinitely -- this function will be called as a daemon
# process so we don't need to worry about joining it # process so we don't need to worry about joining it
while True: while True:
# attempt to grab the next frame from the input queue # attempt to grab the next frame from the input queue
rgb = inputQueue.get() rgb = inputQueue.get()
# if there was an entry in our queue, process it # if there was an entry in our queue, process it
if rgb is not None: if rgb is not None:
# update the tracker and grab the position of the tracked # update the tracker and grab the position of the tracked
# object # object
t.update(rgb) t.update(rgb)
pos = t.get_position() pos = t.get_position()
# unpack the position object # unpack the position object
startX = int(pos.left()) startX = int(pos.left())
startY = int(pos.top()) startY = int(pos.top())
endX = int(pos.right()) endX = int(pos.right())
endY = int(pos.bottom()) endY = int(pos.bottom())
# add the label + bounding box coordinates to the output # add the label + bounding box coordinates to the output
# queue # queue
outputQueue.put((label, (startX, startY, endX, endY))) outputQueue.put((label, (startX, startY, endX, endY)))
# construct the argument parser and parse the arguments # construct the argument parser and parse the arguments
ap = argparse.ArgumentParser() ap = argparse.ArgumentParser()
ap.add_argument("-p", "--prototxt", required=True, ap.add_argument("-p", "--prototxt", required=True,
help="path to Caffe 'deploy' prototxt file") help="path to Caffe 'deploy' prototxt file")
ap.add_argument("-m", "--model", required=True, ap.add_argument("-m", "--model", required=True,
help="path to Caffe pre-trained model") help="path to Caffe pre-trained model")
ap.add_argument("-v", "--video", required=True, ap.add_argument("-v", "--video", required=True,
help="path to input video file") help="path to input video file")
ap.add_argument("-o", "--output", type=str, ap.add_argument("-o", "--output", type=str,
help="path to optional output video file") help="path to optional output video file")
ap.add_argument("-c", "--confidence", type=float, default=0.2, ap.add_argument("-c", "--confidence", type=float, default=0.2,
help="minimum probability to filter weak detections") help="minimum probability to filter weak detections")
args = vars(ap.parse_args()) args = vars(ap.parse_args())
# initialize our list of queues -- both input queue and output queue # initialize our list of queues -- both input queue and output queue
@ -63,9 +63,9 @@ outputQueues = []
# initialize the list of class labels MobileNet SSD was trained to # initialize the list of class labels MobileNet SSD was trained to
# detect # detect
CLASSES = ["background", "aeroplane", "bicycle", "bird", "boat", CLASSES = ["background", "aeroplane", "bicycle", "bird", "boat",
"bottle", "bus", "car", "cat", "chair", "cow", "diningtable", "bottle", "bus", "car", "cat", "chair", "cow", "diningtable",
"dog", "horse", "motorbike", "person", "pottedplant", "sheep", "dog", "horse", "motorbike", "person", "pottedplant", "sheep",
"sofa", "train", "tvmonitor"] "sofa", "train", "tvmonitor"]
# load our serialized model from disk # load our serialized model from disk
print("[INFO] loading model...") print("[INFO] loading model...")
@ -81,120 +81,120 @@ fps = FPS().start()
# loop over frames from the video file stream # loop over frames from the video file stream
while True: while True:
# grab the next frame from the video file # grab the next frame from the video file
(grabbed, frame) = vs.read() (grabbed, frame) = vs.read()
# check to see if we have reached the end of the video file # check to see if we have reached the end of the video file
if frame is None: if frame is None:
break break
# resize the frame for faster processing and then convert the # resize the frame for faster processing and then convert the
# frame from BGR to RGB ordering (dlib needs RGB ordering) # frame from BGR to RGB ordering (dlib needs RGB ordering)
frame = imutils.resize(frame, width=600) frame = imutils.resize(frame, width=600)
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# if we are supposed to be writing a video to disk, initialize # if we are supposed to be writing a video to disk, initialize
# the writer # the writer
if args["output"] is not None and writer is None: if args["output"] is not None and writer is None:
fourcc = cv2.VideoWriter_fourcc(*"MJPG") fourcc = cv2.VideoWriter_fourcc(*"MJPG")
writer = cv2.VideoWriter(args["output"], fourcc, 30, writer = cv2.VideoWriter(args["output"], fourcc, 30,
(frame.shape[1], frame.shape[0]), True) (frame.shape[1], frame.shape[0]), True)
# if our list of queues is empty then we know we have yet to # if our list of queues is empty then we know we have yet to
# create our first object tracker # create our first object tracker
if len(inputQueues) == 0: if len(inputQueues) == 0:
# grab the frame dimensions and convert the frame to a blob # grab the frame dimensions and convert the frame to a blob
(h, w) = frame.shape[:2] (h, w) = frame.shape[:2]
blob = cv2.dnn.blobFromImage(frame, 0.007843, (w, h), 127.5) blob = cv2.dnn.blobFromImage(frame, 0.007843, (w, h), 127.5)
# pass the blob through the network and obtain the detections # pass the blob through the network and obtain the detections
# and predictions # and predictions
net.setInput(blob) net.setInput(blob)
detections = net.forward() detections = net.forward()
# loop over the detections # loop over the detections
for i in np.arange(0, detections.shape[2]): for i in np.arange(0, detections.shape[2]):
# extract the confidence (i.e., probability) associated # extract the confidence (i.e., probability) associated
# with the prediction # with the prediction
confidence = detections[0, 0, i, 2] confidence = detections[0, 0, i, 2]
# filter out weak detections by requiring a minimum # filter out weak detections by requiring a minimum
# confidence # confidence
if confidence > args["confidence"]: if confidence > args["confidence"]:
# extract the index of the class label from the # extract the index of the class label from the
# detections list # detections list
idx = int(detections[0, 0, i, 1]) idx = int(detections[0, 0, i, 1])
label = CLASSES[idx] label = CLASSES[idx]
# if the class label is not a person, ignore it # if the class label is not a person, ignore it
if CLASSES[idx] != "person": if CLASSES[idx] != "person":
continue continue
# compute the (x, y)-coordinates of the bounding box # compute the (x, y)-coordinates of the bounding box
# for the object # for the object
box = detections[0, 0, i, 3:7] * np.array([w, h, w, h]) box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])
(startX, startY, endX, endY) = box.astype("int") (startX, startY, endX, endY) = box.astype("int")
bb = (startX, startY, endX, endY) bb = (startX, startY, endX, endY)
# create two brand new input and output queues, # create two brand new input and output queues,
# respectively # respectively
iq = multiprocessing.Queue() iq = multiprocessing.Queue()
oq = multiprocessing.Queue() oq = multiprocessing.Queue()
inputQueues.append(iq) inputQueues.append(iq)
outputQueues.append(oq) outputQueues.append(oq)
# spawn a daemon process for a new object tracker # spawn a daemon process for a new object tracker
p = multiprocessing.Process( p = multiprocessing.Process(
target=start_tracker, target=start_tracker,
args=(bb, label, rgb, iq, oq)) args=(bb, label, rgb, iq, oq))
p.daemon = True p.daemon = True
p.start() p.start()
# grab the corresponding class label for the detection # grab the corresponding class label for the detection
# and draw the bounding box # and draw the bounding box
cv2.rectangle(frame, (startX, startY), (endX, endY), cv2.rectangle(frame, (startX, startY), (endX, endY),
(0, 255, 0), 2) (0, 255, 0), 2)
cv2.putText(frame, label, (startX, startY - 15), cv2.putText(frame, label, (startX, startY - 15),
cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2) cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)
# otherwise, we've already performed detection so let's track # otherwise, we've already performed detection so let's track
# multiple objects # multiple objects
else: else:
# loop over each of our input ques and add the input RGB # loop over each of our input ques and add the input RGB
# frame to it, enabling us to update each of the respective # frame to it, enabling us to update each of the respective
# object trackers running in separate processes # object trackers running in separate processes
for iq in inputQueues: for iq in inputQueues:
iq.put(rgb) iq.put(rgb)
# loop over each of the output queues # loop over each of the output queues
for oq in outputQueues: for oq in outputQueues:
# grab the updated bounding box coordinates for the # grab the updated bounding box coordinates for the
# object -- the .get method is a blocking operation so # object -- the .get method is a blocking operation so
# this will pause our execution until the respective # this will pause our execution until the respective
# process finishes the tracking update # process finishes the tracking update
(label, (startX, startY, endX, endY)) = oq.get() (label, (startX, startY, endX, endY)) = oq.get()
# draw the bounding box from the correlation object # draw the bounding box from the correlation object
# tracker # tracker
cv2.rectangle(frame, (startX, startY), (endX, endY), cv2.rectangle(frame, (startX, startY), (endX, endY),
(0, 255, 0), 2) (0, 255, 0), 2)
cv2.putText(frame, label, (startX, startY - 15), cv2.putText(frame, label, (startX, startY - 15),
cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2) cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)
# check to see if we should write the frame to disk # check to see if we should write the frame to disk
if writer is not None: if writer is not None:
writer.write(frame) writer.write(frame)
# show the output frame # show the output frame
cv2.imshow("Frame", frame) cv2.imshow("Frame", frame)
key = cv2.waitKey(1) & 0xFF key = cv2.waitKey(1) & 0xFF
# if the `q` key was pressed, break from the loop # if the `q` key was pressed, break from the loop
if key == ord("q"): if key == ord("q"):
break break
# update the FPS counter # update the FPS counter
fps.update() fps.update()
# stop the timer and display FPS information # stop the timer and display FPS information
fps.stop() fps.stop()
@ -203,7 +203,7 @@ print("[INFO] approx. FPS: {:.2f}".format(fps.fps()))
# check to see if we need to release the video writer pointer # check to see if we need to release the video writer pointer
if writer is not None: if writer is not None:
writer.release() writer.release()
# do a bit of cleanup # do a bit of cleanup
cv2.destroyAllWindows() cv2.destroyAllWindows()

122
examples/opencv_objs_tracking.py

@ -11,21 +11,21 @@ import cv2
# construct the argument parser and parse the arguments # construct the argument parser and parse the arguments
ap = argparse.ArgumentParser() ap = argparse.ArgumentParser()
ap.add_argument("-v", "--video", type=str, ap.add_argument("-v", "--video", type=str,
help="path to input video file") help="path to input video file")
ap.add_argument("-t", "--tracker", type=str, default="kcf", ap.add_argument("-t", "--tracker", type=str, default="kcf",
help="OpenCV object tracker type") help="OpenCV object tracker type")
args = vars(ap.parse_args()) args = vars(ap.parse_args())
# initialize a dictionary that maps strings to their corresponding # initialize a dictionary that maps strings to their corresponding
# OpenCV object tracker implementations # OpenCV object tracker implementations
OPENCV_OBJECT_TRACKERS = { OPENCV_OBJECT_TRACKERS = {
"csrt": cv2.TrackerCSRT_create, "csrt": cv2.TrackerCSRT_create,
"kcf": cv2.TrackerKCF_create, "kcf": cv2.TrackerKCF_create,
"boosting": cv2.TrackerBoosting_create, "boosting": cv2.TrackerBoosting_create,
"mil": cv2.TrackerMIL_create, "mil": cv2.TrackerMIL_create,
"tld": cv2.TrackerTLD_create, "tld": cv2.TrackerTLD_create,
"medianflow": cv2.TrackerMedianFlow_create, "medianflow": cv2.TrackerMedianFlow_create,
"mosse": cv2.TrackerMOSSE_create "mosse": cv2.TrackerMOSSE_create
} }
# initialize OpenCV's special multi-object tracker # initialize OpenCV's special multi-object tracker
@ -33,68 +33,72 @@ trackers = cv2.MultiTracker_create()
# if a video path was not supplied, grab the reference to the web cam # if a video path was not supplied, grab the reference to the web cam
if not args.get("video", False): if not args.get("video", False):
print("[INFO] starting video stream...") print("[INFO] starting video stream...")
vs = VideoStream(src=0).start() vs = VideoStream(src=0).start()
time.sleep(1.0) time.sleep(1.0)
# otherwise, grab a reference to the video file # otherwise, grab a reference to the video file
else: else:
vs = cv2.VideoCapture(args["video"]) vs = cv2.VideoCapture(args["video"])
# loop over frames from the video stream # loop over frames from the video stream
while True: while True:
# grab the current frame, then handle if we are using a # grab the current frame, then handle if we are using a
# VideoStream or VideoCapture object # VideoStream or VideoCapture object
frame = vs.read() frame = vs.read()
frame = frame[1] if args.get("video", False) else frame frame = frame[1] if args.get("video", False) else frame
# check to see if we have reached the end of the stream # check to see if we have reached the end of the stream
if frame is None: if frame is None:
break break
# resize the frame (so we can process it faster) # resize the frame (so we can process it faster)
# frame = imutils.resize(frame, width=600) # frame = imutils.resize(frame, width=600)
# grab the updated bounding box coordinates (if any) for each # grab the updated bounding box coordinates (if any) for each
# object that is being tracked # object that is being tracked
(success, boxes) = trackers.update(frame) (success, boxes) = trackers.update(frame)
print('success', success) print('success', success)
print('boxes', boxes) print('boxes', boxes)
trackers.d
# loop over the bounding boxes and draw then on the frame
# loop over the bounding boxes and draw then on the frame for box in boxes:
for box in boxes: (x, y, w, h) = [int(v) for v in box]
(x, y, w, h) = [int(v) for v in box] cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
# objs = trackers.getObjects()
# show the output frame # for o in objs:
cv2.imshow("Frame", frame) # print(type(o), o)
key = cv2.waitKey(1) & 0xFF
# show the output frame
# if the 's' key is selected, we are going to "select" a bounding cv2.imshow("Frame", frame)
# box to track key = cv2.waitKey(1) & 0xFF
if key == ord("s"):
# select the bounding box of the object we want to track (make # if the 's' key is selected, we are going to "select" a bounding
# sure you press ENTER or SPACE after selecting the ROI) # box to track
box = cv2.selectROI("Frame", frame, fromCenter=False, if key == ord("s"):
showCrosshair=True) # select the bounding box of the object we want to track (make
# sure you press ENTER or SPACE after selecting the ROI)
# create a new object tracker for the bounding box and add it box = cv2.selectROI("Frame", frame, fromCenter=False,
# to our multi-object tracker showCrosshair=True)
tracker = OPENCV_OBJECT_TRACKERS[args["tracker"]]()
trackers.add(tracker, frame, box) # create a new object tracker for the bounding box and add it
# to our multi-object tracker
# if the `q` key was pressed, break from the loop tracker = OPENCV_OBJECT_TRACKERS[args["tracker"]]()
elif key == ord("q"): # print('handpick box', type(box), box)
break n = trackers.add(tracker, frame, box)
# print(type(n), n)
# if the `q` key was pressed, break from the loop
elif key == ord("q"):
break
# if we are using a webcam, release the pointer # if we are using a webcam, release the pointer
if not args.get("video", False): if not args.get("video", False):
vs.stop() vs.stop()
# otherwise, release the file pointer # otherwise, release the file pointer
else: else:
vs.release() vs.release()
# close all windows # close all windows
cv2.destroyAllWindows() cv2.destroyAllWindows()

109
examples/yolo_img_obj_detector.py

@ -0,0 +1,109 @@
"""USAGE
python examples/yolo_obj_detector.py \
-c ~/dev/obj-tracking/yolov3.cfg \
-w ~/dev/obj-tracking/yolov3.weights \
-cl ~/dev/obj-tracking/yolo/darknet/data/coco.names \
-i ~/dev/obj-tracking/person.jpg
python examples/yolo_obj_detector.py \
-c ~/syncthing/dropbox/tracking-obj/mytrain.cfg \
-w ~/syncthing/dropbox/tracking-obj/mytrain_final.weights \
-cl ~/syncthing/dropbox/tracking-obj/mytrain.names \
-i /media/sipp11/500BUP/handai_photos/test/6294.jpg
"""
import cv2
import argparse
import numpy as np
ap = argparse.ArgumentParser()
ap.add_argument("-i", "--image", required=True, help="path to input image")
ap.add_argument("-c", "--config", required=True, help="path to yolo config file")
ap.add_argument(
"-w", "--weights", required=True, help="path to yolo pre-trained weights"
)
ap.add_argument(
"-cl", "--classes", required=True, help="path to text file containing class names"
)
args = ap.parse_args()
def get_output_layers(net):
layer_names = net.getLayerNames()
output_layers = [layer_names[i[0] - 1] for i in net.getUnconnectedOutLayers()]
return output_layers
def draw_prediction(img, class_id, confidence, x, y, x_plus_w, y_plus_h):
label = str(classes[class_id])
color = COLORS[class_id]
cv2.rectangle(img, (x, y), (x_plus_w, y_plus_h), color, 2)
cv2.putText(img, label, (x - 10, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
image = cv2.imread(args.image)
Width = image.shape[1]
Height = image.shape[0]
scale = 0.00392
classes = None
with open(args.classes, "r") as f:
classes = [line.strip() for line in f.readlines()]
COLORS = np.random.uniform(0, 255, size=(len(classes), 3))
net = cv2.dnn.readNet(args.weights, args.config)
blob = cv2.dnn.blobFromImage(image, scale, (416, 416), (0, 0, 0), True, crop=False)
net.setInput(blob)
outs = net.forward(get_output_layers(net))
class_ids = []
confidences = []
boxes = []
conf_threshold = 0.5
nms_threshold = 0.4
for out in outs:
for detection in out:
scores = detection[5:]
class_id = np.argmax(scores)
confidence = scores[class_id]
if confidence > 0.5:
center_x = int(detection[0] * Width)
center_y = int(detection[1] * Height)
w = int(detection[2] * Width)
h = int(detection[3] * Height)
x = center_x - w / 2
y = center_y - h / 2
class_ids.append(class_id)
confidences.append(float(confidence))
boxes.append([x, y, w, h])
indices = cv2.dnn.NMSBoxes(boxes, confidences, conf_threshold, nms_threshold)
for i in indices:
i = i[0]
box = boxes[i]
x = box[0]
y = box[1]
w = box[2]
h = box[3]
draw_prediction(
image,
class_ids[i],
confidences[i],
round(x),
round(y),
round(x + w),
round(y + h),
)
cv2.imshow("object detection", image)
cv2.waitKey()
cv2.imwrite("object-detection.jpg", image)
cv2.destroyAllWindows()

283
examples/yolo_obj_detector.py

@ -1,103 +1,198 @@
"""USAGE """USAGE:
python examples/yolo_obj_detector.py \
-c ~/dev/obj-tracking/yolov3.cfg \ time python examples/test.py --input ~/Desktop/5min.mp4 -o output.mp4
-w ~/dev/obj-tracking/yolov3.weights \ time python examples/test.py --input ~/Desktop/5min.mp4 -l
-cl ~/dev/obj-tracking/yolo/darknet/data/coco.names \
-i ~/dev/obj-tracking/person.jpg
""" """
import cv2 # import the necessary packages
import argparse
import numpy as np import numpy as np
import argparse
import imutils
import time
import cv2
import os
# construct the argument parse and parse the arguments
ap = argparse.ArgumentParser() ap = argparse.ArgumentParser()
ap.add_argument("-i", "--image", required=True, help="path to input image") ap.add_argument("-i", "--input", required=True, help="path to input video")
ap.add_argument("-c", "--config", required=True, help="path to yolo config file") ap.add_argument("-o", "--output", required=False, help="path to output video")
ap.add_argument("-l", "--live", action='store_true', help="Show live detection")
# ap.add_argument("-y", "--yolo", required=True,
# help="base path to YOLO directory")
ap.add_argument( ap.add_argument(
"-w", "--weights", required=True, help="path to yolo pre-trained weights" "-c",
"--confidence",
type=float,
default=0.5,
help="minimum probability to filter weak detections",
) )
ap.add_argument( ap.add_argument(
"-cl", "--classes", required=True, help="path to text file containing class names" "-t",
"--threshold",
type=float,
default=0.3,
help="threshold when applyong non-maxima suppression",
) )
args = ap.parse_args() args = vars(ap.parse_args())
# load the COCO class labels our YOLO model was trained on
def get_output_layers(net): # labelsPath = os.path.sep.join([args["yolo"], "coco.names"])
layer_names = net.getLayerNames() labelsPath = "/home/sipp11/syncthing/dropbox/tracking-obj/mytrain.names"
output_layers = [layer_names[i[0] - 1] for i in net.getUnconnectedOutLayers()] LABELS = open(labelsPath).read().strip().split("\n")
return output_layers
# initialize a list of colors to represent each possible class label
np.random.seed(42)
def draw_prediction(img, class_id, confidence, x, y, x_plus_w, y_plus_h): COLORS = np.random.randint(0, 255, size=(len(LABELS), 3), dtype="uint8")
label = str(classes[class_id])
color = COLORS[class_id] # derive the paths to the YOLO weights and model configuration
cv2.rectangle(img, (x, y), (x_plus_w, y_plus_h), color, 2) # weightsPath = os.path.sep.join([args["yolo"], "yolov3.weights"])
cv2.putText(img, label, (x - 10, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2) # configPath = os.path.sep.join([args["yolo"], "yolov3.cfg"])
image = cv2.imread(args.image) weightsPath = "/home/sipp11/syncthing/dropbox/tracking-obj/mytrain_final.weights"
configPath = "/home/sipp11/syncthing/dropbox/tracking-obj/mytrain.cfg"
Width = image.shape[1]
Height = image.shape[0] # load our YOLO object detector trained on COCO dataset (80 classes)
scale = 0.00392 # and determine only the *output* layer names that we need from YOLO
print("[INFO] loading YOLO from disk...")
classes = None net = cv2.dnn.readNetFromDarknet(configPath, weightsPath)
ln = net.getLayerNames()
with open(args.classes, "r") as f: ln = [ln[i[0] - 1] for i in net.getUnconnectedOutLayers()]
classes = [line.strip() for line in f.readlines()]
COLORS = np.random.uniform(0, 255, size=(len(classes), 3)) # initialize the video stream, pointer to output video file, and
# frame dimensions
net = cv2.dnn.readNet(args.weights, args.config) vs = cv2.VideoCapture(args["input"])
blob = cv2.dnn.blobFromImage(image, scale, (416, 416), (0, 0, 0), True, crop=False) writer = None
(W, H) = (None, None)
net.setInput(blob)
# try to determine the total number of frames in the video file
outs = net.forward(get_output_layers(net)) try:
prop = (
class_ids = [] cv2.cv.CV_CAP_PROP_FRAME_COUNT if imutils.is_cv2() else cv2.CAP_PROP_FRAME_COUNT
confidences = []
boxes = []
conf_threshold = 0.5
nms_threshold = 0.4
for out in outs:
for detection in out:
scores = detection[5:]
class_id = np.argmax(scores)
confidence = scores[class_id]
if confidence > 0.5:
center_x = int(detection[0] * Width)
center_y = int(detection[1] * Height)
w = int(detection[2] * Width)
h = int(detection[3] * Height)
x = center_x - w / 2
y = center_y - h / 2
class_ids.append(class_id)
confidences.append(float(confidence))
boxes.append([x, y, w, h])
indices = cv2.dnn.NMSBoxes(boxes, confidences, conf_threshold, nms_threshold)
for i in indices:
i = i[0]
box = boxes[i]
x = box[0]
y = box[1]
w = box[2]
h = box[3]
draw_prediction(
image,
class_ids[i],
confidences[i],
round(x),
round(y),
round(x + w),
round(y + h),
) )
total = int(vs.get(prop))
cv2.imshow("object detection", image) print("[INFO] {} total frames in video".format(total))
cv2.waitKey()
# an error occurred while trying to determine the total
cv2.imwrite("object-detection.jpg", image) # number of frames in the video file
cv2.destroyAllWindows() except:
print("[INFO] could not determine # of frames in video")
print("[INFO] no approx. completion time can be provided")
total = -1
# loop over frames from the video file stream
while True:
# read the next frame from the file
(grabbed, frame) = vs.read()
# if the frame was not grabbed, then we have reached the end
# of the stream
if not grabbed:
break
# if the frame dimensions are empty, grab them
if W is None or H is None:
(H, W) = frame.shape[:2]
# construct a blob from the input frame and then perform a forward
# pass of the YOLO object detector, giving us our bounding boxes
# and associated probabilities
blob = cv2.dnn.blobFromImage(frame, 1 / 255.0, (416, 416), swapRB=True, crop=False)
net.setInput(blob)
start = time.time()
layerOutputs = net.forward(ln)
end = time.time()
# initialize our lists of detected bounding boxes, confidences,
# and class IDs, respectively
boxes = []
confidences = []
classIDs = []
# loop over each of the layer outputs
for output in layerOutputs:
# loop over each of the detections
for detection in output:
# extract the class ID and confidence (i.e., probability)
# of the current object detection
scores = detection[5:]
classID = np.argmax(scores)
confidence = scores[classID]
# filter out weak predictions by ensuring the detected
# probability is greater than the minimum probability
if confidence > args["confidence"]:
# scale the bounding box coordinates back relative to
# the size of the image, keeping in mind that YOLO
# actually returns the center (x, y)-coordinates of
# the bounding box followed by the boxes' width and
# height
box = detection[0:4] * np.array([W, H, W, H])
(centerX, centerY, width, height) = box.astype("int")
# use the center (x, y)-coordinates to derive the top
# and and left corner of the bounding box
x = int(centerX - (width / 2))
y = int(centerY - (height / 2))
# update our list of bounding box coordinates,
# confidences, and class IDs
boxes.append([x, y, int(width), int(height)])
confidences.append(float(confidence))
classIDs.append(classID)
# apply non-maxima suppression to suppress weak, overlapping
# bounding boxes
idxs = cv2.dnn.NMSBoxes(
boxes, confidences, args["confidence"], args["threshold"]
)
# ensure at least one detection exists
if len(idxs) > 0:
# loop over the indexes we are keeping
for i in idxs.flatten():
# extract the bounding box coordinates
(x, y) = (boxes[i][0], boxes[i][1])
(w, h) = (boxes[i][2], boxes[i][3])
# draw a bounding box rectangle and label on the frame
color = [int(c) for c in COLORS[classIDs[i]]]
cv2.rectangle(frame, (x, y), (x + w, y + h), color, 2)
text = "{}: {:.4f}".format(LABELS[classIDs[i]], confidences[i])
cv2.putText(
frame, text, (x, y - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2
)
if args["live"]:
cv2.imshow("Frame", frame)
key = cv2.waitKey(1) & 0xFF
# if the `q` key was pressed, break from the loop
if key == ord("q"):
break
if args["output"]:
# check if the video writer is None
if writer is None:
# initialize our video writer
fourcc = cv2.VideoWriter_fourcc(*"MJPG")
writer = cv2.VideoWriter(
args["output"], fourcc, 30, (frame.shape[1], frame.shape[0]), True
)
# some information on processing single frame
if total > 0:
elap = end - start
print("[INFO] single frame took {:.4f} seconds".format(elap))
print(
"[INFO] estimated total time to finish: {:.4f}".format(elap * total)
)
# write the output frame to disk
writer.write(frame)
# release the file pointers
print("[INFO] cleaning up...")
writer.release()
vs.release()

38
src/draw_area.py

@ -0,0 +1,38 @@
import argparse
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from PIL import Image
import numpy as np
from utils import AREAS
from matplotlib.cm import get_cmap
ap = argparse.ArgumentParser()
ap.add_argument("-i", "--input", required=True, help="path to input iamge")
args = vars(ap.parse_args())
im = np.array(Image.open(args['input']), dtype=np.uint8)
# Create figure and axes
fig, ax = plt.subplots(1)
# Display the image
ax.imshow(im)
for _area in AREAS:
a = dict(_area)
color = get_cmap('tab20')(a['id'])
x1, y1 = a['area'][0]
x2, y2 = a['area'][1]
w, h = x2 - x1, y2- y1
# Create a Rectangle patch
rect = patches.Rectangle(
a['area'][0], w, h, linewidth=1, edgecolor=color, facecolor="none"
)
# Add the patch to the Axes
ax.add_patch(rect)
ax.text(x1, y1 - 5, f"AREA: {a['id']}", fontsize=5, color=color)
plt.savefig("test.jpg", dpi=300)

19
src/log.py

@ -0,0 +1,19 @@
import logging
def init_logger():
logger = logging.getLogger() # get the root logger
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter("[%(levelname)s] %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
trace_handler = logging.FileHandler("trace.log", "w")
trace_handler.setLevel(logging.INFO)
# trace_formatter = logging.Formatter("%(asctime)s[%(levelname)s],%(message)s")
trace_formatter = logging.Formatter("%(message)s")
trace_handler.setFormatter(trace_formatter)
logger.addHandler(trace_handler)

566
src/main.py

@ -0,0 +1,566 @@
"""USAGE:
time python src/_detector.py --input ~/Desktop/5min.mp4 -o output.mp4
time python src/_detector.py --input ~/Desktop/5min.mp4 -l
"""
import sys
import time
import argparse
import pprint
import numpy as np
import imutils
import cv2
from utils import (
check_if_inside_the_boxes,
is_it_the_same_obj,
box_distance,
get_heading,
get_avg_heading,
OBJ_LEAVING_COND,
DONTCARE,
)
from log import init_logger, logging
init_logger()
pp = pprint.PrettyPrinter(indent=2)
# tracking
OPENCV_OBJECT_TRACKERS = {"csrt": cv2.TrackerCSRT_create}
# initialize OpenCV's special multi-object tracker
cv_trackers = cv2.MultiTracker_create()
trackers = []
finished = []
W4A = {} # this stands for "wait for arrival [at ...]"
# construct the argument parse and parse the arguments
ap = argparse.ArgumentParser()
ap.add_argument("-i", "--input", required=True, help="path to input video")
ap.add_argument("-o", "--output", required=False, help="path to output video")
ap.add_argument("-l", "--live", action="store_true", help="Show live detection")
# ap.add_argument("-y", "--yolo", required=True,
# help="base path to YOLO directory")
ap.add_argument(
"-c",
"--confidence",
type=float,
default=0.95,
help="minimum probability to filter weak detections",
)
ap.add_argument(
"-t",
"--threshold",
type=float,
default=0.3,
help="threshold when applyong non-maxima suppression",
)
args = vars(ap.parse_args())
# load the COCO class labels our YOLO model was trained on
# labelsPath = os.path.sep.join([args["yolo"], "coco.names"])
labelsPath = "../../syncthing/dropbox/tracking-obj/mytrain.names"
LABELS = open(labelsPath).read().strip().split("\n")
# 0 person, 1 wheelchair, 2 bicycle, 3 motorbike, 4 car, 5 bus, 6 truck
# initialize a list of colors to represent each possible class label
np.random.seed(42)
COLORS = np.random.randint(0, 255, size=(len(LABELS), 3), dtype="uint8")
# derive the paths to the YOLO weights and model configuration
weightsPath = "../../syncthing/dropbox/tracking-obj/mytrain_final.weights"
configPath = "../../syncthing/dropbox/tracking-obj/mytrain.cfg"
# load our YOLO object detector trained on COCO dataset (80 classes)
# and determine only the *output* layer names that we need from YOLO
logging.debug("[INFO] loading YOLO from disk...")
net = cv2.dnn.readNetFromDarknet(configPath, weightsPath)
ln = net.getLayerNames()
ln = [ln[i[0] - 1] for i in net.getUnconnectedOutLayers()]
start = end = 0
def detect_stuffs(_frame):
# construct a blob from the input frame and then perform a forward
# pass of the YOLO object detector, giving us our bounding boxes
# and associated probabilities
blob = cv2.dnn.blobFromImage(_frame, 1 / 255.0, (416, 416), swapRB=True, crop=False)
net.setInput(blob)
start = time.time()
layerOutputs = net.forward(ln)
end = time.time()
# initialize our lists of detected bounding boxes, confidences,
# and class IDs, respectively
boxes = []
confidences = []
classIDs = []
# loop over each of the layer outputs
for output in layerOutputs:
# print(f'[{_frame_count:08d}] output -> ', len(output))
# loop over each of the detections
for detection in output:
# extract the class ID and confidence (i.e., probability)
# of the current object detection
scores = detection[5:]
classID = np.argmax(scores)
confidence = scores[classID]
# filter out weak predictions by ensuring the detected
# probability is greater than the minimum probability
if confidence <= args["confidence"]:
continue
# scale the bounding box coordinates back relative to
# the size of the image, keeping in mind that YOLO
# actually returns the center (x, y)-coordinates of
# the bounding box followed by the boxes' width and
# height
box = detection[0:4] * np.array([W, H, W, H])
(centerX, centerY, width, height) = box.astype("int")
# use the center (x, y)-coordinates to derive the top
# and and left corner of the bounding box
x = int(centerX - (width / 2))
y = int(centerY - (height / 2))
# update our list of bounding box coordinates,
# confidences, and class IDs
boxes.append([x, y, int(width), int(height)])
confidences.append(float(confidence))
classIDs.append(classID)
# apply non-maxima suppression to suppress weak, overlapping
# bounding boxes
idxs = cv2.dnn.NMSBoxes(
boxes, confidences, args["confidence"], args["threshold"]
)
# ensure at least one detection exists
if len(idxs) == 0:
continue
return idxs, boxes, confidences, classIDs, start, end
# initialize the video stream, pointer to output video file, and
# frame dimensions
vs = cv2.VideoCapture(args["input"])
writer = None
(W, H) = (None, None)
# try to determine the total number of frames in the video file
try:
prop = (
cv2.cv.CV_CAP_PROP_FRAME_COUNT if imutils.is_cv2() else cv2.CAP_PROP_FRAME_COUNT
)
total = int(vs.get(prop))
print("[INFO] {} total frames in video".format(total))
# an error occurred while trying to determine the total
# number of frames in the video file
except:
print("[INFO] could not determine # of frames in video")
print("[INFO] no approx. completion time can be provided")
total = -1
_frame_count = 0
tracker_counter = 1
logging.debug("INIT")
# for xx in range(3996):
# (grabbed, frame) = vs.read()
# _frame_count += 1
# loop over frames from the video file stream
while True:
# read the next frame from the file
(grabbed, frame) = vs.read()
# if the frame was not grabbed, then we have reached the end
# of the stream
if not grabbed:
break
# if the frame dimensions are empty, grab them
if W is None or H is None:
(H, W) = frame.shape[:2]
_frame_count += 1
# grab the updated bounding box coordinates (if any) for each
# object that is being tracked
(success, boxes) = cv_trackers.update(frame)
# print("success", success)
# print("boxes", boxes)
print(f"[{_frame_count:08d}] ::")
# quit if unable to read the video file
if not success:
print("Failed to read video", grabbed, frame)
cv_trackers.clear()
cv_trackers = cv2.MultiTracker_create()
# print(" INIT AGAIN cv_tracker", type(cv_trackers), cv_trackers)
# get rid of them in trackers too
trackers = [_ for _ in trackers if _["id"] not in ut_ids]
# add tracker again
for _trckr in trackers:
__tkr = OPENCV_OBJECT_TRACKERS["csrt"]()
cv_trackers.add(__tkr, frame, tuple(_trckr["curr_position"]))
(success, boxes) = cv_trackers.update(frame)
# print('SUCCESS? ', success)
untracking = []
# loop over the bounding boxes and draw then on the frame
obj_cnt = len(boxes)
# print(f"obj_cnt: ", obj_cnt, ' | tracker #', len(trackers))
for idx in range(obj_cnt):
GONE = False
box = boxes[idx]
(x, y, w, h) = [int(v) for v in box]
logging.info(f"{_frame_count},{trackers[idx]['id']},{trackers[idx]['type']},POSITION,{x},{y},{w},{h}")
# check if size is growing? if more than twice then, untrack it
# TODO: make it static bound should be better
ow, oh = trackers[idx]["size"]
if ow / w > 2 or oh / h > 2:
print(f" {tk['id']} GROW_TOO_BIG")
trackers[idx]["status"] = "grow-too-big"
logging.info(
f"{_frame_count},{tk['id']},{tk['type']},GROW_TOO_BIG,{x},{y},{w},{h}"
)
untracking.append(trackers[idx])
continue
_last_pos = trackers[idx]["history"][0]
curr_distance = box_distance(box, _last_pos)
last_distance = trackers[idx]["distance"]
trackers[idx]["distance"] = curr_distance
trackers[idx]["curr_position"] = box # [int(v) for v in box]
trackers[idx]["history"].insert(0, box)
_last_idx = 2 if len(trackers[idx]["history"]) > 2 else 1
_x, _y, _w, _h = trackers[idx]["history"][_last_idx]
_heading = get_heading(_x, _y, x, y)
tk = trackers[idx]
STILL_DIST_PX = 1
if last_distance < STILL_DIST_PX and curr_distance < STILL_DIST_PX:
trackers[idx]["still"] += 1
else:
trackers[idx]["still"] = 0
if trackers[idx]["still"] == 0:
trackers[idx]["heading"] = [_heading] + trackers[idx]["heading"]
# trackers[idx]["heading"] = trackers[idx]["heading"][:20]
if trackers[idx]["still"] > 30 or x < 5 or x > 1250:
logging.info(
f"{_frame_count},{tk['id']},{tk['type']},LEFT,{x},{y},{w},{h},STILL"
)
untracking.append(trackers[idx])
if trackers[idx]["still"] > 10 and len(trackers[idx]["heading"]) < 5:
print(f" {tk['id']} LEFT - short-life")
logging.info(
f"{_frame_count},{tk['id']},{tk['type']},LEFT,{x},{y},{w},{h},SHORT_LIFE"
)
trackers[idx]["status"] = "short-life"
untracking.append(trackers[idx])
# check if it's hit the first
avg_heading = get_avg_heading(trackers[idx]["heading"])
h_count = len(tk["heading"])
if 5 < h_count and h_count < 10:
# enough to validate, but never an established one
DONTCARE_IDS = [_[0][1] for _ in DONTCARE]
if tk["origin"]["id"] in DONTCARE_IDS:
dc_id = DONTCARE_IDS.index(tk["origin"]["id"])
dc_dict = dict(DONTCARE[dc_id])
print(DONTCARE_IDS, tk["origin"]["id"], dc_id, dc_dict["heading"])
if avg_heading in dc_dict["heading"]:
print(f" {tk['id']} DONT-CARE condition")
logging.info(
f"{_frame_count},{tk['id']},{tk['type']},LEFT,{x},{y},{w},{h},WRONG_DETECTION"
)
trackers[idx]["status"] = "dont-care"
untracking.append(trackers[idx])
# TODO: assign id here! for anything that is -1 id
gid = None
if tk["id"] == -1 and 5 < h_count and h_count < 15:
_area_id = f"id_{tk['origin']['id']}"
if _area_id in W4A:
# check candidates which has "matched" opportunity too
_po = W4A[_area_id]["objects"]
_po = [_ for _ in _po if _frame_count > _[2]] # opportunity
_po = sorted(_po, key=lambda kk: kk[2]) # first one first
if _po:
trackers[idx]["id"] = gid = _po[0][0]["id"]
# remove this id out of next W4A
W4A[_area_id]["objects"] = [
_ for _ in W4A[_area_id]["objects"] if _[0]["id"] != gid
]
# print(f" --- {len(_po)} candicate: picked id={gid}")
# else:
# print(f" --- no candidate")
print(
f" {gid} RE-ENTERS (origin: {trackers[idx]['origin']['id']})"
)
logging.info(
f"{_frame_count},{tk['id']},{tk['type']},RE-ENTER,{x},{y},{w},{h}"
)
elif tk["id"] == -1 and h_count >= 15:
# assign an id
trackers[idx]["id"] = tracker_counter
tracker_counter += 1
logging.info(
f"{_frame_count},{tk['id']},{tk['type']},ENTER,{x},{y},{w},{h}"
)
print(f" {trackers[idx]['id']} ENTERS (& ID assigned)")
for lvng_cnd in OBJ_LEAVING_COND:
_origin = tk["origin"]["id"]
cond = dict(lvng_cnd)
if cond["origin_id"] != _origin:
continue
if cond["heading"] != "" and avg_heading != cond["heading"]:
continue
REACH_condition = False
if cond["x"] != -1:
# print("REACH condition: X", x, cond["x"])
if cond["heading"] in "NE":
_cond = cond["x"](y + w) if callable(cond["x"]) else cond["x"]
if x + w > _cond:
REACH_condition = True
elif cond["heading"] in "SW":
_cond = cond["x"](y + h) if callable(cond["x"]) else cond["x"]
if x < _cond:
REACH_condition = True
elif cond["heading"] == "":
_cond = cond["x"](y) if callable(cond["x"]) else cond["x"]
if x < _cond:
REACH_condition = True
elif cond["y"] != -1:
_cond = cond["y"](x + h) if callable(cond["y"]) else cond["y"]
# Don't have one yet
if cond["heading"] in "SW" and y + h > _cond:
print("REACH condition: Y", y + h, _cond)
REACH_condition = True
# pass
if not REACH_condition:
continue
# print("MATCH COND")
# pp.pprint(cond)
if not cond["next_area"]:
# left the frame
print(f" {tk['id']} LEFT from frame")
logging.info(
f"{_frame_count},{tk['id']},{tk['type']},LEFT,{x},{y},{w},{h}"
)
untracking.append(tk)
continue
_nid = f"id_{cond['next_area'][0]}"
# print(f"#{tk['id']} origin:#{_origin} to#{_nid}", end="")
print(f" {tk['id']} LEFT from {_origin} -> {_nid}")
logging.info(
f"{_frame_count},{tk['id']},{tk['type']},WAIT,{x},{y},{w},{h},SWITCH_ZONE:{_origin}:{_nid}"
)
if _nid not in W4A:
# print(f">>add AREA {_nid} to W4A", end="")
W4A[_nid] = {"objects": []}
# put this object to W4A for next area if doesn't exist
has_this = [_ for _ in W4A[_nid]["objects"] if _[0]["id"] == tk["id"]]
if not has_this:
GONE = True
# unit in frame
_expected_frame = _frame_count + cond["duration_to_next"]
W4A[_nid]["objects"].append((tk, _frame_count, _expected_frame))
untracking.append(tk)
# print(f'>>GONE - W#{len(W4A[_nid]["objects"])}')
# print(f' {_nid} objs: ')
# pp.pprint(W4A[_nid]["objects"])
# print(f' {_nid} untracking: ', [_['id'] for _ in untracking])
if GONE:
continue
print(
f"[{tk['id']}{tk['type'][:1]}] o#{tk['origin']['id']} (x,y)=({x},{y},{w},{h})"
f" | still #{tk['still']} | distance: "
f"{last_distance:.3f} -> {curr_distance:.3f}",
end="",
)
_htxt = ",".join(trackers[idx]["heading"][:18])
print(f"|heading:{_htxt}", end="")
print(f"|avg:{avg_heading}", end="")
print("")
# DRAW on FRAME
color = [int(c) for c in COLORS[0]]
cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
cv2.putText(
frame,
f"{tk['id'] if tk['id'] > 0 else '?'} - {tk['type']}",
(x, y - 5),
cv2.FONT_HERSHEY_SIMPLEX,
0.5,
color,
2,
)
# Cleanup Tracker
if untracking:
ut_ids = [_["id"] for _ in untracking] # untracking ids
# cv_trackers.clear()
cv_trackers.clear()
cv_trackers = None
time.sleep(1)
cv_trackers = cv2.MultiTracker_create()
# print(" cv_tracker", type(cv_trackers), cv_trackers)
# get rid of them in trackers too
trackers = [_ for _ in trackers if _["id"] not in ut_ids]
# add tracker again
for _trckr in trackers:
__tkr = OPENCV_OBJECT_TRACKERS["csrt"]()
cv_trackers.add(__tkr, frame, tuple(_trckr["curr_position"]))
# print(f"=== AFTER CLEANUP ---- UNTRACKING ===")
# print(f" cv #{len(cv_trackers.getObjects())} trackers #{len(trackers)}")
print(f" Total #{len(cv_trackers.getObjects())}")
# only detect once a sec
if _frame_count % 15 == 1:
idxs, boxes, confidences, classIDs, start, end = detect_stuffs(frame)
# loop over the indexes we are keeping
if len(idxs) == 0:
print("CAN NOT DETECT STUFFS??")
continue
if isinstance(idxs, tuple):
pp.pprint(idxs)
for i in idxs.flatten():
# extract the bounding box coordinates
(x, y) = (boxes[i][0], boxes[i][1])
(w, h) = (boxes[i][2], boxes[i][3])
_class = LABELS[classIDs[i]]
found_at = check_if_inside_the_boxes(x, y, w, h, _class)
if not found_at:
continue
# only do for person/wheelchair
if _class not in ["person", "wheelchair", "bicycle"]:
continue
# (1) check whether it's the same object as one in trackers
is_same = False
for t in trackers:
if _class != t["type"]:
continue
_x, _y, _w, _h = t["curr_position"]
# print(f"[{t['id']}] - {t['type']}")
is_same = is_it_the_same_obj(x, y, w, h, _x, _y, _w, _h, id=t["id"])
if is_same:
break
if not is_same:
# create tracker and add to multi-object tracker
_tracker = OPENCV_OBJECT_TRACKERS["csrt"]()
bbox = (x, y, w, h)
cv_trackers.add(_tracker, frame, bbox)
t = {
"id": -1, # NOTE: assign later; tracker_counter if gid is None else gid,
"type": _class,
"status": "",
"curr_position": bbox,
"size": (w, h),
"heading": [],
"origin": found_at,
"distance": -1,
"first_position": bbox,
"still": 0,
"history": [bbox],
}
trackers.append(t)
print(f" total #{len(trackers)}")
pp.pprint(t)
# print(f" i -> {i} ({x},{y}), {w},{h} ({x + w},{y + h})")
# _what = ",".join([LABELS[c] for c in classIDs])
# print(f"[{_frame_count:08d}] :: {_what}")
# CLEANUP
cleanups = []
for k in W4A:
print(f" CLEANUP[area {k}] ", end="")
for _o in W4A[k]["objects"]:
print(f" {_o[0]['id']} - {_o[1]} - {_o[2]}", end="")
if _o[2] < _frame_count:
continue
wf = _o[2] - _o[1]
wf = wf if wf < 30 * 6 else 180
if _o[2] - wf > _frame_count:
# get rid of this
cleanups.append(_o)
# remove this id out of next W4A
W4A[k]["objects"] = [
_ for _ in W4A[_area_id]["objects"] if _[0]["id"] != _o[0]["id"]
]
print(f" | tracked #{len(trackers)}")
for obj, _s, _e in cleanups:
print(
f" {obj['id']} CLEANED UP should found at {_e - _frame_count} ago"
)
if args["live"]:
cv2.imshow("Frame", frame)
key = cv2.waitKey(1) & 0xFF
if key == ord("w"):
while cv2.waitKey(1) & 0xFF != ord("w"):
pass
# if the `q` key was pressed, break from the loop
elif key == ord("q"):
break
if args["output"]:
# check if the video writer is None
if writer is None:
# initialize our video writer
fourcc = cv2.VideoWriter_fourcc(*"MJPG")
writer = cv2.VideoWriter(
args["output"], fourcc, 30, (frame.shape[1], frame.shape[0]), True
)
# some information on processing single frame
if total > 0:
elap = end - start
print("[INFO] single frame took {:.4f} seconds".format(elap))
print(
"[INFO] estimated total time to finish: {:.4f}".format(elap * total)
)
# write the output frame to disk
writer.write(frame)
# TODO: find a way to get rid of IMPOSSIBLE obj for example
# suddenly appear on area 7 then move up NORTH... -- this is incorrectly detected indeed.
# release the file pointers
print("[INFO] cleaning up...")
if writer:
writer.release()
vs.release()

291
src/utils.py

@ -0,0 +1,291 @@
import collections
import math
from collections import namedtuple
Rectangle = namedtuple("Rectangle", "xmin ymin xmax ymax")
def area(a, b):
# returns None if rectangles don't intersect
dx = min(a.xmax, b.xmax) - max(a.xmin, b.xmin)
dy = min(a.ymax, b.ymax) - max(a.ymin, b.ymin)
if (dx >= 0) and (dy >= 0):
return dx * dy
# detecting area
AREAS = [
[
("id", 1),
("area", ((0, 40), (120, 129))),
("target", ["car", "bus", "motorbike"]),
("next", [6]),
],
[
("id", 2),
("area", ((85, 0), (222, 74))),
("target", ["person", "bicycle"]),
("next", [7]),
],
[
("id", 3),
("area", ((38, 340), (130, 482))),
("target", ["person", "wheelchair"]),
("next", [5]),
],
[
("id", 4),
("area", ((96, 310), (145, 461))),
("target", ["person", "wheelchair"]),
],
[
("id", 5),
("area", ((286, 230), (441, 346))),
("target", ["person", "wheelchair"]),
("next", [8]),
],
[
("id", 6),
("area", ((421, 190), (555, 304))),
("target", ["car", "bus", "motorbike"]),
("next", []),
],
[
("id", 7),
("area", ((555, 170), (700, 295))),
("target", ["person", "wheelchair", "bicycle"]),
("next", [4]),
],
[
("id", 11),
("area", ((875, 179), (945, 278))),
("target", ["person", "wheelchair"]),
("next", [9]),
],
[
("id", 8),
("area", ((901, 223), (976, 342))),
("target", ["person", "wheelchair"]),
("next", [7, 5]),
],
[
("id", 9),
("area", ((1047, 229), (1120, 338))),
("target", ["person", "wheelchair"]),
("next", [8]),
],
[
("id", 10),
("area", ((1158, 200), (1230, 307))),
("target", ["person", "wheelchair"]),
("next", [9]),
],
]
OBJ_LEAVING_COND = [
# [
# ('origin_id', ),
# ('heading', ''),
# ('x', ),
# ('y', ),
# ('next_area', []),
# ('duration_to_next', )
# ],
[
("origin_id", 2),
("heading", ""),
("x", lambda y: (y - 112.6) / -0.2270),
("y", -1),
("next_area", [5, 7]),
("duration_to_next", 30 * 5.5),
],
[
("origin_id", 3),
("heading", "N"),
("x", lambda y: (y + 1993) / 15),
("y", -1),
("next_area", [5]),
("duration_to_next", 30 * 4),
],
[
("origin_id", 4),
("heading", "S"),
("x", 34),
("y", -1),
("next_area", []),
("duration_to_next", 30 * 0),
],
[
("origin_id", 5),
("heading", "N"),
("x", 715),
("y", -1),
("next_area", [11]),
("duration_to_next", 30 * 3),
],
[
("origin_id", 5),
("heading", "S"),
("x", lambda y: (y - 165.3) / 0.4143),
("y", -1),
("next_area", [4]),
("duration_to_next", 30 * 4),
],
[
("origin_id", 7),
("heading", "S"),
("x", -1),
("y", lambda x: 0.4 * x + 152),
("next_area", [4]),
("duration_to_next", 30 * 4),
],
[
("origin_id", 11),
("heading", "S"),
("x", 870),
("y", -1),
("next_area", [7]),
("duration_to_next", 30 * 2),
],
# [
# ('origin_id', ),
# ('heading', ''),
# ('x', ),
# ('y', ),
# ('next_area', []),
# ('duration_to_next', )
# ],
]
DONTCARE = (
(("origin_id", 5), ("heading", ["S", "W"])),
(("origin_id", 4), ("heading", ["S", "W"])),
(("origin_id", 7), ("heading", ["N", "E"])),
(("origin_id", 10), ("heading", ["N", "E"])),
)
def get_linear_equation(pnt1, pnt2):
if not (isinstance(pnt1, tuple) or isinstance(pnt2, tuple)):
return None
a = (pnt2[1] - pnt1[1]) / (pnt2[0] - pnt1[0])
b = pnt1[1] - a * pnt1[0]
return f"y = {a:.4f}x + {b:.1f}"
def get_avg_heading(headings):
latest = headings[:15]
_h = "".join(latest).replace("W", "").replace("E", "")
chars = collections.Counter(_h).most_common(10)
if chars:
return chars[0][0]
return None
def get_heading(x1, y1, x2, y2):
diff_x, diff_y = x2 - x1, y2 - y1
if diff_x > 0 and diff_y > 0:
return "NE"
if diff_x > 0 and diff_y == 0:
return "N"
if diff_x > 0:
return "NW"
if diff_y > 0:
return "SE"
if diff_y == 0:
return "S"
return "SW"
def box_distance(pos1, pos2):
x1, y1, w1, h1 = pos1
x2, y2, w2, h2 = pos2
tx1, tx2 = x1 + w1 / 2, x2 + w2 / 2
ty1, ty2 = y1 + h1 / 2, y2 + h2 / 2
return math.sqrt(math.pow(tx2 - tx1, 2) + math.pow(ty2 - ty1, 2))
def distance(x2, y2, x1, y1):
return math.sqrt(math.pow(x2 - x1, 2) + math.pow(y2 - y1, 2))
def check_if_inside_the_boxes(x, y, w, h, _type):
cx, cy = x + w / 2, y + h / 2
# print(cx, cy)
is_inside = False
for _box in AREAS:
if is_inside:
break
box = dict(_box)
((x1, y1), (x2, y2)) = box["area"]
# print(x1, cx, x2, ' -- ', y1, cy, y2, _type, box['target'])
if x1 < cx and cx < x2 and y1 < cy and cy < y2 and _type in box["target"]:
# print('inside --> ', _type, cx, cy, box['id'])
is_inside = True
# if diff_x < box_w
if is_inside:
# print("INSIDE!! this -> ", box)
return box
return False
def is_it_the_same_obj(x1, y1, w1, h1, i1, j1, w2, h2, **kwargs):
"""We would use the centroid location to check whether they are the same
object and of course, dimension too.
"""
_id = kwargs.get("id", None)
# if _id:
# print(" :: check against id:", _id, end="")
# if first coords are pretty much the same spot, then they are the same
if abs(x1 - i1) / x1 < 0.05 and abs(y1 - j1) / y1 < 0.05:
# print(" same 1st coords")
return True
DIMENSION_SHIFT = 0.15
# we have to use centroid !! from the experience
cx1, cy1, cx2, cy2 = x1 + w1 / 2, y1 + h1 / 2, i1 + w2 / 2, j1 + h2 / 2
c_dff_x, c_dff_y = abs(cx2 - cx1), abs(cy2 - cy1)
w_shift, h_shift = w1 * DIMENSION_SHIFT, h1 * DIMENSION_SHIFT
# print(" ::SAME:: shift", end="")
# print(f" | SHIFT w:{w_shift},h:{h_shift}", end="")
# print(f" | centroid {c_dff_x}, {c_dff_y}", end="")
if c_dff_x > w_shift and c_dff_y > h_shift:
# print(" ::SAME:: shift too much already -- NOT THE SAME")
return False
# if one inside the other
if i1 > x1 and (w1 - w2) > (i1 - x1) and j1 > y1 and h1 - h2 > j1 - y1:
# one is inside the other
# print(" ::SAME:: new one inside existing tracker")
return True
if x1 > i1 and (w2 - w1) > x1 - i1 and y1 > j1 and h2 - h1 > y1 - j1:
# one is inside the other
# print(" ::SAME:: existing tracker inside new tracker")
return True
# if it's 90% overlapped then, assumed it's the same
# ra = Rectangle(x1, y1, x1 + w1, y1 + h1)
# rb = Rectangle(i1, j1, i1 + w2, j1 + w2)
# print(f'overlapped area: {area(ra, rb)} 1:{w1*h1:.1f} 1:{w2*h2:.1f}')
# print(f'||one {x1},{y1},{w1},{h1} two {i1},{j1},{w2},{h2}||')
# if it's not inside the other, then we can use "size" if it's different
size1, size2 = w1 * h1, w2 * h2
# if size is larger than 20%, then it's not the same thing
# print(" ::SAME:: size")
if abs(size2 - size1) / size1 > 0.45:
# print(f" sz {size1}, {size2}, diff%{abs(size2 - size1)/size1}", end="")
# print(" ^^ too diff in size -- NOT THE SAME", end="")
return False
# print(" ::SAME:: last")
# print("")
return True
if __name__ == "__main__":
check_if_inside_the_boxes(461, 263, 24, 65, "person")
check_if_inside_the_boxes(8, 45, 172, 193, "bus")
check_if_inside_the_boxes(300, 300, 24, 65, "person")
Loading…
Cancel
Save