竞赛总结:智能驾驶汽车虚拟仿真视频数据理解

  • 赛题名称:2023全球智能汽车AI挑战赛——赛道二:智能驾驶汽车虚拟仿真视频数据理解赛道
  • 赛题任务:对视频中的信息进行综合理解,以指定的json文件格式,按照数据说明中的关键词(key)填充描述型的文本信息
  • 赛题类型:计算机视觉、目标检测

比赛链接:2023全球智能汽车AI挑战赛——赛道二:智能驾驶汽车虚拟仿真视频数据理解赛道

Datawhale教学视频:二次元的Datawhale的个人空间-二次元的Datawhale个人主页)

赛事背景

当前,全球新一轮科技革命和产业变革蓬勃发展,汽车与人工智能技术加速融合,电动化、网联化、智能化成为汽车产业的发展潮流和趋势,AI技术将更广泛地和汽车产业的各个领域,应用于汽车的智能维护、智能制造、智能驾驶等诸多方面。作为人工智能技术和汽车产业先进技术的倡导者,吉利汽车集团、阿里云、NVIDIA 英伟达一直致力于推动未来出行方式的发展,共同发起了本届2023全球智能汽车AI挑战赛。本届比赛将汇聚来自全球各地的杰出AI领域人才,推动自动驾驶、AI大模型、加速计算、云计算技术三者深度结合,为未来智能出行提供更加安全、高效、舒适的解决方案。

赛事任务

输入:元宇宙仿真平台生成的前视摄像头虚拟视频数据(8-10秒左右);

输出:对视频中的信息进行综合理解,以指定的json文件格式,按照数据说明中的关键词(key)填充描述型的文本信息(value,中文/英文均可以);

数据说明

文本描述结构树

image-20240114211932916

