完整项目链接:GitHub - xfliu1998/kinectProject

Kinect V2相机标定及图像处理(Python)

Kinect V2相机标定

使用张正友标定法对Kinect V2相机标定。原理及参考代码见以下链接:

单目相机标定实现–张正友标定法:https://blog.csdn.net/weixin_43763292/article/details/128546103?spm=1001.2014.3001.5506
python利用opencv进行相机标定(完全版):https://blog.csdn.net/dgut_guangdian/article/details/107467070?spm=1001.2014.3001.5506
Kinect2.0-Python调用-PyKinect2:https://blog.csdn.net/zcz0101/article/details/115718427?spm=1001.2014.3001.5506

  1. 导入需要的包
1
2
3
4
5
6
7
8
9
10
import os
import time
import datetime
import glob as gb
import h5py
import keyboard
import cv2
import numpy as np
from utils.calibration import Calibrator
from utils.kinect import Kinect
  1. 拍摄不同角度的标定图。取照要求:不同角度不同位置拍摄(10-20)张标定图,棋盘格的视野在整张图的1/4~2/3左右。以下函数默认拍摄20张图,按空格键拍摄照片,按q键结束拍摄,拍的标定图会保存到指定路径。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def get_chess_image(image_num=20):
out_path = './data/chess/'
if not os.path.exists(out_path):
os.makedirs(out_path)
camera = cv2.VideoCapture(0)
i = 0
while 1:
(grabbed, img) = camera.read()
cv2.imshow('img', img)
if cv2.waitKey(1) & keyboard.is_pressed('space'): # press space to save an image
i += 1
firename = str(f'{out_path}img{str(i)}.jpg')
cv2.imwrite(firename, img)
print('write: ', firename)
if cv2.waitKey(1) & 0xFF == ord('q') or i == image_num: # press q to finish
break
  1. 对相机标定。函数将相机的内参和畸变系数输出至指定路径,同时可以获得每张标定图的外参矩阵。函数需要用到标定类,代码在utils包中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def camera_calibrator(shape_inner_corner=(11, 8), size_grid=0.025):
'''
:param shape_inner_corner: checkerboard size = 12*9
:param size_grid: the length of the sides of the checkerboard = 25mm
'''
chess_dir = "./data/chess"
out_path = "./data/dedistortion/chess"
if not os.path.exists(out_path):
os.makedirs(out_path)
# create calibrator
calibrator = Calibrator(chess_dir, shape_inner_corner, size_grid)
# calibrate the camera
mat_intri, coff_dis = calibrator.calibrate_camera()
np.save('./data/intrinsic_matrix.npy', mat_intri)
np.save('./data/distortion_cofficients.npy', coff_dis)
print("intrinsic matrix: \n {}".format(mat_intri))
print("distortion cofficients: \n {}".format(coff_dis)) # (k_1, k_2, p_1, p_2, k_3)
# dedistortion
calibrator.dedistortion(chess_dir, out_path)
return mat_intri, coff_dis
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import os
import glob
import cv2
import numpy as np


class Calibrator(object):
def __init__(self, img_dir, shape_inner_corner, size_grid, visualization=True):
"""
--parameters--
img_dir: the directory that save images for calibration, str
shape_inner_corner: the shape of inner corner, Array of int, (h, w)
size_grid: the real size of a grid in calibrator, float
visualization: whether visualization, bool
"""
self.img_dir = img_dir
self.shape_inner_corner = shape_inner_corner
self.size_grid = size_grid
self.visualization = visualization
self.mat_intri = None # intrinsic matrix
self.coff_dis = None # cofficients of distortion

# create the conner in world space
w, h = shape_inner_corner
# cp_int: save the coordinate of corner points in world space in 'int' form. like (0,0,0), ...., (10,7,0)
cp_int = np.zeros((w * h, 3), np.float32)
cp_int[:, :2] = np.mgrid[0:w, 0:h].T.reshape(-1, 2)
# cp_world: corner point in world space, save the coordinate of corner points in world space
self.cp_world = cp_int * size_grid

# images
self.img_paths = []
for extension in ["jpg", "png", "jpeg"]:
self.img_paths += glob.glob(os.path.join(img_dir, "*.{}".format(extension)))
assert len(self.img_paths), "No images for calibration found!"

def calibrate_camera(self):
w, h = self.shape_inner_corner
# criteria: only for subpix calibration, which is not used here
# criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 30, 0.001)
points_world = [] # the points in world space
points_pixel = [] # the points in pixel space (relevant to points_world)
for img_path in self.img_paths:
img = cv2.imread(img_path)
gray_img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# find the corners, cp_img: corner points in pixel space
ret, cp_img = cv2.findChessboardCorners(gray_img, (w, h), None)
# if ret is True, save
if ret:
# cv2.cornerSubPix(gray_img, cp_img, (11,11), (-1,-1), criteria)
points_world.append(self.cp_world)
points_pixel.append(cp_img)
# view the corners
if self.visualization:
cv2.drawChessboardCorners(img, (w, h), cp_img, ret)
_, img_name = os.path.split(img_path)
cv2.imwrite(os.path.join(self.img_dir, f"dedistortion/coner_detect/{img_name}"), img)
cv2.imshow('FoundCorners', img)
cv2.waitKey(500)

# calibrate the camera
ret, mat_intri, coff_dis, v_rot, v_trans = cv2.calibrateCamera(points_world, points_pixel, gray_img.shape[::-1], None, None)
# print("ret: {}".format(ret))
# print("intrinsic matrix: \n {}".format(mat_intri))
# # in the form of (k_1, k_2, p_1, p_2, k_3)
# print("distortion cofficients: \n {}".format(coff_dis))
# print("rotation vectors: \n {}".format(v_rot))
# print("translation vectors: \n {}".format(v_trans))

# calculate the error of reproject
total_error = 0
for i in range(len(points_world)):
points_pixel_repro, _ = cv2.projectPoints(points_world[i], v_rot[i], v_trans[i], mat_intri, coff_dis)
error = cv2.norm(points_pixel[i], points_pixel_repro, cv2.NORM_L2) / len(points_pixel_repro)
total_error += error
# print("Average error of reproject: {}".format(total_error / len(points_world)))

self.mat_intri = mat_intri
self.coff_dis = coff_dis
return mat_intri, coff_dis

def dedistortion(self, img_dir, save_dir):
if not os.path.exists(save_dir):
os.makedirs(save_dir)
# if not calibrated, calibrate first
if self.mat_intri is None:
assert self.coff_dis is None
self.calibrate_camera()

img_paths = []
for extension in ["jpg", "png", "jpeg"]:
img_paths += glob.glob(os.path.join(img_dir, "*.{}".format(extension)))

for img_path in img_paths:
_, img_name = os.path.split(img_path)
img = cv2.imread(img_path)
h, w = img.shape[0], img.shape[1]
newcameramtx, roi = cv2.getOptimalNewCameraMatrix(self.mat_intri, self.coff_dis, (w, h), 1, (w, h))
dst = cv2.undistort(img, self.mat_intri, self.coff_dis, None, newcameramtx)
# clip the data
# x, y, w, h = roi
# dst = dst[y:y+h, x:x+w]
cv2.imwrite(os.path.join(save_dir, img_name), dst)
# print("Dedistorted images have been saved to: {}".format(save_dir))

