view src/parpg/grease/collision.py @ 39:8b3890f17f94

FifeManager no longer subclasses ApplicationBase PARPGApplication stores an instance of tbe manager instead
author KarstenBock@gmx.net
date Wed, 03 Aug 2011 22:57:52 +0200
parents 09b581087d68
children
line wrap: on
line source

#############################################################################
#
# Copyright (c) 2010 by Casey Duncan and contributors
# All Rights Reserved.
#
# This software is subject to the provisions of the MIT License
# A copy of the license should accompany this distribution.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
#
#############################################################################
"""
**Grease collision detection systems**

Grease uses two-phase broad and narrow collision detection. *Broad-phase*
collision systems are used to efficiently identify pairs that may be colliding
without resorting to a brute-force check of all possible pairs. *Narrow-phase*
collision systems use the pairs generated by the broad-phase and perform more
precise collision tests to determine if a collision has actually occurred. The
narrow-phase system also calculates more details about each collision,
including collision point and normal vector for use in collision response.

A typical collision detection system consists of a narrow-phase system that
contains a broad-phased system. The narrow-phase system is usually the only

one that the application directly interacts with, though the application is
free to use the broad-phased system directly if desired. This could be
useful in cases where speed, rather than precision is paramount.

The narrow-phase system can be assigned handler objects to run after
collision detection. These can perform tasks like handling collision response
or dispatching collision events to application handlers.

Note that broad-phase systems can return false positives, though they should
never return false negatives. Do not assume that all pairs returned by a
broad-phase system are actually in collision.
"""

__version__ = '$Id$'

from grease.geometry import Vec2d
from bisect import bisect_right


class Pair(tuple):
	"""Pair of entities in collision. This is an ordered sequence of two
	entities, that compares and hashes unordered.
	
	Also stores additional collision point and normal vectors
	for each entity.

	Sets of ``Pair`` objects are exposed in the ``collision_pairs``
	attribute of collision systems to indicate the entity pairs in
	collision.
	"""
	info = None
	"""A sequence of (entity, collision point, collision normal)
	for each entity in the pair
	"""

	def __new__(cls, entity1, entity2, point=None, normal=None):
		pair = tuple.__new__(cls, (entity1, entity2))
		return pair
	
	def __hash__(self):
		return hash(self[0]) ^ hash(self[1])
	
	def __eq__(self, other):
		other = tuple(other)
		return tuple(self) == other or (self[1], self[0]) == other
	
	def __repr__(self):
		return '%s%r' % (self.__class__.__name__, tuple(self))
	
	def set_point_normal(self, point0, normal0, point1, normal1):
		"""Set the collision point and normal for both entities"""
		self.info = (
			(self[0], point0, normal0),
			(self[1], point1, normal1),
		)