上传json格式示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
建议用英文提交:
{
"author" : "abc" ,
"time" : "YYMMDD",
"model" : "model_name",
"test_results" :[
{
"clip_id" : "xxxx_1",
"scenario" : "cityroad",
"weather":"unknown",
"period":"night",
"road_structure":"ramp",
"general_obstacle":"nothing",
"abnormal_condition":"nothing",
"ego_car_behavior":"turning right",
"closest_participants_type":"passenger car",
"closest_participants_behavior":"braking"
},
{
"clip_id" : "xxxx_2"
... ...
},
... ...
}

为了减少程序编译过程中的问题,提交答案的json文件中的 key & value 请使用英文,key请不要进行更改,value使用以下列表中的元素。

1
2
3
4
5
6
7
8
9
"scenario" : ["suburbs","city street","expressway","tunnel","parking-lot","gas or charging stations","unknown"]
"weather" : ["clear","cloudy","raining","foggy","snowy","unknown"]
"period" : ["daytime","dawn or dusk","night","unknown"]
"road_structure" : ["normal","crossroads","T-junction","ramp","lane merging","parking lot entrance","round about","unknown"]
"general_obstacle" : ["nothing","speed bumper","traffic cone","water horse","stone","manhole cover","nothing","unknown"]
"abnormal_condition" : ["uneven","oil or water stain","standing water","cracked","nothing","unknown"]
"ego_car_behavior" : ["slow down","go straight","turn right","turn left","stop","U-turn","speed up","lane change","others"]
"closest_participants_type" : ["passenger car","bus","truck","pedestrian","policeman","nothing","others","unknown"]
"closest_participants_behavior" : ["slow down","go straight","turn right","turn left","stop","U-turn","speed up","lane change","others"]

评测指标

初赛阶段:排行榜总分=视频理解准确度分数
复赛阶段:复赛总成绩=复赛排行榜视频理解准确度分数(100%)+代码复现时效分数(10%)
具体成绩计算方法和晋级标准请参考【赛制介绍】

视频理解准确度分数评测规则如下:

参赛者可采用不同的人工智能的模型和算法,推理出对应视频的描述语言,参赛者可以在给定的备选答案中选出一个正确的答案,如果其描述语言不在给定的备选答案中,也可以给出一个最佳的答案。

系统会针对参赛者提交的json文件,通过描述型的文本信息与真值进行对比,综合得出分数;其中,“距离最近的交通参与者的行为”的题目为2分,其它题目为1分;每个视频的满分为10分。每一个视频结果中的key值,需要参考数据说明的json格式示例,请勿进行修改。

对于真值部分,组织者会建立对应的中英文近义词作为真值列表,只要在该列表中就获得分数,例如真值“雨天” = [“雨天”, “雨”, “小雨”… , “rainy”, “rain”, “raining”…],参赛选手可以选择对应的近义词来进行作答,但每一项的真值列表不公开,仅体现在后台程序中。

解题思路

基本思路

  • 使用文本与图像进行匹配

datawhale学习组织将Baseline部署在线上平台百度AI Studio上,可一键fork运行代码:

https://aistudio.baidu.com/projectdetail/7033846?contributionType=1&sUid=40990&shared=1&ts=1699415726984

baseline代码解读

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# 导入必要的库和模块
import paddle # 导入 PaddlePaddle 深度学习框架
from PIL import Image # 从 PIL 库导入 Image 模块,用于图像处理
from clip import tokenize, load_model # 导入 clip 模块,可能用于图像和文本的联合处理
import glob, json, os # 导入文件处理和 JSON 处理的库
import cv2 # 导入 OpenCV 库,用于计算机视觉任务
from tqdm import tqdm_notebook # 导入 tqdm_notebook 以在笔记本中显示进度条
import numpy as np # 导入 NumPy 用于数值处理
from sklearn.preprocessing import normalize # 从 sklearn.preprocessing 导入 normalize 用于数据归一化
import matplotlib.pyplot as plt # 导入 matplotlib.pyplot 用于绘图

# 加载模型和转换工具
model, transforms = load_model('ViT_B_32', pretrained=True) # 加载预训练的 ViT_B_32 模型和其转换

# 为各个类别和相应词汇定义字典
en_match_words = {
# 各个类别的关键词列表
}

# 初始化提交的 JSON 结构
submit_json = {
"author": "abc", # 作者姓名
"time": "231011", # 时间戳
"model": "model_name", # 使用的模型名称
"test_results": [] # 测试结果的列表,初始为空
}

# 获取并排序视频路径
paths = glob.glob('./PreliminaryTestVideos/*') # 使用 glob 获取指定路径下的所有视频文件
paths.sort() # 对路径进行排序

# 遍历每个视频文件进行处理
for video_path in paths:
print(video_path) # 打印视频路径

# 从路径中提取视频剪辑的 ID
clip_id = video_path.split('/')[-1]
cap = cv2.VideoCapture(video_path) # 使用 OpenCV 读取视频
img = cap.read()[1] # 读取视频的第一帧
image = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) # 将图像从 BGR 转换为 RGB 格式
image = Image.fromarray(image) # 将数组转换为 PIL 图像
image = transforms(image).unsqueeze(0) # 应用预处理转换并添加一个维度

# 初始化用于单个视频结果的字典
single_video_result = {
# 视频的各种属性
}

# 针对特定关键词进行预测
for keyword in en_match_words.keys():
if keyword not in ["weather", "road_structure"]:
continue # 如果关键词不是 weather 或 road_structure,则跳过

texts = np.array(en_match_words[keyword]) # 将关键词转换为 NumPy 数组

with paddle.no_grad(): # 禁用梯度计算
# 使用模型进行预测
logits_per_image, logits_per_text = model(image, tokenize(en_match_words[keyword]))
probs = paddle.nn.functional.softmax(logits_per_image, axis=-1) # 应用 softmax 获取概率分布

probs = probs.numpy() # 将概率转换为 NumPy 数组
single_video_result[keyword] = texts[probs[0].argsort()[::-1][0]] # 选择具有最高概率的词汇作为结果

submit_json["test_results"].append(single_video_result) # 将结果添加到测试结果列表

# 将最终结果写入 JSON 文件
with open('clip_result.json', 'w', encoding='utf-8') as up:
json.dump(submit_json, up, ensure_ascii=False) # 以 UTF-8 编码将结果保存到文件中

进阶思路

  • 使用图像进行视觉问答
  • 时序视频进行视频问答
  • 使用多模态大模型(CLIP)进行问答

