帮同事看了两道数据处理的题目
dataIdSort
题目名称:dataIdSort 题目内容:现在有一个数据文件 data.txt,现在需要作为数据分析人员的你对该数据文件进行分析,并进行数据识别与分类。该文件里有身份证号(idcard)、手机号(phone)、银行卡号(bankcard)、IP 地址(ip)、MAC 地址(mac) 这五类敏感信息,需要你参考附件中《个人信息数据规范文档.pdf》中说明的要求识别出这五类敏感数据,并对其进行数据分类。最终将进行了数据识别与分类后的内容保存到 csv 文件中(文件编码 utf-8),并将该文件上传至该题的校验平台(在该校验平台里可以下载该题的示例文件 example.csv,可作为该题的格式参考),校验达标即可拿到 flag。题目分值:已答出 0 次,初始分值 500.0,当前分值 500.0,解出分值 500.0 题目难度:中等相关附件:dataIdSort 的附件.zip 下载
脚本如下,看示例还以为要按分类排序,实际和顺序无关。以下脚本处理完的输出正确率离要求还差一点,主要是 ip 的正则还不完善,比如数据中如果出现 2127.0.0.1 应该视为非合法 ip 而直接忽略,但使用的正则会匹配到 127.0.0.1
from datetime import date
import re
idcard_result = ""
phone_result = ""
bankcard_result = ""
ip_result = ""
mac_result = ""
result = "category,value\n"
phone_prefix = [
134, 135, 136, 137, 138, 139, 147, 148, 150, 151, 152, 157, 158, 159, 172, 178,
182, 183, 184, 187, 188, 195, 198, 130, 131, 132, 140, 145, 146, 155, 156, 166,
167, 171, 175, 176, 185, 186, 196, 133, 149, 153, 173, 174, 177, 180, 181, 189,
190, 191, 193, 199
]
phone_regex = re.compile(r"(\+86 |\(\+86\))?(\d{11}|\d{3} \d{4} \d{4}|\d{3}-\d{4}-\d{4})")
idcard_regex = re.compile(r"\d{17}[\dXx]|\d{6} \d{8} \d{3}[\dXx]|\d{6}-\d{8}-\d{3}[\dXx]")
bankcard_prefix = [622848, 622700, 621700, 622262, 622188, 622200, 622568, 622609, 622908, 622518]
bankcard_regex = re.compile(r"\d{16,19}")
ipv4_regex = re.compile(r'(?:25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d)(?:\.(?:25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d)){3}')
mac_regex = re.compile(r'(?:[0-9A-Fa-f]{2}[:]){5}[0-9A-Fa-f]{2}')
def luhn_check(number: str) -> bool:
"""
使用 Luhn 算法校验字符串是否为有效的卡号格式(忽略非数字字符)。
返回 True 表示校验通过,False 表示不通过。
"""
# 只保留数字
digits = [int(ch) for ch in number if ch.isdigit()]
if not digits:
return False
total = 0
# 从右到左,索引0表示最右侧数字
reversed_digits = digits[::-1]
for i, d in enumerate(reversed_digits):
if i % 2 == 1:
# 偶数位(从右边数,第二、第四...)乘2
doubled = d * 2
# 如果大于9,则各位相加(等价于减9)
if doubled > 9:
doubled -= 9
total += doubled
else:
# 奇数位不变
total += d
return total % 10 == 0
_WEIGHTS = [7, 9, 10, 5, 8, 4, 2, 1, 6, 3, 7, 9, 10, 5, 8, 4, 2]
_CHECK_MAP = ['1', '0', 'X', '9', '8', '7', '6', '5', '4', '3', '2']
def is_valid_idcard(id_str: str) -> bool:
"""
仅返回 bool:True 表示为合法的18位中国居民身份证号(包括出生日期和校验码校验)。
"""
s = id_str.strip().upper()
# 出生日期校验(YYYYMMDD)
try:
y = int(s[6:10])
m = int(s[10:12])
d = int(s[12:14])
bd = date(y, m, d)
if 2025-y>100:
return False
except Exception:
return False
if bd > date.today():
return False
# 校验码计算
try:
digits17 = [int(ch) for ch in s[:17]]
except ValueError:
return False
total = sum(w * n for w, n in zip(_WEIGHTS, digits17))
expected = _CHECK_MAP[total % 11]
return s[17] == expected
def extract_phone(text: str):
global phone_result
for m in phone_regex.findall(text):
pn = m[1] if len(m) == 2 else m[0]
if int(pn[:3]) in phone_prefix:
phone_result += f"phone,{''.join(m)}\n"
def extract_idcard(text: str):
global idcard_result
for m in idcard_regex.findall(text):
if is_valid_idcard(m.replace(" ", "").replace("-", "")):
idcard_result += f"idcard,{''.join(m)}\n"
def extract_bankcard(text: str):
global bankcard_result
for m in bankcard_regex.findall(text):
if luhn_check(m) and int(m[:6]) in bankcard_prefix:
bankcard_result += f"bankcard,{''.join(m)}\n"
def extract_ip(text: str):
global ip_result
for m in ipv4_regex.findall(text):
ip_result += f"ip,{''.join(m)}\n"
def extract_mac(text: str):
global mac_result
for m in mac_regex.findall(text):
mac_result += f"mac,{''.join(m)}\n"
with open("data.txt") as data:
for line in data.readlines():
extract_phone(line)
extract_idcard(line)
extract_bankcard(line)
extract_ip(line)
extract_mac(line)
result += idcard_result+phone_result+bankcard_result+ip_result+mac_result
with open("result.csv", "w") as result_file:
result_file.write(result)为了方便,我是跑完脚本后手动删除了几个 ip 达到了正确率要求😅
dataPic
题目名称:dataPic 题目内容:某公司内部进行了一次员工个人信息收集活动,在内网搭建了一个简易的调查问卷系统,需要员工登录后上传个人身份证照片进行实名认证和填写个人信息调查问卷。不过由于部分员工填写不规范,现需要对收集到的数据进行清洗和验证。现在拿到了本次信息收集的流量文件(dataPic.pcapng),请作为数据分析专家的你协助提取并验证员工信息,输出符合要求的有效数据。完整的员工信息和数据清洗规则参考附件中的“个人信息数据规范文档.pdf”。将验证通过的完整的员工信息保存到 csv 文件中(文件编码为 utf-8),并将该文件上传至该题的校验平台(在该校验平台里可以下载该题的示例文件 example.csv,可作为该题的格式参考),校验达标即可拿到 flag。(特别声明: 本题所有数据均为随机生成)题目分值:已答出 0 次,初始分值 500.0,当前分值 500.0,解出分值 500.0 题目难度:困难
正确率只有六成不到,不知道是不是身份证图像识别的问题(这部分代码是直接让大模型写的)
from datetime import date
import os
from urllib.parse import unquote
import re
import struct
from collections import defaultdict, namedtuple
import pyshark
import cv2
import numpy as np
import easyocr
from PIL import Image
# 辅助:图像预处理(灰度、去噪、二值化、透视校正可按需扩展)
def preprocess_image(path, max_dim=1600):
img = cv2.imdecode(
np.fromfile(path, dtype=np.uint8), cv2.IMREAD_COLOR
) # 支持中文路径
h, w = img.shape[:2]
scale = min(max_dim / max(h, w), 1.0)
if scale < 1.0:
img = cv2.resize(
img, (int(w * scale), int(h * scale)), interpolation=cv2.INTER_AREA
)
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# 自适应直方图增强
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8))
gray = clahe.apply(gray)
# 二值化(可选)
_, th = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
return img, th
# 用 EasyOCR 识别中文文本
def ocr_easyocr(img_arr, lang_list=["ch_sim", "en"]):
reader = easyocr.Reader(lang_list, gpu=False) # 将 gpu=True 若有 GPU 和配置
# EasyOCR 接受路径或 numpy 数组(RGB)
if len(img_arr.shape) == 3:
rgb = cv2.cvtColor(img_arr, cv2.COLOR_BGR2RGB)
else:
rgb = cv2.cvtColor(img_arr, cv2.COLOR_GRAY2RGB)
result = reader.readtext(rgb, detail=0) # 不需要坐标
# 合并为段落文本(按行)
text = "\n".join(result)
return text, result
# 简单基于正则和关键词的字段抽取
def parse_id_fields(text):
# 先去掉空白和常见干扰
t = text.replace(" ", "").replace(":", ":").replace("\n", "|")
# 身份证号:18位或17位+X
id_match = re.search(
r"([1-9]\d{5}(19|20)\d{2}(0[1-9]|1[0-2])(0[1-9]|[12]\d|3[01])\d{3}[\dXx])", t
)
idno = id_match.group(1) if id_match else None
# 出生日期(备用)
dob_match = re.search(
r"((19|20)\d{2}[-/年]?(0[1-9]|1[0-2])[-/月]?(0[1-9]|[12]\d|3[01])日?)", t
)
dob = dob_match.group(1) if dob_match else None
# 性别:查“男”“女”或“性别:男”
sex = None
sex_match = re.search(r"(性别[::]?)(男|女|M|F)", t)
if sex_match:
sex = sex_match.group(2)
else:
if re.search(r"\b男\b", t):
sex = "男"
elif re.search(r"\b女\b", t):
sex = "女"
# 姓名:通常出现在“姓名”关键词之后;否则尝试行首短文本
name = None
name_match = re.search(r"(姓名[::]?)([^\|]{2,6})", text) # 用原始 text 保留换行
if name_match:
name = name_match.group(2).strip()
else:
# 备用:第一行含中文且长度2-4的可能为姓名
lines = [ln.strip() for ln in text.splitlines() if ln.strip()]
for ln in lines[:3]:
if re.match(r"^[\u4e00-\u9fa5]{2,4}$", ln):
name = ln
break
return {"name": name, "sex": sex, "id_number": idno, "birthday": dob}
# 主流程
def extract_id_info(image_path):
orig, bin_img = preprocess_image(image_path)
text, tokens = ocr_easyocr(orig)
fields = parse_id_fields(text)
# 返回识别文本和结构化字段
return {"raw_text": text, "tokens": tokens, "fields": fields}
result = "username,password,name,sex,birth,idcard,phone\n"
PNG_SIG = b"\x89PNG\r\n\x1a\n"
PCAP_PATH = "dataPic.pcapng"
phone_prefix = [
134, 135, 136, 137, 138, 139, 147, 148, 150, 151, 152, 157, 158, 159, 172, 178, 182, 183, 184, 187, 188, 195, 198, 130, 131, 132,
140, 145, 146, 155, 156, 166, 167, 171, 175, 176, 185, 186, 196, 133, 149, 153, 173, 174, 177, 180, 181, 189, 190, 191, 193, 199,
]
# 将 pkt.tcp.payload(hex with :) 转为 bytes
def pkt_payload_bytes(pkt):
try:
p = pkt.tcp.payload # hex like "89:50:4e:..."
if p:
return bytes.fromhex(p.replace(":", ""))
except Exception:
pass
# 某些版本可能有 payload_binary
try:
pb = getattr(pkt.tcp, "payload_binary", None)
if pb:
return pb
except Exception:
pass
return b""
# 解析 big-endian uint32
def be32(b):
return struct.unpack(">I", b)[0]
# 从重组的 bytes 中按 PNG chunk 规则提取所有完整 PNG bytes 列表
def extract_pngs_from_bytes(data):
res = []
idx = 0
n = len(data)
while True:
p = data.find(PNG_SIG, idx)
if p == -1:
break
pos = p + len(PNG_SIG)
try:
while True:
if pos + 8 > n:
raise ValueError("truncated chunk header")
length = be32(data[pos : pos + 4])
ctype = data[pos + 4 : pos + 8]
pos += 8
if pos + length + 4 > n:
raise ValueError("truncated chunk data")
pos += length + 4 # skip data + CRC
if ctype == b"IEND":
png_bytes = data[p:pos]
res.append(png_bytes)
idx = pos
break
except Exception:
idx = p + 1
continue
return res
# 精确按 tcp.seq 重组:收集每个包的 seq 与 payload,按 seq 排序并拼接去重
Segment = namedtuple("Segment", ["seq", "data"])
def reassemble_stream_by_seq(stream_id):
# 读取该 stream 的所有包(两个方向都可能,因此包含 src/dst)
with pyshark.FileCapture(
PCAP_PATH, display_filter=f"tcp.stream == {stream_id}", keep_packets=False
) as cap:
segs_by_dir = defaultdict(list) # key: (src_ip, src_port) -> list of Segment
for pkt in cap:
try:
# 确保有 tcp.seq 与 payload
if not hasattr(pkt.tcp, "seq"):
continue
seq = int(pkt.tcp.seq)
payload = pkt_payload_bytes(pkt)
if not payload:
continue
# direction key: use tuple of src/dst (so we separate client->server vs server->client)
src = (pkt.ip.src, pkt.tcp.srcport)
# append
segs_by_dir[src].append(Segment(seq, payload))
except Exception:
continue
# 对每个方向重组:按 seq 排序并拼接,跳过重叠已存在部分
reassembled = {}
for key, segs in segs_by_dir.items():
segs.sort(key=lambda s: s.seq)
out = bytearray()
cur_end = None # next expected seq relative position: we use absolute seq; handle overlap by mapping to out length
# We don't know initial relative offset; we use first segment as base seq -> map seq - base_seq to offset
base_seq = segs[0].seq
for s in segs:
offset = s.seq - base_seq
if offset < 0:
# overlaps before base: skip prefix
skip = -offset
if skip >= len(s.data):
continue
data = s.data[skip:]
offset = 0
else:
data = s.data
# ensure out large enough
if offset > len(out):
# gap: fill with zeros? better to pad with zeros to keep positions; but for HTTP we expect contiguous mostly
out.extend(b"\x00" * (offset - len(out)))
# write data with overlap handling
write_pos = offset
for i in range(len(data)):
if write_pos + i < len(out):
# already have byte, keep existing (from earlier segment)
continue
out.append(data[i])
# continue
reassembled[key] = bytes(out)
return reassembled # dict: direction_key -> bytes
extracted_files = {}
def extract_png_from_stream(stream_id, out_dir="idcards"):
os.makedirs(out_dir, exist_ok=True)
reassembled = reassemble_stream_by_seq(stream_id)
saved = []
for dir_key, data in reassembled.items():
# For HTTP upload, the client->server direction likely contains the request body.
# Locate HTTP header end then body
hdr_end = data.find(b"\r\n\r\n")
if hdr_end != -1:
body = data[hdr_end + 4 :]
else:
body = data # fallback: search whole data
# extract PNGs from body
pngs = extract_pngs_from_bytes(body)
for i, pb in enumerate(pngs):
fname = f"stream{stream_id}_{dir_key[0]}_{dir_key[1]}_{i}.png"
path = os.path.join(out_dir, fname)
with open(path, "wb") as f:
f.write(pb)
saved.append(path)
print("Saved", path, "size", len(pb))
extracted_files[stream_id] = path
if not saved:
print("No PNG found in stream", stream_id)
def get_data_from_pkt(pkt) -> bytes:
return bytes.fromhex(pkt.tcp.payload.replace(":", ""))
streams_to_solve = []
_WEIGHTS = [7, 9, 10, 5, 8, 4, 2, 1, 6, 3, 7, 9, 10, 5, 8, 4, 2]
_CHECK_MAP = ["1", "0", "X", "9", "8", "7", "6", "5", "4", "3", "2"]
def is_valid_idcard(id_str: str) -> bool:
"""
仅返回 bool:True 表示为合法的18位中国居民身份证号(包括出生日期和校验码校验)。
"""
s = id_str.strip().upper()
# 出生日期校验(YYYYMMDD)
try:
y = int(s[6:10])
m = int(s[10:12])
d = int(s[12:14])
bd = date(y, m, d)
except Exception:
return False
if bd > date.today():
return False
# 校验码计算
try:
digits17 = [int(ch) for ch in s[:17]]
except ValueError:
return False
total = sum(w * n for w, n in zip(_WEIGHTS, digits17))
expected = _CHECK_MAP[total % 11]
return s[17] == expected
def handle_http_stream(stream_id):
global result
pkts = []
with pyshark.FileCapture(
PCAP_PATH,
display_filter=f"tcp.stream == {stream_id}",
keep_packets=False,
) as cap:
for pkt in cap:
if hasattr(pkt, "http"):
pkts.append(pkt)
login_req = pkts[0]
login_data = get_data_from_pkt(login_req)
m = re.match(r"username=(.*)&password=(.*)", login_data.decode())
username, password = m.group(1), m.group(2)
# global extracted_files
# extracted = extracted_files[stream_id]
idcard_info = extract_id_info(f"idcards/stream{stream_id}.png")
idcard_text = idcard_info["raw_text"]
idcard = idcard_info["fields"]["id_number"]
assert len(idcard) == 18 and is_valid_idcard(idcard), f"身份证号码异常:{idcard} [{stream_id=}]"
survey_req = pkts[6]
personal_data = get_data_from_pkt(survey_req)
m = re.match(
r"name=(.*)&phone=(.*)&gender=(.*)&birth_date=(.*)", personal_data.decode()
)
name, phone, gender, birth_date = (
unquote(m.group(1)),
m.group(2),
unquote(m.group(3)),
m.group(4).replace("%2F", ""),
)
assert name in idcard_text, f"姓名和身份证不一致:{name} [{stream_id=}]"
assert len(phone) == 11 and int(phone[:3]) in phone_prefix, (
f"手机号码异常:{phone} [{stream_id=}]"
)
assert gender in ("男", "女"), f"性别异常:{gender} [{stream_id=}]"
if sex_field := idcard_info["fields"]["sex"]:
# 如果性别提取失败,视为匹配
assert gender == sex_field, f"性别和身份证不一致:{gender} [{stream_id=}]"
assert birth_date in idcard, f"出生日期和身份证不一致:{birth_date} [{stream_id=}]"
item = f"{username},{password},{name},{gender},{birth_date},{idcard},{phone}"
print(f"✅ {item}")
result += f"{item}\n"
def get_valid_streams():
global streams_to_solve
with pyshark.FileCapture(
PCAP_PATH,
display_filter='http.response.code == 302 && http.location == "index.php"',
) as cap:
for pkt in cap:
streams_to_solve.append(int(pkt.tcp.stream))
if __name__ == "__main__":
get_valid_streams()
for stream in streams_to_solve:
try:
# 提取一次即可
# extract_png_from_stream(stream)
handle_http_stream(stream)
except Exception as e:
print(f"❌ {e}")
continue
with open("result.csv", "w") as result_file:
result_file.write(result)