wingfoil.wingfoil

Wingfoil is a blazingly fast, highly scalable stream processing framework designed for latency-critical use cases such as electronic trading and real-time AI systems

class Graph(nodes)

Bases: object

run(realtime, start=None, duration=None, cycles=None)
class Latency(stages)

Bases: object

static from_bytes(data, stages)
stages
stamps
to_bytes()
class Node

Bases: object

count()

Counts how many times upstream node has ticked.

run(realtime, start=None, duration=None, cycles=None)
class PrometheusExporter(addr)

Bases: object

Serves a Prometheus-compatible GET /metrics endpoint.

Usage:

exporter = PrometheusExporter(“0.0.0.0:9091”) port = exporter.serve() node = exporter.register(“my_metric”, stream)

register(name, stream)

Register a stream as a named gauge metric. Returns a Node.

serve()

Start the HTTP server. Returns the port that was bound.

class Stream(inner)

Bases: object

average()
buffer(capacity)
collect()
count()
csv_write(path)

Write this stream of dicts to a CSV file.

Each dict becomes one CSV row. Headers are inferred from the first dict’s keys, and a time column is prepended with the graph time in nanoseconds.

Parameters:

path – Output file path

Returns:

A Node that drives the write operation.

dataframe()
delay(delay_secs)

Propagates its source delayed by specified duration (milliseconds)

difference()

difference in its source from one cycle to the next (pass-through of PyElement)

distinct()

only propagates its source if it changed (uses PartialEq on PyElement)

etcd_pub(endpoint, lease_ttl=None, force=True)

Publish this stream of dicts to etcd via PUT.

Stream values must be dicts with “key” (str) and “value” (bytes), or lists of such dicts for multiple writes per tick.

Parameters:
  • endpoint – etcd endpoint, e.g. “http://localhost:2379”

  • lease_ttl – optional lease TTL in seconds; keys expire after this duration and vanish immediately on clean shutdown. Pass None for persistent keys (default).

  • force – if True (default), silently overwrite existing keys. If False, fail if any key already exists.

Returns:

A Node that drives the write operation.

filter(keep_func)

drops source contingent on supplied predicate (Python callable)

finally(func)
for_each(func)
inspect(func)
kdb_write(host, port, table, columns)

Write this stream to a KDB+ table.

Parameters:
  • host – KDB+ server hostname

  • port – KDB+ server port

  • table – Name of the target KDB+ table

  • columns – List of (name, type) tuples for non-time columns. Supported types: “symbol”, “float”, “long”, “int”, “bool”

Returns:

A Node that drives the write operation.

latency_report(stages, print_on_teardown=True)

Install a latency report sink. The stream must carry TracedBytes values. Per-stage delta statistics (count/min/mean/p50/p99/max) are printed on graph shutdown.

Parameters:
  • stages – Stage names in order (same list used for Latency).

  • print_on_teardown – Whether to print the report on shutdown (default True).

Returns:

A Node that drives the report sink.

latency_report_if(stages, enabled, print_on_teardown=True)

Like latency_report, but only installs the sink when enabled is True. When False, returns the upstream as a Node (no report sink).

limit(limit)

propagates source up to limit times

logged(label)

logs source and propagates it. Default level INFO.

map(func)

Map’s its source into a new Stream using the supplied Python callable.

not()
otlp_push(metric_name, endpoint, service_name)

Push this stream as an OTLP gauge metric.

Parameters:
  • metric_name – Name of the metric to report

  • endpoint – OTLP HTTP endpoint, e.g. “http://localhost:4318”

  • service_name – Service name reported in OTLP resource attributes

Returns:

A Node that drives the push operation.

peek_value()
run(realtime, start=None, duration=None, cycles=None)
sample(trigger)
stamp(stage)

Stamp a named latency stage on each tick using the cycle-start wall-clock time. The stream must carry TracedBytes values.

Parameters:

stage – Stage name (must match one of the names in the Latency).

Returns:

A new Stream with the stage stamped.

stamp_if(stage, enabled)

Like stamp, but only inserts the stamp node when enabled is True. When False, returns the stream unchanged — zero runtime cost.

stamp_precise(stage)

Stamp a named latency stage with a precise wall-clock read (~5-10ns TSC read per tick). Gives intra-cycle resolution.

stamp_precise_if(stage, enabled)

Like stamp_precise, but only inserts the stamp node when enabled is True. When False, returns the stream unchanged.

sum()

sum the stream (extracts f64 values before summing)

web_pub(server, topic)

Publish this stream to a [WebServer] topic over WebSocket.

Values must be JSON-compatible Python objects (dict / list / str / int / float / bool / bytes / None). Connected browser clients that subscribed to topic receive the frames.

Parameters:
  • server – A WebServer instance.

  • topic – The topic name to publish on.

Returns:

A Node that drives the publish operation.

with_time()

Pairs each value with the graph time as a (float, value) tuple, where the float is seconds since Unix epoch.

zmq_pub(port)

Publish this stream of bytes to a ZMQ PUB socket bound on the given port.

The stream values must be bytes objects. Only supported in real-time mode.

Parameters:

port – TCP port to bind the PUB socket on

Returns:

A Node that drives the publish operation.

zmq_pub_etcd(name, port, endpoint)

Publish this stream of bytes and register as name in etcd.

Binds on 127.0.0.1; use zmq_pub_etcd_on for multi-host deployments where 127.0.0.1 is not routable by subscribers on other hosts.