class BroadSweepAndPrune(object):
	"""2D Broad-phase sweep and prune bounding box collision detector

	This algorithm is efficient for collision detection between many
	moving bodies. It has linear algorithmic complexity and takes
	advantage of temporal coherence between frames. It also does
	not suffer from bad worst-case performance (like RDC can). 
	Unlike spacial hashing, it does not need to be optimized for 
	specific space and body sizes.

	Other algorithms may be more efficient for collision detection with
	stationary bodies, bodies that are always evenly distributed, or ad-hoc
	queries.

	:param collision_component: Name of the collision component used by this
		system, defaults to 'collision'. This component supplies each
		entities' aabb and collision masks.
	:type collision_component: str
	"""
	world = None
	"""|World| object this system belongs to"""

	collision_component = None
	"""Name of world's collision component used by this system"""

	LEFT_ATTR = "left"
	RIGHT_ATTR = "right"
	TOP_ATTR = "top"
	BOTTOM_ATTR = "bottom"

	def __init__(self, collision_component='collision'):
		self.collision_component = collision_component
		self._by_x = None
		self._by_y = None
		self._collision_pairs = None
	
	def set_world(self, world):
		"""Bind the system to a world"""
		self.world = world
	
	def step(self, dt):
		"""Update the system for this time step, updates and sorts the 
		axis arrays.
		"""
		component = getattr(self.world.components, self.collision_component)
		LEFT = self.LEFT_ATTR
		RIGHT = self.RIGHT_ATTR
		TOP = self.TOP_ATTR
		BOTTOM = self.BOTTOM_ATTR
		if self._by_x is None:
			# Build axis lists from scratch
			# Note we cache the box positions here
			# so that we can perform hit tests efficiently
			# it also isolates us from changes made to the 
			# box positions after we run
			by_x = self._by_x = []
			append_x = by_x.append
			by_y = self._by_y = []
			append_y = by_y.append
			for data in component.itervalues():
				append_x([data.aabb.left, LEFT, data])
				append_x([data.aabb.right, RIGHT, data])
				append_y([data.aabb.bottom, BOTTOM, data])
				append_y([data.aabb.top, TOP, data])
		else:
			by_x = self._by_x
			by_y = self._by_y
			removed = []
			for entry in by_x:
				entry[0] = getattr(entry[2].aabb, entry[1])
			for entry in by_y:
				entry[0] = getattr(entry[2].aabb, entry[1])
			# Removing entities is inefficient, but expected to be rare
			if component.deleted_entities:
				deleted_entities = component.deleted_entities
				deleted_x = []
				deleted_y = []
				for i, (_, _, data) in enumerate(by_x):
					if data.entity in deleted_entities:
						deleted_x.append(i)
				deleted_x.reverse()
				for i in deleted_x:
					del by_x[i]
				for i, (_, _, data) in enumerate(by_y):
					if data.entity in deleted_entities:
						deleted_y.append(i)
				deleted_y.reverse()
				for i in deleted_y:
					del by_y[i]
			# Tack on new entities
			for entity in component.new_entities:
				data = component[entity]
				by_x.append([data.aabb.left, LEFT, data])
				by_x.append([data.aabb.right, RIGHT, data])
				by_y.append([data.aabb.bottom, BOTTOM, data])
				by_y.append([data.aabb.top, TOP, data])
				
		# Tim-sort is highly efficient with mostly sorted lists.
		# Because positions tend to change little each frame
		# we take advantage of this here. Obviously things are
		# less efficient with very fast moving, or teleporting entities
		by_x.sort()
		by_y.sort()
		self._collision_pairs = None
	
	@property
	def collision_pairs(self):
		"""Set of candidate collision pairs for this timestep"""
		if self._collision_pairs is None:
			if self._by_x is None:
				# Axis arrays not ready
				return set()

			LEFT = self.LEFT_ATTR
			RIGHT = self.RIGHT_ATTR
			TOP = self.TOP_ATTR
			BOTTOM = self.BOTTOM_ATTR
			# Build candidates overlapping along the x-axis
			component = getattr(self.world.components, self.collision_component)
			xoverlaps = set()
			add_xoverlap = xoverlaps.add
			discard_xoverlap = xoverlaps.discard
			open = {}
			for _, side, data in self._by_x:
				if side is LEFT:
					for open_entity, (from_mask, into_mask) in open.iteritems():
						if data.from_mask & into_mask or from_mask & data.into_mask:
							add_xoverlap(Pair(data.entity, open_entity))
					open[data.entity] = (data.from_mask, data.into_mask)
				elif side is RIGHT:
					del open[data.entity]

			if len(xoverlaps) <= 10 and len(xoverlaps)*4 < len(self._by_y):
				# few candidates were found, so just scan the x overlap candidates
				# along y. This requires an additional sort, but it should
				# be cheaper than scanning everyone and its simpler
				# than a separate brute-force check
				entities = set([entity for entity, _ in xoverlaps] 
					+ [entity for _, entity in xoverlaps])
				by_y = []
				for entity in entities:
					data = component[entity]
					# We can use tuples here, which are cheaper to create
					by_y.append((data.aabb.bottom, BOTTOM, data))
					by_y.append((data.aabb.top, TOP, data))
				by_y.sort()
			else:
				by_y = self._by_y

			# Now check the candidates along the y-axis
			open = set()
			add_open = open.add
			discard_open = open.discard
			self._collision_pairs = set()
			add_pair = self._collision_pairs.add
			for _, side, data in by_y:
				if side is BOTTOM:
					for open_entity in open:
						pair = Pair(data.entity, open_entity)
						if pair in xoverlaps:
							discard_xoverlap(pair)
							add_pair(pair)
							if not xoverlaps:
								# No more candidates, bail
								return self._collision_pairs
					add_open(data.entity)
				elif side is TOP:
					discard_open(data.entity)
		return self._collision_pairs
	
	def query_point(self, x_or_point, y=None, from_mask=0xffffffff):
		"""Hit test at the point specified. 

		:param x_or_point: x coordinate (float) or sequence of (x, y) floats.

		:param y: y coordinate (float) if x is not a sequence

		:param from_mask: Bit mask used to filter query results. This value
			is bit ANDed with candidate entities' ``collision.into_mask``.
			If the result is non-zero, then it is considered a hit. By
			default all entities colliding with the input point are
			returned.

		:return: A set of entities where the point is inside their bounding
			boxes as of the last time step.
		"""
		if self._by_x is None:
			# Axis arrays not ready
			return set()
		if y is None:
			x, y = x_or_point
		else:
			x = x_or_point
		LEFT = self.LEFT_ATTR
		RIGHT = self.RIGHT_ATTR
		TOP = self.TOP_ATTR
		BOTTOM = self.BOTTOM_ATTR
		x_index = bisect_right(self._by_x, [x])
		x_hits = set()
		add_x_hit = x_hits.add
		discard_x_hit = x_hits.discard
		if x_index <= len(self._by_x) // 2:
			# closer to the left, scan from left to right
			while (x == self._by_x[x_index][0] 
				and self._by_x[x_index][1] is LEFT 
				and x_index < len(self._by_x)):
				# Ensure we hit on exact left edge matches
				x_index += 1
			for _, side, data in self._by_x[:x_index]:
				if side is LEFT and from_mask & data.into_mask:
					add_x_hit(data.entity)
				else:
					discard_x_hit(data.entity)
		else:
			# closer to the right
			for _, side, data in reversed(self._by_x[x_index:]):
				if side is RIGHT and from_mask & data.into_mask:
					add_x_hit(data.entity)
				else:
					discard_x_hit(data.entity)
		if not x_hits:
			return x_hits

		y_index = bisect_right(self._by_y, [y])
		y_hits = set()
		add_y_hit = y_hits.add
		discard_y_hit = y_hits.discard
		if y_index <= len(self._by_y) // 2:
			# closer to the bottom
			while (y == self._by_y[y_index][0] 
				and self._by_y[y_index][1] is BOTTOM 
				and y_index < len(self._by_y)):
				# Ensure we hit on exact bottom edge matches
				y_index += 1
			for _, side, data in self._by_y[:y_index]:
				if side is BOTTOM:
					add_y_hit(data.entity)
				else:
					discard_y_hit(data.entity)
		else:
			# closer to the top
			for _, side, data in reversed(self._by_y[y_index:]):
				if side is TOP:
					add_y_hit(data.entity)
				else:
					discard_y_hit(data.entity)
		if y_hits:
			return x_hits & y_hits
		else:
			return y_hits


