pyspark.pandas.merge_asof

pyspark.pandas.merge_asof(left: Union[pyspark.pandas.frame.DataFrame, pyspark.pandas.series.Series], right: Union[pyspark.pandas.frame.DataFrame, pyspark.pandas.series.Series], on: Union[Any, Tuple[Any, …], None] = None, left_on: Union[Any, Tuple[Any, …], None] = None, right_on: Union[Any, Tuple[Any, …], None] = None, left_index: bool = False, right_index: bool = False, by: Union[Any, Tuple[Any, …], List[Union[Any, Tuple[Any, …]]], None] = None, left_by: Union[Any, Tuple[Any, …], List[Union[Any, Tuple[Any, …]]], None] = None, right_by: Union[Any, Tuple[Any, …], List[Union[Any, Tuple[Any, …]]], None] = None, suffixes: Tuple[str, str] = '_x', '_y', tolerance: Optional[Any] = None, allow_exact_matches: bool = True, direction: str = 'backward') → pyspark.pandas.frame.DataFrame[source]

Perform an asof merge.

This is like a left-join except that we match on nearest key rather than equal keys.

For each row in the left DataFrame:

  • A “backward” search selects the last row in the right DataFrame whose ‘on’ key is less than or equal to the left’s key.

  • A “forward” search selects the first row in the right DataFrame whose ‘on’ key is greater than or equal to the left’s key.

  • A “nearest” search selects the row in the right DataFrame who’s ‘on’ key is closest in absolute distance to the left’s key.

Optionally match on equivalent keys with ‘by’ before searching with ‘on’.

New in version 3.3.0.

Parameters
leftDataFrame or named Series
rightDataFrame or named Series
onlabel

Field name to join on. Must be found in both DataFrames. The data MUST be ordered. This must be a numeric column, such as datetimelike, integer, or float. On or left_on/right_on must be given.

left_onlabel

Field name to join on in left DataFrame.

right_onlabel

Field name to join on in right DataFrame.

left_indexbool

Use the index of the left DataFrame as the join key.

right_indexbool

Use the index of the right DataFrame as the join key.

bycolumn name or list of column names

Match on these columns before performing merge operation.

left_bycolumn name

Field names to match on in the left DataFrame.

right_bycolumn name

Field names to match on in the right DataFrame.

suffixes2-length sequence (tuple, list, …)

Suffix to apply to overlapping column names in the left and right side, respectively.

toleranceint or Timedelta, optional, default None

Select asof tolerance within this range; must be compatible with the merge index.

allow_exact_matchesbool, default True
  • If True, allow matching with the same ‘on’ value (i.e. less-than-or-equal-to / greater-than-or-equal-to)

  • If False, don’t match the same ‘on’ value (i.e., strictly less-than / strictly greater-than).

direction‘backward’ (default), ‘forward’, or ‘nearest’

Whether to search for prior, subsequent, or closest matches.

Returns
mergedDataFrame

See also

merge

Merge with a database-style join.

merge_ordered

Merge with optional filling/interpolation.

Examples

>>> left = ps.DataFrame({"a": [1, 5, 10], "left_val": ["a", "b", "c"]})
>>> left
    a left_val
0   1        a
1   5        b
2  10        c
>>> right = ps.DataFrame({"a": [1, 2, 3, 6, 7], "right_val": [1, 2, 3, 6, 7]})
>>> right
   a  right_val
0  1          1
1  2          2
2  3          3
3  6          6
4  7          7
>>> ps.merge_asof(left, right, on="a").sort_values("a").reset_index(drop=True)
    a left_val  right_val
0   1        a          1
1   5        b          3
2  10        c          7
>>> ps.merge_asof(
...     left,
...     right,
...     on="a",
...     allow_exact_matches=False
... ).sort_values("a").reset_index(drop=True)
    a left_val  right_val
0   1        a        NaN
1   5        b        3.0
2  10        c        7.0
>>> ps.merge_asof(
...     left,
...     right,
...     on="a",
...     direction="forward"
... ).sort_values("a").reset_index(drop=True)
    a left_val  right_val
0   1        a        1.0
1   5        b        6.0
2  10        c        NaN
>>> ps.merge_asof(
...     left,
...     right,
...     on="a",
...     direction="nearest"
... ).sort_values("a").reset_index(drop=True)
    a left_val  right_val
0   1        a          1
1   5        b          6
2  10        c          7

We can use indexed DataFrames as well.

>>> left = ps.DataFrame({"left_val": ["a", "b", "c"]}, index=[1, 5, 10])
>>> left
   left_val
1         a
5         b
10        c
>>> right = ps.DataFrame({"right_val": [1, 2, 3, 6, 7]}, index=[1, 2, 3, 6, 7])
>>> right
   right_val
1          1
2          2
3          3
6          6
7          7
>>> ps.merge_asof(left, right, left_index=True, right_index=True).sort_index()
   left_val  right_val
1         a          1
5         b          3
10        c          7

Here is a real-world times-series example

