Library

Powerhose is organized into a simple hierarchy of classes and a few functions:

  • get_cluster() – a function that create a whole cluster.
  • Job – A class that holds a job to be performed.
  • Broker – A class that pass the jobs it receives to workers.
  • Worker – A class that connects to a broker and pass jobs it receives to a Python callable.
  • Heartbeat and Stethoscope – implements a heartbeat.
  • Client – A class that connects to a broker and let you run jobs against it.

get_cluster

The get_cluster() function creates a Broker and several Worker instance. It can be run in the background for conveniency.

powerhose.get_cluster(target, numprocesses=5, frontend='ipc:///tmp/powerhose-front.ipc', backend='ipc:///tmp/powerhose-back.ipc', heartbeat='ipc:///tmp/powerhose-beat.ipc', register='ipc:///tmp/powerhose-reg.ipc', working_dir='.', logfile='stdout', debug=False, background=False, worker_params=None, timeout=7.5, max_age=-1, max_age_delta=0)

Runs a Powerhose cluster.

Options:

  • callable: The Python callable that will be called when the broker receive a job.
  • numprocesses: The number of workers. Defaults to 5.
  • frontend: the ZMQ socket to receive jobs.
  • backend: the ZMQ socket to communicate with workers.
  • register : the ZMQ socket to register workers
  • heartbeat: the ZMQ socket to receive heartbeat requests
  • working_dir: The working directory. Defaults to ”.”
  • logfile: The file to log into. Defaults to stdout.
  • debug: If True, the logs are at the DEBUG level. Defaults to False
  • background: If True, the cluster is run in the background. Defaults to False.
  • worker_params: a dict of params to pass to the worker. Default is None
  • timeout the maximum time allowed before the thread stacks is dumped and the job result not sent back.
  • max_age: maximum age for a worker in seconds. After that delay, the worker will simply quit. When set to -1, never quits. Defaults to -1.
  • max_age_delta: maximum value in seconds added to max age. The Worker will quit after max_age + random(0, max_age_delta) This is done to avoid having all workers quit at the same instant.

Example:

from powerhose import get_cluster
from powerhose.client import Client


cluster = get_cluster('example.echo', background=True)
cluster.start()

client = Client()

for i in range(10):
    print client.execute(str(i))

cluster.stop()

Job

class powerhose.job.Job(data='', headers=None)

A Job is just a container that’s passed into the wire.

A job is composed of headers and raw data, and offers serialization.

Options:

  • data: the raw string data (default: ‘’)
  • headers: a mapping of headers (default: None)
add_header(name, value)

Adds a header.

Options:

  • name: header name
  • value: value

Both values should be strings. If the header already exists it’s overwritten.

serialize()

Serializes the job.

The output can be sent over a wire. A serialized job can be read with a cursor with no specific preprocessing.

classmethod load_from_string(data)

Loads a job from a serialized string and return a Job instance.

Options:

  • data : serialized string.

Example:

>>> from powerhose.job import Job
>>> job = Job('4*2')
>>> job.serialize()
'NONE:::4*2'
>>> Job.load_from_string('NONE:::4*2')
<powerhose.job.Job object at 0x107b78c50>
>>> Job.load_from_string('NONE:::4*2').data
'4*2'

Broker

class powerhose.broker.Broker(frontend='ipc:///tmp/powerhose-front.ipc', backend='ipc:///tmp/powerhose-back.ipc', heartbeat='ipc:///tmp/powerhose-beat.ipc', register='ipc:///tmp/powerhose-reg.ipc', io_threads=1)

Class that route jobs to workers.

Options:

  • frontend: the ZMQ socket to receive jobs.
  • backend: the ZMQ socket to communicate with workers.
  • heartbeat: the ZMQ socket to receive heartbeat requests.
  • register : the ZMQ socket to register workers
start()

Starts the broker.

stop()

Stops the broker.

Worker

class powerhose.worker.Worker(target, backend='ipc:///tmp/powerhose-back.ipc', heartbeat='ipc:///tmp/powerhose-beat.ipc', register='ipc:///tmp/powerhose-reg.ipc', ping_delay=1.0, ping_retries=3, params=None, timeout=7.5, max_age=-1, max_age_delta=0)

Class that links a callable to a broker.