Kinect V2相机获取图像及图像预处理

  1. 使用Kinect V2相机拍摄RGB-D数据流。设定拍摄目标的名字name,运行函数后延时1s开始拍摄,按ESC键结束拍摄。拍摄的RGB-D数据首先保存到指定路径下的h5文件中,同时输出拍摄的时间。拍摄Kinect图像需要用到Kinect类,代码在utils包中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def capture_image(name):
file_name = './data/h5/' + name + '.h5'
if os.path.exists(file_name):
os.remove(file_name)
open(file_name, "x")
f = h5py.File(file_name, 'a')
i = 1
kinect = Kinect()

time.sleep(1)
s = time.time()
while 1:
data = kinect.get_the_data_of_color_depth_infrared_image()
# save data
f.create_dataset(str(i), data=data[0])
f.create_dataset(str(i+1), data=data[1])
i += 2
cv2.imshow('kinect', data[0])
cv2.waitKey(1)
if keyboard.is_pressed('esc'): # press ESC to exit
break
print('record time: %f s' % (time.time() - s))
return file_name
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
from pykinect2 import PyKinectV2
from pykinect2.PyKinectV2 import *
from pykinect2 import PyKinectRuntime
import numpy as np
import matplotlib.pyplot as plt
import cv2 as cv
import ctypes
import math
import time
import copy
import keyboard


class Kinect(object):

def __init__(self):
self._kinect = PyKinectRuntime.PyKinectRuntime(PyKinectV2.FrameSourceTypes_Color | PyKinectV2.FrameSourceTypes_Depth | PyKinectV2.FrameSourceTypes_Infrared)
self.color_frame = None
self.depth_frame = None
self.infrared_frame = None
self.w_color = 1920
self.h_color = 1080
self.w_depth = 512
self.h_depth = 424
self.csp_type = _ColorSpacePoint * np.int(self.w_color * self.h_color)
self.csp = ctypes.cast(self.csp_type(), ctypes.POINTER(_DepthSpacePoint))
self.color = None
self.depth = None
self.infrared = None
self.first_time = True

# Copying this image directly in python is not as efficient as getting another frame directly from C++
"""Get the latest color data"""
def get_the_last_color(self):
if self._kinect.has_new_color_frame():
# the obtained image data is 2d and needs to be converted to the desired format
frame = self._kinect.get_last_color_frame()
# return 4 channels, and one channel is not registered
gbra = frame.reshape([self._kinect.color_frame_desc.Height, self._kinect.color_frame_desc.Width, 4])
# remove color image data, the default is that the mirror image needs to be flipped
color_frame = gbra[..., :3][:, ::-1, :]
return color_frame

"""Get the latest depth data"""
def get_the_last_depth(self):
if self._kinect.has_new_depth_frame():
frame = self._kinect.get_last_depth_frame()
depth_frame_all = frame.reshape([self._kinect.depth_frame_desc.Height, self._kinect.depth_frame_desc.Width])
self.depth_frame = depth_frame_all[:, ::-1]
return self.depth_frame

"""Get the latest infrared data"""
def get_the_last_infrared(self, Infrared_threshold = 16000):
if self._kinect.has_new_infrared_frame():
frame = self._kinect.get_last_infrared_frame()
image_infrared_all = frame.reshape([self._kinect.infrared_frame_desc.Height, self._kinect.infrared_frame_desc.Width])
# image_infrared_all[image_infrared_all > Infrared_threshold] = 0
# image_infrared_all = image_infrared_all / Infrared_threshold * 255
self.infrared_frame = image_infrared_all[:, ::-1]
return self.infrared_frame

"""Match the depth pixels into the color image"""
def map_depth_points_to_color_points(self, depth_points):
depth_points = [self.map_depth_point_to_color_point(x) for x in depth_points]
return depth_points

def map_depth_point_to_color_point(self, depth_point):
global valid
depth_point_to_color = copy.deepcopy(depth_point)
n = 0
while 1:
self.get_the_last_depth()
self.get_the_last_color()
color_point = self._kinect._mapper.MapDepthPointToColorSpace(_DepthSpacePoint(511 - depth_point_to_color[1], depth_point_to_color[0]), self.depth_frame[depth_point_to_color[0], 511 - depth_point_to_color[1]])
if math.isinf(float(color_point.y)):
n += 1
if n >= 10000:
color_point = [0, 0]
break
else:
color_point = [np.int0(color_point.y), 1920 - np.int0(color_point.x)] # image coordinates, human eye Angle
valid += 1
print('valid number:', valid)
break
return color_point

"""Map an array of color pixels into a depth image"""
def map_color_points_to_depth_points(self, color_points):
self.get_the_last_depth()
self.get_the_last_color()
self._kinect._mapper.MapColorFrameToDepthSpace(ctypes.c_uint(512 * 424), self._kinect._depth_frame_data, ctypes.c_uint(1920 * 1080), self.csp)
depth_points = [self.map_color_point_to_depth_point(x, True) for x in color_points]
return depth_points

def map_color_point_to_depth_point(self, color_point, if_call_flg=False):
n = 0
color_point_to_depth = copy.deepcopy(color_point)
color_point_to_depth[1] = 1920 - color_point_to_depth[1]
while 1:
self.get_the_last_depth()
self.get_the_last_color()
if not if_call_flg:
self._kinect._mapper.MapColorFrameToDepthSpace(ctypes.c_uint(512 * 424), self._kinect._depth_frame_data, ctypes.c_uint(1920 * 1080), self.csp)
if math.isinf(float(self.csp[color_point_to_depth[0] * 1920 + color_point_to_depth[1] - 1].y)) \
or np.isnan(self.csp[color_point_to_depth[0] * 1920 + color_point_to_depth[1] - 1].y):
n += 1
if n >= 10000:
print('Color mapping depth, invalid points')
depth_point = [0, 0]
break
else:
try:
depth_point = [np.int0(self.csp[color_point_to_depth[0] * 1920 + color_point_to_depth[1] - 1].y),
np.int0(self.csp[color_point_to_depth[0] * 1920 + color_point_to_depth[1] - 1].x)]
except OverflowError as e:
print('Color mapping depth, invalid points')
depth_point = [0, 0]
break
depth_point[1] = 512 - depth_point[1]
return depth_point

"""Get the latest color and depth images as well as infrared images"""
def get_the_data_of_color_depth_infrared_image(self):
time_s = time.time()
if self.first_time:
while 1:
n = 0
self.color = self.get_the_last_color()
n += 1

self.depth = self.get_the_last_depth()
n += 1

if self._kinect.has_new_infrared_frame():
frame = self._kinect.get_last_infrared_frame()
image_infrared_all = frame.reshape([self._kinect.depth_frame_desc.Height, self._kinect.depth_frame_desc.Width])
self.infrared = image_infrared_all[:, ::-1]
n += 1

t = time.time() - time_s
if n == 3:
self.first_time = False
break
elif t > 5:
print('No image data is obtained, please check that the Kinect2 connection is normal')
break
else:
if self._kinect.has_new_color_frame():
frame = self._kinect.get_last_color_frame()
gbra = frame.reshape([self._kinect.color_frame_desc.Height, self._kinect.color_frame_desc.Width, 4])
gbr = gbra[:, :, :3][:, ::-1, :]
self.color = gbr

if self._kinect.has_new_depth_frame():
frame = self._kinect.get_last_depth_frame()
image_depth_all = frame.reshape([self._kinect.depth_frame_desc.Height, self._kinect.depth_frame_desc.Width])
depth = image_depth_all[:, ::-1]
self.depth = depth

if self._kinect.has_new_infrared_frame():
frame = self._kinect.get_last_infrared_frame()
image_infrared_all = frame.reshape([self._kinect.depth_frame_desc.Height, self._kinect.depth_frame_desc.Width])
self.infrared = image_infrared_all[:, ::-1]

return self.color, self.depth, self.infrared


if __name__ == '__main__':
i = 1
kinect = Kinect()
s = time.time()

while 1:
data = kinect.get_the_data_of_color_depth_infrared_image()
img = data[0]
mat_intri = np.load('./data/intrinsic_matrix.npy')
coff_dis = np.load('./data/distortion_cofficients.npy')
h, w = img.shape[0], img.shape[1]
newcameramtx, roi = cv.getOptimalNewCameraMatrix(mat_intri, coff_dis, (w, h), 1, (w, h))
dst = cv.undistort(img, mat_intri, coff_dis, None, newcameramtx)
dst = cv.cvtColor(dst, cv.COLOR_BGR2RGB)
plt.imshow(dst/255)
plt.show()

"""
# store the mapping matrix in an npy file
color_points = np.zeros((512 * 424, 2), dtype=np.int) # valid number: 207662
k = 0
for i in range(424):
for j in range(512):
color_points[k] = [i, j]
k += 1
depth_map_color = kinect.map_depth_points_to_color_points(color_points)

# turn to 0 that is not in the mapping range
depth_map_color[..., 0] = np.where(depth_map_color[..., 0] >= 1080, 0, depth_map_color[..., 0])
depth_map_color[..., 0] = np.where(depth_map_color[..., 0] < 0, 0, depth_map_color[..., 0])
depth_map_color[..., 1] = np.where(depth_map_color[..., 1] >= 1920, 0, depth_map_color[..., 1])
depth_map_color[..., 1] = np.where(depth_map_color[..., 1] < 0, 0, depth_map_color[..., 1])

# interpolated fill 0 values
zeros = np.array(list(set(np.where(depth_map_color == 0)[0])))
for zero in zeros:
if zero < 40 * 512 or zero > 360 * 512:
continue
j = 1
while depth_map_color[zero - j].any() == 0 or depth_map_color[zero + j].any() == 0:
j += 1
depth_map_color[zero][0] = (depth_map_color[zero - j][0] + depth_map_color[zero + j][0]) // 2
depth_map_color[zero][1] = (depth_map_color[zero - j][1] + depth_map_color[zero + j][1]) // 2
np.save('full_depth_map_color.npy', full_depth_map_color)
"""

depth_map_color = np.load('./data/full_depth_map_color.npy') # (424*512, 2)
full_depth_map_color = depth_map_color
map_color = dst[full_depth_map_color[..., 0], full_depth_map_color[..., 1]] # (424*512, 2)
map_color = map_color.reshape((424, 512, 3))
plt.imshow(map_color/255)
plt.show()

if keyboard.is_pressed('esc'):
break
  1. 彩色图去畸变。利用上文计算的相机参数对每一张彩色图去除畸变,同时保存去畸变后的彩色图。需要用到以下两个函数:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def dedistortion(mat_intri, coff_dis, img):
h, w = img.shape[0], img.shape[1]
newcameramtx, roi = cv2.getOptimalNewCameraMatrix(mat_intri, coff_dis, (w, h), 1, (w, h))
dst = cv2.undistort(img, mat_intri, coff_dis, None, newcameramtx)
return dst

def save_image(data, name, type, dir='test'):
global num
idx = str(num).zfill(6)
if dir == 'raw':
color_path = './data/' + name + '/color'
depth_path = './data/' + name + '/depth'
else:
color_path = "./test_data/" + name + '/color'
depth_path = "./test_data/" + name + '/depth'
if not os.path.exists(color_path):
os.makedirs(color_path)
if not os.path.exists(depth_path):
os.makedirs(depth_path)
if type == 'color':
cv2.imwrite(color_path + '/color-' + idx + '.png', data)
else:
cv2.imwrite(depth_path + '/depth-' + idx + '.png', data)
if dir == 'test':
num += 1
  1. 深度图彩色图对齐。kinect相机的颜色相机和深度传感器之间存在距离,导致深度图和颜色图像素不是一一对应,需要对齐,使用以下函数对齐color和depth,对齐后将图像中心裁剪为尺寸(480, 360)。但是对齐后的颜色图可以会出现对齐错误(对齐npy文件存储在data包中,获取方式见kinect.py的main函数)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def center_crop(img,  crop_size):
