github爆火的1brc:气象站点数据计算挑战

前言

在这个数据爆炸的时代,气象科学作为一门与人类生活息息相关的学科,正以前所未有的速度积累着庞大的数据量。想象一下,全球数以万计的气象站点,每分钟、每小时不间断地记录着气温、湿度、风速、降水量等关键气象参数,这些数据汇聚起来,形成了一个浩瀚的数据海洋

1brc是什么

1 Billion Row Challenge Calculate the min, max, and average of 1 billion measurements

1BRC挑战的目标是编写一个Java程序,该程序从一个包含十亿行数据的文本文件中读取温度测量值,并计算每个气象站的最低温度、平均温度和最高温度。文本文件结构简单,每行记录一个温度值,格式为“城市名;温度值”。

你的任务,如果你选择接受的话,就是创造出执行这个任务速度最快的程序。过程中,你需要充分利用现代Java的各种优势,探索诸如虚拟线程、Vector API及SIMD指令集、垃圾回收优化、AOT编译等技术,以及任何你能想到的性能提升技巧。

程序最终输出应按城市名字母顺序排列,展示每个站点的最低、平均、最高温度值,形如:

代码语言:javascript
复制
{Abha=5.0/18.0/27.4, Abidjan=15.7/26.0/34.1, Abéché=12.1/29.4/35.6, ...}

当然活动的火爆以至于其他编程语言也不甘寂寞,纷纷进行挑战

项目内容

当然,现在也有人使用Python进行相关活动

小编去github上找了下原数据,但是只找到一个4万行的版本,如果有朋友有原数据欢迎分享到和鲸

下面展示基于dask和polars进行数据处理的代码

1brc网站:https://1brc.dev/

In [1]:

代码语言:javascript
复制
代码语言:javascript
复制
!pip install polars -i https://pypi.mirrors.ustc.edu.cn/simple/
代码语言:javascript
复制
Looking in indexes: https://pypi.mirrors.ustc.edu.cn/simple/
Collecting polars
  Downloading https://mirrors.bfsu.edu.cn/pypi/web/packages/b8/ca/5280387f8aa6e62618e47882d3836324e2584611e1cea86ddf3ab4aa7f5c/polars-0.20.26-cp38-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (28.0 MB)     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 28.0/28.0 MB 52.5 MB/s eta 0:00:0000:0100:01
Installing collected packages: polars
Successfully installed polars-0.20.26

数据展示

In [4]:

代码语言:javascript
复制
import pandas as pd
代码语言:javascript
复制
file_path='/home/mw/input/1brc8235/weather_stations (1).csv'
df = pd.read_csv(file_path, sep=';', names=['station', 'measurement'])
df.describe()
代码语言:javascript
复制
代码语言:javascript
复制

Out[4]:

找到的数据是仅有44691行的缩水版本,当然十亿行的版本有13GB大小

dask

dask是大家并行计算的老朋友,博主经常用来并行插值,并行处理数据等等

例如

进阶!dask解决超高精度tif读取与绘图难问题 又见dask! 如何使用dask-geopandas处理大型地理数据

In [2]:

代码语言:javascript
复制
代码语言:javascript
复制
%%timeit
import dask.dataframe as dd

file_path = "/home/mw/input/1brc8235/weather_stations (1).csv"

def process_data_with_dask(file_path):
# 读取CSV文件到Dask DataFrame
df = dd.read_csv(file_path, sep=';', names=['station', 'measurement'])

# 转换measurement列为浮点数
df['measurement'] = df['measurement'].astype(float)

# 聚合操作:按station分组并计算min、mean、max
agg_result = df.groupby('station')['measurement'].agg(['min', 'mean', 'max']).compute()

# 排序结果并格式化输出
formatted_result = agg_result.sort_index().apply(lambda x: f"{x['min']}/{x['mean']}/{x['max']}", axis=1)

return formatted_result

if name == "main":
result = process_data_with_dask(file_path)
print(result)