Options:

  • target: The Python callable that will be called when the broker send a job.
  • backend: The ZMQ socket to connect to the broker.
  • heartbeat: The ZMQ socket to perform PINGs on the broker to make sure it’s still alive.
  • register : the ZMQ socket to register workers
  • ping_delay: the delay in seconds betweem two pings.
  • ping_retries: the number of attempts to ping the broker before quitting.
  • params a dict containing the params to set for this worker.
  • timeout the maximum time allowed before the thread stacks is dump and the job result not sent back.
  • max_age: maximum age for a worker in seconds. After that delay, the worker will simply quit. When set to -1, never quits. Defaults to -1.
  • max_age_delta: maximum value in seconds added to max age. The Worker will quit after max_age + random(0, max_age_delta) This is done to avoid having all workers quit at the same instant. Defaults to 0. The value must be an integer.
start()

Starts the worker

stop()

Stops the worker.

Heartbeat

The Broker class runs a Heartbeat instance that regularly sends a BEAT message on a PUB channel. Each worker has a Stethoscope instance that subscribes to the channel, to check if the Broker is still around.

class powerhose.heartbeat.Heartbeat(endpoint='ipc:///tmp/powerhose-beat.ipc', interval=2.0, io_loop=None, ctx=None)

Class that implements a ZMQ heartbeat server.

This class sends in a ZMQ socket regular beats.

Options:

  • endpoint : The ZMQ socket to call.
  • interval : Interval between two beat.
stop()

Stops the Pong service

start()

Starts the Pong service

class powerhose.heartbeat.Stethoscope(endpoint='ipc:///tmp/powerhose-beat.ipc', warmup_delay=0.5, delay=3.0, retries=3, onbeatlost=None, onbeat=None, io_loop=None, ctx=None)

Class that implements a ZMQ heartbeat client.

Options:

  • endpoint : The ZMQ socket to call.
  • warmup_delay : The delay before starting to Ping. Defaults to 5s.
  • delay: The delay between two pings. Defaults to 3s.
  • retries: The number of attempts to ping. Defaults to 3.
  • onbeatlost: a callable that will be called when a ping failed. If the callable returns True, the ping quits. Defaults to None.
  • onbeat: a callable that will be called when a ping succeeds. Defaults to None.
stop()

Stops the Pinger

Client

class powerhose.client.Client(frontend='ipc:///tmp/powerhose-front.ipc', timeout=5.0, timeout_max_overflow=7.5, timeout_overflows=1, debug=False, ctx=None)

Class to call a Powerhose cluster.

Options:

  • frontend: ZMQ socket to call.
  • timeout: maximum allowed time for a job to run. Defaults to 1s.
  • timeout_max_overflow: maximum timeout overflow allowed. Defaults to 1.5s
  • timeout_overflows: number of times in a row the timeout value can be overflowed per worker. The client keeps a counter of executions that were longer than the regular timeout but shorter than timeout_max_overflow. When the number goes over timeout_overflows, the usual TimeoutError is raised. When a worker returns on time, the counter is reset.
execute(job, timeout=None)

Runs the job

Options:

  • job: Job to be performed. Can be a Job instance or a string. If it’s a string a Job instance will be automatically created out of it.
  • timeout: maximum allowed time for a job to run. If not provided, uses the one defined in the constructor.

If the job fails after the timeout, raises a TimeoutError.

This method is thread-safe and uses a lock. If you need to execute a lot of jobs simultaneously on a broker, use the Pool class.

ping(timeout=1.0)

Can be used to simply ping the broker to make sure it’s responsive.

Returns the broker PID

Client Pool

class powerhose.client.Pool(size=10, frontend='ipc:///tmp/powerhose-front.ipc', timeout=5.0, timeout_max_overflow=7.5, timeout_overflows=1, debug=False, ctx=None)

The pool class manage several CLient instances and publish the same interface,

Options:

  • size: size of the pool. Defaults to 10.
  • frontend: ZMQ socket to call.
  • timeout: maximum allowed time for a job to run. Defaults to 5s.
  • timeout_max_overflow: maximum timeout overflow allowed
  • timeout_overflows: number of times in a row the timeout value can be overflowed per worker. The client keeps a counter of executions that were longer than the regular timeout but shorter than timeout_max_overflow. When the number goes over timeout_overflows, the usual TimeoutError is raised. When a worker returns on time, the counter is reset.

Project Versions

Table Of Contents

Previous topic

Command-line tools

Next topic

Examples

This Page

Feedback