tw, th = crop_size
h, w = img.shape[0], img.shape[1]
if len(img.shape) == 2:
crop_img = img[(h - th) // 2:(h + th) // 2, (w - tw) // 2:(w + tw) // 2]
else:
crop_img = img[(h - th) // 2:(h + th) // 2, (w - tw) // 2:(w + tw) // 2, :]
return crop_img

def match_color_depth(color, depth):
# crop+resize is worse
full_depth_map_color = np.load('data/full_depth_map_color.npy')
map_color = color[full_depth_map_color[..., 0], full_depth_map_color[..., 1]] # (424*512, 2)
map_color = map_color.reshape((424, 512, 3))
# 512 * 424
color = center_crop(map_color, (480, 360))
depth = center_crop(depth, (480, 360))
# plt.subplot(1, 2, 1)
# plt.imshow(color)
# plt.subplot(1, 2, 2)
# plt.imshow(depth)
# plt.show()
return color, depth
  1. 输出color视频。将校正后的color图像流输出为指定路径下的视频。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def trans_video(image_path, video_name, fps, res, type):
img_path = gb.glob(image_path + "/*.png")
videoWriter = cv2.VideoWriter(video_name, cv2.VideoWriter_fourcc(*'DIVX'), fps, res)
for path in img_path:
img = cv2.imread(path)
img = cv2.resize(img, res)
videoWriter.write(img)
print('transform ' + type + ' video done!')

def save_video(name):
currentdate = datetime.datetime.now()
file_name = str(currentdate.day) + "." + str(currentdate.month) + "." + str(currentdate.hour) + "." + str(currentdate.minute)
color_path = './data/' + name + '/color'
# depth_path = './data/' + name + '/depth'
video_path = './data/' + name + '/video'
if not os.path.exists(video_path):
os.makedirs(video_path)
trans_video(color_path, video_path + '/color-' + file_name + '.avi', 30, (1920, 1080), 'color')
# trans_video(depth_path, depth_path + '/depth-' + file_name + '.avi', 30, (512, 424), 'depth')
  1. 完整调用。调用以上程序的主程序如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
if __name__ == '__main__':
# 1. shooting calibration images
get_chess_image()

# 2. camera calibration
mat_intri, coff_dis = camera_calibrator()
# mat_intri = np.load('./data/intrinsic_matrix.npy')
# coff_dis = np.load('./data/distortion_cofficients.npy')

# 3. capture object images to save h5 file
name = 'object'
file_name = capture_image(name)

f = h5py.File(file_name, 'r')
num = 0
for i in range(1, len(f.keys()), 2):
color = f[str(i)][:]
depth = f[str(i + 1)][:]

# 4. data process: dedistortion; match color and depth images; save color/depth images
dedistortion_color = dedistortion(mat_intri, coff_dis, color)
save_image(dedistortion_color, name, 'color', 'raw')
save_image(depth, name, 'depth', 'raw')
new_color, new_depth = match_color_depth(dedistortion_color, depth)
save_image(new_color, name, 'color', 'test')
save_image(new_depth, name, 'depth', 'test')
f.close()
print('image save done!')

# 5. convert to video
save_video(name)

基于RGB-D数据和掩码的实时三维重建

输入指定数量的RGB-D图像及图像掩码(mask可以使用SAM模型获取)的路径,代码可以实现掩码内空洞点云的填充、任意倍率上采样,然后进行结果的open3d非阻塞可视化,重建图保存在指定路径。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
import os
import time
import math
import numpy as np
from skimage import io
from sklearn.preprocessing import StandardScaler, MinMaxScaler
import cv2
import open3d as o3d


def cal_neighbors():
# preprocessing neighborhood point coordinates
neighbors_list = dict()
for r in range(1, 9):
neighbors = []
for x in range(-r, r + 1):
for y in range(-r, r + 1):
if math.sqrt(x ** 2 + y ** 2) <= r:
neighbors.append([x, y])
neighbors.sort(key=lambda x: x[0] ** 2 + x[1] ** 2)
neighbors_list[r] = neighbors[1:]
print(neighbors_list)


neighbors_list = {1: [[-1, 0], [0, -1], [0, 1], [1, 0]],
2: [[-1, 0], [0, -1], [0, 1], [1, 0], [-1, -1], [-1, 1], [1, -1], [1, 1], [-2, 0], [0, -2], [0, 2], [2, 0]],
3: [[-1, 0], [0, -1], [0, 1], [1, 0], [-1, -1], [-1, 1], [1, -1], [1, 1], [-2, 0], [0, -2], [0, 2], [2, 0], [-2, -1], [-2, 1], [-1, -2], [-1, 2], [1, -2], [1, 2], [2, -1], [2, 1], [-2, -2], [-2, 2], [2, -2], [2, 2], [-3, 0], [0, -3], [0, 3], [3, 0]],
4: [[-1, 0], [0, -1], [0, 1], [1, 0], [-1, -1], [-1, 1], [1, -1], [1, 1], [-2, 0], [0, -2], [0, 2], [2, 0], [-2, -1], [-2, 1], [-1, -2], [-1, 2], [1, -2], [1, 2], [2, -1], [2, 1], [-2, -2], [-2, 2], [2, -2], [2, 2], [-3, 0], [0, -3], [0, 3], [3, 0], [-3, -1], [-3, 1], [-1, -3], [-1, 3], [1, -3], [1, 3], [3, -1], [3, 1], [-3, -2], [-3, 2], [-2, -3], [-2, 3], [2, -3], [2, 3], [3, -2], [3, 2], [-4, 0], [0, -4], [0, 4], [4, 0]],
5: [[-1, 0], [0, -1], [0, 1], [1, 0], [-1, -1], [-1, 1], [1, -1], [1, 1], [-2, 0], [0, -2], [0, 2], [2, 0], [-2, -1], [-2, 1], [-1, -2], [-1, 2], [1, -2], [1, 2], [2, -1], [2, 1], [-2, -2], [-2, 2], [2, -2], [2, 2], [-3, 0], [0, -3], [0, 3], [3, 0], [-3, -1], [-3, 1], [-1, -3], [-1, 3], [1, -3], [1, 3], [3, -1], [3, 1], [-3, -2], [-3, 2], [-2, -3], [-2, 3], [2, -3], [2, 3], [3, -2], [3, 2], [-4, 0], [0, -4], [0, 4], [4, 0], [-4, -1], [-4, 1], [-1, -4], [-1, 4], [1, -4], [1, 4], [4, -1], [4, 1], [-3, -3], [-3, 3], [3, -3], [3, 3], [-4, -2], [-4, 2], [-2, -4], [-2, 4], [2, -4], [2, 4], [4, -2], [4, 2], [-5, 0], [-4, -3], [-4, 3], [-3, -4], [-3, 4], [0, -5], [0, 5], [3, -4], [3, 4], [4, -3], [4, 3], [5, 0]],
6: [[-1, 0], [0, -1], [0, 1], [1, 0], [-1, -1], [-1, 1], [1, -1], [1, 1], [-2, 0], [0, -2], [0, 2], [2, 0], [-2, -1], [-2, 1], [-1, -2], [-1, 2], [1, -2], [1, 2], [2, -1], [2, 1], [-2, -2], [-2, 2], [2, -2], [2, 2], [-3, 0], [0, -3], [0, 3], [3, 0], [-3, -1], [-3, 1], [-1, -3], [-1, 3], [1, -3], [1, 3], [3, -1], [3, 1], [-3, -2], [-3, 2], [-2, -3], [-2, 3], [2, -3], [2, 3], [3, -2], [3, 2], [-4, 0], [0, -4], [0, 4], [4, 0], [-4, -1], [-4, 1], [-1, -4], [-1, 4], [1, -4], [1, 4], [4, -1], [4, 1], [-3, -3], [-3, 3], [3, -3], [3, 3], [-4, -2], [-4, 2], [-2, -4], [-2, 4], [2, -4], [2, 4], [4, -2], [4, 2], [-5, 0], [-4, -3], [-4, 3], [-3, -4], [-3, 4], [0, -5], [0, 5], [3, -4], [3, 4], [4, -3], [4, 3], [5, 0], [-5, -1], [-5, 1], [-1, -5], [-1, 5], [1, -5], [1, 5], [5, -1], [5, 1], [-5, -2], [-5, 2], [-2, -5], [-2, 5], [2, -5], [2, 5], [5, -2], [5, 2], [-4, -4], [-4, 4], [4, -4], [4, 4], [-5, -3], [-5, 3], [-3, -5], [-3, 5], [3, -5], [3, 5], [5, -3], [5, 3], [-6, 0], [0, -6], [0, 6], [6, 0]],
7: [[-1, 0], [0, -1], [0, 1], [1, 0], [-1, -1], [-1, 1], [1, -1], [1, 1], [-2, 0], [0, -2], [0, 2], [2, 0], [-2, -1], [-2, 1], [-1, -2], [-1, 2], [1, -2], [1, 2], [2, -1], [2, 1], [-2, -2], [-2, 2], [2, -2], [2, 2], [-3, 0], [0, -3], [0, 3], [3, 0], [-3, -1], [-3, 1], [-1, -3], [-1, 3], [1, -3], [1, 3], [3, -1], [3, 1], [-3, -2], [-3, 2], [-2, -3], [-2, 3], [2, -3], [2, 3], [3, -2], [3, 2], [-4, 0], [0, -4], [0, 4], [4, 0], [-4, -1], [-4, 1], [-1, -4], [-1, 4], [1, -4], [1, 4], [4, -1], [4, 1], [-3, -3], [-3, 3], [3, -3], [3, 3], [-4, -2], [-4, 2], [-2, -4], [-2, 4], [2, -4], [2, 4], [4, -2], [4, 2], [-5, 0], [-4, -3], [-4, 3], [-3, -4], [-3, 4], [0, -5], [0, 5], [3, -4], [3, 4], [4, -3], [4, 3], [5, 0], [-5, -1], [-5, 1], [-1, -5], [-1, 5], [1, -5], [1, 5], [5, -1], [5, 1], [-5, -2], [-5, 2], [-2, -5], [-2, 5], [2, -5], [2, 5], [5, -2], [5, 2], [-4, -4], [-4, 4], [4, -4], [4, 4], [-5, -3], [-5, 3], [-3, -5], [-3, 5], [3, -5], [3, 5], [5, -3], [5, 3], [-6, 0], [0, -6], [0, 6], [6, 0], [-6, -1], [-6, 1], [-1, -6], [-1, 6], [1, -6], [1, 6], [6, -1], [6, 1], [-6, -2], [-6, 2], [-2, -6], [-2, 6], [2, -6], [2, 6], [6, -2], [6, 2], [-5, -4], [-5, 4], [-4, -5], [-4, 5], [4, -5], [4, 5], [5, -4], [5, 4], [-6, -3], [-6, 3], [-3, -6], [-3, 6], [3, -6], [3, 6], [6, -3], [6, 3], [-7, 0], [0, -7], [0, 7], [7, 0]],
8: [[-1, 0], [0, -1], [0, 1], [1, 0], [-1, -1], [-1, 1], [1, -1], [1, 1], [-2, 0], [0, -2], [0, 2], [2, 0], [-2, -1], [-2, 1], [-1, -2], [-1, 2], [1, -2], [1, 2], [2, -1], [2, 1], [-2, -2], [-2, 2], [2, -2], [2, 2], [-3, 0], [0, -3], [0, 3], [3, 0], [-3, -1], [-3, 1], [-1, -3], [-1, 3], [1, -3], [1, 3], [3, -1], [3, 1], [-3, -2], [-3, 2], [-2, -3], [-2, 3], [2, -3], [2, 3], [3, -2], [3, 2], [-4, 0], [0, -4], [0, 4], [4, 0], [-4, -1], [-4, 1], [-1, -4], [-1, 4], [1, -4], [1, 4], [4, -1], [4, 1], [-3, -3], [-3, 3], [3, -3], [3, 3], [-4, -2], [-4, 2], [-2, -4], [-2, 4], [2, -4], [2, 4], [4, -2], [4, 2], [-5, 0], [-4, -3], [-4, 3], [-3, -4], [-3, 4], [0, -5], [0, 5], [3, -4], [3, 4], [4, -3], [4, 3], [5, 0], [-5, -1], [-5, 1], [-1, -5], [-1, 5], [1, -5], [1, 5], [5, -1], [5, 1], [-5, -2], [-5, 2], [-2, -5], [-2, 5], [2, -5], [2, 5], [5, -2], [5, 2], [-4, -4], [-4, 4], [4, -4], [4, 4], [-5, -3], [-5, 3], [-3, -5], [-3, 5], [3, -5], [3, 5], [5, -3], [5, 3], [-6, 0], [0, -6], [0, 6], [6, 0], [-6, -1], [-6, 1], [-1, -6], [-1, 6], [1, -6], [1, 6], [6, -1], [6, 1], [-6, -2], [-6, 2], [-2, -6], [-2, 6], [2, -6], [2, 6], [6, -2], [6, 2], [-5, -4], [-5, 4], [-4, -5], [-4, 5], [4, -5], [4, 5], [5, -4], [5, 4], [-6, -3], [-6, 3], [-3, -6], [-3, 6], [3, -6], [3, 6], [6, -3], [6, 3], [-7, 0], [0, -7], [0, 7], [7, 0], [-7, -1], [-7, 1], [-5, -5], [-5, 5], [-1, -7], [-1, 7], [1, -7], [1, 7], [5, -5], [5, 5], [7, -1], [7, 1], [-6, -4], [-6, 4], [-4, -6], [-4, 6], [4, -6], [4, 6], [6, -4], [6, 4], [-7, -2], [-7, 2], [-2, -7], [-2, 7], [2, -7], [2, 7], [7, -2], [7, 2], [-7, -3], [-7, 3], [-3, -7], [-3, 7], [3, -7], [3, 7], [7, -3], [7, 3], [-6, -5], [-6, 5], [-5, -6], [-5, 6], [5, -6], [5, 6], [6, -5], [6, 5], [-8, 0], [0, -8], [0, 8], [8, 0]]}


def get_mask_from_dist(image, d=1):
mask = np.where(image[..., -1] <= d, image[..., -1], 0)
# mask1 = load_mask(data_path + '/mask/0.png')[..., 0] # (h, w)
# mask2 = load_mask(data_path + '/mask/1.png')[..., 0] # (h, w)
# mask += mask1
# mask += mask2
mask = np.tile(np.expand_dims(mask, axis=-1), 3)
mask_data = np.where(mask, 255, 0)
return mask_data


def crop(img, crop_size, crop_type):
tw, th = crop_size
h, w = img.shape[0], img.shape[1]
if crop_type == 'center':
if len(img.shape) == 2:
crop_img = img[(h - th) // 2:(h + th) // 2, (w - tw) // 2:(w + tw) // 2]
else:
crop_img = img[(h - th) // 2:(h + th) // 2, (w - tw) // 2:(w + tw) // 2, :]
# down sample: INTER_NEAREST INTER_AREA
elif crop_type == 'cv2resize':
crop_img = cv2.resize(img, crop_size, interpolation=cv2.INTER_NEAREST)
else: # INTER_LINEAR
crop_img = cv2.resize(img, crop_size)
return crop_img


def normalize(img, normal_type):
h, w = img.shape[0], img.shape[1]
source_color_img = img[:, :, :3]
source_depth_img = img[:, :, 3:]
if normal_type == 'standard_scaler':
color_img = source_color_img.reshape(h * w, 3)
depth_img = source_depth_img.reshape(h * w, 3)
std_sca1 = StandardScaler()
std_sca2 = StandardScaler()
color_img_sca = std_sca1.fit_transform(color_img)
depth_img_sca = std_sca2.fit_transform(depth_img)
color_img_sca = color_img_sca.reshape(h, w, 3)
depth_img_sca = depth_img_sca.reshape(h, w, 3)
elif normal_type == 'minMax_scalar':
color_img = source_color_img.reshape(h * w, 3)
depth_img = source_depth_img.reshape(h * w, 3)
mm_sca1 = MinMaxScaler()
mm_sca2 = MinMaxScaler()
color_img_sca = mm_sca1.fit_transform(color_img)
depth_img_sca = mm_sca2.fit_transform(depth_img)
color_img_sca = color_img_sca.reshape(h, w, 3)
depth_img_sca = depth_img_sca.reshape(h, w, 3)
else:
color_img_sca = source_color_img / 255
depth_img_sca = source_depth_img
img_sca = np.concatenate((color_img_sca, depth_img_sca, source_color_img, source_depth_img), axis=-1)
return img_sca


def backproject_depth(depth_image, intrinsics):
assert len(depth_image.shape) == 2
intrinsics[-1][-1] = 1
height, width = depth_image.shape
if depth_image.dtype != np.float32:
depth_image = depth_image.astype(np.float32)

depth_image /= 1000 # unit: m
img = np.ones((width, height, 3))
img[..., 0] = np.array([[i] * height for i in range(width)]).reshape(width, height)
img[..., 1] = np.array(list(range(height)) * width).reshape(width, height)
Z = np.repeat(np.transpose(depth_image).reshape(width, height, 1), 3, axis=2)
img2d = img * Z # (h, w, 3)
point_image = np.matmul(img2d, np.linalg.inv(np.transpose(intrinsics))) # 3, hxw xyz
point_image = point_image.swapaxes(0, 1).astype(np.float32)

"""
point_image = np.zeros((height, width, 3), dtype=np.float32)
k_x, k_y, u_0, v_0 = intrinsics[0, 0], intrinsics[1, 1], intrinsics[0, 2], intrinsics[1, 2]
for v in range(height): # row -> y
for u in range(width): # col -> x
if depth_image[v, u] == 0:
continue
depth = depth_image[v, u]
z_c = depth / 1000 # unit: m
x_c = (u - u_0) * z_c / k_x
y_c = (v - v_0) * z_c / k_y
point_image[v, u] = np.array([x_c, y_c, z_c], dtype=np.float32)
"""
return point_image


def load_image(color_image_path, depth_image_path, intrinsics):
color_image = io.imread(color_image_path) # (h, w, 3) RGB
depth_image = io.imread(depth_image_path) # (h, w)
# depth_image = cv2.GaussianBlur(depth_image, (3, 3), 1)
depth_image = backproject_depth(depth_image, intrinsics) # (h, w, 3) xyz
image = np.concatenate((color_image, depth_image), axis=-1) # (h, w, 6)
return image # (h, w, 6)


def load_mask(mask_image_path):
mask_image = cv2.imread(mask_image_path)
return mask_image / 255. # (h, w, 3)


def display_inlier_outlier(cloud, ind):
inlier_cloud = cloud.select_by_index(ind)
outlier_cloud = cloud.select_by_index(ind, invert=True)
outlier_cloud.paint_uniform_color([1, 0, 0]) # red
inlier_cloud.paint_uniform_color([0.8, 0.8, 0.8]) # gray
o3d.visualization.draw_geometries([inlier_cloud, outlier_cloud])


def remove_noisy(image, nb_points=30, radius=0.02):
height, width = image.shape[:2]
point = image[..., 3:6]
point = point.reshape(height * width, 3)
pc = o3d.geometry.PointCloud()
pc.points = o3d.utility.Vector3dVector(point)
# convert outlier coordinates to 0
down_sample_points = 1
uni_down_src_pc = pc.uniform_down_sample(every_k_points=down_sample_points)
# delete points that have few neighborhood points around a sphere of a given radius
cl1, ind = uni_down_src_pc.remove_radius_outlier(nb_points=nb_points, radius=radius)
# display outlier and inlier
# display_inlier_outlier(pc, ind)
ind = set(ind)
for i in range(height * width):
if i not in ind:
point[i] = np.zeros((1, 3), dtype=np.float32)
point = point.reshape(height, width, 3)
image[..., 3:6] = point
image[..., 9:] = point
return image


def data_process(data):
intrinsics_path = data['intrinsics_path']
if intrinsics_path.endswith('txt'):
intrinsics = []
with open(intrinsics_path, 'r', encoding='utf-8') as f:
for line in f.readlines():
intrinsics.append(list(map(float, line.split()))[:-1])
intrinsics = np.array(intrinsics[:-1], dtype=np.float32)
else:
intrinsics = np.load(intrinsics_path)

color_image_path = os.path.join(data_path, data["color"])
depth_image_path = os.path.join(data_path, data["depth"])
image = load_image(color_image_path, depth_image_path, intrinsics)

# mask = get_mask_from_dist(image) # move_dragon

# crop and normalize
crop_size = input_height, input_width
image = crop(image, crop_size, crop_type)
image = normalize(image, normal_type='')

# mask image
mask = load_mask(data["mask"]) # (h, w)
mask = crop(mask, crop_size, crop_type)
mask = np.where(mask, True, False)
image = np.where(np.tile(mask, 4), image, 0.) # (h, w, 12) normal_RGB+normal_xyz+rgb+xyz

# point cloud denoising
image = remove_noisy(image)
return image, mask, intrinsics


def batch_data_process(img_num, data_path):
pre_data = []
for i, img_number in enumerate(range(img_num)):
img_number = f"{img_number:06d}"
data = {'color': data_path + 'rgbd/frame-' + img_number + '.color.png',
'depth': data_path + 'rgbd/frame-' + img_number + '.depth.png',
'mask': data_path + '/mask/mask' + img_number + '.png',
'intrinsics_path': data_path + 'colorIntrinsics.txt'}
image, mask, intrinsics = data_process(data)
# cv2.imwrite(f'{data_path}/mask/mask{img_number}.png', mask)
pre_data.append([image, mask, intrinsics])
print(i)
return pre_data


def map_3d_to_2d(point_set, intrinsics, height, width):
# x = KX / z_c
point_rearr = point_set.transpose(1, 0) # (3, num_points)
point_proj2D = np.divide(np.matmul(intrinsics, point_rearr), point_rearr[2:, :])
point_pred = point_proj2D[:2, :].transpose(1, 0) # (num_points, 2) u v
pred_image = np.zeros((height, width, 3), dtype=np.float32)
alpha = 1.5 if data_type == 'kinect' else 2 # 640/width 480/height
# normalization, truncation, assignment
point_pred_int = np.round(point_pred / alpha).astype(np.int) # (num_points, 2) u v
point_pred_int[..., 0] = np.clip(point_pred_int[..., 0], 0, width - 1)
point_pred_int[..., 1] = np.clip(point_pred_int[..., 1], 0, height - 1)
pred_image[point_pred_int[..., 1], point_pred_int[..., 0]] = point_set
return pred_image


def uneven_upsample_based_mask(pred_image, mask_image, radius):
# pred_image: h*w*3 xyz mask_image: h*w*3
up_image = pred_image.copy()
# the index that is inside mask and the current point is null
mask_image_inf = np.where(mask_image, 0, float('inf')) + pred_image
# n1: number of points to be interpolated 2:uv
idx = np.argwhere(mask_image_inf[..., 0] == 0) # (n1, 2)
# n2: neighbor count 2:uv
neighbors = np.array(neighbors_list[radius]) # (n2, 2)
n1, n2 = idx.shape[0], neighbors.shape[0]
idx_expand = np.tile(np.expand_dims(idx, axis=-1), n2).swapaxes(1, 2)
neighbors_expand = np.tile(np.expand_dims(neighbors, axis=-1), n1).swapaxes(0, 2).swapaxes(1, 2)
coordinate = idx_expand + neighbors_expand # (n1, n2, 2)
# remove those that are not in mask and have no coordinate value
coordinate[..., 0] = np.where(coordinate[..., 0] >= input_height, input_height - 1, coordinate[..., 0])
coordinate[..., 0] = np.where(coordinate[..., 0] < 0, 0, coordinate[..., 0])
coordinate[..., 1] = np.where(coordinate[..., 1] >= input_width, input_width - 1, coordinate[..., 1])
coordinate[..., 1] = np.where(coordinate[..., 1] < 0, 0, coordinate[..., 1])
x_val = mask_image_inf[coordinate[..., 0], coordinate[..., 1]][..., 0] # (n1, n2)
# set the true to 1, false to 0
neighbor_mask = np.where((x_val == float('inf')) | (x_val == 0), 0, 1) # (n1, n2)
# calculate distance matrix
D = np.linalg.norm(coordinate - idx_expand, axis=2) + 1e-6 # (n1, n2)
# calculate the inverse distance weight and update the new matrix
W = (1 / D) / np.expand_dims(np.sum(1 / D * neighbor_mask, axis=1), axis=1) # (n1, n2)
diff_val = np.matmul((neighbor_mask * W).reshape(n1, 1, n2),
pred_image[coordinate[..., 0], coordinate[..., 1]]).reshape(n1, 3)
up_image[idx[..., 0], idx[..., 1]] = diff_val # (n1, 3) xyz
return up_image


def up_sample(pred_point, mask_image, intrinsics, radius, interpolation_type, ratio):
# point: n*3 mask_image: h*w*3
height, width = mask_image.shape[0], mask_image.shape[1]
pred_image = map_3d_to_2d(pred_point, intrinsics, height, width) # (h, w, 3) xyz
pred_image = np.where(mask_image, pred_image, 0) # (h, w, 3)
up_point_image = uneven_upsample_based_mask(pred_image, mask_image, radius)

tw, th = up_point_image.shape[1] * ratio, up_point_image.shape[0] * ratio
# INTER_NEAREST INTER_AREA = INTER_LINEAR
interpolation = cv2.INTER_NEAREST if interpolation_type == 'nearest' else cv2.INTER_LINEAR
up_point_image = cv2.resize(up_point_image, (tw, th), interpolation=interpolation)
mask_image_float = np.where(mask_image, 1., 0.)
# uses nearest neighbor interpolation, linear interpolation has serrated edges
mask_image_float = cv2.resize(mask_image_float, (tw, th), interpolation=interpolation)
up_mask_image_bool = np.where(mask_image_float == 1., True, False)

intrinsics = intrinsics / 2 if data_type == 'dataset' else intrinsics / 1.5
point_image = backproject_depth(up_point_image[..., -1] * 1000, intrinsics * ratio)
# interpolation edge processing: remove edge interference
point_set = point_image[up_mask_image_bool].reshape(-1, 3)
return point_set, up_mask_image_bool # (point_num, 3)


def color_interpolation(color_img, mask_image_bool, interpolation_type, ratio):
interpolation = cv2.INTER_NEAREST if interpolation_type == 'nearest' else cv2.INTER_LINEAR
tw, th = color_img.shape[1] * ratio, color_img.shape[0] * ratio
up_color_image = cv2.resize(color_img, (tw, th), interpolation=interpolation)
color_set = up_color_image[mask_image_bool].reshape(-1, 3)
return color_set # (point_num, 3)


def transform_visual(points):
matrix = np.array(
[[1.0, 0.0, 0.0],
[0.0, -1.0, 0.0],
[0.0, 0.0, -1.0]]
)
points = np.matmul(points, matrix.transpose())
return points


def reconstruction(image, mask, intrinsics, radius=8, ratio=2, interpolation_type='linear'):
"""
:param radius: the neighborhood radius of the sample
:param ratio: sampling multiple
:param interpolation_type: nearest area linear
"""
t1 = time.time()
point = image[..., 9:][mask]
point_pred = point[np.where(point != 0)].reshape(-1, 3)
# print('before sample:', len(point_pred))
point_up_sample, mask_image_bool = up_sample(point_pred, mask, intrinsics, radius, interpolation_type, ratio)
# print('after sample:', len(point_up_sample))
t2 = time.time()
pred_color = color_interpolation(image[..., :3], mask_image_bool, interpolation_type, ratio)
t3 = time.time()
print(f'the time of point sample:{t2 - t1}, the time of color sample:{t3 - t2}')
point_up_sample = transform_visual(point_up_sample)
return point_up_sample, pred_color


def reconstruct_and_visualize(pre_data, out_path, save_img=True):
if not os.path.exists(out_path):
os.makedirs(out_path)
vis = o3d.visualization.Visualizer()
vis.create_window(width=960 * 2, height=640 * 2, left=10, top=10)
pcd = o3d.geometry.PointCloud()
for i, (image, mask, intrinsics) in enumerate(pre_data):
point_up_sample, pred_color = reconstruction(image, mask, intrinsics)
pcd.points = o3d.utility.Vector3dVector(point_up_sample.reshape(-1, 3))
pcd.colors = o3d.utility.Vector3dVector(pred_color.reshape(-1, 3))
# visual the single image
# o3d.visualization.draw_geometries([pcd])
vis.add_geometry(pcd)
vis.poll_events()
vis.update_renderer()
if save_img:
vis.capture_screen_image(f"{out_path}{i:06d}.png")
vis.destroy_window()


if __name__ == '__main__':
crop_type = 'inter_nearest'
input_height, input_width = 240, 320
obj = 'move_dragon'
data_type = 'dataset'
img_num = 90
data_path = f'/home/PycharmProjects/data/{obj}/'

pre_data = batch_data_process(img_num, data_path)
print('data preprocess done!')

out_path = data_path + '/recon_img/'
reconstruct_and_visualize(pre_data, out_path)

参考:

Open3d之非阻塞可视化:https://blog.csdn.net/u014072827/article/details/113766353?spm=1001.2014.3001.5506

Python实现传统2D/3D配准——SIFT/SURF/ BRISK/ORB/AKAZE/ICP

  1. 导入使用到的包
1
2
3
4
5
6
import random
import math
import matplotlib.pyplot as plt
import numpy as np
import open3d as o3d
import cv2
  1. 基于传统特征点匹配的2D配准函数。函数输入为想要配准的source图和target图,两个图像均为维度(h, w, 3)的ndarray。函数可以选择使用不同的特征提取方式:SIFT/SURF/ BRISK/ORB/AKAZE。函数实现了匹配点、配准图的可视化。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def akaze_registration2d(source_image, target_image, optical_flow_gt=None):
# running speed and robustness comparison: SIFT < SURF < BRISK < FREAK < ORB < AKAZE
# initial
sift = cv2.xfeatures2d.SIFT_create()
surf = cv2.xfeatures2d.SURF_create()
brisk = cv2.BRISK_create()
orb = cv2.ORB_create()
akaze = cv2.AKAZE_create()
# find key points and descriptions
kp1, des1 = akaze.detectAndCompute(source_image, None)
kp2, des2 = akaze.detectAndCompute(target_image, None)

# BFMatcher (Brute force computing)
bf = cv2.BFMatcher() # cv2.NORM_HAMMING, crossCheck=True
# matches = bf.match(des1, des2)
matches = bf.knnMatch(des1, des2, k=2)

"""
# get the flann matcher
FLANN_INDEX_KDTREE = 0
# para1:indexParams
# for SIFT and SURF: index_params=dict(algorithm=FLANN_INDEX_KDTREE, trees=5)
# for ORB: index_params=dict(algorithm=FLANN_INDEX_LSH, table_number=6, key_size=12)
indexParams = dict(algorithm=FLANN_INDEX_KDTREE, trees=5)
# para2:searchParams (the number of recursive traversals)
searchParams = dict(checks=50)
# ues FlannBasedMatcher to find the nearest neighbor
flann = cv2.FlannBasedMatcher(indexParams, searchParams)
# use knnMatch and return matches
matches = flann.knnMatch(des1, des2, k=2)
"""

# sort by similarity
matches = sorted(matches, key=lambda x: x[0].distance)
# rotation test
good_matches = []
n = 50
for d1, d2 in matches:
# the smaller the coefficient, the fewer the matching points
if d1.distance < 0.9 * d2.distance:
good_matches.append([d1])

# draw matches
# img3 = cv2.drawMatches(img1, kp1, img2, kp2, matches[: n], img2, flags=2)
img3 = cv2.drawMatchesKnn(np.uint8(source_image), kp1, np.uint8(target_image), kp2, good_matches, None, flags=cv2.DrawMatchesFlags_NOT_DRAW_SINGLE_POINTS)
plt.imshow(img3.astype('uint8'))
plt.show()
cv2.imwrite('akaze_matches.jpg', img3)

# select matching key
ref_matched_kpts = np.float32([kp1[m[0].queryIdx].pt for m in good_matches]).reshape(-1, 1, 2)
sensed_matched_kpts = np.float32([kp2[m[0].trainIdx].pt for m in good_matches]).reshape(-1, 1, 2)

plt.ion() # change the display mode to interactive mode
ndarray_image = np.concatenate([source_image, target_image], axis=1) # (h, 2w, 3)
plt.imshow(ndarray_image.astype('uint8'))
plt.show()
plt.savefig('akaze_registration2d.png')

# calculate homography
H, status = cv2.findHomography(ref_matched_kpts, sensed_matched_kpts, cv2.RANSAC, 5.0)
# transform
warped_image = cv2.warpPerspective(source_image, H, (source_image.shape[1], source_image.shape[0]))
warped_image = cv2.cvtColor(warped_image, cv2.COLOR_RGB2BGR)
cv2.imwrite('akaze_warped.jpg', warped_image)

# calculate error
if not optical_flow_gt:
error_matrix = np.zeros((ref_matched_kpts.shape[0],))
num = 0
for i in range(ref_matched_kpts.shape[0]):
u, v = round(ref_matched_kpts[i, 0, 0]), round(ref_matched_kpts[i, 0, 1])
if np.isinf(optical_flow_gt[v, u, 0]):
continue
u_error = abs(optical_flow_gt[v, u, 0]) - abs(ref_matched_kpts[i, 0, 0] - sensed_matched_kpts[i, 0, 0])
v_error = abs(optical_flow_gt[v, u, 1]) - abs(ref_matched_kpts[i, 0, 1] - sensed_matched_kpts[i, 0, 1])
error = math.sqrt(u_error ** 2 + v_error ** 2)
error_matrix[i] = error
if num < n:
plt.plot([ref_matched_kpts[i, 0, 0], sensed_matched_kpts[i, 0, 0] + source_image.shape[1]],
[ref_matched_kpts[i, 0, 1], sensed_matched_kpts[i, 0, 1]],
color=[(10+3*i)/255, (80+i)/255, 220/255], linewidth=0.5, marker='.', markersize=2)
num += 1
# remove invalid error
error_matrix = error_matrix[error_matrix != 0]
print('akaze EPE2D error: %f pixel ' % np.mean(error_matrix), error_matrix.shape[0])
  1. ICP算法3D配准函数。函数输入为想要配准的source点云和target点云,两个点云均为维度(n, 3)的ndarray,n表示点的数量,3表示点的三维xyz坐标。函数使用两种方式实现了ICP匹配算法及结果的可视化。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def icp_registration3d(source_point, target_point, target_point_gt=None):
source_pcd = o3d.geometry.PointCloud()
num_point = source_point.shape[0]
idx = random.sample(range(num_point), num_point)
source_point = source_point[idx]
source_pcd.points = o3d.utility.Vector3dVector(source_point)
source_pcd.paint_uniform_color([255/255, 127/255, 0/255]) # orange
target_pcd = o3d.geometry.PointCloud()
target_pcd.points = o3d.utility.Vector3dVector(target_point)
target_pcd.paint_uniform_color([50/255, 205/255, 50/255]) # green

"""
threshold = 1.0 # the threshold of the moving range
trans_init = np.asarray([[1, 0, 0, 0], # 4x4 identity matrix
[0, 1, 0, 0], # Initial matrix: no displacement, no rotation
[0, 0, 1, 0],
[0, 0, 0, 1]])
reg_p2p = o3d.pipelines.registration.registration_icp(source_pcd, target_pcd, threshold, trans_init, o3d.pipelines.registration.TransformationEstimationPointToPoint())
source_pcd.transform(reg_p2p.transformation)
target_pred_pcd = source_pcd
target_point_pred = np.array(source_pcd.points)
target_pred_pcd.paint_uniform_color([67 / 255, 110 / 255, 238 / 255]) # blue
"""

c0 = np.mean(source_point, axis=0)
c1 = np.mean(target_point, axis=0)
H = (source_point - c0).transpose() @ (target_point - c1)
U, S, Vt = np.linalg.svd(H)
R = np.dot(Vt.T, U.T)
t = c1 - R @ c0
target_point_pred = np.dot(source_point, R.transpose()) + t
target_pred_pcd = o3d.geometry.PointCloud()
target_pred_pcd.points = o3d.utility.Vector3dVector(target_point_pred)
target_pred_pcd.paint_uniform_color([67/255, 110/255, 238/255]) # blue

vis = o3d.visualization.Visualizer()
vis.create_window()
vis.add_geometry(target_pred_pcd)
vis.add_geometry(target_pcd)

"""
# Alignment
align_colors = [[(10+2*i)%255/255, 80/255, 200/255] for i in range(num_point)]
icp_points = np.concatenate([target_point_pred, target_point], axis=0)
icp_lines = [[i, i + num_point] for i in range(num_point)]
icp_align = o3d.geometry.LineSet(
points=o3d.utility.Vector3dVector(icp_points),
lines=o3d.utility.Vector2iVector(icp_lines))
icp_align.colors = o3d.utility.Vector3dVector(align_colors)
vis.add_geometry(icp_align)
"""

# vis.update_geometry()
vis.poll_events()
vis.update_renderer()
vis.run()

# calculate error
if not target_point_gt:
point_error = np.linalg.norm(target_point_pred - target_point_gt, axis=1, ord=2)
print('ICP EPE3D error: %f m' % np.mean(point_error))
  1. 函数调用。输入想要配准的2d图像和3d点云。
1
2
3
4
5
6
7
8
9
def predict(data):
source_color = data['source_color'] # (h, w, 3)
target_color = data['target_color'] # (h, w, 3)
akaze_registration2d(source_color, target_color)

# n: number of points
source_point = data['source_point'] # (n, 3)
target_point = data['source_target_point'] # (n, 3)
icp_registration3d(source_point, target_point)