class Circular(object):
	"""Basic narrow-phase collision detector which treats all entities as
	circles with their radius defined in the collision component.

	:param handlers: A sequence of collision handler functions that are invoked
		after collision detection.
	:type handlers: sequence of functions
	
	:param collision_component: Name of collision component for this system,
		defaults to 'collision'. This supplies each entity's collision
		radius and masks.
	:type collision_component: str

	:param position_component: Name of position component for this system,
		defaults to 'position'. This supplies each entity's position.
	:type position_component: str

	:param update_aabbs: If True (the default), then the entities'
		`collision.aabb` fields will be updated using their position
		and collision radius before invoking the broad phase system. 
		Set this False if another system updates the aabbs.
	:type update_aabbs: bool

	:param broad_phase: A broad-phase collision system to use as a source
		for collision pairs. If not specified, a :class:`BroadSweepAndPrune`
		system will be created automatically.
	"""
	world = None
	"""|World| object this system belongs to"""

	position_component = None
	"""Name of world's position component used by this system"""

	collision_component = None
	"""Name of world's collision component used by this system"""

	update_aabbs = True
	"""Flag to indicate whether the system updates the entities' `collision.aabb`
	field before invoking the broad phase collision system
	"""
	
	handlers = None
	"""A sequence of collision handler functions invoke after collision
	detection
	"""

	broad_phase = None
	"""Broad phase collision system used as a source for collision pairs"""

	def __init__(self, handlers=(), position_component='position', 
		collision_component='collision', update_aabbs=True, broad_phase=None):
		self.handlers = tuple(handlers)
		if broad_phase is None:
			broad_phase = BroadSweepAndPrune(collision_component)
		self.collision_component = collision_component
		self.position_component = position_component
		self.update_aabbs = bool(update_aabbs)
		self.broad_phase = broad_phase
		self._collision_pairs = None
	
	def set_world(self, world):
		"""Bind the system to a world"""
		self.world = world
		self.broad_phase.set_world(world)
		for handler in self.handlers:
			if hasattr(handler, 'set_world'):
				handler.set_world(world)
	
	def step(self, dt):
		"""Update the collision system for this time step and invoke
		the handlers
		"""
		if self.update_aabbs:
			for position, collision in self.world.components.join(
				self.position_component, self.collision_component):
				aabb = collision.aabb
				x, y = position.position
				radius = collision.radius
				aabb.left = x - radius
				aabb.right = x + radius
				aabb.bottom = y - radius
				aabb.top = y + radius
		self.broad_phase.step(dt)
		self._collision_pairs = None
		for handler in self.handlers:
			handler(self)
	
	@property
	def collision_pairs(self):
		"""The set of entity pairs in collision in this timestep"""
		if self._collision_pairs is None:
			position = getattr(self.world.components, self.position_component)
			collision = getattr(self.world.components, self.collision_component)
			pairs = self._collision_pairs = set()
			for pair in self.broad_phase.collision_pairs:
				entity1, entity2 = pair
				position1 = position[entity1].position
				position2 = position[entity2].position
				radius1 = collision[entity1].radius
				radius2 = collision[entity2].radius
				separation = position2 - position1
				if separation.get_length_sqrd() <= (radius1 + radius2)**2:
					normal = separation.normalized()
					pair.set_point_normal(
						normal * radius1 + position1, normal,
						normal * -radius2 + position2, -normal)
					pairs.add(pair)
		return self._collision_pairs
	
	def query_point(self, x_or_point, y=None, from_mask=0xffffffff):
		"""Hit test at the point specified. 

		:param x_or_point: x coordinate (float) or sequence of (x, y) floats.

		:param y: y coordinate (float) if x is not a sequence

		:param from_mask: Bit mask used to filter query results. This value
			is bit ANDed with candidate entities' ``collision.into_mask``.
			If the result is non-zero, then it is considered a hit. By
			default all entities colliding with the input point are
			returned.

		:return: A set of entities where the point is inside their collision
			radii as of the last time step.

		"""
		if y is None:
			point = Vec2d(x_or_point)
		else:
			point = Vec2d(x_or_point, y)
		hits = set()
		position = getattr(self.world.components, self.position_component)
		collision = getattr(self.world.components, self.collision_component)
		for entity in self.broad_phase.query_point(x_or_point, y, from_mask):
			separation = point - position[entity].position
			if separation.get_length_sqrd() <= collision[entity].radius**2:
				hits.add(entity)
		return hits