>>> quotes = ps.DataFrame(
...     {
...         "time": [
...             pd.Timestamp("2016-05-25 13:30:00.023"),
...             pd.Timestamp("2016-05-25 13:30:00.023"),
...             pd.Timestamp("2016-05-25 13:30:00.030"),
...             pd.Timestamp("2016-05-25 13:30:00.041"),
...             pd.Timestamp("2016-05-25 13:30:00.048"),
...             pd.Timestamp("2016-05-25 13:30:00.049"),
...             pd.Timestamp("2016-05-25 13:30:00.072"),
...             pd.Timestamp("2016-05-25 13:30:00.075")
...         ],
...         "ticker": [
...                "GOOG",
...                "MSFT",
...                "MSFT",
...                "MSFT",
...                "GOOG",
...                "AAPL",
...                "GOOG",
...                "MSFT"
...            ],
...            "bid": [720.50, 51.95, 51.97, 51.99, 720.50, 97.99, 720.50, 52.01],
...            "ask": [720.93, 51.96, 51.98, 52.00, 720.93, 98.01, 720.88, 52.03]
...     }
... )
>>> quotes
                     time ticker     bid     ask
0 2016-05-25 13:30:00.023   GOOG  720.50  720.93
1 2016-05-25 13:30:00.023   MSFT   51.95   51.96
2 2016-05-25 13:30:00.030   MSFT   51.97   51.98
3 2016-05-25 13:30:00.041   MSFT   51.99   52.00
4 2016-05-25 13:30:00.048   GOOG  720.50  720.93
5 2016-05-25 13:30:00.049   AAPL   97.99   98.01
6 2016-05-25 13:30:00.072   GOOG  720.50  720.88
7 2016-05-25 13:30:00.075   MSFT   52.01   52.03
>>> trades = ps.DataFrame(
...        {
...            "time": [
...                pd.Timestamp("2016-05-25 13:30:00.023"),
...                pd.Timestamp("2016-05-25 13:30:00.038"),
...                pd.Timestamp("2016-05-25 13:30:00.048"),
...                pd.Timestamp("2016-05-25 13:30:00.048"),
...                pd.Timestamp("2016-05-25 13:30:00.048")
...            ],
...            "ticker": ["MSFT", "MSFT", "GOOG", "GOOG", "AAPL"],
...            "price": [51.95, 51.95, 720.77, 720.92, 98.0],
...            "quantity": [75, 155, 100, 100, 100]
...        }
...    )
>>> trades
                     time ticker   price  quantity
0 2016-05-25 13:30:00.023   MSFT   51.95        75
1 2016-05-25 13:30:00.038   MSFT   51.95       155
2 2016-05-25 13:30:00.048   GOOG  720.77       100
3 2016-05-25 13:30:00.048   GOOG  720.92       100
4 2016-05-25 13:30:00.048   AAPL   98.00       100

By default we are taking the asof of the quotes

>>> ps.merge_asof(
...    trades, quotes, on="time", by="ticker"
... ).sort_values(["time", "ticker", "price"]).reset_index(drop=True)
                     time ticker   price  quantity     bid     ask
0 2016-05-25 13:30:00.023   MSFT   51.95        75   51.95   51.96
1 2016-05-25 13:30:00.038   MSFT   51.95       155   51.97   51.98
2 2016-05-25 13:30:00.048   AAPL   98.00       100     NaN     NaN
3 2016-05-25 13:30:00.048   GOOG  720.77       100  720.50  720.93
4 2016-05-25 13:30:00.048   GOOG  720.92       100  720.50  720.93

We only asof within 2ms between the quote time and the trade time

>>> ps.merge_asof(
...     trades,
...     quotes,
...     on="time",
...     by="ticker",
...     tolerance=sf.expr("INTERVAL 2 MILLISECONDS")  # pd.Timedelta("2ms")
... ).sort_values(["time", "ticker", "price"]).reset_index(drop=True)
                     time ticker   price  quantity     bid     ask
0 2016-05-25 13:30:00.023   MSFT   51.95        75   51.95   51.96
1 2016-05-25 13:30:00.038   MSFT   51.95       155     NaN     NaN
2 2016-05-25 13:30:00.048   AAPL   98.00       100     NaN     NaN
3 2016-05-25 13:30:00.048   GOOG  720.77       100  720.50  720.93
4 2016-05-25 13:30:00.048   GOOG  720.92       100  720.50  720.93

We only asof within 10ms between the quote time and the trade time and we exclude exact matches on time. However prior data will propagate forward

>>> ps.merge_asof(
...     trades,
...     quotes,
...     on="time",
...     by="ticker",
...     tolerance=sf.expr("INTERVAL 10 MILLISECONDS"),  # pd.Timedelta("10ms")
...     allow_exact_matches=False
... ).sort_values(["time", "ticker", "price"]).reset_index(drop=True)
                     time ticker   price  quantity     bid     ask
0 2016-05-25 13:30:00.023   MSFT   51.95        75     NaN     NaN
1 2016-05-25 13:30:00.038   MSFT   51.95       155   51.97   51.98
2 2016-05-25 13:30:00.048   AAPL   98.00       100     NaN     NaN
3 2016-05-25 13:30:00.048   GOOG  720.77       100     NaN     NaN
4 2016-05-25 13:30:00.048   GOOG  720.92       100     NaN     NaN