代码语言:javascript
复制
/opt/conda/lib/python3.9/site-packages/dask/dataframe/_pyarrow_compat.py:17: FutureWarning: Minimal version of pyarrow will soon be increased to 14.0.1. You are using 11.0.0. Please consider upgrading.
warnings.warn(
<magic-timeit>:1: DeprecationWarning: The current Dask DataFrame implementation is deprecated.
In a future release, Dask DataFrame will use new implementation that
contains several improvements including a logical query planning.
The user-facing DataFrame API will remain unchanged.

The new implementation is already available and can be enabled by
installing the dask-expr library:

$ pip install dask-expr

and turning the query planning option on:

&gt;&gt;&gt; import dask
&gt;&gt;&gt; dask.config.set({&#39;dataframe.query-planning&#39;: True})
&gt;&gt;&gt; import dask.dataframe as dd

API documentation for the new implementation is available at
https://docs.dask.org/en/stable/dask-expr-api.html

Any feedback can be reported on the Dask issue tracker
https://github.com/dask/dask/issues

代码语言:javascript
复制
station

Adapted from https://simplemaps.com/data/world-cities nan/nan/nan

Licensed under Creative Commons Attribution 4.0 (https://creativecommons.org/licenses/by/4.0/) nan/nan/nan

A Coruña 43.3667/43.3667/43.3667
A Yun Pa 13.3939/13.3939/13.3939
Aabenraa 55.0444/55.0444/55.0444
...
’Tlat Bni Oukil 32.577/32.577/32.577
’s-Gravendeel 51.7833/51.7833/51.7833
’s-Gravenzande 52.0/52.0/52.0
’s-Heerenberg 51.8764/51.8764/51.8764
’s-Hertogenbosch 51.6833/51.6833/51.6833
Length: 41345, dtype: object
station

Adapted from https://simplemaps.com/data/world-cities nan/nan/nan

Licensed under Creative Commons Attribution 4.0 (https://creativecommons.org/licenses/by/4.0/) nan/nan/nan

A Coruña 43.3667/43.3667/43.3667
A Yun Pa 13.3939/13.3939/13.3939
Aabenraa 55.0444/55.0444/55.0444
...
’Tlat Bni Oukil 32.577/32.577/32.577
’s-Gravendeel 51.7833/51.7833/51.7833
’s-Gravenzande 52.0/52.0/52.0
’s-Heerenberg 51.8764/51.8764/51.8764
’s-Hertogenbosch 51.6833/51.6833/51.6833
Length: 41345, dtype: object
station

Adapted from https://simplemaps.com/data/world-cities nan/nan/nan

Licensed under Creative Commons Attribution 4.0 (https://creativecommons.org/licenses/by/4.0/) nan/nan/nan

A Coruña 43.3667/43.3667/43.3667
A Yun Pa 13.3939/13.3939/13.3939
Aabenraa 55.0444/55.0444/55.0444
...
’Tlat Bni Oukil 32.577/32.577/32.577
’s-Gravendeel 51.7833/51.7833/51.7833
’s-Gravenzande 52.0/52.0/52.0
’s-Heerenberg 51.8764/51.8764/51.8764
’s-Hertogenbosch 51.6833/51.6833/51.6833
Length: 41345, dtype: object
station

Adapted from https://simplemaps.com/data/world-cities nan/nan/nan

Licensed under Creative Commons Attribution 4.0 (https://creativecommons.org/licenses/by/4.0/) nan/nan/nan

A Coruña 43.3667/43.3667/43.3667
A Yun Pa 13.3939/13.3939/13.3939
Aabenraa 55.0444/55.0444/55.0444
...
’Tlat Bni Oukil 32.577/32.577/32.577
’s-Gravendeel 51.7833/51.7833/51.7833
’s-Gravenzande 52.0/52.0/52.0
’s-Heerenberg 51.8764/51.8764/51.8764
’s-Hertogenbosch 51.6833/51.6833/51.6833
Length: 41345, dtype: object
station

Adapted from https://simplemaps.com/data/world-cities nan/nan/nan

Licensed under Creative Commons Attribution 4.0 (https://creativecommons.org/licenses/by/4.0/) nan/nan/nan

A Coruña 43.3667/43.3667/43.3667
A Yun Pa 13.3939/13.3939/13.3939
Aabenraa 55.0444/55.0444/55.0444
...
’Tlat Bni Oukil 32.577/32.577/32.577
’s-Gravendeel 51.7833/51.7833/51.7833
’s-Gravenzande 52.0/52.0/52.0
’s-Heerenberg 51.8764/51.8764/51.8764
’s-Hertogenbosch 51.6833/51.6833/51.6833
Length: 41345, dtype: object
station

Adapted from https://simplemaps.com/data/world-cities nan/nan/nan

Licensed under Creative Commons Attribution 4.0 (https://creativecommons.org/licenses/by/4.0/) nan/nan/nan

A Coruña 43.3667/43.3667/43.3667
A Yun Pa 13.3939/13.3939/13.3939
Aabenraa 55.0444/55.0444/55.0444
...
’Tlat Bni Oukil 32.577/32.577/32.577
’s-Gravendeel 51.7833/51.7833/51.7833
’s-Gravenzande 52.0/52.0/52.0
’s-Heerenberg 51.8764/51.8764/51.8764
’s-Hertogenbosch 51.6833/51.6833/51.6833
Length: 41345, dtype: object
station

Adapted from https://simplemaps.com/data/world-cities nan/nan/nan

Licensed under Creative Commons Attribution 4.0 (https://creativecommons.org/licenses/by/4.0/) nan/nan/nan

A Coruña 43.3667/43.3667/43.3667
A Yun Pa 13.3939/13.3939/13.3939
Aabenraa 55.0444/55.0444/55.0444
...
’Tlat Bni Oukil 32.577/32.577/32.577
’s-Gravendeel 51.7833/51.7833/51.7833
’s-Gravenzande 52.0/52.0/52.0
’s-Heerenberg 51.8764/51.8764/51.8764
’s-Hertogenbosch 51.6833/51.6833/51.6833
Length: 41345, dtype: object
station

Adapted from https://simplemaps.com/data/world-cities nan/nan/nan

Licensed under Creative Commons Attribution 4.0 (https://creativecommons.org/licenses/by/4.0/) nan/nan/nan

A Coruña 43.3667/43.3667/43.3667
A Yun Pa 13.3939/13.3939/13.3939
Aabenraa 55.0444/55.0444/55.0444
...
’Tlat Bni Oukil 32.577/32.577/32.577
’s-Gravendeel 51.7833/51.7833/51.7833
’s-Gravenzande 52.0/52.0/52.0
’s-Heerenberg 51.8764/51.8764/51.8764
’s-Hertogenbosch 51.6833/51.6833/51.6833
Length: 41345, dtype: object
534 ms ± 20.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

polars

polars版本来源在这里:https://github.com/ifnesi/1brc
Polars 是一个开源的数据分析库,专为 Rust 和 Python 编程语言设计,旨在提供高速、内存高效的数据处理能力。它受到了 Apache Arrow 的启发,旨在作为 Pandas 库的一个高性能替代品

In [5]:

代码语言:javascript
复制
代码语言:javascript
复制
import polars as pl
file_path = "/home/mw/input/1brc8235/weather_stations (1).csv"
df_test= pl.read_csv(file_path, separator=';',skip_rows =2, new_columns=['station', 'measurement'])
print(df_test)
代码语言:javascript
复制
代码语言:javascript
复制
代码语言:javascript
复制
shape: (44_690, 2)
┌─────────────┬─────────────┐
│ station ┆ measurement │
│ --- ┆ --- │
│ str ┆ f64 │
╞═════════════╪═════════════╡
│ Jakarta ┆ -6.175 │
│ Delhi ┆ 28.61 │
│ Guangzhou ┆ 23.13 │
│ Mumbai ┆ 19.0761 │
│ Manila ┆ 14.5958 │
│ … ┆ … │
│ Numto ┆ 63.6667 │
│ Nord ┆ 81.7166 │
│ Timmiarmiut ┆ 62.5333 │
│ San Rafael ┆ -16.7795 │
│ Nordvik ┆ 74.0165 │
└─────────────┴─────────────┘

In [1]:

代码语言:javascript
复制
代码语言:javascript
复制
%%timeit
import polars as pl

Read data file

df = pl.scan_csv(
"/home/mw/input/1brc8235/weather_stations (1).csv",
separator=";",
has_header=False,skip_rows=2,
with_column_names=lambda cols: ["station_name", "measurement"],
)

Group data

grouped = (
df.group_by("station_name")
.agg(
pl.min("measurement").alias("min_measurement"),
pl.mean("measurement").alias("mean_measurement"),
pl.max("measurement").alias("max_measurement"),
)
.sort("station_name")
.collect(streaming=True)
)

Create or overwrite the output file

with open("./output.txt", "w") as file:
# Initialize writing to the file
file.write("{\n")
for data in grouped.iter_rows():
# Write each group's statistics to the file
file.write(f" {data[0]}={data[1]:.1f}/{data[2]:.1f}/{data[3]:.1f},\n")
# Close the file after writing all groups
file.write("}\n")

代码语言:javascript
复制
131 ms ± 3.19 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

收藏

评论

小结

原本以为掌握了Dask这样的并行计算神器,就已经站在了数据处理界的巅峰,却不曾想,天外有天,高手如云。1brc,真是让人眼界大开,仿佛推开了一扇通往新世界的大门。

技术的边界永远在不断拓展,总有更高效、更智能的方法等待我们去探索和掌握。