Loading...
--- xnu/xnu-12377.101.15/tools/lldbmacros/ipc.py
+++ /dev/null
@@ -1,2897 +0,0 @@
-""" Please make sure you read the README file COMPLETELY BEFORE reading anything below.
- It is very critical that you read coding guidelines in Section E in README file.
-"""
-from __future__ import absolute_import, division, print_function
-
-from builtins import hex
-from builtins import range
-
-import sys
-from xnu import *
-from utils import *
-from process import *
-from bank import *
-from waitq import *
-from ioreg import *
-from memory import *
-from core.kernelcore import IterateCircleQueue
-import xnudefines
-import kmemory
-
-# ruff: noqa: F405 F403
-
-# from osfmk/ipc/ipc_entry.h
-IE_BITS_ROLL_MASK = 0x03000000
-
-# from osfmk/mach/port.h
-MACH_PORT_RIGHT_SEND = 0x00010000
-MACH_PORT_RIGHT_RECEIVE = 0x00020000
-MACH_PORT_RIGHT_SEND_ONCE = 0x00040000
-MACH_PORT_RIGHT_PORT_SET = 0x00080000
-MACH_PORT_RIGHT_DEAD_NAME = 0x00100000
-MACH_PORT_RIGHT_LABELH = 0x00200000
-
-# from osfmk/mach/mach_types.h
-TASK_FLAVOR_CONTROL = 0
-TASK_FLAVOR_READ = 1
-TASK_FLAVOR_INSPECT = 2
-TASK_FLAVOR_NAME = 3
-
-
-def IsKObjectType(ty):
- return ty >= GetEnumValue("ipc_object_type_t", "__IKOT_FIRST")
-
-
-def IPCPortTypeToString(ty):
- s = GetEnumName("ipc_object_type_t", ty, "IKOT_")
- if s.startswith("IOT_"):
- s = s[4:].replace("_PORT", "")
- return s.lower()
-
-
-def IsPortType(ty, name):
- return ty == GetEnumValue("ipc_object_type_t", name)
-
-
-def IsPortSetType(ty):
- return IsPortType(ty, "IOT_PORT_SET")
-
-
-def GetPortLabel(port, ty):
- addr = kern.StripKernelPAC(port.xGetScalarByPath(".ip_object.iol_pointer"))
- return port.xCreateValueFromAddress(None, addr, gettype(ty))
-
-
-@lldb_type_summary(["struct ipc_entry_table *", "ipc_entry_table_t"])
-def PrintIpcEntryTable(array):
- t, s = kalloc_array_decode(array, "struct ipc_entry")
- return "ptr = {:#x}, size = {:d}, elem_type = struct ipc_entry".format(
- unsigned(t), s
- )
-
-
-@lldb_type_summary(["struct ipc_port_requests_table *", "ipc_port_requests_table_t"])
-def PrintIpcPortRequestTable(array):
- t, s = kalloc_array_decode(array, "struct ipc_port_requests")
- return "ptr = {:#x}, size = {:d}, elem_type = struct ipc_port_requests".format(
- unsigned(t), s
- )
-
-
-def GetSpaceTable(space):
- """Return the tuple of (entries, size) of the table for a space"""
- table = space.is_table.__smr_ptr
- if table:
- return kalloc_array_decode(table, "struct ipc_entry")
- return (None, 0)
-
-
-def GetSpaceEntriesWithBits(is_tableval, num_entries, mask):
- base = is_tableval.GetSBValue().Dereference()
- return (
- (index, iep)
- for index, iep in enumerate(base.xIterSiblings(1, num_entries), 1)
- if iep.xGetIntegerByName("ie_bits") & mask
- )
-
-
-def GetSpaceObjectsWithBits(is_tableval, num_entries, mask, ty):
- base = is_tableval.GetSBValue().Dereference()
- return (
- iep.xCreateValueFromAddress(
- None,
- iep.xGetIntegerByName("ie_object"),
- ty,
- )
- for iep in base.xIterSiblings(1, num_entries)
- if iep.xGetIntegerByName("ie_bits") & mask
- )
-
-
-def GetGenFromIEBits(ie_bits):
- return (ie_bits >> 24) | 3
-
-
-def GetNameFromIndexAndIEBits(index, ie_bits):
- return (index << 8) | GetGenFromIEBits(ie_bits)
-
-
-@header(
- "{0: <20s} {1: <6s} {2: <6s} {3: <10s} {4: <32s}".format(
- "task", "pid", "#acts", "tablesize", "command"
- )
-)
-def GetTaskIPCSummary(task, show_busy=False):
- """Display a task's ipc summary.
- params:
- task : core.value represeting a Task in kernel
- returns
- str - string of ipc info for the task
- """
- out_string = ""
- format_string = "{0: <#20x} {1: <6d} {2: <6d} {3: <10d} {4: <32s}"
- busy_format = " {0: <10d} {1: <6d}"
- proc_name = ""
- if not task.active:
- proc_name = "terminated: "
- if task.halting:
- proc_name += "halting: "
- proc_name += GetProcNameForTask(task)
- _, table_size = GetSpaceTable(task.itk_space)
- out_string += format_string.format(
- task, GetProcPIDForTask(task), task.thread_count, table_size, proc_name
- )
- if show_busy:
- nbusy, nmsgs = GetTaskBusyPortsSummary(task)
- out_string += busy_format.format(nbusy, nmsgs)
- return (out_string, table_size, nbusy, nmsgs)
- return (out_string, table_size)
-
-
-@header(
- "{0: <20s} {1: <6s} {2: <6s} {3: <10s} {4: <32s} {5: <10s} {6: <6s}".format(
- "task", "pid", "#acts", "tablesize", "command", "#busyports", "#kmsgs"
- )
-)
-def GetTaskBusyIPCSummary(task):
- return GetTaskIPCSummary(task, True)
-
-
-def GetTaskBusyPortsSummary(task):
- is_tableval, num_entries = GetSpaceTable(task.itk_space)
- gettype("struct ipc_port")
- nbusy = 0
- nmsgs = 0
-
- if is_tableval:
- ports = GetSpaceObjectsWithBits(
- is_tableval, num_entries, MACH_PORT_RIGHT_RECEIVE, gettype("struct ipc_port")
- )
-
- for port in ports:
- if not port or port == xnudefines.MACH_PORT_DEAD:
- continue
- count = port.xGetIntegerByPath(".ip_messages.imq_msgcount")
- if count:
- nbusy += 1
- nmsgs += count
-
- return (nbusy, nmsgs)
-
-
-@header(
- "{:<20s} {:<20s} {:<10s} {:>6s} {:<20s} {:>8s} {:<20s} {:s}".format(
- "port",
- "waitqueue",
- "recvname",
- "refs",
- "receiver",
- "nmsgs",
- "kind",
- "dest/kobject",
- )
-)
-def PrintPortSummary(port, show_kmsg_summary=True, show_sets=False, prefix="", O=None):
- """Display a port's summary
- params:
- port : core.value representing a port in the kernel
- returns
- str : string of ipc info for the given port
- """
-
- format_string = "{:<#20x} {:<#20x} {:#010x} {:>6d} {:<#20x} {:>8d} {:<20s} {:<s}"
- receiver_name = port.ip_messages.imq_receiver_name
- space = 0
- kind = IPCPortTypeToString(port.ip_object.io_type)
-
- if unsigned(port.ip_object.io_state):
- if receiver_name:
- space = unsigned(port.ip_receiver)
-
- _, dest_str, service_str = GetPortDestProc(port)
- else:
- dest_str = "inactive-port"
- service_str = ""
-
- print(
- prefix
- + format_string.format(
- unsigned(port),
- addressof(port.ip_waitq),
- unsigned(receiver_name),
- port.ip_object.io_references,
- space,
- port.ip_messages.imq_msgcount,
- kind,
- dest_str + " " + service_str,
- )
- )
-
- if show_kmsg_summary:
- with O.table(prefix + GetKMsgSummary.header):
- for kmsgp in IterateCircleQueue(
- port.ip_messages.imq_messages, "ipc_kmsg", "ikm_link"
- ):
- print(prefix + GetKMsgSummary(kmsgp, prefix))
-
- wq = Waitq(addressof(port.ip_waitq))
- if show_sets and wq.hasSets():
-
- def doit(wq):
- for wqs in Waitq(addressof(port.ip_waitq)).iterateSets():
- PrintPortSetSummary(
- wqs.asPset(), space=port.ip_receiver, verbose=False, O=O
- )
-
- if O is None:
- print(PrintPortSetSummary.header)
- doit(wq)
- else:
- with O.table(PrintPortSetSummary.header, indent=True):
- doit(wq)
- print("")
-
-
-def GetPortDispositionString(disp):
- if disp < 0: ## use negative numbers for request ports
- if disp == -1:
- disp_str = "reqNS"
- elif disp == -2:
- disp_str = "reqPD"
- elif disp == -3:
- disp_str = "reqSPa"
- elif disp == -4:
- disp_str = "reqSPr"
- elif disp == -5:
- disp_str = "reqSPra"
- else:
- disp_str = "-X"
- ## These dispositions should match those found in osfmk/mach/message.h
- elif disp == 16:
- disp_str = "R" ## receive
- elif disp == 24:
- disp_str = "dR" ## dispose receive
- elif disp == 17:
- disp_str = "S" ## (move) send
- elif disp == 19:
- disp_str = "cS" ## copy send
- elif disp == 20:
- disp_str = "mS" ## make send
- elif disp == 25:
- disp_str = "dS" ## dispose send
- elif disp == 18:
- disp_str = "O" ## send-once
- elif disp == 21:
- disp_str = "mO" ## make send-once
- elif disp == 26:
- disp_str = "dO" ## dispose send-once
- ## faux dispositions used to string-ify IPC entry types
- elif disp == 100:
- disp_str = "PS" ## port set
- elif disp == 101:
- disp_str = "dead" ## dead name
- elif disp == 102:
- disp_str = "L" ## LABELH
- elif disp == 103:
- disp_str = "V" ## Thread voucher (thread->ith_voucher->iv_port)
- ## Catch-all
- else:
- disp_str = "X" ## invalid
- return disp_str
-
-
-def GetPortPDRequest(port):
- """Returns the port-destroyed notification port if any"""
- if port.ip_has_watchport:
- return port.ip_twe.twe_pdrequest
- if not IsPortType(port.ip_object.io_type, "IOT_SPECIAL_REPLY_PORT"):
- return port.ip_pdrequest
- return 0
-
-
-def GetKmsgHeader(kmsgp):
- """Helper to get mach message header of a kmsg.
- Assumes the kmsg has not been put to user.
- params:
- kmsgp : core.value representing the given ipc_kmsg_t struct
- returns:
- Mach message header for kmsgp
- """
- if kmsgp.ikm_type == GetEnumValue("ipc_kmsg_type_t", "IKM_TYPE_ALL_INLINED"):
- return kern.GetValueFromAddress(
- int(addressof(kmsgp.ikm_big_data)), "mach_msg_header_t *"
- )
- if kmsgp.ikm_type == GetEnumValue("ipc_kmsg_type_t", "IKM_TYPE_UDATA_OOL"):
- return kern.GetValueFromAddress(
- int(addressof(kmsgp.ikm_small_data)), "mach_msg_header_t *"
- )
- return kern.GetValueFromAddress(unsigned(kmsgp.ikm_kdata), "mach_msg_header_t *")
-
-
-@header(
- "{:<20s} {:<20s} {:<20s} {:<10s} {:>6s} {:<20s} {:<8s} {:<26s} {:<12s} {:<20s}".format(
- "",
- "kmsg",
- "header",
- "msgid",
- "size",
- "reply-port",
- "disp",
- "source",
- "destname",
- "destination",
- )
-)
-def GetKMsgSummary(kmsgp, prefix_str=""):
- """Display a summary for type ipc_kmsg_t
- params:
- kmsgp : core.value representing the given ipc_kmsg_t struct
- returns:
- str : string of summary info for the given ipc_kmsg_t instance
- """
- kmsghp = GetKmsgHeader(kmsgp)
- kmsgh = dereference(kmsghp)
- out_string = ""
- out_string += "{:<20s} {:<#20x} {:<#20x} {kmsgh.msgh_id:#010x} {kmsgh.msgh_size:>6d} {kmsgh.msgh_local_port:<#20x} ".format(
- "", unsigned(kmsgp), unsigned(kmsghp), kmsgh=kmsghp
- )
- prefix_str = "{:<20s} ".format(" ") + prefix_str
- disposition = ""
- bits = kmsgh.msgh_bits & 0xFF
-
- # remote port
- if bits == 17:
- disposition = "rS"
- elif bits == 18:
- disposition = "rO"
- else:
- disposition = "rX" # invalid
-
- out_string += "{:<2s}".format(disposition)
-
- # local port
- disposition = ""
- bits = (kmsgh.msgh_bits & 0xFF00) >> 8
-
- if bits == 17:
- disposition = "lS"
- elif bits == 18:
- disposition = "lO"
- elif bits == 0:
- disposition = "l-"
- else:
- disposition = "lX" # invalid
-
- out_string += "{:<2s}".format(disposition)
-
- # voucher
- disposition = ""
- bits = (kmsgh.msgh_bits & 0xFF0000) >> 16
-
- if bits == 17:
- disposition = "vS"
- elif bits == 0:
- disposition = "v-"
- else:
- disposition = "vX"
-
- out_string += "{:<2s}".format(disposition)
-
- # complex message
- if kmsgh.msgh_bits & 0x80000000:
- out_string += "{0: <1s}".format("c")
- else:
- out_string += "{0: <1s}".format("s")
-
- # importance boost
- if kmsgh.msgh_bits & 0x20000000:
- out_string += "{0: <1s}".format("I")
- else:
- out_string += "{0: <1s}".format("-")
-
- dest_port = GetKmsgHeader(kmsgp).msgh_remote_port
- if dest_port:
- name_str, dest_str, _ = GetPortDestProc(dest_port)
- else:
- name_str = dest_str = ""
-
- out_string += " {:<26s} {:<12s} {:<20s}".format(
- GetKMsgSrc(kmsgp), name_str, dest_str
- )
-
- if kmsgh.msgh_bits & 0x80000000: # MACH_MSGH_BITS_COMPLEX
- out_string += "\n" + prefix_str + "\t" + GetKMsgComplexBodyDesc.header
- out_string += (
- "\n" + prefix_str + "\t" + GetKMsgComplexBodyDesc(kmsgp, prefix_str + "\t")
- )
-
- return out_string + "\n"
-
-
-@header("{: <20s} {: <20s} {: <10s}".format("descriptor", "address", "size"))
-def GetMachMsgOOLDescriptorSummary(desc):
- """Returns description for mach_msg_ool_descriptor_t * object"""
- format_string = "{: <#20x} {: <#20x} {:#010x}"
- out_string = format_string.format(desc, desc.address, desc.size)
- return out_string
-
-
-def GetKmsgDescriptors(kmsgp):
- """Get a list of descriptors in a complex message"""
- kmsghp = GetKmsgHeader(kmsgp)
- kmsgh = dereference(kmsghp)
- if not (kmsgh.msgh_bits & 0x80000000): # pragma pylint: disable=superfluous-parens
- return []
- ## Something in the python/lldb types is not getting alignment correct here.
- ## I'm grabbing a pointer to the body manually, and using tribal knowledge
- ## of the location of the descriptor count to get this correct
- body = Cast(
- addressof(Cast(addressof(kmsgh), "char *")[sizeof(kmsgh)]), "mach_msg_body_t *"
- )
- # dsc_count = body.msgh_descriptor_count
- dsc_count = dereference(Cast(body, "uint32_t *"))
- # dschead = Cast(addressof(body[1]), 'mach_msg_descriptor_t *')
- dschead = Cast(
- addressof(Cast(addressof(body[0]), "char *")[sizeof("uint32_t")]),
- "mach_msg_descriptor_t *",
- )
- dsc_list = []
- for i in range(dsc_count):
- dsc_list.append(dschead[i])
- return (body, dschead, dsc_list)
-
-
-def GetKmsgTotalDescSize(kmsgp):
- """Helper to get total descriptor size of a kmsg.
- Assumes the kmsg has full kernel representation (header and descriptors)
- params:
- kmsgp : core.value representing the given ipc_kmsg_t struct
- returns:
- Total descriptor size
- """
- kmsghp = GetKmsgHeader(kmsgp)
- kmsgh = dereference(kmsghp)
- dsc_count = 0
-
- if kmsgh.msgh_bits & 0x80000000: # MACH_MSGH_BITS_COMPLEX
- (body, _, _) = GetKmsgDescriptors(kmsgp)
- dsc_count = dereference(Cast(body, "uint32_t *"))
-
- return dsc_count * sizeof("mach_msg_descriptor_t")
-
-
-@header(
- "{: <20s} {: <20s} {: >6s} {: <20s}".format(
- "kmsgheader", "body", "descs", "dsc_head"
- )
-)
-def GetKMsgComplexBodyDesc(kmsgp, prefix_str=""):
- """Routine that prints a complex kmsg's body"""
- kmsghp = GetKmsgHeader(kmsgp)
- kmsgh = dereference(kmsghp)
- if not (kmsgh.msgh_bits & 0x80000000): # pragma pylint: disable=superfluous-parens
- return ""
- format_string = "{: <#20x} {: <#20x} {:6d} {: <#20x}"
- out_string = ""
-
- (body, dschead, dsc_list) = GetKmsgDescriptors(kmsgp)
- out_string += format_string.format(kmsghp, body, len(dsc_list), dschead)
- for dsc in dsc_list:
- try:
- dsc_type = unsigned(dsc.type.type)
- out_string += (
- "\n"
- + prefix_str
- + "Descriptor: "
- + xnudefines.mach_msg_type_descriptor_strings[dsc_type]
- )
- if dsc_type == 0:
- # its a port.
- p = dsc.port.name
- dstr = GetPortDispositionString(dsc.port.disposition)
- out_string += " disp:{:s}, name:{: <#20x}".format(dstr, p)
- elif unsigned(dsc.type.type) in (1, 3):
- # its OOL DESCRIPTOR or OOL VOLATILE DESCRIPTOR
- ool = dsc.out_of_line
- out_string += " " + GetMachMsgOOLDescriptorSummary(addressof(ool))
- except Exception:
- out_string += "\n" + prefix_str + "Invalid Descriptor: {}".format(dsc)
- return out_string
-
-
-def GetKmsgTrailer(kmsgp):
- """Helper to get trailer address of a kmsg
- params:
- kmsgp : core.value representing the given ipc_kmsg_t struct
- returns:
- Trailer address
- """
- kmsghp = GetKmsgHeader(kmsgp)
- kmsgh = dereference(kmsghp)
-
- if kmsgp.ikm_type == int(
- GetEnumValue("ipc_kmsg_type_t", "IKM_TYPE_ALL_INLINED")
- ) or kmsgp.ikm_type == int(GetEnumValue("ipc_kmsg_type_t", "IKM_TYPE_KDATA_OOL")):
- return kern.GetValueFromAddress(
- unsigned(kmsghp) + kmsgh.msgh_size, "mach_msg_max_trailer_t *"
- )
- else:
- if kmsgh.msgh_bits & 0x80000000: # MACH_MSGH_BITS_COMPLEX
- content_size = (
- kmsgh.msgh_size
- - sizeof("mach_msg_base_t")
- - GetKmsgTotalDescSize(kmsgp)
- )
- else:
- content_size = kmsgh.msgh_size - sizeof("mach_msg_header_t")
- return kern.GetValueFromAddress(
- unsigned(kmsgp.ikm_udata) + content_size, "mach_msg_max_trailer_t *"
- )
-
-
-def GetKMsgSrc(kmsgp):
- """Routine that prints a kmsg's source process and pid details
- params:
- kmsgp : core.value representing the given ipc_kmsg_t struct
- returns:
- str : string containing the name and pid of the kmsg's source proc
- """
- trailer = GetKmsgTrailer(kmsgp)
- kmsgpid = Cast(trailer, "uint *")[10] # audit_token.val[5]
- return "{0:s}({1:d})".format(GetProcNameForPid(kmsgpid), kmsgpid)
-
-
-@header(
- "{:<20s} {:<20s} {:<10s} {:>6s} {:<6s}".format(
- "portset", "waitqueue", "name", "refs", "flags"
- )
-)
-def PrintPortSetSummary(pset, space=0, verbose=True, O=None):
- """Display summary for a given struct ipc_pset *
- params:
- pset : core.value representing a pset in the kernel
- returns:
- str : string of summary information for the given pset
- """
- show_kmsg_summary = False
- if config["verbosity"] > vHUMAN:
- show_kmsg_summary = True
-
- ips_wqset = pset.ips_wqset
- wqs = Waitq(addressof(ips_wqset))
-
- local_name = unsigned(ips_wqset.wqset_index) << 8
- dest = "-"
- if space:
- is_tableval, _ = GetSpaceTable(space)
- if is_tableval:
- entry_val = GetObjectAtIndexFromArray(is_tableval, local_name >> 8)
- local_name |= GetGenFromIEBits(unsigned(entry_val.ie_bits))
- dest = GetSpaceProcDesc(space)
- else:
- for wq in wqs.iterateMembers():
- dest = GetSpaceProcDesc(wq.asPort().ip_receiver)
-
- if unsigned(pset.ips_object.io_state):
- pass
- else:
- pass
-
- print(
- "{:<#20x} {:<#20x} {:#010x} {:>6d} {:<6s} {:<20s}".format(
- unsigned(pset),
- addressof(pset.ips_wqset),
- local_name,
- pset.ips_object.io_references,
- "ASet",
- dest,
- )
- )
-
- if verbose and wqs.hasThreads():
- with O.table("{:<20s} {:<20s}".format("waiter", "event"), indent=True):
- for thread in wqs.iterateThreads():
- print("{:<#20x} {:<#20x}".format(unsigned(thread), thread.wait_event))
- print("")
-
- if verbose and wqs.hasMembers():
- with O.table(PrintPortSummary.header, indent=True):
- for wq in wqs.iterateMembers():
- PrintPortSummary(wq.asPort(), show_kmsg_summary=show_kmsg_summary, O=O)
- print("")
-
-
-# Macro: showipc
-
-
-@lldb_command("showipc")
-def ShowIPC(cmd_args=None):
- """Routine to print data for the given IPC space
- Usage: showipc <address of ipc space>
- """
- if cmd_args is None or len(cmd_args) == 0:
- raise ArgumentError("No arguments passed")
-
- ipc = kern.GetValueFromAddress(cmd_args[0], "ipc_space *")
- if not ipc:
- print("unknown arguments:", str(cmd_args))
- return False
- print(PrintIPCInformation.header)
- PrintIPCInformation(ipc, False, False)
- return True
-
-
-# EndMacro: showipc
-
-# Macro: showtaskipc
-
-
-@lldb_command("showtaskipc")
-def ShowTaskIPC(cmd_args=None):
- """Routine to print IPC summary of given task
- Usage: showtaskipc <address of task>
- """
- if cmd_args is None or len(cmd_args) == 0:
- raise ArgumentError("No arguments passed")
-
- tval = kern.GetValueFromAddress(cmd_args[0], "task *")
- if not tval:
- print("unknown arguments:", str(cmd_args))
- return False
- print(GetTaskSummary.header + " " + GetProcSummary.header)
- pval = GetProcFromTask(tval)
- print(GetTaskSummary(tval) + " " + GetProcSummary(pval))
- print(GetTaskBusyIPCSummary.header)
- summary, _, _, _ = GetTaskBusyIPCSummary(tval)
- print(summary)
- return True
-
-
-# EndMacro: showtaskipc
-
-# Macro: showallipc
-
-
-@lldb_command("showallipc")
-def ShowAllIPC(cmd_args=None):
- """Routine to print IPC summary of all tasks
- Usage: showallipc
- """
- for t in kern.tasks:
- print(GetTaskSummary.header + " " + GetProcSummary.header)
- pval = GetProcFromTask(t)
- print(GetTaskSummary(t) + " " + GetProcSummary(pval))
- print(PrintIPCInformation.header)
- PrintIPCInformation(t.itk_space, False, False)
- print("\n\n")
-
-
-# EndMacro: showallipc
-
-
-@lldb_command("showipcsummary", fancy=True)
-def ShowIPCSummary(cmd_args=None, cmd_options={}, O=None):
- """Summarizes the IPC state of all tasks.
- This is a convenient way to dump some basic clues about IPC messaging. You can use the output to determine
- tasks that are candidates for further investigation.
- """
- with O.table(GetTaskIPCSummary.header):
- ipc_table_size = 0
-
- l = [GetTaskIPCSummary(t) for t in kern.tasks]
- l.sort(key=lambda e: e[1], reverse=True)
-
- for e in l:
- print(e[0])
- ipc_table_size += e[1]
-
- for t in kern.terminated_tasks:
- ipc_table_size += GetTaskIPCSummary(t)[1]
-
- print("Total Table size: {:d}".format(ipc_table_size))
-
-
-def GetKObjectFromPort(portval):
- """Get Kobject description from the port.
- params: portval - core.value representation of 'ipc_port *' object
- returns: str - string of kobject information
- """
- if not portval or portval == xnudefines.MACH_PORT_DEAD:
- return "MACH_PORT_DEAD"
-
- otype = unsigned(portval.ip_object.io_type)
- if not IsKObjectType(otype):
- return "not a kobject"
-
- kobject_addr = kern.StripKernelPAC(unsigned(portval.ip_kobject))
- objtype_str = IPCPortTypeToString(otype)
-
- if kobject_addr and (objtype_str == 'iokit_object' or objtype_str == 'uext_object' or objtype_str == 'iokit_connect'):
- iokit_object = CastIOKitClass(portval.ip_kobject, 'IOMachPort *').object
- desc_str = "{:<#20x}".format(kern.StripKernelPAC(unsigned(iokit_object)))
- else:
- desc_str = "{:<#20x}".format(kobject_addr)
-
- if not kobject_addr:
- pass
-
- elif objtype_str == 'iokit_object' or objtype_str == 'uext_object' or objtype_str == 'iokit_connect' :
- iokit_classnm = GetObjectTypeStr(iokit_object)
- if not iokit_classnm:
- desc_str += " <unknown class>"
- else:
- desc_str += re.sub(r"vtable for ", r" ", iokit_classnm)
-
- elif objtype_str[:5] == "task_" and objtype_str != "task_id_token":
- task = value(
- portval.GetSBValue()
- .xCreateValueFromAddress(None, kobject_addr, gettype("struct task"))
- .AddressOf()
- )
- if GetProcFromTask(task) is not None:
- desc_str += " {:s}({:d})".format(
- GetProcNameForTask(task), GetProcPIDForTask(task)
- )
-
- elif objtype_str[:7] == "thread_":
- thread = value(
- portval.GetSBValue()
- .xCreateValueFromAddress(None, kobject_addr, gettype("struct thread"))
- .AddressOf()
- )
- task = thread.t_tro.tro_task
- if GetProcFromTask(task) is not None:
- desc_str += " {:s}({:d})".format(
- GetProcNameForTask(task), GetProcPIDForTask(task)
- )
-
- return desc_str
-
-
-def GetSpaceProcDesc(space):
- """Display the name and pid of a space's task
- params:
- space: core.value representing a pointer to a space
- returns:
- str : string containing receiver's name and pid
- """
- task = space.is_task
- if GetProcFromTask(task) is None:
- return "task {:<#20x}".format(unsigned(task))
- return "{:s}({:d})".format(GetProcNameForTask(task), GetProcPIDForTask(task))
-
-
-def GetPortDestProc(port):
- """Display the name and pid of a given port's receiver
- params:
- port : core.value representing a pointer to a port in the kernel
- returns:
- a tuple of:
- str : the name of that port in its destination (or dead/in-transit)
- str : the destination/kobject value for this port
- str : the service name for this port
- """
-
- name = unsigned(port.ip_messages.imq_receiver_name)
- otype = unsigned(port.ip_object.io_type)
- dest_str = None
- state = port.ip_object.io_state
-
- if state == GetEnumValue("ipc_object_state_t", "IO_STATE_INACTIVE"):
- name_str = "dead"
- dest_str = ""
- elif state == GetEnumValue("ipc_object_state_t", "IO_STATE_IN_LIMBO"):
- name_str = "in-limbo"
- dest_str = ""
- elif state == GetEnumValue("ipc_object_state_t", "IO_STATE_IN_LIMBO_PD"):
- name_str = "in-limbo-pd"
- dest_str = ""
- elif state == GetEnumValue("ipc_object_state_t", "IO_STATE_IN_TRANSIT"):
- name_str = "in-transit"
- dest_str = "{:<#20x}".format(port.ip_destination)
- elif state == GetEnumValue("ipc_object_state_t", "IO_STATE_IN_TRANSIT_PD"):
- name_str = "in-transit-pd"
- dest_str = "{:<#20x}".format(port.ip_destination)
- else: # in-space / in-space immovable
- if name == 1: # kobjects
- name_str = ""
- else:
- name_str = "{:<#12x}".format(name)
-
- if IsKObjectType(otype):
- return (name_str, GetKObjectFromPort(port), "")
-
- service = ""
- if port.ip_object.io_type == GetEnumValue("ipc_object_type_t", "IOT_SERVICE_PORT"):
- try:
- splabel = GetPortLabel(port.GetSBValue(), "struct ipc_service_port_label")
- service = splabel.xGetCStringByName("ispl_service_name")
- except Exception:
- service = "unknown"
-
- if dest_str is None:
- dest_str = GetSpaceProcDesc(port.ip_receiver)
-
- return (name_str, dest_str, service)
-
-
-@lldb_type_summary(["ipc_entry_t"])
-@header(
- "{: <20s} {: <12s} {: <8s} {: <8s} {: <8s} {: <8s} {: <12s} {: <20s} {: <20s}".format(
- "object",
- "name",
- "rite",
- "urefs",
- "nsets",
- "nmsgs",
- "destname",
- "kind",
- "dest/kobject",
- )
-)
-def GetIPCEntrySummary(entry, ipc_name="", rights_filter=0):
- """Get summary of a ipc entry.
- params:
- entry - core.value representing ipc_entry_t in the kernel
- ipc_name - str of format '0x0123' for display in summary.
- returns:
- str - string of ipc entry related information
-
- types of rights:
- 'Dead' : Dead name
- 'Set' : Port set
- 'S' : Send right
- 'R' : Receive right
- 'O' : Send-once right
- 'm' : Immovable send port
- 'i' : Immovable receive port
- types of notifications:
- 'd' : Dead-Name notification requested
- 's' : Send-Possible notification armed
- 'r' : Send-Possible notification requested
- 'n' : No-Senders notification requested
- 'x' : Port-destroy notification requested
- """
- out_str = ""
- int(hex(entry), 16)
- right_str = ""
- name_str = ""
- destname_str = ""
- service_str = ""
-
- ie_object = entry.ie_object
- ie_bits = int(entry.ie_bits)
- urefs = int(ie_bits & 0xFFFF)
- nsets = 0
- nmsgs = 0
-
- if ie_bits & MACH_PORT_RIGHT_DEAD_NAME:
- right_str = "Dead"
- kind = ""
- elif ie_bits & MACH_PORT_RIGHT_PORT_SET:
- right_str = "Set"
- kind = ""
- psetval = kern.CreateTypedPointerFromAddress(
- unsigned(ie_object), "struct ipc_pset"
- )
- wqs = Waitq(addressof(psetval.ips_wqset))
- members = 0
- for m in wqs.iterateMembers():
- members += 1
- destname_str = "{:d} Members".format(members)
- else:
- if ie_bits & MACH_PORT_RIGHT_SEND:
- if ie_bits & MACH_PORT_RIGHT_RECEIVE:
- # SEND + RECV
- right_str = "SR"
- else:
- # SEND only
- right_str = "S"
- elif ie_bits & MACH_PORT_RIGHT_RECEIVE:
- # RECV only
- right_str = "R"
- elif ie_bits & MACH_PORT_RIGHT_SEND_ONCE:
- # SEND_ONCE
- right_str = "O"
- if ie_bits & xnudefines.IE_BITS_IMMOVABLE_SEND:
- right_str += "m"
- portval = kern.CreateTypedPointerFromAddress(
- unsigned(ie_object), "struct ipc_port"
- )
- if int(entry.ie_request) != 0:
- requestsval, _ = kalloc_array_decode(
- portval.ip_requests, "struct ipc_port_request"
- )
- sorightval = requestsval[int(entry.ie_request)].ipr_soright
- soright_ptr = unsigned(sorightval)
- if soright_ptr != 0:
- # dead-name notification requested
- right_str += "d"
- # send-possible armed
- if soright_ptr & 0x1:
- right_str += "s"
- # send-possible requested
- if soright_ptr & 0x2:
- right_str += "r"
- # No-senders notification requested
- if not IsKObjectType(ie_object.io_type) and portval.ip_nsrequest != 0:
- right_str += "n"
- # port-destroy notification requested
- if GetPortPDRequest(portval):
- right_str += "x"
- # Immovable receive rights
- if portval.ip_object.io_state == GetEnumValue(
- "ipc_object_state_t", "IO_STATE_IN_SPACE_IMMOVABLE"
- ):
- right_str += "i"
- # Immovable send rights
- # Port with SB filtering on
- if portval.ip_object.io_filtered != 0:
- right_str += "f"
-
- # early-out if the rights-filter doesn't match
- if rights_filter != 0 and rights_filter not in right_str:
- return ""
-
- # now show the port destination part
- name_str, destname_str, service_str = GetPortDestProc(portval)
- # Get the number of sets to which this port belongs
- nsets = len([s for s in Waitq(addressof(portval.ip_waitq)).iterateSets()])
- nmsgs = portval.ip_messages.imq_msgcount
- kind = IPCPortTypeToString(ie_object.io_type)
-
- # append the generation to the name value
- # (from osfmk/ipc/ipc_entry.h)
- # bits rollover period
- # 0 0 64
- # 0 1 48
- # 1 0 32
- # 1 1 16
- ie_gen_roll = {0: ".64", 1: ".48", 2: ".32", 3: ".16"}
- ipc_name = "{:s}{:s}".format(
- ipc_name.strip(), ie_gen_roll[(ie_bits & IE_BITS_ROLL_MASK) >> 24]
- )
-
- if rights_filter == 0 or rights_filter in right_str:
- format_string = "{: <#20x} {: <12s} {: <8s} {: <8d} {: <8d} {: <8d} {: <12s} {: <20s} {: <20s}"
- out_str = format_string.format(
- ie_object,
- ipc_name,
- right_str,
- urefs,
- nsets,
- nmsgs,
- name_str,
- kind,
- destname_str + " " + service_str,
- )
- return out_str
-
-
-@header("{0: >20s}".format("user bt"))
-def GetPortUserStack(port, task):
- """Get UserStack information for the given port & task.
- params: port - core.value representation of 'ipc_port *' object
- task - value representing 'task *' object
- returns: str - string information on port's userstack
- """
- out_str = ""
- if not port or port == xnudefines.MACH_PORT_DEAD:
- return out_str
- pid = port.ip_made_pid
- proc_val = GetProcFromTask(task)
- if port.ip_made_bt:
- btlib = kmemory.BTLibrary.get_shared()
- out_str += (
- "\n".join(btlib.get_stack(port.ip_made_bt).symbolicated_frames()) + "\n"
- )
- if pid != GetProcPID(proc_val):
- out_str += " ({:<10d})\n".format(pid)
- return out_str
-
-
-@lldb_type_summary(["ipc_space *"])
-@header(
- "{0: <20s} {1: <20s} {2: <20s} {3: <8s} {4: <10s} {5: >8s} {6: <8s}".format(
- "ipc_space", "is_task", "is_table", "flags", "ports", "low_mod", "high_mod"
- )
-)
-def PrintIPCInformation(
- space, show_entries=False, show_userstack=False, rights_filter=0
-):
- """Provide a summary of the ipc space"""
- out_str = ""
- format_string = (
- "{0: <#20x} {1: <#20x} {2: <#20x} {3: <8s} {4: <10d} {5: >8d} {6: <8d}"
- )
- is_tableval, num_entries = GetSpaceTable(space)
- flags = ""
- if is_tableval:
- flags += "A"
- else:
- flags += " "
- if (space.is_grower) != 0:
- flags += "G"
- print(
- format_string.format(
- space,
- space.is_task,
- is_tableval if is_tableval else 0,
- flags,
- num_entries,
- space.is_low_mod,
- space.is_high_mod,
- )
- )
-
- # should show the each individual entries if asked.
- if show_entries and is_tableval:
- print("\t" + GetIPCEntrySummary.header)
-
- entries = (
- (index, value(iep.AddressOf()))
- for index, iep in GetSpaceEntriesWithBits(
- is_tableval, num_entries, 0x001F0000
- )
- )
-
- for index, entryval in entries:
- entry_ie_bits = unsigned(entryval.ie_bits)
- entry_name = "{0: <#20x}".format(
- GetNameFromIndexAndIEBits(index, entry_ie_bits)
- )
- entry_str = GetIPCEntrySummary(entryval, entry_name, rights_filter)
- if not entry_str:
- continue
-
- print("\t" + entry_str)
- if show_userstack:
- entryport = Cast(entryval.ie_object, "ipc_port *")
- if (
- entryval.ie_object
- and (int(entry_ie_bits) & 0x00070000)
- and entryport.ip_made_bt
- ):
- print(
- GetPortUserStack.header
- + GetPortUserStack(entryport, space.is_task)
- )
-
- # done with showing entries
- return out_str
-
-
-# Macro: showrights
-
-
-@lldb_command("showrights", "R:")
-def ShowRights(cmd_args=None, cmd_options={}):
- """Routine to print rights information for the given IPC space
- Usage: showrights [-R rights_type] <address of ipc space>
- -R rights_type : only display rights matching the string 'rights_type'
-
- types of rights:
- 'Dead' : Dead name
- 'Set' : Port set
- 'S' : Send right
- 'R' : Receive right
- 'O' : Send-once right
- types of notifications:
- 'd' : Dead-Name notification requested
- 's' : Send-Possible notification armed
- 'r' : Send-Possible notification requested
- 'n' : No-Senders notification requested
- 'x' : Port-destroy notification requested
- """
- if cmd_args is None or len(cmd_args) == 0:
- raise ArgumentError("No arguments passed")
-
- ipc = kern.GetValueFromAddress(cmd_args[0], "ipc_space *")
- if not ipc:
- print("unknown arguments:", str(cmd_args))
- return False
- rights_type = 0
- if "-R" in cmd_options:
- rights_type = cmd_options["-R"]
- print(PrintIPCInformation.header)
- PrintIPCInformation(ipc, True, False, rights_type)
-
-
-# EndMacro: showrights
-
-
-@lldb_command("showtaskrights", "R:")
-def ShowTaskRights(cmd_args=None, cmd_options={}):
- """Routine to ipc rights information for a task
- Usage: showtaskrights [-R rights_type] <task address>
- -R rights_type : only display rights matching the string 'rights_type'
-
- types of rights:
- 'Dead' : Dead name
- 'Set' : Port set
- 'S' : Send right
- 'R' : Receive right
- 'O' : Send-once right
- 'm' : Immovable send port
- 'i' : Immovable receive port
- 'f' : Port with SB filtering on
- types of notifications:
- 'd' : Dead-Name notification requested
- 's' : Send-Possible notification armed
- 'r' : Send-Possible notification requested
- 'n' : No-Senders notification requested
- 'x' : Port-destroy notification requested
- """
- if cmd_args is None or len(cmd_args) == 0:
- raise ArgumentError("No arguments passed")
-
- tval = kern.GetValueFromAddress(cmd_args[0], "task *")
- if not tval:
- print("unknown arguments:", str(cmd_args))
- return False
- rights_type = 0
- if "-R" in cmd_options:
- rights_type = cmd_options["-R"]
- print(GetTaskSummary.header + " " + GetProcSummary.header)
- pval = GetProcFromTask(tval)
- print(GetTaskSummary(tval) + " " + GetProcSummary(pval))
- print(PrintIPCInformation.header)
- PrintIPCInformation(tval.itk_space, True, False, rights_type)
-
-
-# Count the vouchers in a given task's ipc space
-@header("{: <20s} {: <6s} {: <36s} {: <8s}".format("task", "pid", "name", "#vouchers"))
-def GetTaskVoucherCount(t):
- is_tableval, num_entries = GetSpaceTable(t.itk_space)
- count = 0
- voucher_kotype = int(GetEnumValue("ipc_object_type_t", "IKOT_VOUCHER"))
-
- if is_tableval:
- ports = GetSpaceObjectsWithBits(
- is_tableval, num_entries, 0x00070000, gettype("struct ipc_port")
- )
-
- for port in ports:
- if port.xGetIntegerByPath(".ip_object.io_type") == voucher_kotype:
- count += 1
-
- format_str = "{: <#20x} {: <6d} {: <36s} {: <8d}"
- pval = GetProcFromTask(t)
- return format_str.format(t, GetProcPID(pval), GetProcNameForTask(t), count)
-
-
-# Macro: countallvouchers
-@lldb_command("countallvouchers", "SG", fancy=True)
-def CountAllVouchers(cmd_args=None, cmd_options={}, O=None):
- """Routine to count the number of vouchers by task. Useful for finding leaks.
- Usage: countallvouchers [-S] [-G]
- -S : sort tasks by voucher count (highest to lowest)
- -G : group and sum vouchers by process name
- """
-
- sort_results = "-S" in cmd_options
- group_by_name = "-G" in cmd_options
-
- # Collect all task voucher information with progress indicator
- task_results = []
- ellipsis_states = [".", "..", "..."]
- ellipsis_idx = 0
- task_count = 0
-
- for t in kern.tasks:
- # Show progress with cycling ellipsis (update every 10 tasks)
- if task_count % 10 == 0:
- sys.stderr.write("Counting{:<3s}\r".format(ellipsis_states[ellipsis_idx % 3]))
- sys.stderr.flush()
- ellipsis_idx += 1
- task_count += 1
-
- # Get the formatted string and parse out the count for sorting
- result_str = GetTaskVoucherCount(t)
- task_results.append(result_str)
-
- # Clear the progress indicator
- sys.stderr.write("{:80s}\r".format(""))
- sys.stderr.flush()
-
- # Group by name if requested
- if group_by_name:
- # Parse results and group by process name
- # Format is: task (20 chars) + space + pid (6 chars) + space + name (36 chars) + space + vouchers
- grouped = {}
- for result in task_results:
- if len(result) < 64:
- continue
-
- # Parse fixed-width columns
- task_addr = result[0:20].strip()
- pid = result[21:27].strip()
- name = result[28:64].strip()
- voucher_str = result[65:].strip()
-
- try:
- voucher_count = int(voucher_str)
- except ValueError:
- continue
-
- if name not in grouped:
- grouped[name] = {
- 'task_addr': task_addr, # Use first task's address
- 'pids': [],
- 'voucher_sum': 0
- }
- grouped[name]['pids'].append(pid)
- grouped[name]['voucher_sum'] += voucher_count
-
- # Format grouped results
- format_str = "{: <#20x} {: <6s} {: <36s} {: <8d}"
- task_results = []
- for name, data in grouped.items():
- pid_count = len(data['pids'])
- pid_str = f"{pid_count} pids" if pid_count > 1 else data['pids'][0]
- task_addr = int(data['task_addr'], 16)
- task_results.append(format_str.format(task_addr, pid_str, name, data['voucher_sum']))
-
- # Sort if requested (by the last field which is the voucher count)
- if sort_results:
- # Parse and sort by voucher count (extract the last number from each line)
- task_results.sort(key=lambda x: int(x.split()[-1]), reverse=True)
-
- # Display results
- with O.table(GetTaskVoucherCount.header):
- for result in task_results:
- print(result)
-
-
-# Macro: showataskrightsbt
-
-
-@lldb_command("showtaskrightsbt", "R:")
-def ShowTaskRightsBt(cmd_args=None, cmd_options={}):
- """Routine to ipc rights information with userstacks for a task
- Usage: showtaskrightsbt [-R rights_type] <task address>
- -R rights_type : only display rights matching the string 'rights_type'
-
- types of rights:
- 'Dead' : Dead name
- 'Set' : Port set
- 'S' : Send right
- 'R' : Receive right
- 'O' : Send-once right
- 'm' : Immovable send port
- 'i' : Immovable receive port
- types of notifications:
- 'd' : Dead-Name notification requested
- 's' : Send-Possible notification armed
- 'r' : Send-Possible notification requested
- 'n' : No-Senders notification requested
- 'x' : Port-destroy notification requested
- """
- if cmd_args is None or len(cmd_args) == 0:
- raise ArgumentError("No arguments passed")
-
- tval = kern.GetValueFromAddress(cmd_args[0], "task *")
- if not tval:
- print("unknown arguments:", str(cmd_args))
- return False
- rights_type = 0
- if "-R" in cmd_options:
- rights_type = cmd_options["-R"]
- print(GetTaskSummary.header + " " + GetProcSummary.header)
- pval = GetProcFromTask(tval)
- print(GetTaskSummary(tval) + " " + GetProcSummary(pval))
- print(PrintIPCInformation.header)
- PrintIPCInformation(tval.itk_space, True, True, rights_type)
-
-
-# EndMacro: showtaskrightsbt
-
-# Macro: showallrights
-
-
-@lldb_command("showallrights", "R:")
-def ShowAllRights(cmd_args=None, cmd_options={}):
- """Routine to print rights information for IPC space of all tasks
- Usage: showallrights [-R rights_type]
- -R rights_type : only display rights matching the string 'rights_type'
-
- types of rights:
- 'Dead' : Dead name
- 'Set' : Port set
- 'S' : Send right
- 'R' : Receive right
- 'O' : Send-once right
- 'm' : Immovable send port
- 'i' : Immovable receive port
- types of notifications:
- 'd' : Dead-Name notification requested
- 's' : Send-Possible notification armed
- 'r' : Send-Possible notification requested
- 'n' : No-Senders notification requested
- 'x' : Port-destroy notification requested
- """
- rights_type = 0
- if "-R" in cmd_options:
- rights_type = cmd_options["-R"]
- for t in kern.tasks:
- print(GetTaskSummary.header + " " + GetProcSummary.header)
- pval = GetProcFromTask(t)
- print(GetTaskSummary(t) + " " + GetProcSummary(pval))
- try:
- print(PrintIPCInformation.header)
- PrintIPCInformation(t.itk_space, True, False, rights_type) + "\n\n"
- except (KeyboardInterrupt, SystemExit):
- raise
- except Exception:
- print(
- "Failed to get IPC information. Do individual showtaskrights <task> to find the error. \n\n"
- )
-
-
-# EndMacro: showallrights
-
-
-def GetInTransitPortSummary(port, disp, holding_port, holding_kmsg):
- """String-ify the in-transit dispostion of a port."""
- ## This should match the summary generated by GetIPCEntrySummary
- ## "object" "name" "rite" "urefs" "nsets" "nmsgs" "destname" "destination"
- format_str = (
- "\t{: <#20x} {: <12} {: <8s} {: <8d} {: <8d} {: <8d} p:{: <#19x} k:{: <#19x}"
- )
-
- disp_str = GetPortDispositionString(disp)
-
- out_str = format_str.format(
- unsigned(port),
- "in-transit",
- disp_str,
- 0,
- 0,
- port.ip_messages.imq_msgcount,
- unsigned(holding_port),
- unsigned(holding_kmsg),
- )
- return out_str
-
-
-def GetDispositionFromEntryType(entry_bits):
- """Translate an IPC entry type into an in-transit disposition. This allows
- the GetInTransitPortSummary function to be re-used to string-ify IPC
- entry types.
- """
- ebits = int(entry_bits)
- if (ebits & 0x003F0000) == 0:
- return 0
-
- if (ebits & MACH_PORT_RIGHT_SEND) != 0:
- return 17
- elif (ebits & MACH_PORT_RIGHT_RECEIVE) != 0:
- return 16
- elif (ebits & MACH_PORT_RIGHT_SEND_ONCE) != 0:
- return 18
- elif (ebits & MACH_PORT_RIGHT_PORT_SET) != 0:
- return 100
- elif (ebits & MACH_PORT_RIGHT_DEAD_NAME) != 0:
- return 101
- elif (ebits & MACH_PORT_RIGHT_LABELH) != 0:
- return 102
- else:
- return 0
-
-
-def GetDispositionFromVoucherPort(th_vport):
- """Translate a thread's voucher port into a 'disposition'"""
- if unsigned(th_vport) > 0:
- return 103 ## Voucher type
- return 0
-
-
-g_kmsg_prog = 0
-g_progmeter = {
- 0: "*",
- 1: "-",
- 2: "\\",
- 3: "|",
- 4: "/",
- 5: "-",
- 6: "\\",
- 7: "|",
- 8: "/",
-}
-
-
-def PrintProgressForKmsg():
- global g_kmsg_prog
- global g_progmeter
- sys.stderr.write(" {:<1s}\r".format(g_progmeter[g_kmsg_prog % 9]))
- g_kmsg_prog += 1
-
-
-def CollectPortsForAnalysis(port, disposition):
- """ """
- if not port or port == xnudefines.MACH_PORT_DEAD:
- return
- p = Cast(port, "struct ipc_port *")
- yield (p, disposition)
-
- # no-senders notification port
- if not IsKObjectType(p.ip_object.io_type) and p.ip_nsrequest != 0:
- PrintProgressForKmsg()
- yield (p.ip_nsrequest, -1)
-
- # port-death notification port
- pdrequest = GetPortPDRequest(p)
- if pdrequest:
- PrintProgressForKmsg()
- yield (pdrequest, -2)
-
- ## ports can have many send-possible notifications armed: go through the table!
- if unsigned(p.ip_requests) != 0:
- table, table_sz = kalloc_array_decode(p.ip_requests, "struct ipc_port_request")
- for i in range(table_sz):
- if i == 0:
- continue
- ipr = table[i]
- if unsigned(ipr.ipr_name) in (0, 0xFFFFFFFE):
- # 0xfffffffe is a host notify request
- continue
- ipr_bits = unsigned(ipr.ipr_soright) & 3
- ipr_port = kern.GetValueFromAddress(
- int(ipr.ipr_soright) & ~3, "struct ipc_port *"
- )
- # skip unused entries in the ipc table to avoid null dereferences
- if not ipr_port:
- continue
- ipr_disp = 0
- if ipr_bits & 3: ## send-possible armed and requested
- ipr_disp = -5
- elif ipr_bits & 2: ## send-possible requested
- ipr_disp = -4
- elif ipr_bits & 1: ## send-possible armed
- ipr_disp = -3
- PrintProgressForKmsg()
- yield (ipr_port, ipr_disp)
- return
-
-
-def CollectKmsgPorts(task, task_port, kmsgp):
- """Look through a message, 'kmsgp' destined for 'task'
- (enqueued on task_port). Collect any port descriptors,
- remote, local, voucher, or other port references
- into a (ipc_port_t, disposition) list.
- """
- kmsgh = dereference(GetKmsgHeader(kmsgp))
-
- p_list = []
-
- PrintProgressForKmsg()
- if kmsgh.msgh_remote_port and unsigned(kmsgh.msgh_remote_port) != unsigned(
- task_port
- ):
- disp = kmsgh.msgh_bits & 0x1F
- p_list += list(CollectPortsForAnalysis(kmsgh.msgh_remote_port, disp))
-
- if (
- kmsgh.msgh_local_port
- and unsigned(kmsgh.msgh_local_port) != unsigned(task_port)
- and unsigned(kmsgh.msgh_local_port) != unsigned(kmsgh.msgh_remote_port)
- ):
- disp = (kmsgh.msgh_bits & 0x1F00) >> 8
- p_list += list(CollectPortsForAnalysis(kmsgh.msgh_local_port, disp))
-
- if kmsgp.ikm_voucher_port:
- p_list += list(CollectPortsForAnalysis(kmsgp.ikm_voucher_port, 0))
-
- if kmsgh.msgh_bits & 0x80000000:
- ## Complex message - look for descriptors
- PrintProgressForKmsg()
- (body, dschead, dsc_list) = GetKmsgDescriptors(kmsgp)
- for dsc in dsc_list:
- PrintProgressForKmsg()
- dsc_type = unsigned(dsc.type.type)
- if dsc_type == 0 or dsc_type == 2: ## 0 == port, 2 == ool port
- if dsc_type == 0:
- ## its a port descriptor
- dsc_disp = dsc.port.disposition
- p_list += list(CollectPortsForAnalysis(dsc.port.name, dsc_disp))
- else:
- ## it's an ool_ports descriptor which is an array of ports
- dsc_disp = dsc.ool_ports.disposition
- dispdata = Cast(dsc.ool_ports.address, "struct ipc_port *")
- for pidx in range(dsc.ool_ports.count):
- PrintProgressForKmsg()
- p_list += list(
- CollectPortsForAnalysis(dispdata[pidx], dsc_disp)
- )
- return p_list
-
-
-def CollectKmsgPortRefs(task, task_port, kmsgp, p_refs):
- """Recursively collect all references to ports inside the kmsg 'kmsgp'
- into the set 'p_refs'
- """
- p_list = CollectKmsgPorts(task, task_port, kmsgp)
-
- ## Iterate over each ports we've collected, to see if they
- ## have messages on them, and then recurse!
- for p, pdisp in p_list:
- ptype = p.ip_object.io_type
- p_refs.add((p, pdisp, ptype))
- if IsPortSetType(ptype):
- continue
- ## If the port that's in-transit has messages already enqueued,
- ## go through each of those messages and look for more ports!
- for p_kmsgp in IterateCircleQueue(
- p.ip_messages.imq_messages, "ipc_kmsg", "ikm_link"
- ):
- CollectKmsgPortRefs(task, p, p_kmsgp, p_refs)
-
-
-def FindKmsgPortRefs(instr, task, task_port, kmsgp, qport):
- """Look through a message, 'kmsgp' destined for 'task'. If we find
- any port descriptors, remote, local, voucher, or other port that
- matches 'qport', return a short description
- which should match the format of GetIPCEntrySummary.
- """
-
- out_str = instr
- p_list = CollectKmsgPorts(task, task_port, kmsgp)
-
- ## Run through all ports we've collected looking for 'qport'
- for p, pdisp in p_list:
- PrintProgressForKmsg()
- if unsigned(p) == unsigned(qport):
- ## the port we're looking for was found in this message!
- if len(out_str) > 0:
- out_str += "\n"
- out_str += GetInTransitPortSummary(p, pdisp, task_port, kmsgp)
-
- ptype = p.ip_object.io_type
- if IsPortSetType(ptype):
- continue
-
- ## If the port that's in-transit has messages already enqueued,
- ## go through each of those messages and look for more ports!
- for p_kmsgp in IterateCircleQueue(
- p.ip_messages.imq_messages, "ipc_kmsg", "ikm_link"
- ):
- out_str = FindKmsgPortRefs(out_str, task, p, p_kmsgp, qport)
-
- return out_str
-
-
-port_iteration_do_print_taskname = False
-registeredport_idx = -10
-excports_idx = -20
-intransit_idx = -1000
-taskports_idx = -2000
-thports_idx = -3000
-
-
-def IterateAllPorts(tasklist, func, ctx, include_psets, follow_busyports, should_log):
- """Iterate over all ports in the system, calling 'func'
- for each entry in
- """
- global port_iteration_do_print_taskname
- global intransit_idx, taskports_idx, thports_idx, registeredport_idx, excports_idx
-
- ## XXX: also host special ports
-
- entry_port_type_mask = 0x00070000
- if include_psets:
- entry_port_type_mask = 0x000F0000
-
- if tasklist is None:
- tasklist = list(kern.tasks)
- tasklist += list(kern.terminated_tasks)
-
- tidx = 1
-
- for t in tasklist:
- # Write a progress line. Using stderr avoids automatic newline when
- # writing to stdout from lldb. Blank spaces at the end clear out long
- # lines.
- if should_log:
- procname = ""
- if not t.active:
- procname = "terminated: "
- if t.halting:
- procname += "halting: "
- procname += GetProcNameForTask(t)
- sys.stderr.write(
- " checking {:s} ({}/{})...{:50s}\r".format(
- procname, tidx, len(tasklist), ""
- )
- )
- tidx += 1
-
- port_iteration_do_print_taskname = True
- space = t.itk_space
- is_tableval, num_entries = GetSpaceTable(space)
-
- if not is_tableval:
- continue
-
- base = is_tableval.GetSBValue().Dereference()
- entries = (value(iep.AddressOf()) for iep in base.xIterSiblings(1, num_entries))
-
- for idx, entry_val in enumerate(entries, 1):
- entry_bits = unsigned(entry_val.ie_bits)
- "{:x}".format(GetNameFromIndexAndIEBits(idx, entry_bits))
-
- entry_disp = GetDispositionFromEntryType(entry_bits)
-
- ## If the entry in the table represents a port of some sort,
- ## then make the callback provided
- if int(entry_bits) & entry_port_type_mask:
- eport = kern.CreateTypedPointerFromAddress(
- unsigned(entry_val.ie_object), "struct ipc_port"
- )
- ## Make the callback
- func(t, space, ctx, idx, entry_val, eport, entry_disp)
-
- ## if the port has pending messages, look through
- ## each message for ports (and recurse)
- if (
- follow_busyports
- and unsigned(eport) > 0
- and eport.ip_messages.imq_msgcount > 0
- ):
- ## collect all port references from all messages
- for kmsgp in IterateCircleQueue(
- eport.ip_messages.imq_messages, "ipc_kmsg", "ikm_link"
- ):
- p_refs = set()
- CollectKmsgPortRefs(t, eport, kmsgp, p_refs)
- for port, pdisp, ptype in p_refs:
- func(t, space, ctx, intransit_idx, None, port, pdisp)
- ## for idx in xrange(1, num_entries)
-
- ## Task ports (send rights)
- if getattr(t, "itk_settable_self", 0) > 0:
- func(t, space, ctx, taskports_idx, 0, t.itk_settable_self, 17)
- if unsigned(t.itk_host) > 0:
- func(t, space, ctx, taskports_idx, 0, t.itk_host, 17)
- if unsigned(t.itk_bootstrap) > 0:
- func(t, space, ctx, taskports_idx, 0, t.itk_bootstrap, 17)
- if unsigned(t.itk_debug_control) > 0:
- func(t, space, ctx, taskports_idx, 0, t.itk_debug_control, 17)
- if unsigned(t.itk_task_access) > 0:
- func(t, space, ctx, taskports_idx, 0, t.itk_task_access, 17)
- if unsigned(t.itk_task_ports[TASK_FLAVOR_READ]) > 0: ## task read port
- func(t, space, ctx, taskports_idx, 0, t.itk_task_ports[TASK_FLAVOR_READ], 17)
- if unsigned(t.itk_task_ports[TASK_FLAVOR_INSPECT]) > 0: ## task inspect port
- func(t, space, ctx, taskports_idx, 0, t.itk_task_ports[TASK_FLAVOR_INSPECT], 17)
-
- ## Task name port (not a send right, just a naked ref); TASK_FLAVOR_NAME = 3
- if unsigned(t.itk_task_ports[TASK_FLAVOR_NAME]) > 0:
- func(t, space, ctx, taskports_idx, 0, t.itk_task_ports[TASK_FLAVOR_NAME], 0)
-
- ## task resume port is a receive right to resume the task
- if unsigned(t.itk_resume) > 0:
- func(t, space, ctx, taskports_idx, 0, t.itk_resume, 16)
-
- ## registered task ports (all send rights)
- tr_idx = 0
- tr_max = sizeof(t.itk_registered) // sizeof(t.itk_registered[0])
- while tr_idx < tr_max:
- tport = t.itk_registered[tr_idx]
- if unsigned(tport) > 0:
- try:
- func(t, space, ctx, registeredport_idx, 0, tport, 17)
- except Exception:
- print(
- "\texception looking through registered port {:d}/{:d} in {:s}".format(
- tr_idx, tr_max, t
- )
- )
- pass
- tr_idx += 1
-
- ## Task exception ports
- exidx = 0
- exmax = sizeof(t.exc_actions) // sizeof(t.exc_actions[0])
- while exidx < exmax: ## see: osfmk/mach/[arm|i386]/exception.h
- export = t.exc_actions[exidx].port ## send right
- if unsigned(export) > 0:
- try:
- func(t, space, ctx, excports_idx, 0, export, 17)
- except Exception:
- print(
- "\texception looking through exception port {:d}/{:d} in {:s}".format(
- exidx, exmax, t
- )
- )
- pass
- exidx += 1
-
- ## XXX: any ports still valid after clearing IPC space?!
-
- for thval in IterateQueue(t.threads, "thread *", "task_threads"):
- ## XXX: look at block reason to see if it's in mach_msg_receive - then look at saved state / message
-
- ## Thread port (send right)
- if unsigned(thval.thread_settable_self_port) > 0:
- thport = thval.thread_settable_self_port
- func(
- t, space, ctx, thports_idx, 0, thport, 17
- ) ## see: osfmk/mach/message.h
- ## Thread special reply port (send-once right)
- if unsigned(thval.ith_special_reply_port) > 0:
- thport = thval.ith_special_reply_port
- func(
- t, space, ctx, thports_idx, 0, thport, 18
- ) ## see: osfmk/mach/message.h
- ## Thread voucher port
- if unsigned(thval.ith_voucher) > 0:
- vport = thval.ith_voucher.iv_port
- if unsigned(vport) > 0:
- vdisp = GetDispositionFromVoucherPort(vport)
- func(t, space, ctx, thports_idx, 0, vport, vdisp)
- ## Thread exception ports
- if unsigned(thval.t_tro.tro_exc_actions) > 0:
- exidx = 0
- while exidx < exmax: ## see: osfmk/mach/[arm|i386]/exception.h
- export = thval.t_tro.tro_exc_actions[exidx].port ## send right
- if unsigned(export) > 0:
- try:
- func(t, space, ctx, excports_idx, 0, export, 17)
- except Exception:
- print(
- "\texception looking through exception port {:d}/{:d} in {:s}".format(
- exidx, exmax, t
- )
- )
- pass
- exidx += 1
- ## XXX: the message on a thread (that's currently being received)
- ## for (thval in t.threads)
- ## for (t in tasklist)
-
-
-# Macro: findportrights
-def FindPortRightsCallback(task, space, ctx, entry_idx, ipc_entry, ipc_port, port_disp):
- """Callback which uses 'ctx' as the (port,rights_types) tuple for which
- a caller is seeking references. This should *not* be used from a
- recursive call to IterateAllPorts.
- """
- global port_iteration_do_print_taskname
-
- (qport, rights_type) = ctx
- entry_name = ""
- entry_str = ""
- if unsigned(ipc_entry) != 0:
- entry_bits = unsigned(ipc_entry.ie_bits)
- entry_name = "{:x}".format(GetNameFromIndexAndIEBits(entry_idx, entry_bits))
- if (int(entry_bits) & 0x001F0000) != 0 and unsigned(
- ipc_entry.ie_object
- ) == unsigned(qport):
- ## it's a valid entry, and it points to the port
- entry_str = "\t" + GetIPCEntrySummary(ipc_entry, entry_name, rights_type)
-
- procname = GetProcNameForTask(task)
- if (
- ipc_port
- and ipc_port != xnudefines.MACH_PORT_DEAD
- and ipc_port.ip_messages.imq_msgcount > 0
- ):
- sys.stderr.write(
- " checking {:s} busy-port {}:{:#x}...{:30s}\r".format(
- procname, entry_name, unsigned(ipc_port), ""
- )
- )
- ## Search through busy ports to find descriptors which could
- ## contain the only reference to this port!
- for kmsgp in IterateCircleQueue(
- ipc_port.ip_messages.imq_messages, "ipc_kmsg", "ikm_link"
- ):
- entry_str = FindKmsgPortRefs(entry_str, task, ipc_port, kmsgp, qport)
-
- if len(entry_str) > 0:
- sys.stderr.write("{:80s}\r".format(""))
- if port_iteration_do_print_taskname:
- print("Task: {0: <#x} {1: <s}".format(task, procname))
- print("\t" + GetIPCEntrySummary.header)
- port_iteration_do_print_taskname = False
- print(entry_str)
-
-
-@lldb_command("findportrights", "R:S:")
-def FindPortRights(cmd_args=None, cmd_options={}):
- """Routine to locate and print all extant rights to a given port
- Usage: findportrights [-R rights_type] [-S <ipc_space_t>] <ipc_port_t>
- -S ipc_space : only search the specified ipc space
- -R rights_type : only display rights matching the string 'rights_type'
-
- types of rights:
- 'Dead' : Dead name
- 'Set' : Port set
- 'S' : Send right
- 'R' : Receive right
- 'O' : Send-once right
- types of notifications:
- 'd' : Dead-Name notification requested
- 's' : Send-Possible notification armed
- 'r' : Send-Possible notification requested
- 'n' : No-Senders notification requested
- 'x' : Port-destroy notification requested
- """
- if cmd_args is None or len(cmd_args) == 0:
- raise ArgumentError("no port address provided")
- port = kern.GetValueFromAddress(cmd_args[0], "struct ipc_port *")
-
- rights_type = 0
- if "-R" in cmd_options:
- rights_type = cmd_options["-R"]
-
- tasklist = None
- if "-S" in cmd_options:
- space = kern.GetValueFromAddress(cmd_options["-S"], "struct ipc_space *")
- tasklist = [space.is_task]
-
- ## Don't include port sets
- ## Don't recurse on busy ports (we do that manually)
- ## DO log progress
- IterateAllPorts(
- tasklist, FindPortRightsCallback, (port, rights_type), False, False, True
- )
- sys.stderr.write("{:120s}\r".format(" "))
-
- print("Done.")
-
-
-# EndMacro: findportrights
-
-# Macro: countallports
-
-
-def CountPortsCallback(task, space, ctx, entry_idx, ipc_entry, ipc_port, port_disp):
- """Callback which uses 'ctx' as the set of all ports found in the
- iteration. This should *not* be used from a recursive
- call to IterateAllPorts.
- """
- global intransit_idx
-
- (p_set, p_intransit, p_bytask) = ctx
-
- ## Add the port address to the set of all port addresses
- ipc_port_addr = unsigned(ipc_port)
- p_set.add(ipc_port_addr)
-
- if entry_idx == intransit_idx:
- p_intransit.add(ipc_port_addr)
-
- if task.active or (task.halting and not task.active):
- if task not in p_bytask:
- p_bytask[task] = {"transit": 0, "table": 0, "other": 0}
- if entry_idx == intransit_idx:
- p_bytask[task]["transit"] += 1
- elif entry_idx >= 0:
- p_bytask[task]["table"] += 1
- else:
- p_bytask[task]["other"] += 1
-
-
-@header(f"{'#ports': <10s} {'in transit': <10s} {'Special': <10s}")
-@lldb_command("countallports", "P", fancy=True)
-def CountAllPorts(cmd_args=None, cmd_options={}, O=None):
- """Routine to search for all as many references to ipc_port structures in the kernel
- that we can find.
- Usage: countallports [-P]
- -P : include port sets in the count (default: NO)
- """
- p_set = set()
- p_intransit = set()
- p_bytask = {}
-
- find_psets = False
- if "-P" in cmd_options:
- find_psets = True
-
- ## optionally include port sets
- ## DO recurse on busy ports
- ## DO log progress
- IterateAllPorts(
- None, CountPortsCallback, (p_set, p_intransit, p_bytask), find_psets, True, True
- )
- sys.stderr.write(f"{' ':120s}\r")
-
- # sort by ipc table size
- with O.table(GetTaskIPCSummary.header + " " + CountAllPorts.header):
- for task, port_summary in sorted(
- p_bytask.items(), key=lambda item: item[1]["table"], reverse=True
- ):
- outstring, _ = GetTaskIPCSummary(task)
- outstring += f" {port_summary['table']: <10d} {port_summary['transit']: <10d} {port_summary['other']: <10d}"
- print(outstring)
-
- print(f"\nTotal ports found: {len(p_set)}")
- print(f"Number of ports In Transit: {len(p_intransit)}")
-
-
-# EndMacro: countallports
-# Macro: showpipestats
-
-
-@lldb_command("showpipestats")
-def ShowPipeStats(cmd_args=None):
- """Display pipes usage information in the kernel"""
- print("Number of pipes: {: d}".format(kern.globals.amountpipes))
- print(
- "Memory used by pipes: {:s}".format(sizeof_fmt(int(kern.globals.amountpipekva)))
- )
- print(
- "Max memory allowed for pipes: {:s}".format(
- sizeof_fmt(int(kern.globals.maxpipekva))
- )
- )
-
-
-# EndMacro: showpipestats
-# Macro: showtaskbusyports
-
-
-@lldb_command("showtaskbusyports", fancy=True)
-def ShowTaskBusyPorts(cmd_args=None, cmd_options={}, O=None):
- """Routine to print information about receive rights belonging to this task that
- have enqueued messages. This is often a sign of a blocked or hung process
- Usage: showtaskbusyports <task address>
- """
- if cmd_args is None or len(cmd_args) == 0:
- raise ArgumentError("No arguments passed. Please pass in the address of a task")
-
- task = kern.GetValueFromAddress(cmd_args[0], "task_t")
- is_tableval, num_entries = GetSpaceTable(task.itk_space)
-
- if is_tableval:
- ports = GetSpaceObjectsWithBits(
- is_tableval, num_entries, MACH_PORT_RIGHT_RECEIVE, gettype("struct ipc_port")
- )
-
- with O.table(PrintPortSummary.header):
- for port in ports:
- if port.xGetIntegerByPath(".ip_messages.imq_msgcount"):
- PrintPortSummary(value(port.AddressOf()), O=O)
-
-
-# EndMacro: showtaskbusyports
-# Macro: showallbusyports
-
-
-@lldb_command("showallbusyports", fancy=True)
-def ShowAllBusyPorts(cmd_args=None, cmd_options={}, O=None):
- """Routine to print information about all receive rights on the system that
- have enqueued messages.
- """
- with O.table(PrintPortSummary.header):
- port_ty = gettype("struct ipc_port")
- for port in kmemory.Zone("ipc ports").iter_allocated(port_ty):
- if port.xGetIntegerByPath(".ip_messages.imq_msgcount") > 0:
- PrintPortSummary(value(port.AddressOf()), O=O)
-
-
-# EndMacro: showallbusyports
-# Macro: showallports
-
-
-@lldb_command("showallports", fancy=True)
-def ShowAllPorts(cmd_args=None, cmd_options={}, O=None):
- """Routine to print information about all allocated ports in the system
-
- usage: showallports
- """
- with O.table(PrintPortSummary.header):
- port_ty = gettype("struct ipc_port")
- for port in kmemory.Zone("ipc ports").iter_allocated(port_ty):
- PrintPortSummary(value(port.AddressOf()), show_kmsg_summary=False, O=O)
-
-
-# EndMacro: showallports
-# Macro: findkobjectport
-
-
-@lldb_command("findkobjectport", fancy=True)
-def FindKobjectPort(cmd_args=None, cmd_options={}, O=None):
- """Locate all ports pointing to a given kobject
-
- usage: findkobjectport <kobject-addr>
- """
- if cmd_args is None or len(cmd_args) == 0:
- raise ArgumentError()
-
- kobj_addr = unsigned(kern.GetValueFromAddress(cmd_args[0]))
- kmem = kmemory.KMem.get_shared()
- port_ty = gettype("struct ipc_port")
-
- with O.table(PrintPortSummary.header):
- for port in kmemory.Zone("ipc ports").iter_allocated(port_ty):
- otype = port.xGetIntegerByPath(".ip_object.io_type")
- if not IsKObjectType(otype):
- continue
-
- ip_kobject = kmem.make_address(port.xGetScalarByName("ip_kobject"))
- if ip_kobject == kobj_addr:
- PrintPortSummary(value(port.AddressOf()), show_kmsg_summary=False, O=O)
-
-
-# EndMacro: findkobjectport
-# Macro: showtaskbusypsets
-
-
-@lldb_command("showtaskbusypsets", fancy=True)
-def ShowTaskBusyPortSets(cmd_args=None, cmd_options={}, O=None):
- """Routine to print information about port sets belonging to this task that
- have enqueued messages. This is often a sign of a blocked or hung process
- Usage: showtaskbusypsets <task address>
- """
- if cmd_args is None or len(cmd_args) == 0:
- raise ArgumentError("No arguments passed. Please pass in the address of a task")
-
- task = kern.GetValueFromAddress(cmd_args[0], "task_t")
- is_tableval, num_entries = GetSpaceTable(task.itk_space)
-
- if is_tableval:
- psets = GetSpaceObjectsWithBits(
- is_tableval, num_entries, MACH_PORT_RIGHT_PORT_SET, gettype("struct ipc_pset")
- )
-
- with O.table(PrintPortSetSummary.header):
- for pset in (value(v.AddressOf()) for v in psets):
- for wq in Waitq(addressof(pset.ips_wqset)).iterateMembers():
- if wq.asPort().ip_messages.imq_msgcount > 0:
- PrintPortSetSummary(pset, space=task.itk_space, O=O)
-
-
-# EndMacro: showtaskbusyports
-# Macro: showallbusypsets
-
-
-@lldb_command("showallbusypsets", fancy=True)
-def ShowAllBusyPortSets(cmd_args=None, cmd_options={}, O=None):
- """Routine to print information about all port sets on the system that
- have enqueued messages.
- """
- with O.table(PrintPortSetSummary.header):
- pset_ty = gettype("struct ipc_pset")
- for pset in kmemory.Zone("ipc port sets").iter_allocated(pset_ty):
- pset = value(pset.AddressOf())
- for wq in Waitq(addressof(pset.ips_wqset)).iterateMembers():
- port = wq.asPort()
- if port.ip_messages.imq_msgcount > 0:
- PrintPortSetSummary(pset, space=port.ip_receiver, O=O)
-
-
-# EndMacro: showallbusyports
-# Macro: showallpsets
-
-
-@lldb_command("showallpsets", fancy=True)
-def ShowAllPortSets(cmd_args=None, cmd_options={}, O=None):
- """Routine to print information about all allocated psets in the system
-
- usage: showallpsets
- """
- with O.table(PrintPortSetSummary.header):
- pset_ty = gettype("struct ipc_pset")
- for pset in kmemory.Zone("ipc port sets").iter_allocated(pset_ty):
- PrintPortSetSummary(value(pset.AddressOf()), O=O)
-
-
-# EndMacro: showallports
-# Macro: showbusyportsummary
-
-
-@lldb_command("showbusyportsummary")
-def ShowBusyPortSummary(cmd_args=None):
- """Routine to print a summary of information about all receive rights
- on the system that have enqueued messages.
- """
-
- ipc_table_size = 0
- ipc_busy_ports = 0
- ipc_msgs = 0
-
- print(GetTaskBusyIPCSummary.header)
- for tsk in kern.tasks:
- (summary, table_size, nbusy, nmsgs) = GetTaskBusyIPCSummary(tsk)
- ipc_table_size += table_size
- ipc_busy_ports += nbusy
- ipc_msgs += nmsgs
- print(summary)
- for tsk in kern.terminated_tasks:
- (summary, table_size, nbusy, nmsgs) = GetTaskBusyIPCSummary(tsk)
- ipc_table_size += table_size
- ipc_busy_ports += nbusy
- ipc_msgs += nmsgs
- print(summary)
- print(
- "Total Table Size: {:d}, Busy Ports: {:d}, Messages in-flight: {:d}".format(
- ipc_table_size, ipc_busy_ports, ipc_msgs
- )
- )
- return
-
-
-# EndMacro: showbusyportsummary
-# Macro: showport / showpset
-
-
-def ShowPortOrPset(obj, space=0, O=None):
- """Routine that lists details about a given IPC port or pset
- Syntax: (lldb) showport 0xaddr
- """
- if not obj or obj == xnudefines.IPC_OBJECT_DEAD:
- print("IPC_OBJECT_DEAD")
- return
-
- if IsPortSetType(obj.io_type):
- with O.table(PrintPortSetSummary.header):
- PrintPortSetSummary(cast(obj, "ipc_pset_t"), space, O=O)
- else:
- with O.table(PrintPortSummary.header):
- PrintPortSummary(cast(obj, "ipc_port_t"), show_sets=True, O=O)
-
-
-@lldb_command("showport", "K", fancy=True)
-def ShowPort(cmd_args=None, cmd_options={}, O=None):
- """Routine that lists details about a given IPC port
-
- usage: showport <address>
- """
- # -K is default and kept for backward compat, it used to mean "show kmsg queue"
- if cmd_args is None or len(cmd_args) == 0:
- raise ArgumentError("Missing port argument")
-
- obj = kern.GetValueFromAddress(cmd_args[0], "struct ipc_object *")
- ShowPortOrPset(obj, O=O)
-
-
-@lldb_command("showpset", "S:", fancy=True)
-def ShowPSet(cmd_args=None, cmd_options={}, O=None):
- """Routine that prints details for a given ipc_pset *
-
- usage: showpset [-S <space>] <address>
- """
- if cmd_args is None or len(cmd_args) == 0:
- raise ArgumentError("Missing port argument")
-
- space = 0
- if "-S" in cmd_options:
- space = kern.GetValueFromAddress(cmd_options["-S"], "struct ipc_space *")
- obj = kern.GetValueFromAddress(cmd_args[0], "struct ipc_object *")
- ShowPortOrPset(obj, space=space, O=O)
-
-
-# EndMacro: showport / showpset
-# Macro: showkmsg:
-
-
-@lldb_command("showkmsg")
-def ShowKMSG(cmd_args=[]):
- """Show detail information about a <ipc_kmsg_t> structure
- Usage: (lldb) showkmsg <ipc_kmsg_t>
- """
- if cmd_args is None or len(cmd_args) == 0:
- raise ArgumentError("Invalid arguments")
-
- kmsg = kern.GetValueFromAddress(cmd_args[0], "ipc_kmsg_t")
- print(GetKMsgSummary.header)
- print(GetKMsgSummary(kmsg))
-
-
-# EndMacro: showkmsg
-# IPC importance inheritance related macros.
-
-
-@lldb_command("showalliits")
-def ShowAllIITs(cmd_args=[], cmd_options={}):
- """Development only macro. Show list of all iits allocated in the system."""
- try:
- iit_queue = kern.globals.global_iit_alloc_queue
- except ValueError:
- print("This debug macro is only available in development or debug kernels")
- return
-
- print(GetIPCImportantTaskSummary.header)
- for iit in IterateQueue(
- iit_queue, "struct ipc_importance_task *", "iit_allocation"
- ):
- print(GetIPCImportantTaskSummary(iit))
- return
-
-
-@header(
- "{: <18s} {: <3s} {: <18s} {: <32s} {: <18s} {: <8s}".format(
- "ipc_imp_inherit", "don", "to_task", "proc_name", "from_elem", "depth"
- )
-)
-@lldb_type_summary(["ipc_importance_inherit *", "ipc_importance_inherit_t"])
-def GetIPCImportanceInheritSummary(iii):
- """describes iii object of type ipc_importance_inherit_t *"""
- out_str = ""
- fmt = "{o: <#18x} {don: <3s} {o.iii_to_task.iit_task: <#18x} {task_name: <20s} {o.iii_from_elem: <#18x} {o.iii_depth: <#8x}"
- donating_str = ""
- if unsigned(iii.iii_donating):
- donating_str = "DON"
- taskname = GetProcNameForTask(iii.iii_to_task.iit_task)
- if hasattr(iii.iii_to_task, "iit_bsd_pid"):
- taskname = "({:d}) {:s}".format(
- iii.iii_to_task.iit_bsd_pid, iii.iii_to_task.iit_procname
- )
- out_str += fmt.format(o=iii, task_name=taskname, don=donating_str)
- return out_str
-
-
-@static_var("recursion_count", 0)
-@header(
- "{: <18s} {: <4s} {: <8s} {: <8s} {: <18s} {: <18s}".format(
- "iie", "type", "refs", "made", "#kmsgs", "#inherits"
- )
-)
-@lldb_type_summary(["ipc_importance_elem *"])
-def GetIPCImportanceElemSummary(iie):
- """describes an ipc_importance_elem * object"""
-
- if GetIPCImportanceElemSummary.recursion_count > 500:
- GetIPCImportanceElemSummary.recursion_count = 0
- return "Recursion of 500 reached"
-
- out_str = ""
- fmt = "{: <#18x} {: <4s} {: <8d} {: <8d} {: <#18x} {: <#18x}"
- if unsigned(iie.iie_bits) & xnudefines.IIE_TYPE_MASK:
- type_str = "INH"
- inherit_count = 0
- else:
- type_str = "TASK"
- iit = Cast(iie, "struct ipc_importance_task *")
- inherit_count = sum(
- 1
- for i in IterateQueue(
- iit.iit_inherits, "struct ipc_importance_inherit *", "iii_inheritance"
- )
- )
-
- refs = unsigned(iie.iie_bits) >> xnudefines.IIE_TYPE_BITS
- made_refs = unsigned(iie.iie_made)
- kmsg_count = sum(
- 1 for i in IterateQueue(iie.iie_kmsgs, "struct ipc_kmsg *", "ikm_inheritance")
- )
- out_str += fmt.format(iie, type_str, refs, made_refs, kmsg_count, inherit_count)
- if config["verbosity"] > vHUMAN:
- if kmsg_count > 0:
- out_str += "\n\t" + GetKMsgSummary.header
- for k in IterateQueue(
- iie.iie_kmsgs, "struct ipc_kmsg *", "ikm_inheritance"
- ):
- out_str += (
- "\t"
- + "{: <#18x}".format(GetKmsgHeader(k).msgh_remote_port)
- + " "
- + GetKMsgSummary(k, "\t").lstrip()
- )
- out_str += "\n"
- if inherit_count > 0:
- out_str += "\n\t" + GetIPCImportanceInheritSummary.header + "\n"
- for i in IterateQueue(
- iit.iit_inherits, "struct ipc_importance_inherit *", "iii_inheritance"
- ):
- out_str += "\t" + GetIPCImportanceInheritSummary(i) + "\n"
- out_str += "\n"
- if type_str == "INH":
- iii = Cast(iie, "struct ipc_importance_inherit *")
- out_str += "Inherit from: " + GetIPCImportanceElemSummary(iii.iii_from_elem)
-
- return out_str
-
-
-@header("{: <18s} {: <18s} {: <32}".format("iit", "task", "name"))
-@lldb_type_summary(["ipc_importance_task *"])
-def GetIPCImportantTaskSummary(iit):
- """iit is a ipc_importance_task value object."""
- fmt = "{: <#18x} {: <#18x} {: <32}"
- out_str = ""
- pname = GetProcNameForTask(iit.iit_task)
- if hasattr(iit, "iit_bsd_pid"):
- pname = "({:d}) {:s}".format(iit.iit_bsd_pid, iit.iit_procname)
- out_str += fmt.format(iit, iit.iit_task, pname)
- return out_str
-
-
-@lldb_command("showallimportancetasks")
-def ShowIPCImportanceTasks(cmd_args=[], cmd_options={}):
- """display a list of all tasks with ipc importance information.
- Usage: (lldb) showallimportancetasks
- Tip: add "-v" to see detailed information on each kmsg or inherit elems
- """
- print(
- " "
- + GetIPCImportantTaskSummary.header
- + " "
- + GetIPCImportanceElemSummary.header
- )
- for t in kern.tasks:
- s = ""
- if unsigned(t.task_imp_base):
- s += " " + GetIPCImportantTaskSummary(t.task_imp_base)
- s += " " + GetIPCImportanceElemSummary(addressof(t.task_imp_base.iit_elem))
- print(s)
-
-
-@lldb_command("showipcimportance", "")
-def ShowIPCImportance(cmd_args=[], cmd_options={}):
- """Describe an importance from <ipc_importance_elem_t> argument.
- Usage: (lldb) showimportance <ipc_importance_elem_t>
- """
- if cmd_args is None or len(cmd_args) == 0:
- raise ArgumentError("Please provide valid argument")
-
- elem = kern.GetValueFromAddress(cmd_args[0], "ipc_importance_elem_t")
- print(GetIPCImportanceElemSummary.header)
- print(GetIPCImportanceElemSummary(elem))
-
-
-@header(
- "{: <18s} {: <18s} {: <8s} {: <5s} {: <5s} {: <8s}".format(
- "ivac", "tbl", "tblsize", "index", "Grow", "freelist"
- )
-)
-@lldb_type_summary(["ipc_voucher_attr_control *", "ipc_voucher_attr_control_t"])
-def GetIPCVoucherAttrControlSummary(ivac):
- """describes a voucher attribute control settings"""
- out_str = ""
- fmt = "{c: <#18x} {c.ivac_table: <#18x} {c.ivac_table_size: <8d} {c.ivac_key_index: <5d} {growing: <5s} {c.ivac_freelist: <8d}"
- growing_str = ""
-
- if ivac == 0:
- return "{: <#18x}".format(ivac)
-
- growing_str = "Y" if unsigned(ivac.ivac_is_growing) else "N"
- out_str += fmt.format(c=ivac, growing=growing_str)
- return out_str
-
-
-@lldb_command("showivac", "")
-def ShowIPCVoucherAttributeControl(cmd_args=[], cmd_options={}):
- """Show summary of voucher attribute contols.
- Usage: (lldb) showivac <ipc_voucher_attr_control_t>
- """
- if cmd_args is None or len(cmd_args) == 0:
- raise ArgumentError("Please provide correct arguments.")
- ivac = kern.GetValueFromAddress(cmd_args[0], "ipc_voucher_attr_control_t")
- print(GetIPCVoucherAttrControlSummary.header)
- print(GetIPCVoucherAttrControlSummary(ivac))
- if config["verbosity"] > vHUMAN:
- cur_entry_index = 0
- last_entry_index = unsigned(ivac.ivac_table_size)
- print("index " + GetIPCVoucherAttributeEntrySummary.header)
- while cur_entry_index < last_entry_index:
- print(
- "{: <5d} ".format(cur_entry_index)
- + GetIPCVoucherAttributeEntrySummary(
- addressof(ivac.ivac_table[cur_entry_index])
- )
- )
- cur_entry_index += 1
-
-
-@header(
- "{: <18s} {: <30s} {: <30s} {: <30s} {: <30s}".format(
- "ivam", "get_value_fn", "extract_fn", "release_value_fn", "command_fn"
- )
-)
-@lldb_type_summary(["ipc_voucher_attr_manager *", "ipc_voucher_attr_manager_t"])
-def GetIPCVoucherAttrManagerSummary(ivam):
- """describes a voucher attribute manager settings"""
- out_str = ""
- fmt = "{: <#18x} {: <30s} {: <30s} {: <30s} {: <30s}"
-
- if unsigned(ivam) == 0:
- return "{: <#18x}".format(ivam)
-
- get_value_fn = kern.Symbolicate(unsigned(ivam.ivam_get_value))
- extract_fn = kern.Symbolicate(unsigned(ivam.ivam_extract_content))
- release_value_fn = kern.Symbolicate(unsigned(ivam.ivam_release_value))
- command_fn = kern.Symbolicate(unsigned(ivam.ivam_command))
- out_str += fmt.format(ivam, get_value_fn, extract_fn, release_value_fn, command_fn)
- return out_str
-
-
-def iv_key_to_index(key):
- """ref: osfmk/ipc/ipc_voucher.c: iv_key_to_index"""
- if (key == xnudefines.MACH_VOUCHER_ATTR_KEY_ALL) or (
- key > xnudefines.MACH_VOUCHER_ATTR_KEY_NUM
- ):
- return xnudefines.IV_UNUSED_KEYINDEX
- return key - 1
-
-
-def iv_index_to_key(index):
- """ref: osfmk/ipc/ipc_voucher.c: iv_index_to_key"""
- if index < xnudefines.MACH_VOUCHER_ATTR_KEY_NUM_WELL_KNOWN:
- return index + 1
- return xnudefines.MACH_VOUCHER_ATTR_KEY_NONE
-
-
-@header(
- "{: <3s} {: <3s} {:s} {:s}".format(
- "idx",
- "key",
- GetIPCVoucherAttrControlSummary.header.strip(),
- GetIPCVoucherAttrManagerSummary.header.strip(),
- )
-)
-@lldb_type_summary(
- ["ipc_voucher_global_table_element *", "ipc_voucher_global_table_element_t"]
-)
-def GetIPCVoucherGlobalTableElementSummary(idx, ivac, ivam):
- """describes a ipc_voucher_global_table_element object"""
- out_str = ""
- fmt = "{idx: <3d} {key: <3d} {ctrl_s:s} {mgr_s:s}"
- out_str += fmt.format(
- idx=idx,
- key=iv_index_to_key(idx),
- ctrl_s=GetIPCVoucherAttrControlSummary(addressof(ivac)),
- mgr_s=GetIPCVoucherAttrManagerSummary(ivam),
- )
- return out_str
-
-
-@lldb_command("showglobalvouchertable", "")
-def ShowGlobalVoucherTable(cmd_args=[], cmd_options={}):
- """show detailed information of all voucher attribute managers registered with vouchers system
- Usage: (lldb) showglobalvouchertable
- """
- entry_size = sizeof(kern.globals.ivac_global_table[0])
- elems = sizeof(kern.globals.ivac_global_table) // entry_size
- print(GetIPCVoucherGlobalTableElementSummary.header)
- for i in range(elems):
- ivac = kern.globals.ivac_global_table[i]
- ivam = kern.globals.ivam_global_table[i]
- if unsigned(ivam) == 0:
- continue
- print(GetIPCVoucherGlobalTableElementSummary(i, ivac, ivam))
-
-
-# Type summaries for Bag of Bits.
-
-
-@lldb_type_summary(["user_data_value_element", "user_data_element_t"])
-@header(
- "{0: <20s} {1: <16s} {2: <20s} {3: <20s} {4: <16s} {5: <20s}".format(
- "user_data_ve", "maderefs", "checksum", "hash value", "size", "data"
- )
-)
-def GetBagofBitsElementSummary(data_element):
- """Summarizes the Bag of Bits element
- params: data_element = value of the object of type user_data_value_element_t
- returns: String with summary of the type.
- """
- format_str = "{0: <#20x} {1: <16d} {2: <#20x} {3: <#20x} {4: <16d}"
- out_string = format_str.format(
- data_element,
- unsigned(data_element.e_made),
- data_element.e_sum,
- data_element.e_hash,
- unsigned(data_element.e_size),
- )
- out_string += " 0x"
-
- for i in range(0, (unsigned(data_element.e_size) - 1)):
- out_string += "{:02x}".format(int(data_element.e_data[i]))
- return out_string
-
-
-def GetIPCHandleSummary(handle_ptr):
- """converts a handle value inside a voucher attribute table to ipc element and returns appropriate summary.
- params: handle_ptr - uint64 number stored in handle of voucher.
- returns: str - string summary of the element held in internal structure
- """
- elem = kern.GetValueFromAddress(handle_ptr, "ipc_importance_elem_t")
- if elem.iie_bits & xnudefines.IIE_TYPE_MASK:
- iie = Cast(elem, "struct ipc_importance_inherit *")
- return GetIPCImportanceInheritSummary(iie)
- else:
- iit = Cast(elem, "struct ipc_importance_task *")
- return GetIPCImportantTaskSummary(iit)
-
-
-def GetATMHandleSummary(handle_ptr):
- """Convert a handle value to atm value and returns corresponding summary of its fields.
- params: handle_ptr - uint64 number stored in handle of voucher
- returns: str - summary of atm value
- """
- return "???"
-
-
-def GetBankHandleSummary(handle_ptr):
- """converts a handle value inside a voucher attribute table to bank element and returns appropriate summary.
- params: handle_ptr - uint64 number stored in handle of voucher.
- returns: str - summary of bank element
- """
- if handle_ptr == 1:
- return "Bank task of Current task"
- elem = kern.GetValueFromAddress(handle_ptr, "bank_element_t")
- if elem.be_type & 1:
- ba = Cast(elem, "struct bank_account *")
- return GetBankAccountSummary(ba)
- else:
- bt = Cast(elem, "struct bank_task *")
- return GetBankTaskSummary(bt)
-
-
-def GetBagofBitsHandleSummary(handle_ptr):
- """Convert a handle value to bag of bits value and returns corresponding summary of its fields.
- params: handle_ptr - uint64 number stored in handle of voucher
- returns: str - summary of bag of bits element
- """
- elem = kern.GetValueFromAddress(handle_ptr, "user_data_element_t")
- return GetBagofBitsElementSummary(elem)
-
-
-@static_var(
- "attr_managers",
- {
- 1: GetATMHandleSummary,
- 2: GetIPCHandleSummary,
- 3: GetBankHandleSummary,
- 7: GetBagofBitsHandleSummary,
- },
-)
-def GetHandleSummaryForKey(handle_ptr, key_num):
- """Get a summary of handle pointer from the voucher attribute manager.
- For example key 2 -> ipc and it puts either ipc_importance_inherit_t or ipc_important_task_t.
- key 3 -> Bank and it puts either bank_task_t or bank_account_t.
- key 7 -> Bag of Bits and it puts user_data_element_t in handle. So summary of it would be Bag of Bits content and refs etc.
- """
- key_num = int(key_num)
- if key_num not in GetHandleSummaryForKey.attr_managers:
- return "Unknown key %d" % key_num
- return GetHandleSummaryForKey.attr_managers[key_num](handle_ptr)
-
-
-@header(
- "{: <18s} {: <18s} {: <10s} {: <4s} {: <18s} {: <18s}".format(
- "ivace", "value_handle", "#refs", "rel?", "maderefs", "next_layer"
- )
-)
-@lldb_type_summary(["ivac_entry *", "ivac_entry_t"])
-def GetIPCVoucherAttributeEntrySummary(ivace, manager_key_num=0):
- """Get summary for voucher attribute entry."""
- out_str = ""
- fmt = "{e: <#18x} {e.ivace_value: <#18x} {e.ivace_refs: <10d} {release: <4s} {made_refs: <18s} {next_layer: <18s}"
- release_str = ""
- free_str = ""
- made_refs = ""
- next_layer = ""
-
- if unsigned(ivace.ivace_releasing):
- release_str = "Y"
- if unsigned(ivace.ivace_free):
- free_str = "F"
- if unsigned(ivace.ivace_layered):
- next_layer = "{: <#18x}".format(ivace.ivace_u.ivaceu_layer)
- else:
- made_refs = "{: <18d}".format(ivace.ivace_u.ivaceu_made)
-
- out_str += fmt.format(
- e=ivace, release=release_str, made_refs=made_refs, next_layer=next_layer
- )
- if config["verbosity"] > vHUMAN and manager_key_num > 0:
- out_str += " " + GetHandleSummaryForKey(
- unsigned(ivace.ivace_value), manager_key_num
- )
- if config["verbosity"] > vHUMAN:
- out_str += " {: <2s} {: <4d} {: <4d}".format(
- free_str, ivace.ivace_next, ivace.ivace_index
- )
- return out_str
-
-
-@lldb_command("showivacfreelist", "")
-def ShowIVACFreeList(cmd_args=[], cmd_options={}):
- """Walk the free list and print every entry in the list.
- usage: (lldb) showivacfreelist <ipc_voucher_attr_control_t>
- """
- if cmd_args is None or len(cmd_args) == 0:
- raise ArgumentError("Please provide <ipc_voucher_attr_control_t>")
-
- ivac = kern.GetValueFromAddress(cmd_args[0], "ipc_voucher_attr_control_t")
- print(GetIPCVoucherAttrControlSummary.header)
- print(GetIPCVoucherAttrControlSummary(ivac))
- if unsigned(ivac.ivac_freelist) == 0:
- print("ivac table is full")
- return
- print("index " + GetIPCVoucherAttributeEntrySummary.header)
- next_free = unsigned(ivac.ivac_freelist)
- while next_free != 0:
- print(
- "{: <5d} ".format(next_free)
- + GetIPCVoucherAttributeEntrySummary(addressof(ivac.ivac_table[next_free]))
- )
- next_free = unsigned(ivac.ivac_table[next_free].ivace_next)
-
-
-@header(
- "{: <18s} {: <8s} {: <18s} {: <18s}".format(
- "ipc_voucher", "refs", "table", "voucher_port"
- )
-)
-@lldb_type_summary(["ipc_voucher *", "ipc_voucher_t"])
-def GetIPCVoucherSummary(voucher, show_entries=False):
- """describe a voucher from its ipc_voucher * object"""
- out_str = ""
- fmt = "{v: <#18x} {v.iv_refs: <8d} {table_addr: <#18x} {v.iv_port: <#18x}"
- out_str += fmt.format(v=voucher, table_addr=addressof(voucher.iv_table))
- entries_str = ""
- if show_entries or config["verbosity"] > vHUMAN:
- elems = sizeof(voucher.iv_table) // sizeof(voucher.iv_table[0])
- entries_header_str = (
- "\n\t"
- + "{: <5s} {: <3s} {: <16s} {: <30s}".format(
- "index", "key", "value_index", "manager"
- )
- + " "
- + GetIPCVoucherAttributeEntrySummary.header
- )
- fmt = "{: <5d} {: <3d} {: <16d} {: <30s}"
- for i in range(elems):
- voucher_entry_index = unsigned(voucher.iv_table[i])
- if voucher_entry_index:
- s = fmt.format(
- i,
- GetVoucherManagerKeyForIndex(i),
- voucher_entry_index,
- GetVoucherAttributeManagerNameForIndex(i),
- )
- e = GetVoucherValueHandleFromVoucherForIndex(voucher, i)
- if e is not None:
- s += " " + GetIPCVoucherAttributeEntrySummary(
- addressof(e), GetVoucherManagerKeyForIndex(i)
- )
- if entries_header_str:
- entries_str = entries_header_str
- entries_header_str = ""
- entries_str += "\n\t" + s
- if not entries_header_str:
- entries_str += "\n\t"
- out_str += entries_str
- return out_str
-
-
-def GetVoucherManagerKeyForIndex(idx):
- """Returns key number for index based on global table. Will raise index error if value is incorrect"""
- ret = iv_index_to_key(idx)
- if ret == xnudefines.MACH_VOUCHER_ATTR_KEY_NONE:
- raise IndexError("invalid voucher key")
- return ret
-
-
-def GetVoucherAttributeManagerForKey(k):
- """Return the attribute manager name for a given key
- params: k - int key number of the manager
- return: cvalue - the attribute manager object.
- None - if not found
- """
- idx = iv_key_to_index(k)
- if idx == xnudefines.IV_UNUSED_KEYINDEX:
- return None
- return kern.globals.ivam_global_table[idx]
-
-
-def GetVoucherAttributeControllerForKey(k):
- """Return the attribute controller for a given key
- params: k - int key number of the controller
- return: cvalue - the attribute controller object.
- None - if not found
- """
- idx = iv_key_to_index(k)
- if idx == xnudefines.IV_UNUSED_KEYINDEX:
- return None
- return kern.globals.ivac_global_table[idx]
-
-
-def GetVoucherAttributeManagerName(ivam):
- """find the name of the ivam object
- param: ivam - cvalue object of type ipc_voucher_attr_manager_t
- returns: str - name of the manager
- """
- return kern.Symbolicate(unsigned(ivam))
-
-
-def GetVoucherAttributeManagerNameForIndex(idx):
- """get voucher attribute manager name for index
- return: str - name of the attribute manager object
- """
- return GetVoucherAttributeManagerName(
- GetVoucherAttributeManagerForKey(GetVoucherManagerKeyForIndex(idx))
- )
-
-
-def GetVoucherValueHandleFromVoucherForIndex(voucher, idx):
- """traverse the voucher attrs and get value_handle in the voucher attr controls table
- params:
- voucher - cvalue object of type ipc_voucher_t
- idx - int index in the entries for which you wish to get actual handle for
- returns: cvalue object of type ivac_entry_t
- None if no handle found.
- """
- manager_key = GetVoucherManagerKeyForIndex(idx)
- voucher_num_elems = sizeof(voucher.iv_table) // sizeof(voucher.iv_table[0])
- if idx >= voucher_num_elems:
- debuglog("idx %d is out of range max: %d" % (idx, voucher_num_elems))
- return None
- voucher_entry_value = unsigned(voucher.iv_table[idx])
- debuglog("manager_key %d" % manager_key)
- ivac = GetVoucherAttributeControllerForKey(manager_key)
- if ivac is None or addressof(ivac) == 0:
- debuglog("No voucher attribute controller for idx %d" % idx)
- return None
-
- ivace_table = ivac.ivac_table
- if voucher_entry_value >= unsigned(ivac.ivac_table_size):
- print(
- "Failed to get ivace for value %d in table of size %d"
- % (voucher_entry_value, unsigned(ivac.ivac_table_size))
- )
- return None
- return ivace_table[voucher_entry_value]
-
-
-@lldb_command("showallvouchers")
-def ShowAllVouchers(cmd_args=[], cmd_options={}):
- """Display a list of all vouchers in the global voucher hash table
- Usage: (lldb) showallvouchers
- """
- print(GetIPCVoucherSummary.header)
- voucher_ty = gettype("struct ipc_voucher")
- for v in kmemory.Zone("ipc vouchers").iter_allocated(voucher_ty):
- print(GetIPCVoucherSummary(value(v.AddressOf())))
-
-
-@lldb_command("showvoucher", "")
-def ShowVoucher(cmd_args=[], cmd_options={}):
- """Describe a voucher from <ipc_voucher_t> argument.
- Usage: (lldb) showvoucher <ipc_voucher_t>
- """
- if cmd_args is None or len(cmd_args) == 0:
- raise ArgumentError("Please provide valid argument")
-
- voucher = kern.GetValueFromAddress(cmd_args[0], "ipc_voucher_t")
- print(GetIPCVoucherSummary.header)
- print(GetIPCVoucherSummary(voucher, show_entries=True))
-
-
-@lldb_command("showportsendrights")
-def ShowPortSendRights(cmd_args=[], cmd_options={}):
- """Display a list of send rights across all tasks for a given port.
- Usage: (lldb) showportsendrights <ipc_port_t>
- """
- if cmd_args is None or len(cmd_args) == 0:
- raise ArgumentError("no port address provided")
-
- port = kern.GetValueFromAddress(cmd_args[0], "struct ipc_port *")
- if not port or port == xnudefines.MACH_PORT_DEAD:
- return
-
- return FindPortRights(cmd_args=[unsigned(port)], cmd_options={"-R": "S"})
-
-
-@lldb_command("showtasksuspenders")
-def ShowTaskSuspenders(cmd_args=[], cmd_options={}):
- """Display the tasks and send rights that are holding a target task suspended.
- Usage: (lldb) showtasksuspenders <task_t>
- """
- if cmd_args is None or len(cmd_args) == 0:
- raise ArgumentError("no task address provided")
-
- task = kern.GetValueFromAddress(cmd_args[0], "task_t")
-
- if task.suspend_count == 0:
- print(
- "task {:#x} ({:s}) is not suspended".format(
- unsigned(task), GetProcNameForTask(task)
- )
- )
- return
-
- # If the task has been suspended by the kernel (potentially by
- # kperf, using task_suspend_internal) or a client of task_suspend2
- # that does not convert its task suspension token to a port using
- # convert_task_suspension_token_to_port, then it's impossible to determine
- # which task did the suspension.
- port = task.itk_resume
- if task.pidsuspended:
- print(
- 'task {:#x} ({:s}) has been `pid_suspend`ed. (Probably runningboardd\'s fault. Go look at the syslog for "Suspending task.")'.format(
- unsigned(task), GetProcNameForTask(task)
- )
- )
- return
- elif not port:
- print(
- "task {:#x} ({:s}) is suspended but no resume port exists".format(
- unsigned(task), GetProcNameForTask(task)
- )
- )
- return
-
- return FindPortRights(cmd_args=[unsigned(port)], cmd_options={"-R": "S"})
-
-
-@lldb_command("showthreadsuspenders")
-def ShowThreadSuspenders(cmd_args=[], cmd_options={}):
- """Display the tasks and send rights that are holding a target thread suspended.
- Usage: (lldb) showthreadsuspenders <thread_t>
- """
- if cmd_args is None or len(cmd_args) == 0:
- raise ArgumentError("no thread address provided")
-
- thread = kern.GetValueFromAddress(cmd_args[0], "thread_t")
- task = thread.t_tro.tro_task
-
- if thread.user_stop_count == 0:
- print(
- "thread {:#x} ({:s}) is not suspended".format(
- unsigned(thread), GetProcNameForTask(task)
- )
- )
- return
-
- # If the thread is not suspended by thread_suspend2, or if the client of
- # thread_suspend2 does not convert its thread suspension token to a port using
- # convert_thread_suspension_token_to_port, then it's impossible to determine
- # which task did the suspension.
- port = thread.thread_resume_port
- if not port:
- print(
- "thread {:#x} ({:s}) is suspended but no resume port exists".format(
- unsigned(thread), GetProcNameForTask(task)
- )
- )
- return
- print(
- "thread {:#x} ({:s}) is suspended. resume port {:#x}".format(
- unsigned(thread), GetProcNameForTask(task), unsigned(port)
- )
- )
- return FindPortRights(cmd_args=[unsigned(port)])
-
-
-# Macro: showmqueue:
-@lldb_command("showmqueue", fancy=True)
-def ShowMQueue(cmd_args=None, cmd_options={}, O=None):
- """Routine that lists details about a given mqueue.
- An mqueue is directly tied to a mach port, so it just shows the details of that port.
- Syntax: (lldb) showmqueue <address>
- """
- if cmd_args is None or len(cmd_args) == 0:
- raise ArgumentError("Missing mqueue argument")
-
- kern.GetValueFromAddress(cmd_args[0], "struct ipc_mqueue *")
- portoff = getfieldoffset("struct ipc_port", "ip_messages")
- port = unsigned(ArgumentStringToInt(cmd_args[0])) - unsigned(portoff)
- obj = kern.GetValueFromAddress(port, "struct ipc_object *")
- ShowPortOrPset(obj, O=O)
-
-
-# EndMacro: showmqueue