多模态大模型CLIP简介

CLIP(Contrastive Language-Image Pre-training)是一种多模态大模型,由OpenAI开发。它是一种能够同时理解文本和图像的模型,通过对文本和图像进行对比性学习,使其在多模态任务上表现出色。以下是CLIP的一些关键特点和工作原理的简介:

  1. 多模态表示学习: CLIP的设计目标是使模型能够理解文本和图像之间的语义关系,而不是仅限于特定任务。这使得CLIP在各种任务上都能表现良好,而无需针对每个任务进行专门的微调。
  2. 对比性学习: CLIP使用对比损失进行训练。这意味着模型学会将相关的文本和图像样本靠近,而不相关的样本分开。这种对比性学习的方法使得CLIP在理解语义关系时更为强大。
  3. 零样本学习: CLIP在零样本学习方面表现出色。这意味着模型可以在没有特定任务样本的情况下执行任务,因为它已经学会了通用的文本-图像表示。
  4. 大规模预训练: CLIP是在大规模文本和图像数据上进行预训练的。这使得模型能够捕捉更广泛的语义信息,从而在多种任务上通用。
  5. 应用广泛: 由于其多模态的性质,CLIP可以用于多种任务,包括图像分类、物体检测、文本检索等。

总体而言,CLIP代表了一种强大的多模态学习方法,使得模型能够理解文本和图像之间的语义关系,并在各种任务上表现出色。

大佬代码解读

大佬代码地址(大家可以关注膜拜一下大佬):self drive | Kaggle

推理天气,时间和道路结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# 导入必要的库
import glob
import cv2
import numpy as np

# 创建一个JSON格式的字典,包含作者信息、时间戳、模型名称和一个空的测试结果列表
submit_json = {
"author" : "abc" ,
"time" : "231011",
"model" : "model_name",
"test_results" : []
}

# 获取指定路径下的视频文件列表,排序后存储在paths变量中
paths = glob.glob('/kaggle/input/clip-test/初赛测试视频/*')
paths.sort()
debug = False

# 如果设置了debug标志,则只选择一个特定的视频路径用于调试
if debug:
paths = ['/kaggle/input/clip-test/初赛测试视频/40.avi']

# 遍历每个视频文件
for video_path in paths:
print(video_path)

# 提取视频文件名作为clip_id
clip_id = video_path.split('/')[-1]

# 打开视频文件
cap = cv2.VideoCapture(video_path)

# 读取视频的第一帧图像
img = cap.read()[1]
img = cap.read()[1]

# 获取图像的高度、宽度和通道数
height, width, _ = img.shape

# 截取图像的上部分,保留下部分的 75%
end_row = int(height * 0.75)
img2 = img[0:end_row, :]

# 对图像进行预处理,转换为模型所需的格式
image1 = preprocess(Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))).unsqueeze(0).to(device)
image2 = preprocess(Image.fromarray(cv2.cvtColor(img2, cv2.COLOR_BGR2RGB))).unsqueeze(0).to(device)

# 创建包含初始信息的单个视频结果字典
single_video_result = {
"clip_id": clip_id,
"scerario" : "city street",
"weather":"clear",
"period":"night",
"road_structure":"normal",
"general_obstacle":"nothing",
"abnormal_condition":"nothing",
"ego_car_behavior":"go straight",
"closest_participants_type":"passenger car",
"closest_participants_behavior":"braking"
}

# 遍历关键词(en_match_words中的关键词)
for keyword in en_match_words.keys():
# 如果关键词不是["weather", "period", "road_structure"]中的一个,跳过
if keyword not in ["weather", "period", "road_structure"]:
continue

# 获取关键词对应的文本列表
texts = np.array(en_match_words[keyword])
text = clip.tokenize(texts).to(device)

# 使用torch.no_grad()上下文,避免计算梯度
with torch.no_grad():
if keyword == 'period':
# 检查关键词是否为 'period'
# 对于关键词 'period',根据模型预测白天或夜晚
height, width, _ = img.shape
end_row = int(height * 0.5)
img_day = img[0:end_row, :]
img_day = preprocess(Image.fromarray(cv2.cvtColor(img_day, cv2.COLOR_BGR2RGB))).unsqueeze(0).to(device)
# 预处理 'period' 关键词的图像
logits_per_image, logits_per_text = model(img_day, text)
# 获取模型预测结果
probs = logits_per_image.softmax(dim=-1).cpu().numpy()
# 将预测转换为概率
if probs[0][0] < 0.85:
# 如果是 'daytime' 的概率小于 0.85,则分类为 'night'
single_video_result[keyword] = 'night'
else:
# 否则分类为 'daytime'
single_video_result[keyword] = 'daytime'

else:
# 对于其他关键词,根据模型预测关键词的可能类别,并选择概率最高的类别作为结果
# 使用两个不同的图像进行模型预测
logits_per_image1, logits_per_text1 = model(image1, text)
logits_per_image2, logits_per_text2 = model(image2, text)
# 从两个预测中获取预测结果
probs1 = logits_per_image1.softmax(dim=-1).cpu().numpy()
probs2 = logits_per_image2.softmax(dim=-1).cpu().numpy()
# 将两个预测的概率进行组合
probs = probs1 + probs2
# 选择概率最高的类别作为结果
single_video_result[keyword] = texts[probs[0].argsort()[::-1][0]]

# 如果关键词为 "parking lot entrance"
if texts[probs[0].argsort()[::-1][0]] == "parking lot entrance":
# 再次使用模型预测其他可能的条件
texts = ["uneven","水渍","油渍","积水","cracked"]
text = clip.tokenize(texts).to(device)
logits_per_image1, logits_per_text1 = model(image2, text)
probs = logits_per_image1.softmax(dim=-1).cpu().numpy()
# 根据概率值确定结果
if texts[probs[0].argsort()[::-1][0]] in ["水渍","油渍"]:
single_video_result['abnormal_condition'] = "oil or water stain"
elif texts[probs[0].argsort()[::-1][0]] == '积水':
single_video_result['abnormal_condition'] = "standing water"
else :
single_video_result['abnormal_condition'] = texts[probs[0].argsort()[::-1][0]]
print(single_video_result['abnormal_condition'])

# 调整特定的 "road_structure" 值
if single_video_result["road_structure"] == "Ordinary roads":
single_video_result["road_structure"] = "normal"
if single_video_result["road_structure"] == 'lane merging':
# 如果 "road_structure" 为 'lane merging',再次使用模型预测并调整结果
texts = np.array(['车道合并','普通道路'])
text = clip.tokenize(texts).to(device)
logits_per_image1, logits_per_text1 = model(image1, text)
probs1 = logits_per_image1.softmax(dim=-1).cpu().detach().numpy()
if texts[probs1[0].argsort()[::-1][0]] != '车道合并':
single_video_result["road_structure"] = 'normal'

# 将单个视频结果添加到submit_json中的测试结果列表
submit_json["test_results"].append(single_video_result)

推理最近交通参与者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# 导入必要的库
import glob
import cv2
import numpy as np

# 初始化一个 JSON 格式的字典,用于存储视频结果
submit_json_video = {
"author": "abc",
"time": "231011",
"model": "model_name",
"test_results": []
}

# 定义包含不同场景关键词可能取值的字典
en_match_words = {
"scerario": ["suburbs", "city street", "expressway", "tunnel", "parking-lot", "gas or charging stations", "unknown"],
"weather": ["clear", "cloudy", "raining", "foggy", "snowying", "unknown"],
"period": ["daytime", "dawn or dusk", "night", "unknown"],
"road_structure": ["Ordinary roads", "crossroads", "T-junction", "ramp", "lane merging", "parking lot entrance", "round about", "unknown"],
"general_obstacle": ["nothing", "speed bumper", "traffic cone", "water horse", "stone", "manhole cover", "nothing", "unknown"],
"abnormal_condition": ["uneven", "oil or water stain", "standing water", "cracked", "nothing", "unknown"],
"ego_car_behavior": ["slow down", "go straight", "turn right", "turn left", "stop", "U-turn", "speed up", "lane change", "others"],
"closest_participants_type": ["normal car", "bus", "truck", "people", "police", "nothing", "others", "unknown"],
"closest_participants_behavior": ["slow down", "go straight", "turn right", "turn left", "stop", "U-turn", "speed up", "lane change", "others"],
}

# 获取视频文件的路径并进行排序
paths = glob.glob('/kaggle/input/clip-test/初赛测试视频/*')
paths.sort()

# 指定用于分析的关键词(例如,['closest_participants_type'])
keys = ['closest_participants_type']

# 设置调试标志,以控制是处理所有视频还是只处理特定视频
debug = False

# 如果处于调试模式,则使用特定的视频进行测试
if debug:
paths = ['/kaggle/input/clip-test/初赛测试视频/45.avi']

# 遍历每个视频路径
for video_path in paths:
print(video_path)

# 初始化一个数组,用于存储每个关键词的概率总和
ans = [[0, 0, 0, 0, 0, 0, 0, 0]]

# 提取视频文件名作为 clip_id
clip_id = video_path.split('/')[-1]

# 打开视频文件
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

results = []

# 定义用于帧采样率的变量 'x'
x = 5

# 以基于 'x' 的采样率遍历帧
for i in range(0, frame_count, int(fps // x)):
# 将帧位置设置为当前索引
cap.set(cv2.CAP_PROP_POS_FRAMES, i)
ret, img = cap.read()

# 如果帧读取不成功,则中断循环
if not ret:
break

# 从帧底部提取感兴趣区域(ROI)
height, width, _ = img.shape
start_row = int(height * 0)
img = img[start_row:height, :]

# 对帧图像进行预处理
image = preprocess(Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))).unsqueeze(0).to(device)

# 初始化一个用于单帧结果的字典
single_frame_result = {
"clip_id": clip_id,
"scerario": "city street",
"weather": "clear",
"period": "night",
"road_structure": "normal",
"general_obstacle": "nothing",
"abnormal_condition": "nothing",
"ego_car_behavior": "go straight",
"closest_participants_type": "passenger car",
"closest_participants_behavior": "braking"
}