Parameters:
  • name – Name / etcd key to register under (e.g. “quotes”)

  • port – TCP port to bind the PUB socket on

  • endpoint – etcd endpoint (e.g. “http://localhost:2379”)

Returns:

A Node that drives the publish operation.

zmq_pub_etcd_on(name, address, port, endpoint)

Like zmq_pub_etcd but binds on address instead of 127.0.0.1.

Parameters:
  • name – Name / etcd key to register under

  • address – Routable bind address (e.g. “192.168.1.10”)

  • port – TCP port to bind the PUB socket on

  • endpoint – etcd endpoint

Returns:

A Node that drives the publish operation.

class TracedBytes(payload, latency)

Bases: object

latency
payload
class WebServer(addr, codec=None, static_dir=None, historical=False)

Bases: object

Python-facing handle to a running [WebServer].

Mirrors the builder on the Rust side: construct with a bind address, optionally set the codec and static-file directory, then register publish / subscribe streams.

codec_name()

Return the codec currently used on the wire (“bincode” or “json”).

port()

The bound port (0 when historical=True).

sub(topic)

Subscribe to topic. Returns a source Stream of dicts / lists / primitives decoded from the clients’ frames.

To publish a stream, use the fluent stream.web_pub(server, topic) method on the stream itself.

bimap(a, b, func)

maps steams a amd b into a new stream using func (e.g lambda a, b: a + b)

build_dataframe(stream_dict: Dict[str, Any]) DataFrame

Builds a combined DataFrame from a dict of streams.

constant(val)

A stream that ticks once, on first engine cycle

py_csv_read(path, time_column)

Read a CSV file into a stream of dicts.

Each tick yields a dict where keys are column headers and values are strings. Rows sharing the same timestamp are collapsed to one-per-tick.

Parameters:
  • path – Path to the CSV file (headers required)

  • time_column – Name of the column containing the timestamp (parsed as integer nanoseconds since epoch)

py_etcd_sub(endpoint, prefix)

Subscribe to etcd keys matching a prefix.

Emits a snapshot of all existing keys under the prefix, then delivers live watch events as they arrive. Each tick yields a list of event dicts:

`python {"kind": "put" | "delete", "key": str, "value": bytes, "revision": int} `

Parameters:
  • endpoint – etcd endpoint, e.g. “http://localhost:2379”

  • prefix – key prefix to watch, e.g. “/myapp/”

py_fix_accept(port, sender_comp_id, target_comp_id)

Bind a FIX acceptor and return (data_stream, status_stream).

Parameters:
  • port – Port to listen on

  • sender_comp_id – SenderCompID for this session

  • target_comp_id – Expected TargetCompID of the connecting initiator

py_fix_connect(host, port, sender_comp_id, target_comp_id)

Connect to a FIX acceptor and return (data_stream, status_stream).

Parameters:
  • host – FIX acceptor hostname

  • port – FIX acceptor port

  • sender_comp_id – SenderCompID for this session

  • target_comp_id – TargetCompID of the counterparty

Returns:

A tuple of (data_stream, status_stream) where data_stream yields lists of message dicts and status_stream yields session status strings.

py_fix_connect_tls(host, port, sender_comp_id, target_comp_id, password=None)

Connect to a TLS-secured FIX acceptor (e.g. LMAX) and return (data_stream, status_stream, injector_func).

The injector is a callable that sends a FIX message dict on the session: `python injector({"msg_type": "V", "fields": [(262, "req1"), (263, "1")]}) `

Parameters:
  • host – FIX acceptor hostname

  • port – FIX acceptor port (typically 443 for TLS)

  • sender_comp_id – SenderCompID (usually your username)

  • target_comp_id – TargetCompID of the counterparty

  • password – Optional password (tag 554 in Logon)

py_kdb_read(host, port, query, time_col, chunk_size=3600)

Read data from a KDB+ database.

Returns a PyStream where each tick is a dict with column names as keys.

Parameters:
  • host – KDB+ server hostname

  • port – KDB+ server port

  • query – q query to execute (e.g. “select from trade”)

  • time_col – Name of the time column for time-slice filtering

  • chunk_size – Duration of each time slice in seconds (default: 3600)

Returns:

value}

Return type:

Stream where each tick yields a dict of {column_name

Requires RunMode::HistoricalFrom with a non-zero start time and RunFor::Duration.

py_kdb_write(host, port, table, columns, upstream)

Write stream data to a KDB+ table.

Parameters:
  • host – KDB+ server hostname

  • port – KDB+ server port

  • table – Name of the target KDB+ table

  • columns – List of (name, type) tuples for non-time columns. Supported types: “symbol”, “float”, “long”, “int”, “bool”

  • upstream – The PyStream to consume

Returns:

A Node that drives the write operation.

py_zmq_sub(address)

Subscribe to a ZMQ PUB socket.

Returns a (data_stream, status_stream) tuple.

Each tick of data_stream yields a list[bytes] containing all messages received in that cycle. Each tick of status_stream yields the string “connected” or “disconnected”.

Parameters:

address – ZMQ endpoint to connect to (e.g. “tcp://localhost:5556”)

Returns:

Tuple of (data_stream, status_stream)

py_zmq_sub_etcd(name, endpoint)

Subscribe to a named ZMQ publisher via etcd service discovery.

Looks up name in etcd and connects to the publisher at the stored address. The lookup happens at call time — ensure the publisher is registered before calling this function.

Parameters:
  • name – Publisher name / etcd key (e.g. “quotes”)

  • endpoint – etcd endpoint (e.g. “http://localhost:2379”)

Returns:

Tuple of (data_stream, status_stream)

Raises:

RuntimeError – if the key is absent or etcd is unreachable

ticker(seconds)

A node that ticks at the specified period

to_dataframe(data: List[Any] | Dict[str, List[Any]]) DataFrame

Converts a list of data into a DataFrame.