前言
在这个数据爆炸的时代,气象科学作为一门与人类生活息息相关的学科,正以前所未有的速度积累着庞大的数据量。想象一下,全球数以万计的气象站点,每分钟、每小时不间断地记录着气温、湿度、风速、降水量等关键气象参数,这些数据汇聚起来,形成了一个浩瀚的数据海洋
1brc是什么
1 Billion Row Challenge Calculate the min, max, and average of 1 billion measurements
1BRC挑战的目标是编写一个Java程序,该程序从一个包含十亿行数据的文本文件中读取温度测量值,并计算每个气象站的最低温度、平均温度和最高温度。文本文件结构简单,每行记录一个温度值,格式为“城市名;温度值”。
你的任务,如果你选择接受的话,就是创造出执行这个任务速度最快的程序。过程中,你需要充分利用现代Java的各种优势,探索诸如虚拟线程、Vector API及SIMD指令集、垃圾回收优化、AOT编译等技术,以及任何你能想到的性能提升技巧。
程序最终输出应按城市名字母顺序排列,展示每个站点的最低、平均、最高温度值,形如:
{Abha=5.0/18.0/27.4, Abidjan=15.7/26.0/34.1, Abéché=12.1/29.4/35.6, ...}
当然活动的火爆以至于其他编程语言也不甘寂寞,纷纷进行挑战
项目内容
当然,现在也有人使用Python进行相关活动
小编去github上找了下原数据,但是只找到一个4万行的版本,如果有朋友有原数据欢迎分享到和鲸
下面展示基于dask和polars进行数据处理的代码
1brc网站:https://1brc.dev/
In [1]:
!pip install polars -i https://pypi.mirrors.ustc.edu.cn/simple/
Looking in indexes: https://pypi.mirrors.ustc.edu.cn/simple/
Collecting polars
Downloading https://mirrors.bfsu.edu.cn/pypi/web/packages/b8/ca/5280387f8aa6e62618e47882d3836324e2584611e1cea86ddf3ab4aa7f5c/polars-0.20.26-cp38-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (28.0 MB) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 28.0/28.0 MB 52.5 MB/s eta 0:00:0000:0100:01
Installing collected packages: polars
Successfully installed polars-0.20.26
数据展示
In [4]:
import pandas as pd
file_path='/home/mw/input/1brc8235/weather_stations (1).csv'
df = pd.read_csv(file_path, sep=';', names=['station', 'measurement'])
df.describe()
Out[4]:
找到的数据是仅有44691行的缩水版本,当然十亿行的版本有13GB大小
dask
dask是大家并行计算的老朋友,博主经常用来并行插值,并行处理数据等等
例如
进阶!dask解决超高精度tif读取与绘图难问题 又见dask! 如何使用dask-geopandas处理大型地理数据
In [2]:
%%timeit import dask.dataframe as dd
file_path = "/home/mw/input/1brc8235/weather_stations (1).csv"
def process_data_with_dask(file_path):
# 读取CSV文件到Dask DataFrame
df = dd.read_csv(file_path, sep=';', names=['station', 'measurement'])# 转换measurement列为浮点数 df['measurement'] = df['measurement'].astype(float) # 聚合操作:按station分组并计算min、mean、max agg_result = df.groupby('station')['measurement'].agg(['min', 'mean', 'max']).compute() # 排序结果并格式化输出 formatted_result = agg_result.sort_index().apply(lambda x: f"{x['min']}/{x['mean']}/{x['max']}", axis=1) return formatted_result
if name == "main":
result = process_data_with_dask(file_path)
print(result)
/opt/conda/lib/python3.9/site-packages/dask/dataframe/_pyarrow_compat.py:17: FutureWarning: Minimal version of pyarrow will soon be increased to 14.0.1. You are using 11.0.0. Please consider upgrading.
warnings.warn(
<magic-timeit>:1: DeprecationWarning: The current Dask DataFrame implementation is deprecated.
In a future release, Dask DataFrame will use new implementation that
contains several improvements including a logical query planning.
The user-facing DataFrame API will remain unchanged.The new implementation is already available and can be enabled by
installing the dask-expr library:$ pip install dask-expr
and turning the query planning option on:
>>> import dask >>> dask.config.set({'dataframe.query-planning': True}) >>> import dask.dataframe as dd
API documentation for the new implementation is available at
https://docs.dask.org/en/stable/dask-expr-api.html
Any feedback can be reported on the Dask issue tracker
https://github.com/dask/dask/issues
station
Adapted from https://simplemaps.com/data/world-cities nan/nan/nan
Licensed under Creative Commons Attribution 4.0 (https://creativecommons.org/licenses/by/4.0/) nan/nan/nan
A Coruña 43.3667/43.3667/43.3667
A Yun Pa 13.3939/13.3939/13.3939
Aabenraa 55.0444/55.0444/55.0444
...
’Tlat Bni Oukil 32.577/32.577/32.577
’s-Gravendeel 51.7833/51.7833/51.7833
’s-Gravenzande 52.0/52.0/52.0
’s-Heerenberg 51.8764/51.8764/51.8764
’s-Hertogenbosch 51.6833/51.6833/51.6833
Length: 41345, dtype: object
stationAdapted from https://simplemaps.com/data/world-cities nan/nan/nan
Licensed under Creative Commons Attribution 4.0 (https://creativecommons.org/licenses/by/4.0/) nan/nan/nan
A Coruña 43.3667/43.3667/43.3667
A Yun Pa 13.3939/13.3939/13.3939
Aabenraa 55.0444/55.0444/55.0444
...
’Tlat Bni Oukil 32.577/32.577/32.577
’s-Gravendeel 51.7833/51.7833/51.7833
’s-Gravenzande 52.0/52.0/52.0
’s-Heerenberg 51.8764/51.8764/51.8764
’s-Hertogenbosch 51.6833/51.6833/51.6833
Length: 41345, dtype: object
stationAdapted from https://simplemaps.com/data/world-cities nan/nan/nan
Licensed under Creative Commons Attribution 4.0 (https://creativecommons.org/licenses/by/4.0/) nan/nan/nan
A Coruña 43.3667/43.3667/43.3667
A Yun Pa 13.3939/13.3939/13.3939
Aabenraa 55.0444/55.0444/55.0444
...
’Tlat Bni Oukil 32.577/32.577/32.577
’s-Gravendeel 51.7833/51.7833/51.7833
’s-Gravenzande 52.0/52.0/52.0
’s-Heerenberg 51.8764/51.8764/51.8764
’s-Hertogenbosch 51.6833/51.6833/51.6833
Length: 41345, dtype: object
stationAdapted from https://simplemaps.com/data/world-cities nan/nan/nan
Licensed under Creative Commons Attribution 4.0 (https://creativecommons.org/licenses/by/4.0/) nan/nan/nan
A Coruña 43.3667/43.3667/43.3667
A Yun Pa 13.3939/13.3939/13.3939
Aabenraa 55.0444/55.0444/55.0444
...
’Tlat Bni Oukil 32.577/32.577/32.577
’s-Gravendeel 51.7833/51.7833/51.7833
’s-Gravenzande 52.0/52.0/52.0
’s-Heerenberg 51.8764/51.8764/51.8764
’s-Hertogenbosch 51.6833/51.6833/51.6833
Length: 41345, dtype: object
stationAdapted from https://simplemaps.com/data/world-cities nan/nan/nan
Licensed under Creative Commons Attribution 4.0 (https://creativecommons.org/licenses/by/4.0/) nan/nan/nan
A Coruña 43.3667/43.3667/43.3667
A Yun Pa 13.3939/13.3939/13.3939
Aabenraa 55.0444/55.0444/55.0444
...
’Tlat Bni Oukil 32.577/32.577/32.577
’s-Gravendeel 51.7833/51.7833/51.7833
’s-Gravenzande 52.0/52.0/52.0
’s-Heerenberg 51.8764/51.8764/51.8764
’s-Hertogenbosch 51.6833/51.6833/51.6833
Length: 41345, dtype: object
stationAdapted from https://simplemaps.com/data/world-cities nan/nan/nan
Licensed under Creative Commons Attribution 4.0 (https://creativecommons.org/licenses/by/4.0/) nan/nan/nan
A Coruña 43.3667/43.3667/43.3667
A Yun Pa 13.3939/13.3939/13.3939
Aabenraa 55.0444/55.0444/55.0444
...
’Tlat Bni Oukil 32.577/32.577/32.577
’s-Gravendeel 51.7833/51.7833/51.7833
’s-Gravenzande 52.0/52.0/52.0
’s-Heerenberg 51.8764/51.8764/51.8764
’s-Hertogenbosch 51.6833/51.6833/51.6833
Length: 41345, dtype: object
stationAdapted from https://simplemaps.com/data/world-cities nan/nan/nan
Licensed under Creative Commons Attribution 4.0 (https://creativecommons.org/licenses/by/4.0/) nan/nan/nan
A Coruña 43.3667/43.3667/43.3667
A Yun Pa 13.3939/13.3939/13.3939
Aabenraa 55.0444/55.0444/55.0444
...
’Tlat Bni Oukil 32.577/32.577/32.577
’s-Gravendeel 51.7833/51.7833/51.7833
’s-Gravenzande 52.0/52.0/52.0
’s-Heerenberg 51.8764/51.8764/51.8764
’s-Hertogenbosch 51.6833/51.6833/51.6833
Length: 41345, dtype: object
stationAdapted from https://simplemaps.com/data/world-cities nan/nan/nan
Licensed under Creative Commons Attribution 4.0 (https://creativecommons.org/licenses/by/4.0/) nan/nan/nan
A Coruña 43.3667/43.3667/43.3667
A Yun Pa 13.3939/13.3939/13.3939
Aabenraa 55.0444/55.0444/55.0444
...
’Tlat Bni Oukil 32.577/32.577/32.577
’s-Gravendeel 51.7833/51.7833/51.7833
’s-Gravenzande 52.0/52.0/52.0
’s-Heerenberg 51.8764/51.8764/51.8764
’s-Hertogenbosch 51.6833/51.6833/51.6833
Length: 41345, dtype: object
534 ms ± 20.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
polars
polars版本来源在这里:https://github.com/ifnesi/1brc
Polars 是一个开源的数据分析库,专为 Rust 和 Python 编程语言设计,旨在提供高速、内存高效的数据处理能力。它受到了 Apache Arrow 的启发,旨在作为 Pandas 库的一个高性能替代品
In [5]:
import polars as pl
file_path = "/home/mw/input/1brc8235/weather_stations (1).csv"
df_test= pl.read_csv(file_path, separator=';',skip_rows =2, new_columns=['station', 'measurement'])
print(df_test)
shape: (44_690, 2)
┌─────────────┬─────────────┐
│ station ┆ measurement │
│ --- ┆ --- │
│ str ┆ f64 │
╞═════════════╪═════════════╡
│ Jakarta ┆ -6.175 │
│ Delhi ┆ 28.61 │
│ Guangzhou ┆ 23.13 │
│ Mumbai ┆ 19.0761 │
│ Manila ┆ 14.5958 │
│ … ┆ … │
│ Numto ┆ 63.6667 │
│ Nord ┆ 81.7166 │
│ Timmiarmiut ┆ 62.5333 │
│ San Rafael ┆ -16.7795 │
│ Nordvik ┆ 74.0165 │
└─────────────┴─────────────┘
In [1]:
%%timeit
import polars as plRead data file
df = pl.scan_csv(
"/home/mw/input/1brc8235/weather_stations (1).csv",
separator=";",
has_header=False,skip_rows=2,
with_column_names=lambda cols: ["station_name", "measurement"],
)Group data
grouped = (
df.group_by("station_name")
.agg(
pl.min("measurement").alias("min_measurement"),
pl.mean("measurement").alias("mean_measurement"),
pl.max("measurement").alias("max_measurement"),
)
.sort("station_name")
.collect(streaming=True)
)Create or overwrite the output file
with open("./output.txt", "w") as file:
# Initialize writing to the file
file.write("{\n")
for data in grouped.iter_rows():
# Write each group's statistics to the file
file.write(f" {data[0]}={data[1]:.1f}/{data[2]:.1f}/{data[3]:.1f},\n")
# Close the file after writing all groups
file.write("}\n")
131 ms ± 3.19 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
收藏
评论
小结
原本以为掌握了Dask这样的并行计算神器,就已经站在了数据处理界的巅峰,却不曾想,天外有天,高手如云。1brc,真是让人眼界大开,仿佛推开了一扇通往新世界的大门。
技术的边界永远在不断拓展,总有更高效、更智能的方法等待我们去探索和掌握。