# 遍历指定的关键词(例如,['closest_participants_type'])
for k, keyword in enumerate(keys):
# 对于特定帧跳过处理 'closest_participants_type'
if keyword == "closest_participants_type" and (i < fps * 5 // x or i > fps * 7 // x):
continue

# 获取关键词对应的文本
texts = np.array(en_match_words[keyword])
text = clip.tokenize(texts).to(device)

# 使用无梯度计算推理结果
with torch.no_grad():
logits_per_image, logits_per_text = model(image, text)
probs = logits_per_image.softmax(dim=-1).cpu().numpy()

# 将概率累加到 ans 数组中
for j in range(len(ans[k])):
ans[k][j] += probs[0][j]

# 对 ans 数组中的每个关键词,选择具有最大概率的文本值
for i in range(len(ans)):
single_frame_result[keys[i]] = en_match_words[keys[i]][ans[i].index((max(ans[i])))]

# 将单帧结果添加到 JSON 结果列表中
submit_json_video["test_results"].append(single_frame_result)

# 释放视频文件资源
cap.release()

推理自车行为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import glob
import pandas as pd
import cv2
import gc
import numpy as np
import random
import imageio
import tensorflow as tf
from sklearn.model_selection import train_test_split
from tqdm.notebook import tqdm
from tensorflow_docs.vis import embed
import matplotlib.pyplot as plt

def format_frames(frame, output_size):
"""
Pad and resize an image from a video.

Args:
frame: Image that needs to resized and padded.
output_size: Pixel size of the output frame image.

Return:
Formatted frame with padding of specified output size.
"""
frame = tf.image.convert_image_dtype(frame, tf.float32)
frame = tf.image.resize_with_pad(frame, *output_size)
return frame

# 定义一个函数,用于从视频文件中提取帧序列
def frames_from_video_file(video_path, n_frames, output_size=(224, 224), frame_step=15):
"""
从每个视频文件中创建帧序列。

参数:
video_path:视频文件的文件路径。
n_frames:要从每个视频文件中创建的帧数。
output_size:输出帧图像的像素大小。
frame_step:帧步长,即每隔多少帧采样一次。

返回:
一个形状为 (n_frames, height, width, channels) 的 NumPy 数组,包含从视频文件中提取的帧。
"""
# 读取每个视频的每一帧
result = []
src = cv2.VideoCapture(str(video_path)) # 打开视频文件

video_length = src.get(cv2.CAP_PROP_FRAME_COUNT) # 获取视频的总帧数

need_length = 1 + (n_frames - 1) * frame_step # 计算需要的帧序列长度

# 根据视频长度和需要的长度计算起始帧的位置
if need_length > video_length:
start = 0
else:
max_start = video_length - need_length
start = random.randint(0, max_start + 1)

src.set(cv2.CAP_PROP_POS_FRAMES, start) # 设置视频的起始帧位置

# 读取第一帧
ret, frame = src.read()
result.append(format_frames(frame, output_size)) # 将第一帧添加到结果列表

# 循环读取后续帧
for _ in range(n_frames - 1):
for _ in range(frame_step):
ret, frame = src.read()
if ret:
frame = format_frames(frame, output_size)
result.append(frame)
else:
result.append(np.zeros_like(result[0])) # 如果视频读取失败,用零填充

src.release() # 释放视频资源
result = np.array(result)[..., [2, 1, 0]] # 将结果转换为 NumPy 数组,并重新排序通道顺序(BGR 到 RGB)

return result

# 定义一个函数,将图像序列保存为 GIF 文件并返回嵌入的文件链接
def to_gif(images):
converted_images = np.clip(images * 255, 0, 255).astype(np.uint8) # 将图像值从 [0, 1] 转换为 [0, 255] 并转为整数
imageio.mimsave('./animation.gif', converted_images, fps=10) # 保存为 GIF 文件
return embed.embed_file('./animation.gif') # 返回嵌入的文件链接

推理场景

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
from torch.utils.data import DataLoader, Dataset
import os

class CustomDataset(Dataset):
def __init__(self, preprocess, image_files, transform=None):
"""
自定义数据集类的初始化函数。

参数:
preprocess:图像预处理函数。
image_files:包含图像文件路径的列表。
transform:可选的图像转换函数。
"""
self.image_files = image_files
self.transform = transform
self.preprocess = preprocess

def __len__(self):
"""
获取数据集的长度。

返回:
数据集的长度。
"""
return len(self.image_files)

def __getitem__(self, i):
"""
获取数据集中索引为 i 的样本。

参数:
i:样本的索引。

返回:
clip_images:视频片段帧的预处理结果列表。
images:视频片段帧的原始图像列表。
filename:图像文件的基本文件名。
"""
cap = cv2.VideoCapture(self.image_files[i]) # 打开视频文件
fps = cap.get(cv2.CAP_PROP_FPS) # 获取视频的帧率
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) # 获取视频的帧数
images = [] # 存储原始图像列表
clip_images = [] # 存储预处理后的图像列表

for j in range(0, frame_count, int(fps // 1)): # 以指定帧率采样视频帧
cap.set(cv2.CAP_PROP_POS_FRAMES, j) # 设置当前帧位置
ret, img = cap.read() # 读取当前帧

height, width, _ = img.shape
start_row = int(height * 0.20)
img = img[start_row:height, :]
img = cv2.fastNlMeansDenoisingColored(img, None, 10, 10, 7, 21) # 对图像进行去噪处理

clip_images.append(self.preprocess(Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))).unsqueeze(0))
img = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))

image = np.array(img.convert('RGB'))

if self.transform is not None:
image = self.transform(image=image)['image']
images.append(image)

cap.release() # 释放视频资源
return clip_images, images, os.path.basename(self.image_files[i]) # 返回视频片段帧的预处理结果列表、原始图像列表和文件名
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import glob
import cv2
import numpy as np
import clip
import torch
import torchvision.models as models
from PIL import Image

# 初始化存放测试结果的字典
submit_json_scerario = {
"author" : "abc" ,
"time" : "231011",
"model" : "model_name",
"test_results" : []
}

# 获取所有视频文件的路径
paths = glob.glob('/kaggle/input/clip-test/初赛测试视频/*')
paths.sort()
debug = False

# 如果处于调试模式,只选择一个视频进行处理
if debug:
paths = ['/kaggle/input/clip-test/初赛测试视频/03.avi']

# 创建自定义数据集
datasets = CustomDataset(preprocess, paths)

# 创建数据加载器
dataloaders = DataLoader(datasets, batch_size=1, num_workers=2, pin_memory=True)

# 遍历数据加载器中的每个视频
for clip_images, datas, clip_id in dataloaders:
# 初始化单个视频的测试结果字典
single_frame_result = {
"clip_id": clip_id,
"scerario" : "city street",
"weather": "clear",
"period": "night",
"road_structure": "normal",
"general_obstacle": "nothing",
"abnormal_condition": "nothing",
"ego_car_behavior": "go straight",
"closest_participants_type": "passenger car",
"closest_participants_behavior": "braking"
}

print(clip_id)

# 初始化用于存放场景类别统计的列表
clip_ans = [0 for x in range(len(scerario_clip))]

# 遍历视频的每一帧
for i, data in enumerate(clip_images):
texts = np.array(scerario_clip)
text = clip.tokenize(texts).to(device)

with torch.no_grad():
# 使用模型进行推理
logits_per_image, logits_per_text = model(data.squeeze(0).to(device), text)
probs = logits_per_image.softmax(dim=-1).cpu().numpy()

# 统计场景类别的数量
clip_ans[probs[0].argsort()[::-1][0]] += 1

print("clip:", clip_ans, scerario_clip[clip_ans.index(max(clip_ans))])

# 根据统计结果确定场景类别
if scerario_clip[clip_ans.index(max(clip_ans))] in ['lush green valley', 'car tunnel in the mountains', 'snowy mountain valley', 'quiet suburban street', 'open highway in the countryside']:
single_frame_result['scerario'] = 'suburban'
elif scerario_clip[clip_ans.index(max(clip_ans))] in ['The city highway', 'open street in the city', 'city street at daylight', 'Street on a rainy night', 'Street on a snowy night', 'busy city street', 'city streets at night']:
single_frame_result['scerario'] = 'city street'
elif scerario_clip[clip_ans.index(max(clip_ans))] in ['busy highway with heavy traffic']:
single_frame_result['scerario'] = 'expressway'
elif scerario_clip[clip_ans.index(max(clip_ans))] in ['subway tunnel']:
single_frame_result['scerario'] = 'tunnel'
elif scerario_clip[clip_ans.index(max(clip_ans))] in ['Indoor parking lot', 'urban gas station at night', 'crowded shopping mall parking lot']:
single_frame_result['scerario'] = 'parking-lot'
elif scerario_clip[clip_ans.index(max(clip_ans))] in ['rural gas station in daylight']:
single_frame_result['scerario'] = 'gas or charging stations'
else:
single_frame_result['scerario'] = 'unknown'

# 将单个视频的测试结果添加到总体测试结果中
submit_json_scerario["test_results"].append(single_frame_result)

推理其余杂项

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def data_processing(results):
# 初始化结果字典
ans = {
'ans': []
}

# 遍历结果列表
for i, result in enumerate(results):
# 初始化当前结果的字典
this = {
'frame_count': i,
'name': [names[result[0].boxes.cls.cpu().numpy()[j]] for j in range(len(result[0].boxes.cls.cpu().numpy()))], # 标签名称
'conf': result[0].boxes.conf.cpu().numpy(), # 标签置信度
'box': result[0].boxes.xyxy.cpu().numpy().astype(int)
}

# 将当前结果添加到总体结果字典中
ans['ans'].append(this)

return ans

def is_crossroads(res):
# 遍历结果列表
for re in res:
# 如果某个标签为 'traffic light' 的数量大于等于 2,返回 True
if re['name'].count('traffic light') >= 2:
return True
# 如果没有满足条件的结果,返回 False
return False

def is_manhole(res):
# 遍历结果列表
for re in res:
# 如果某个标签为 'manhole cover',并且置信度大于 0.7,返回 True
if 'manhole cover' in re['name']:
if re['conf'][re['name'].index('manhole cover')] > 0.7:
return True
# 如果没有满足条件的结果,返回 False
return False

def calculate_box_area(box):
# 计算边界框的面积
width = abs(box[2] - box[0])
height = abs(box[3] - box[1])
area = width * height
return area

def is_traffic_cone(res):
# 初始化置信度之和
sum_conf = 0
# 遍历结果列表
for re in res:
# 如果结果中包含 'traffic cone' 标签
if 'traffic cone' in re['name']:
# 找到 'traffic cone' 在标签列表中的索引
idx = [index for index, value in enumerate(re['name']) if value == 'traffic cone']
# 遍历所有 'traffic cone' 的索引
for i in idx:
# 打印当前 'traffic cone' 的面积和置信度
print(calculate_box_area(re['box'][i]), re['conf'][i])
# 如果置信度大于 0.5 且面积小于 20000,返回 True
if re['conf'][i] > 0.5 and calculate_box_area(re['box'][i]) < 20000:
return True
# 如果面积小于 20000,累加置信度
elif calculate_box_area(re['box'][i]) < 20000:
sum_conf += re['conf'][i]
# 如果累加的置信度大于等于 0.5,返回 True;否则返回 False
return False if sum_conf < 0.5 else True

def is_police(res):
# 遍历结果列表
for re in res:
# 如果结果中包含 'police car' 标签
if 'police car' in re['name']:
# 找到 'police car' 在标签列表中的索引
idx = [index for index, value in enumerate(re['name']) if value == 'police car']
# 遍历所有 'police car' 的索引
for i in idx:
# 如果置信度大于 0.5,返回 True
if re['conf'][i] > 0.5:
return True
# 如果未检测到 'police car' 或所有检测到的 'police car' 置信度均不大于 0.5,返回 False
return False

def is_truck(res):
# 遍历结果列表
for re in res:
# 如果结果中包含 'truck' 标签
if 'truck' in re['name']:
# 找到 'truck' 在标签列表中的索引
idx = [index for index, value in enumerate(re['name']) if value == 'truck']
# 遍历所有 'truck' 的索引
for i in idx:
# 如果置信度大于 0.5,返回 True
if re['conf'][i] > 0.5:
return True
# 如果未检测到 'truck' 或所有检测到的 'truck' 置信度均不大于 0.5,返回 False
return False

def is_bus(res):
# 遍历结果列表
for re in res:
# 如果结果中包含 'bus' 标签
if 'bus' in re['name']:
# 找到 'bus' 在标签列表中的索引
idx = [index for index, value in enumerate(re['name']) if value == 'bus']
# 遍历所有 'bus' 的索引
for i in idx:
# 如果置信度大于 0.6,返回 True
if re['conf'][i] > 0.6:
return True
# 如果未检测到 'bus' 或所有检测到的 'bus' 置信度均不大于 0.6,返回 False
return False

def is_other(res):
# 遍历结果列表
for re in res:
# 如果结果中包含 'others' 标签
if 'others' in re['name']:
# 找到 'others' 在标签列表中的索引
idx = [index for index, value in enumerate(re['name']) if value == 'others']
# 遍历所有 'others' 的索引
for i in idx:
# 如果置信度大于 0.3,返回 True
if re['conf'][i] > 0.3:
return True
# 如果未检测到 'others' 或所有检测到的 'others' 置信度均不大于 0.3,返回 False
return False

def is_people(res):
# 遍历结果列表
for re in res:
# 如果结果中包含 'person' 标签
if 'person' in re['name']:
# 找到 'person' 在标签列表中的索引
idx = [index for index, value in enumerate(re['name']) if value == 'person']
# 遍历所有 'person' 的索引
for i in idx:
# 如果置信度大于 0.6,返回 True
if re['conf'][i] > 0.6:
return True
# 如果未检测到 'person' 或所有检测到的 'person' 置信度均不大于 0.6,返回 False
return False

参考

2023全球智能汽车AI挑战赛:智能驾驶汽车虚拟仿真视频数据理解 - 飞书云文档 (feishu.cn)

Python 计算机视觉(八)—— OpenCV 进行图像增强_opencv图像增强-CSDN博客