def dispatch_events(collision_system):
	"""Collision handler that dispatches `on_collide()` events to entities
	marked for collision by the specified collision system. The `on_collide()`
	event handler methods are defined by the application on the desired entity
	classes. These methods should have the following signature::

		def on_collide(self, other_entity, collision_point, collision_normal):
			'''Handle A collision between this entity and `other_entity`

			- other_entity (Entity): The other entity in collision with 
			  `self`

			- collision_point (Vec2d): The point on this entity (`self`)
			  where the collision occurred. Note this may be `None` for 
			  some collision systems that do not report it.

			- collision_normal (Vec2d): The normal vector at the point of
			  collision. As with `collision_point`, this may be None for
			  some collision systems.
			'''

	Note the arguments to `on_collide()` are always passed positionally, so you
	can use different argument names than above if desired.

	If a pair of entities are in collision, then the event will be dispatched
	to both objects in arbitrary order if all of their collision masks align.
	"""
	collision = getattr(collision_system.world.components, 
		collision_system.collision_component)
	for pair in collision_system.collision_pairs:
		entity1, entity2 = pair
		if pair.info is not None:
			args1, args2 = pair.info
		else:
			args1 = entity1, None, None
			args2 = entity2, None, None
		try:
			on_collide = entity1.on_collide
			masks_align = collision[entity2].from_mask & collision[entity1].into_mask
		except (AttributeError, KeyError):
			pass
		else:
			if masks_align:
				on_collide(*args2)
		try:
			on_collide = entity2.on_collide
			masks_align = collision[entity1].from_mask & collision[entity2].into_mask
		except (AttributeError, KeyError):
			pass
		else:
			if masks_align:
				on_collide(*args1)