这篇文章上次修改于 930 天前,可能其部分内容已经发生变化,如有疑问可询问作者。
根据bcc的样例,写了一个自己逻辑看着清楚一点的demo脚本
目标是libc.so
的open
函数,效果如下
from bcc import BPF
BPF_CODE = """
#include <linux/ptrace.h>
struct probe_open_data_t {
u64 timestamp_ns;
u32 pid;
u32 tid;
u32 uid;
char pathname[256];
int flags;
};
BPF_PERCPU_ARRAY(open_data, struct probe_open_data_t, 1);
BPF_PERF_OUTPUT(perf_open);
// int open(const char* pathname, int flags, ...)
int probe_hook_open_enter(struct pt_regs *ctx) {
// 先获取pid tid uid信息 用于过滤或者记录
u64 pid_tgid = bpf_get_current_pid_tgid();
u32 pid = pid_tgid >> 32;
u32 tid = (u32)pid_tgid;
u32 uid = bpf_get_current_uid_gid();
// 记录下时间
u64 ts = bpf_ktime_get_ns();
// 过滤模板 在BPF程序正式编译前进行替换
PID_FILTER
TID_FILTER
UID_FILTER
// bpf_trace_printk("[probe_hook_open_enter] pid:%d tid:%d uid:%d\\n", pid, tid, uid);
// 这个时候其实是能直接读到数据的?
char *pathname = (char *)PT_REGS_PARM1(ctx);
int flags = PT_REGS_PARM2(ctx);
// bpf_trace_printk("[probe_hook_open_enter] pathname:%s flags:%d\\n", pathname, flags);
u32 zero = 0;
struct probe_open_data_t *data = open_data.lookup(&zero);
if (!data)
return 0;
// 给结构体赋值数据
data->timestamp_ns = ts;
data->pid = pid;
data->tid = tid;
data->uid = uid;
data->flags = flags;
int ret = bpf_probe_read_user(data->pathname, sizeof(data->pathname), pathname);
// bpf_trace_printk("[probe_hook_open_enter] ret:%d pathname:%s\\n", ret, &data->pathname);
// 等于是像缓冲区提交数据 触发事件 调用python脚本中的回调方法
perf_open.perf_submit(ctx, data, sizeof(struct probe_open_data_t));
return 0;
}
"""
class BPFHooker:
def __init__(self, library: str, uid: int, pid: int = -1, tid: int = -1) -> None:
# 可以是库名 比如 libssl.so 取 ssl
# 可以是是目标ELF程序的完整路径 比如 /apex/com.android.conscrypt/lib64/libssl.so
# 如果要写路径 那么内存里面是什么就是什么 不要使用软链接的路径
self.uid = uid
self.pid = pid
self.tid = tid
self.library = library
self.bpf_module = None # type: BPF
def hook(self):
text = BPF_CODE
if self.pid > 0:
text = text.replace('PID_FILTER', f'if (pid != {self.pid}) {{ return 0; }}')
else:
text = text.replace('PID_FILTER', '')
if self.tid > 0:
text = text.replace('TID_FILTER', f'if (tid != {self.tid}) {{ return 0; }}')
else:
text = text.replace('TID_FILTER', '')
if self.uid > 0:
text = text.replace('UID_FILTER', f'if (uid != {self.uid}) {{ return 0; }}')
else:
text = text.replace('UID_FILTER', '')
self.bpf_module = BPF(text=text)
self.bpf_module.attach_uprobe(
name=self.library,
sym='open',
fn_name='probe_hook_open_enter',
pid=self.pid,
)
print('attach end')
def print_event(self, ctx, data, size):
# print('[print_event]', ctx, data, size)
# 这里的data其实是个地址
# 要用下面的方法转换到python可读取的对象
event = self.bpf_module['perf_open'].event(data)
# print(f'[print_event] event => {event}')
# print(
# f'[open] {event.uid} {event.pid} {event.tid} '
# f'pathname: {event.pathname.decode("utf-8")} flags: {hex(event.flags)}'
# )
print(f'[open] {event.pathname.decode("utf-8")}')
def show(self):
self.bpf_module["perf_open"].open_perf_buffer(self.print_event)
while 1:
try:
self.bpf_module.perf_buffer_poll()
except KeyboardInterrupt:
exit()
def main():
uid = 10222
library = "/apex/com.android.runtime/lib64/bionic/libc.so"
bpf_hooker = BPFHooker(library, uid)
bpf_hooker.hook()
bpf_hooker.show()
if __name__ == '__main__':
main()
没有评论