Hello, I need to download a netcdf file from the cds servers directly to an AWS S3 storage location.
To do that I would usually have the following code:
import fsspec
import cdsapi
ct = cdsapi.Client(quiet=True)
file_to_download_path="simplecache::s3://"+aws_bucket_name+'/'+global_climate_monthly_aws_s3_prefix_nc + 'lat%s_%s_%s.nc' % (lat, year, m)
with fsspec.open(file_to_download_path, mode='wb') as f:
----do something that writes to file f
when I try to use the following command:
ct.retrieve(dataset, kwargs, f) it doesn't work because ct.retrieve expects a file name, not a file Instance. I need to open my file with fsspec.open otherwise it doesn't work to write directly to the AWS S3 cloud.
Any idea how I can have the ct.retrieve command write directly to an AWS S3 file.
I am desperately trying to avoid having to write the file locally and then upload it.
Thank you
It’s not that easy.
You have to rewrite the parts of CDSAPI that interact with the filesystem to add the option to write to S3 with boto3.
I tried to do it in the last few weeks, because I thought it was going to be as easy as with the ecmwf web api python client where I was able to get it, but the reality is something else.
While digging through the code of cdsapi I discover it is calling classes from other modules (datapi and others), together with awful deprecation patches, so I would need to patch also these libraries. I decided it was not worth the effort.
To be fair the current status of the python clients that ECMWF “maintains” (web api, cdsapi, opendata) is so fragmented and incosistent that they should consider reorganizing it. For the moment it’s not worth patching it up to write to S3.
Hi Guido, claude suggested to be the following script which works for me: cdsapi.retrieve(target=None)(omit the target argument) returns a Results object with.location (the direct HTTPS URL) and .session (an already-authenticated requests.Session) — that’s all you need to drive a boto3multipart upload.
import boto3
import cdsapi
CHUNK_SIZE = 8 * 1024 * 1024 # 8 MB — boto3 multipart minimum is 5 MB
def cds_to_s3(request: dict, bucket: str, s3_key: str) -> None:
"""Retrieve a CDS dataset and stream it directly to S3 — no local disk used."""
result = cdsapi.Client().retrieve("reanalysis-era5-single-levels", request)
s3 = boto3.client("s3")
mpu = s3.create_multipart_upload(Bucket=bucket, Key=s3_key)
upload_id = mpu["UploadId"]
parts, part_number, buf = [], 1, b""
try:
with result.session.get(result.location, stream=True) as resp:
resp.raise_for_status()
for chunk in resp.iter_content(chunk_size=CHUNK_SIZE):
buf += chunk
if len(buf) >= CHUNK_SIZE:
part = s3.upload_part(Bucket=bucket, Key=s3_key,
UploadId=upload_id, PartNumber=part_number, Body=buf)
parts.append({"PartNumber": part_number, "ETag": part["ETag"]})
part_number += 1
buf = b""
if buf:
part = s3.upload_part(Bucket=bucket, Key=s3_key,
UploadId=upload_id, PartNumber=part_number, Body=buf)
parts.append({"PartNumber": part_number, "ETag": part["ETag"]})
s3.complete_multipart_upload(Bucket=bucket, Key=s3_key,
UploadId=upload_id, MultipartUpload={"Parts": parts})
except Exception:
s3.abort_multipart_upload(Bucket=bucket, Key=s3_key, UploadId=upload_id)
raise