Commit 36f979e8 authored by lixiao's avatar lixiao

Initial commit

parents
Pipeline #9830 canceled with stages
FROM python:3.9-slim
# 设置工作目录
WORKDIR /app
# 安装依赖
COPY requirements.txt .
RUN pip install -r requirements.txt
# 复制项目文件
COPY . .
# 运行主程序
CMD ["python", "main.py"]
import rasterio
import numpy as np
class IndexCalculate:
def __init__(self,band_data:list):
self.b1 = np.float32(band_data[0])
self.b2 = np.float32(band_data[1])
def deviation_index(self):
mask = (self.b1 - self.b2)/(self.b1 + self.b2)
return mask
import logging
def set_log(log_path,api_name):
logger = logging.getLogger(api_name)
logger.setLevel(level=logging.DEBUG)
formatter = logging.Formatter('{"logL":"%(levelname)s","logT":"%(asctime)s","logM":"%(name)s", "%(message)s"}',
datefmt='%Y-%m-%d %H:%M:%S')
handler = logging.FileHandler(log_path,mode='w',encoding='utf-8')
handler.setLevel(level=logging.INFO)
handler.setFormatter(formatter)
console = logging.StreamHandler()
console.setLevel(logging.DEBUG)
logger.addHandler(handler)
logger.addHandler(console)
return logger
import os
import yaml
from algorithms.IndexCalculate import IndexCalculate
from utils.ImageProcessor import ImageProcessor
from utils.SarProcessor import SarProcessor
from utils.out_json import out_json
import logSet
import json
from datetime import datetime
visible_satellite = ["sentinel2","landsat"]
sar_satellite = ["sentinel1"]
ref_config_path = os.path.join(os.getcwd(),"reference.yaml")
with open(ref_config_path, 'r') as file:
ref_config = yaml.safe_load(file)
def search_band(input_config,ref_config):
input_satellite = input_config["satellite"]
input_xml_path = input_config["input_path"]
current_dir = os.path.dirname(input_xml_path)
ref_band = ref_config['satellite'].get(input_satellite)
if input_satellite in visible_satellite:
if input_satellite == "sentinel2":
green_band_path = os.path.join(current_dir,"R10m",f'{ref_band["Green"]}.jp2')
red_band_path = os.path.join(current_dir,"R10m",f'{ref_band["Red"]}.jp2')
nir_band_path = os.path.join(current_dir,"R10m",f'{ref_band["NIR"]}.jp2')
cloud_band_path = os.path.join(current_dir,"qi",'CLD_20m.jp2')
elif input_satellite == "landsat":
green_band_path = input_xml_path.replace('MTL.xml',f'SR_{ref_band["Green"]}.TIF')
red_band_path = input_xml_path.replace('MTL.xml',f'SR_{ref_band["Red"]}.TIF')
nir_band_path =input_xml_path.replace('MTL.xml',f'SR_{ref_band["NIR"]}.TIF')
cloud_band_path = input_xml_path.replace('MTL.xml',f'QA_PIXEL.TIF')
# print('current_dir:',current_dir)
# print('green_band_path',green_band_path)
else:
raise ValueError(f"Unsupported satellite type: {ref_band}")
return {
"green_band_path": green_band_path,
"red_band_path" : red_band_path,
"nir_band_path" : nir_band_path,
"cloud_band_path": cloud_band_path
}
elif input_satellite in sar_satellite:
vv_band_path = os.path.join(current_dir,"measurement",f'{ref_band["vv"]}.tiff')
vh_band_path = os.path.join(current_dir,"measurement",f'{ref_band["vh"]}.tiff')
return{
"vv_band_path": vv_band_path,
"vh_band_path": vh_band_path
}
def get_s2_tif_name(input_config):
product_info_path = input_config['input_path'].replace('metadata.xml','productInfo.json')
with open(product_info_path, 'r') as file:
data = json.load(file)
name = data['name']
parts = name.split('T')
print(parts)
date = parts[0]
number = parts[2].split('_')[0]
# 获取当前时间并格式化
current_time = datetime.now().strftime("%Y%m%d%H%M%S")
if input_config['index'] == 'NDWI':
index_name = 'WaterExtraction'
elif input_config['index'] == 'NDVI':
index_name = 'NDVI'
output_name = "_".join([date, number, index_name, current_time]) + '.tif'
os.makedirs(input_config["output_path"], exist_ok=True)
output_path = os.path.join(input_config['output_path'],output_name)
return output_path
def get_landsat_tif_name(input_config):
parts = input_config['input_path'].split('_')
# 获取当前时间并格式化
if input_config['index'] == 'NDWI':
index_name = 'WaterExtraction'
elif input_config['index'] == 'NDVI':
index_name = 'NDVI'
current_time = datetime.now().strftime("%Y%m%d%H%M%S")
output_name = "_".join([parts[0], parts[1], parts[3], parts[2], index_name, current_time]) + '.tif'
os.makedirs(input_config["output_path"], exist_ok=True)
output_path = os.path.join(input_config['output_path'],output_name)
return output_path
def visible_go(input_config):
#拼写输出tif路径
if input_config['satellite'] =='senitnel2':
output_path = get_s2_tif_name(input_config)
elif input_config['satellite'] =='landsat':
output_path = get_landsat_tif_name(input_config)
print(output_path)
log_path = output_path.replace('.tif','.log')
logger = logSet.set_log(log_path,'开始读取数据')
band_path_dic = search_band(input_config,ref_config)
satellite = input_config['satellite']
print(band_path_dic)
#load tif数据
index = input_config["index"]
image_processor = ImageProcessor(band_path_dict = band_path_dic,index = index )
band_data_list = image_processor.load_bands()
logger.info(msg= 'logC": "加载数据完成,开始指数计算", "logP": "20%')
#指数计算
index_calculate = IndexCalculate(band_data_list)
mask = index_calculate.deviation_index()
cloud_data = image_processor.get_cloud(satellite=satellite)
masked_data = image_processor.remove_cloud(mask,cloud_data,satellite=satellite)
logger.info(msg= 'logC": "指数计算完成,开始写出tif", "logP": "60%')
#写出tif
image_processor.mask_save(masked_data,output_path)
logger.info(msg= 'logC": "写出tif完成,开始写出json", "logP": "80%')
if index == "NDWI":
#输出tif为二值时,去除破碎像元
print('去除破碎像元开始')
image_processor.sieve_filter(output_path)
#写出json
if input_config['satellite'] == 'sentinel2':
json_path = input_config['input_path'].replace('metadata.xml','tileInfo.json')
if input_config['satellite'] == 'landsat':
json_path = input_config['input_path'].replace('MTL.xml','stac.json')
outjson_path = out_json(input_config, json_path, output_path)
logger.info(msg= 'logC": f"写出json完成,{outjson_path}", "logP": "100%')
return 'success'
# def sar_go(input_config):
# band_path_dic = search_band(input_config,ref_config)
# satellite = input_config['satellite']
# print(band_path_dic)
# #load tif数据
# index = input_config["index"]
# sar_processor = SarProcessor(band_path_dict = band_path_dic,index = index )
# vv_array, vh_array = sar_processor.load_bands()
# #lee滤波
# filtered_vv_array = sar_processor.lee_filter(vv_array)
# filtered_vh_array = sar_processor.lee_filter(vh_array)
# #vv/vh比值
# water_ratio = sar_processor.compute_ratio(filtered_vv_array, filtered_vh_array)
# #Ostu阈值法提取水体
# water_mask = sar_processor.extract_water(water_ratio)
# output_path = input_config["output_path"]
# os.makedirs(os.path.dirname(input_config["output_path"]), exist_ok=True)
# sar_processor.mask_save(water_mask,output_path)
# return 'sar success'
if __name__ == "__main__":
input_config = {
"index": "NDWI",
"satellite": "sentinel2",
"input_path": r"D:\hunan\50RKQ_l2a\metadata.xml",
"output_path": r"D:\hunan\50RKQ_l2a\test2"
}
# input_config = {
# "index": "NDVI",
# "satellite": "landsat",
# "input_path": r"D:\hunan\landsat_data\LC08_L2SP_122043_20231227_20240104_02_T1\LC08_L2SP_122043_20231227_20240104_02_T1_MTL.xml",
# "output_path": r"D:\hunan\landsat_data\LC08_L2SP_122043_20231227_20240104_02_T1\test\test_ndvi7.tif"
# }
# input_config = {
# "index": "NDWI",
# "satellite": "sentinel1",
# # "input_path":r"D:\hunan\s1_20240116_aws\S1A_IW_GRDH_1SDV_20231031T103557_20231031T103622_051008_062673_CCD9\manifest.safe",
# # "output_path": r"D:\hunan\s1_20240116_aws\S1A_IW_GRDH_1SDV_20231031T103557_20231031T103622_051008_062673_CCD9\test\test_water3_mask.tif"
# "input_path":r"D:\hunan\S1A_IW_GRDH_1SDV_20231014T102655_20231014T102720_050760_061DF1_1AE9.SAFE\manifest.safe",
# # "output_path":r"D:\hunan\S1A_IW_GRDH_1SDV_20231014T102655_20231014T102720_050760_061DF1_1AE9.SAFE\test\test2.tif"
# "output_path":r"D:\hunan\S1A_IW_GRDH_1SDV_20231014T102655_20231014T102720_050760_061DF1_1AE9.SAFE\test"
# }
if input_config["satellite"] in visible_satellite:
result = visible_go(input_config)
# elif input_config["satellite"] in sar_satellite:
# result = sar_go(input_config)
# print(result)
\ No newline at end of file
data:
input_path:
output_path:
algorithms:
NDWI: (Green - NIR) / (GREEN + NIR)
NDVI: (NIR - Red) / (NIR + Red)
satellite:
sentinel2:
Green: B03
Red: B04
NIR: B08
landsat:
Green: B3
Red: B4
NIR: B5
sentinel1:
vv: iw-vv
vh: iw-vh
postprocessing:
import rasterio
import numpy as np
from osgeo import gdal
import scipy.ndimage
class ImageProcessor():
def __init__(self, band_path_dict:dict,index:str):
self.index = index
self.green_band_path = band_path_dict["green_band_path"]
self.red_band_path = band_path_dict["red_band_path"]
self.nir_band_path = band_path_dict["nir_band_path"]
self.cloud_band_path = band_path_dict["cloud_band_path"]
self.b1 = None
self.b2 = None
self.profile = None
def load_bands(self):
if self.index == "NDWI":
with rasterio.open(self.green_band_path) as src:
self.profile = src.profile.copy()
print('self.profile')
self.profile.update({
'driver':'GTiff',
'dtype':'uint8',
'nodata':0
})
print(self.profile)
self.b1 = src.read(1)
with rasterio.open(self.nir_band_path) as src:
self.b2 = src.read(1)
elif self.index == "NDVI":
with rasterio.open(self.nir_band_path) as src:
print('self.profile')
self.b1 = src.read(1)
self.profile = src.profile.copy()
self.profile.update({
'driver':'GTiff',
'dtype':'float32',
'nodata': -9999
})
print(self.profile)
with rasterio.open(self.red_band_path) as src:
self.b2 = src.read(1)
return [self.b1, self.b2]
def sieve_filter(self,mask_path):
mask_image = gdal.Open(mask_path,1)
mask_band = mask_image.GetRasterBand(1)
gdal.SieveFilter(srcBand = mask_band, maskBand=None, dstBand = mask_band,
threshold=500, connectedness=8)
del mask_image, mask_band
def get_cloud(self,satellite=None):
with rasterio.open(self.cloud_band_path) as src:
cloud_data = src.read(1)
if satellite == "sentinel2":
#CLD波段分辨率为20M,扩展为10m
cloud_data = scipy.ndimage.zoom(cloud_data,zoom=[2,2],order=0)
return cloud_data
def remove_cloud(self, mask_data, cloud_data, threshold = 0,satellite='sentinel2'):
if self.index == "NDWI":
#二值化
mask_data = np.where(mask_data > threshold, 1, 0)
if satellite == 'sentinel2':
#云概率大于0的都算作云
masked_data = np.where(cloud_data > 0, 0, mask_data)
elif satellite =='landsat':
cloud_values = [22280,23888,24344,24472,30048,54596,54852,55052]
masked_data = np.where(np.isin(cloud_data,cloud_values), 0, mask_data)
pass
elif self.index == "NDVI":
mask_data = np.where(((mask_data >= -1) & (mask_data <= 1)), mask_data, -9999)
if satellite == 'sentinel2':
masked_data = np.where(cloud_data > 0, -9999, mask_data)
elif satellite =='landsat':
cloud_values = [22280,23888,24344,24472,30048,54596,54852,55052]
masked_data = np.where(np.isin(cloud_data,cloud_values), -9999,mask_data)
return masked_data
def mask_save(self,mask,output_file):
#防止ZIPDecode:Decoding 问题
self.profile.update(
{'compress':'LZW'}
)
with rasterio.open(output_file, 'w', **self.profile) as dst:
if mask.ndim != 2:
raise ValueError("Array must be 2D")
dst.write(mask, 1)
print(self.profile)
# 输入json文件
# def out_json()
import rasterio
import numpy as np
from osgeo import gdal
from scipy.ndimage import uniform_filter, gaussian_filter
from skimage.filters import threshold_otsu
import os
# os.environ["PROJ_LIB"] = r"D:\Miniconda3\Library\share\proj\proj.db"
class SarProcessor():
def __init__(self,band_path_dict:dict, index:str):
self.index = index
self.vv_band_path = band_path_dict["vv_band_path"]
self.vh_band_path = band_path_dict["vh_band_path"]
self.vv = None
self.vh = None
self.profile = None
def load_bands(self):
with rasterio.open(self.vv_band_path,'r') as src:
crs = rasterio.crs.CRS.from_epsg(4326)
print('crs',crs)
print('crs',crs)
src.profile.update(crs = crs)
self.profile = src.profile.copy()
self.profile.update({
'driver':'GTiff',
'dtype':'float32',
'crs':crs
})
print('sar profile:')
print(self.profile)
self.vv = src.read(1)
with rasterio.open(self.vh_band_path) as src:
self.vh = src.read(1)
return self.vv, self.vh
# def load_bands(self):
# vv_dataset = gdal.Open(self.vv_band_path)
# print(self.vv_band_path)
# print(vv_dataset)
# vv_band = vv_dataset.GetRasterBand(1)
# print('vv_band',vv_band)
# vv_array = vv_band.ReadAsArray().astype(np.uint16)
# print('vv_array',vv_array)
# vh_dataset = gdal.Open(self.vh_band_path)
# print(self.vh_band_path)
# vh_array = vh_dataset.GetRasterBand(1).ReadAsArray().astype(np.uint16)
# geo_transform = vv_dataset.GetGeoTransform()
# projection = vv_dataset.GetProjection
# self.geo_transform = geo_transform
# self.projection = projection
# self.width = vv_dataset.RasterXSize
# self.height = vv_dataset.RasterYSize
# return vv_array,vh_array
# Lee 滤波函数(通过局部均值和方差减少斑点噪声)
def lee_filter(self,image, size=(7,7)):
mean = uniform_filter(image, size)
mean_sq = uniform_filter(image**2, size)
variance = mean_sq - mean**2
overall_variance = np.var(image)
weights = (variance / (variance + overall_variance)).astype(np.float32)
filtered_image = mean + weights * (image - mean)
return filtered_image
# 增强 VV 和 VH 图像,通过比值进行增强
def compute_ratio(self, vv, vh):
ratio = np.log10(10*vv*vh)
ratio = np.nan_to_num(ratio, nan=0.0, posinf=0.0, neginf=0.0)
# ratio = vv / (vh + 1e-6)
return ratio
# return vv / (vh + 1e-6) # 避免除以 0
# 使用 Otsu 方法提取水体
def extract_water(self, image):
threshold = threshold_otsu(image)
print('threshold',threshold)
water_mask = image < threshold
return water_mask
def mask_save(self,mask,output_file):
#防止ZIPDecode:Decoding 问题
self.profile.update(
{'compress':'LZW'}
)
with rasterio.open(output_file, 'w', **self.profile) as dst:
if mask.ndim != 2:
raise ValueError("Array must be 2D")
dst.write(mask, 1)
print(self.profile)
# def write_tiff(self, mask, output_file):
# driver = gdal.GetDriverByName('Gtiff')
# out_dataset = driver.Create(output_file, self.width, self.height, 1, gdal.GDT_Byte)
# out_dataset.SetGeoTransform(self.geo_transform)
# out_dataset.SetProjection(self.projection)
# out_band = out_dataset.GetRasterBand(1)
# out_band.WriteArray(mask)
# out_band.SetNoDataValue(0)
# out_band.FlushCache()
# out_dataset = None
# print('sar success')
if __name__ == "__main__":
path = r"D:\hunan\S1A_IW_GRDH_1SDV_20231014T102655_20231014T102720_050760_061DF1_1AE9.SAFE\manifest.safe"
sar_dataset = gdal.Open(path)
print(sar_dataset.GetProjection())
\ No newline at end of file
from osgeo import gdal, osr
from pyproj import Transformer
import rasterio
import numpy as np
import json
import re
def get_epsg_code_rasterio(filepath):
with rasterio.open(filepath) as dataset:
epsg = dataset.crs.to_epsg()
res = dataset.res[0]
wkt = dataset.crs.wkt
match = re.search(r'PROJCS\["([^"]+)"', wkt) or re.search(r'GEOGCS\["([^"]+)"', wkt)
if match:
wkt = match.group(1)
else:
return "Unknown Projection"
return epsg, res, wkt
def get_extent_wgs84(filepath):
with rasterio.open(filepath) as dataset:
# 获取影像的边界坐标
bounds = dataset.bounds
# 获取影像的 CRS 和转换为 WGS84 的 Transformer
transformer = Transformer.from_crs(dataset.crs, "EPSG:4326", always_xy=True)
# 将四个角转换到 WGS84
xmin_wgs84, ymin_wgs84 = transformer.transform(bounds.left, bounds.bottom)
xmax_wgs84, ymax_wgs84 = transformer.transform(bounds.right, bounds.top)
# 格式化输出
extent_wgs84 = {
"extent": {
"xmin": xmin_wgs84,
"xmax": xmax_wgs84,
"ymin": ymin_wgs84,
"ymax": ymax_wgs84
}
}
return extent_wgs84
def get_cloud_coverage(json_path, satellite):
with open(json_path, 'r') as file:
data = json.load(file)
if satellite == 'sentinel2':
cloud_coverage = data['cloudyPixelPercentage']
dataTime = data["timestamp"]
grid = str(data["utmZone"])+data["latitudeBand"]+data["gridSquare"]
elif satellite == 'landsat':
cloud_coverage = data["properties"]["eo:cloud_cover"]
dataTime = data["properties"]["datetime"]
grid = data["properties"]["landsat:wrs_path"]+data["properties"]["landsat:wrs_row"]
return cloud_coverage,dataTime,grid
def get_footprint(json_path, satellite):
#哨兵2 tileInfo.json 中 tileDataGeometry
#landsat SR_stac.json 中geometry
footprint = {
"coordinates":None,
'type': "MultiPolygon"
}
with open(json_path, 'r') as file:
data = json.load(file)
if satellite == 'sentinel2':
crs_name = data['tileDataGeometry']['crs']['properties']['name']
epsg_code = crs_name.split(':')[-1]
transformer = Transformer.from_crs(f"EPSG:{epsg_code}", "EPSG:4326")
coordinates_utm = data['tileGeometry']['coordinates']
coordinates_wgs84 = [transformer.transform(x, y) for x, y in coordinates_utm[0]]
elif satellite == 'landsat':
coordinates_wgs84 = data['geometry']['coordinates']
footprint.update({
"coordinates": [[coordinates_wgs84]]
})
return footprint
if __name__ == "__main__":
# filepath = r'D:\hunan\50RKQ_l2a\test\test6.tif'
# json_path = r'D:\hunan\50RKQ_l2a\tileInfo.json'
filepath = r'D:\hunan\landsat_data\LC08_L2SP_122043_20231227_20240104_02_T1\test\test_ndvi6.tif'
json_path = r'D:\hunan\landsat_data\LC08_L2SP_122043_20231227_20240104_02_T1\LC08_L2SP_122043_20231227_20240104_02_T1_SR_stac.json'
satellite = 'sentinel2'
# extent_wgs84 = get_extent_wgs84(filepath, background_value=0)
# print("Extent (WGS84 coordinates):", extent_wgs84)
# footprint_wgs84 = get_footprint(json_path,satellite = 'landsat')
# print("Footprint (WGS84 coordinates):", footprint_wgs84)
epsg, res, wkt = get_epsg_code_rasterio(filepath)
print(epsg,res,wkt)
{
"telephone": "",
"address": "",
"manufacture": "",
"softwareVersion": "8.8.3",
"inputDataUrl": "/testdata/s2-l2a/50RKQ_l2a/metadata.xml",
"productInfoList": [
{
"centerY": 26.60461,
"outputProductInfo": {
"dataExtend": {},
"outputRasterUrl": "S2B_MSIL2A_20240114_T50RKQ_WaterExtraction_20241025141332.tif",
"vectorDataUrl": "no_intersection ",
"thumbnailUrl": "S2B_MSIL2A_20240114_T50RKQ_WaterExtraction_20241025141332_thumbnail.png",
"thematicMapUrl": "",
"quickViewUrl": "S2B_MSIL2A_20240114_T50RKQ_WaterExtraction_20241025141332.png"
},
"subProductionType": "",
"extent": {
"xmin": 113.97448687513339,
"xmax": 115.0975785865723,
"ymin": 26.099861306733171,
"ymax": 27.109361416616217
},
"submodel": "",
"productionId": "",
"productionTime": "2024-10-25T06:14:40.25034517049788Z",
"footprint": {
"coordinates": [
[
[
[
113.97449719059938,
27.089873004266607
],
[
115.08109459169901,
27.109352255482413
],
[
115.09756844363261,
26.118530125887634
],
[
114.00044272042848,
26.099870531919972
],
[
113.97449719059938,
27.089873004266607
]
]
]
],
"type": "MultiPolygon"
},
"productionName": "",
"centerX": 114.53603,
"dataSize": 115.06055,
"dataId": ""
}
],
"imageMetadata": {
"wkid": "32650",
"sensor": "MSI",
"cloudCoverage": " 0.0000000",
"row": "",
"grid": "50RKQ",
"satelliteType": "光学",
"dataFormat": "TIFF",
"imageTime": "2024-01-14T03:10:30.681Z",
"resolution": "10.000000",
"satellite": "Sentinel2",
"wkt": "WGS_1984_UTM_Zone_50N",
"centerX": "114.53603",
"centerY": "26.604611",
"path": ""
},
"modelName": "JianJu_Sentinel2_WaterExtraction_l2a"
}
\ No newline at end of file
import json
import rasterio
from rasterio.transform import rowcol,xy
from pyproj import Transformer
from . import get_extent
import os
from datetime import datetime, timezone
# 获取当前文件的目录路径
current_dir = os.path.dirname(os.path.abspath(__file__))
# 构建 JSON 文件的路径
template_path = os.path.join(current_dir, 'out_template.json')
with open(template_path,'r',encoding='utf-8') as f:
input_template = json.load(f)
def calculate_center(tif_path):
with rasterio.open(tif_path) as dataset:
center_row = dataset.height // 2
center_col = dataset.width //2
center_x, center_y = xy(dataset.transform, center_row, center_col)
transformer = Transformer.from_crs(dataset.crs, "EPSG:4326", always_xy=True)
lon, lat = transformer.transform(center_x,center_y)
center_lon = lon
center_lat = lat
return center_lon,center_lat
def get_file_timestamp_iso(tif_path):
# 获取文件的最后修改时间戳
timestamp = os.path.getmtime(tif_path)
# 将时间戳转换为UTC的ISO 8601格式
timestamp_dt = datetime.fromtimestamp(timestamp, tz=timezone.utc)
timestamp_iso = timestamp_dt.isoformat(timespec='seconds') + "Z"
return timestamp_iso
def fill_productInfoList(tif_path, input_config,json_path):
outputProductInfo = {
"outputRasterUrl" : tif_path,
}
productInfoList = {
"centerX":calculate_center(tif_path)[0],
"centerY":calculate_center(tif_path)[1],
"outputProductInfo":outputProductInfo,
"extent":get_extent.get_extent_wgs84(tif_path),
"productionTime":get_file_timestamp_iso(tif_path),
"footprint":get_extent.get_footprint(json_path, input_config["satellite"])
}
input_template['productInfoList'][0].update(
productInfoList
)
return input_template['productInfoList']
def fill_imageMatadata(tif_path, input_config, json_path):
epsg, res, wkt = get_extent.get_epsg_code_rasterio(tif_path)
satellite = input_config['satellite']
cloudCoverage,dataTime,grid = get_extent.get_cloud_coverage(json_path, satellite)
if input_config['satellite'] == 'sentinel2':
sensor = "MSI"
elif input_config['satellite'] == 'landsat':
sensor = "OLI"
imageMetadata = {
"wkid" : str(epsg),
"sensor" : sensor,
'cloudCoverage':str(cloudCoverage),
"grid": grid,
"imageTime" :dataTime,
"resolution":str(res),
"satellite": input_config['satellite'],
"wkt":wkt,
"centerX":calculate_center(tif_path)[0],
"centerY":calculate_center(tif_path)[1]
}
input_template['imageMetadata'].update(
imageMetadata
)
return input_template['imageMetadata']
def out_json(input_config, json_path, tif_path):
productInfoList = fill_productInfoList(tif_path, input_config, json_path)
imageMetadata = fill_imageMatadata(tif_path, input_config, json_path)
input_template.update(
{
"inputDataUrl" : input_config['input_path'],
"outputRasterUrl" : tif_path,
"productInfoList" : productInfoList,
"imageMetadata" :imageMetadata
}
)
outjson_path = tif_path.replace('.tif','.json')
with open(outjson_path, 'w', encoding="utf-8") as file:
json.dump(input_template, file, ensure_ascii=False, indent=4) # indent=4 用于美化输出
return outjson_path
if __name__ == "__main__":
input_config = {
"index": "NDWI",
"satellite": "sentinel2",
"input_path": r"D:\hunan\50RKQ_l2a\metadata.xml",
"output_path": r"D:\hunan\50RKQ_l2a\test\test6.tif"
}
# input_config = {
# "index": "NDVI",
# "satellite": "landsat",
# "input_path": r"D:\hunan\landsat_data\LC08_L2SP_122043_20231227_20240104_02_T1\LC08_L2SP_122043_20231227_20240104_02_T1_MTL.xml",
# "output_path": r"D:\hunan\landsat_data\LC08_L2SP_122043_20231227_20240104_02_T1\test\test_ndvi7.tif"
# }
if input_config['satellite'] == 'sentinel2':
json_path = input_config['input_path'].replace('metadata.xml','tileInfo.json')
if input_config['satellite'] == 'landsat':
json_path = input_config['input_path'].replace('MTL.xml','stac.json')
tif_path = input_config['output_path']
outjson_path = out_json(input_config, json_path, tif_path)
{
"telephone": "",
"address": "",
"manufacture": "",
"softwareVersion": "",
"inputDataUrl": "",
"productInfoList": [
{
"centerY": 0,
"outputProductInfo": {
"dataExtend": {},
"outputRasterUrl": "",
"vectorDataUrl": "",
"thumbnailUrl": "",
"thematicMapUrl": "",
"quickViewUrl": ""
},
"subProductionType": "",
"extent": {
"xmin": 113.97448687513339,
"xmax": 115.0975785865723,
"ymin": 26.099861306733171,
"ymax": 27.109361416616217
},
"submodel": "",
"productionId": "",
"productionTime": "",
"footprint": {
"coordinates": [
],
"type": "MultiPolygon"
},
"productionName": "",
"centerX": 0,
"dataSize": 0,
"dataId": ""
}
],
"imageMetadata": {
"wkid": "",
"sensor": "",
"cloudCoverage": "",
"row": "",
"grid": "",
"satelliteType": "光学",
"dataFormat": "TIFF",
"imageTime": "",
"resolution": "",
"satellite": "",
"wkt": "",
"centerX": "",
"centerY": "",
"path": ""
},
"modelName": ""
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment