Kedro与Streamlit集成:构建动态数据管道的实践指南


kedro与streamlit集成:构建动态数据管道的实践指南

本教程详细阐述了如何在Streamlit应用中有效集成Kedro数据管道,实现动态数据加载与处理。核心在于通过KedroSession.run()方法的data_catalog参数传递自定义的DataCatalog,以管理Streamlit中加载的DataFrame数据。文章还深入分析了常见的集成误区,如直接修改KedroContext属性,并提供了正确的代码示例和最佳实践,确保数据流的顺畅与高效。

引言:Kedro与Streamlit的强大结合

在现代数据应用开发中,数据管道的自动化与交互式界面的结合变得日益重要。Kedro作为一个生产级的数据管道框架,能够帮助我们构建可维护、可测试和可重用的数据处理逻辑。而Streamlit则以其简洁的API,使得Python开发者能够快速构建美观的数据应用。将Kedro管道集成到Streamlit应用中,可以实现用户通过Web界面上传数据,并实时触发复杂的Kedro数据处理流程,从而构建功能强大且用户友好的数据产品。

本教程的目标是指导您如何在Streamlit应用中运行特定的Kedro管道,并向该管道传递在Streamlit中动态加载的数据,通过自定义的DataCatalog进行管理。

理解Kedro的数据流管理核心:DataCatalog与KedroSession

在深入集成之前,理解Kedro的两个核心概念至关重要:

  1. DataCatalog (数据目录): Kedro的DataCatalog是管理所有数据源的中心枢纽。它定义了数据集的名称、类型以及加载/保存数据的方式。在动态数据场景中,MemoryDataSet特别有用,它允许我们将Python对象(如Pandas DataFrame)作为数据集在内存中传递,而无需写入磁盘。
  2. KedroSession (会话): KedroSession是Kedro项目的入口点,负责加载项目上下文、配置以及运行管道。它是执行Kedro操作的主要接口。KedroSession.run()方法是启动管道执行的关键。

正确姿势:通过KedroSession.run()传递自定义DataCatalog

在Streamlit中运行Kedro管道并传递动态数据,最核心且正确的方法是利用KedroSession.run()方法的data_catalog参数。这个参数允许您在运行时提供一个临时的、自定义的DataCatalog,它将覆盖或扩展Kedro项目默认的catalog.yml中定义的同名数据集。

实现步骤:

  1. 在Streamlit中加载数据: 使用Streamlit的文件上传器或其他输入组件获取用户数据,并将其转换为Pandas DataFrame。
  2. 创建MemoryDataSet: 将这些DataFrame封装成MemoryDataSet实例。MemoryDataSet允许Kedro管道在内存中处理这些数据。
  3. 构建自定义DataCatalog: 创建一个新的DataCatalog实例,并将您的MemoryDataSets添加到其中,使用它们在Kedro管道中对应的输入数据集名称作为键。
  4. 创建KedroSession并运行管道: 使用KedroSession.create()初始化一个会话,然后调用session.run(),并将自定义的DataCatalog通过data_catalog参数传递进去,同时指定要运行的pipeline_name。

示例代码:

以下是一个在Streamlit中集成Kedro管道的完整示例,展示了如何动态加载数据并传递给Kedro:

import streamlit as st
import pandas as pd
from kedro.framework.session import KedroSession
from kedro.io import DataCatalog, MemoryDataSet
import os

# 假设您的Kedro项目位于当前工作目录下的 'my_kedro_project'
# 请根据实际情况调整 project_path
project_path = os.path.join(os.getcwd(), 'my_kedro_project')

st.title("Kedro与Streamlit数据处理应用")

st.header("上传您的数据")

# 模拟Streamlit文件上传和DataFrame创建
# 在实际应用中,这里会是 st.file_uploader 和 pd.read_csv/excel 等
uploaded_file1 = st.file_uploader("上传 reagentes_raw.csv", type=['csv'])
uploaded_file2 = st.file_uploader("上传 balanco_de_massas_raw.csv", type=['csv'])
# ... 更多文件上传器

df1, df2, df3, df4, df5, df6 = None, None, None, None, None, None

if uploaded_file1:
    df1 = pd.read_csv(uploaded_file1)
    st.write("reagentes_raw 数据预览:")
    st.dataframe(df1.head())
if uploaded_file2:
    df2 = pd.read_csv(uploaded_file2)
    st.write("balanco_de_massas_raw 数据预览:")
    st.dataframe(df2.head())
# ... 处理其他上传文件

# 确保所有必需的DataFrame都已加载 (这里仅为演示,实际应根据管道输入进行检查)
if st.button('处理输入数据') and df1 is not None and df2 is not None: # 简化检查
    st.info('正在执行Kedro管道...')

    # 模拟其他DataFrame,实际应通过上传获取
    if df3 is None: df3 = pd.DataFrame({'colA': [1,2], 'colB': ['x','y']})
    if df4 is None: df4 = pd.DataFrame({'colC': [3,4], 'colD': ['a','b']})
    if df5 is None: df5 = pd.DataFrame({'colE': [5,6], 'colF': ['m','n']})
    if df6 is None: df6 = pd.DataFrame({'colG': [7,8], 'colH': ['p','q']})

    try:
        # 创建自定义DataCatalog,包含MemoryDataSet
        custom_catalog = DataCatalog({
            "reagentes_raw": MemoryDataSet(df1),
            "balanco_de_massas_raw": MemoryDataSet(df2),
            "laboratorio_raw": MemoryDataSet(df3), # 示例数据
            "laboratorio_raiox_raw": MemoryDataSet(df4), # 示例数据
            "carta_controle_pims_raw": MemoryDataSet(df5), # 示例数据
            "blend_raw": MemoryDataSet(df6) # 示例数据
        })

        # 创建KedroSession并运行管道
        with KedroSession.create(project_path=project_path) as session:
            # 关键:通过 data_catalog 参数传递自定义目录
            session.run(data_catalog=custom_catalog, pipeline_name="tag_web_app")

        st.success('数据处理成功!')

        # 从自定义的catalog中加载管道的输出数据
        # 假设管道的输出数据集名为 "merged_raw_data_process"
        merged_data = custom_catalog.load("merged_raw_data_process")
        st.header('处理结果预览')
        st.dataframe(merged_data.head())

        # 示例:显示最后更新时间,假设输出数据包含 'timestamp' 列
        if 'timestamp' in merged_data.columns:
            last_update = pd.to_datetime(merged_data['timestamp']).max()
            st.write(f"数据集中最新信息的时间: {last_update.strftime('%Y/%m/%d %H:%M:%S')}")
        else:
            st.write("输出数据中未找到 'timestamp' 列。")

    except Exception as e:
        st.error(f"Kedro管道执行失败: {e}")
        st.exception(e)

注意:

  • 请将project_path替换为您的Kedro项目的实际路径。
  • my_kedro_project目录下需要有一个名为tag_web_app的Kedro管道,并且该管道的输入数据集名称(例如reagentes_raw)需要与custom_catalog中定义的键名一致。
  • 管道的输出数据集(例如merged_raw_data_process)也需要在custom_catalog中定义或由管道生成,以便后续加载。

结果的获取与处理

当Kedro管道通过session.run(data_catalog=custom_catalog, ...)执行完毕后,管道的输出数据集(如果它们被定义为写入catalog)将存储在您传入的custom_catalog对象中。这意味着,您可以直接从该custom_catalog实例中加载管道处理后的结果,并在Streamlit应用中进行展示或进一步处理。

如上例所示:

merged_data = custom_catalog.load("merged_raw_data_process")
st.dataframe(merged_data.head())

这行代码从之前传入的custom_catalog中加载了名为merged_raw_data_process的数据集,该数据集是Kedro管道的最终输出。

常见错误与解决方案

在集成Kedro与Streamlit时,开发者可能会遇到一些常见的AttributeError。这些错误通常源于尝试以不正确的方式修改Kedro的内部状态。

Manus Manus

全球首款通用型AI Agent,可以将你的想法转化为行动。

Manus 250 查看详情 Manus

错误1: AttributeError: can't set attribute 'catalog'

问题描述: 尝试直接对KedroSession或KedroContext的catalog属性进行赋值操作,例如 context.catalog = custom_catalog。

错误原因: KedroSession.catalog和KedroContext.catalog属性在Kedro的设计中是只读的。它们在会话或上下文创建时被初始化,并且不应该在运行时被直接外部修改。Kedro通过配置(catalog.yml)和session.run()方法的参数来管理数据目录的生命周期和内容。

解决方案: 绝对不要尝试直接设置context.catalog。正确的做法是,在调用session.run()时,通过data_catalog参数传递您自定义的DataCatalog。如前文示例所示:

with KedroSession.create(project_path=project_path) as session:
    session.run(data_catalog=custom_catalog, pipeline_name="tag_web_app")

这种方式是Kedro官方推荐且唯一支持的在运行时注入自定义数据目录的方法。

错误2: AttributeError: 'KedroContext' object has no attribute 'pipeline_registry'

问题描述: 尝试从KedroContext对象中访问一个名为pipeline_registry的属性,例如 context.pipeline_registry.get("tag_web_app")。

错误原因: KedroContext对象本身不直接暴露pipeline_registry属性。管道的注册和管理是Kedro内部框架的一部分,通常通过KedroSession的run()方法或context.pipelines属性来间接访问和执行。尝试直接访问pipeline_registry是错误的API使用方式。

解决方案: 避免直接操作pipeline_registry。如果您需要运行特定的管道,只需在session.run()方法中通过pipeline_name参数指定即可:

with KedroSession.create(project_path=project_path) as session:
    session.run(pipeline_name="tag_web_app", data_catalog=custom_catalog)

如果您确实需要获取管道对象(例如用于更高级的调试或自定义运行),可以通过context.pipelines字典来访问,例如 context.pipelines["tag_web_app"],但通常情况下,直接使用session.run()更为简洁和推荐。

最佳实践与注意事项

  1. Kedro项目结构清晰: 确保您的Kedro项目结构良好,管道定义清晰,输入输出数据集命名规范,这有助于Streamlit应用与Kedro管道的顺利对接。
  2. 数据隔离: 使用MemoryDataSet确保每次Streamlit触发的Kedro管道运行时,数据都是独立的,不会相互干扰。
  3. 错误处理: 在Streamlit应用中加入健壮的try-except块,捕获Kedro管道执行过程中可能出现的错误,并向用户提供友好的反馈。
  4. 异步处理(高级): 对于长时间运行的Kedro管道,考虑在Streamlit中使用异步任务队列(如Celery)来避免UI阻塞,提升用户体验。
  5. 环境管理: 确保Streamlit应用运行的环境与Kedro项目所需的依赖一致,避免版本冲突。
  6. 安全性: 如果涉及到敏感数据,请确保数据上传、处理和存储过程符合安全规范。

总结

将Kedro的强大数据管道能力与Streamlit的便捷交互界面相结合,能够为数据科学家和工程师提供一个高效且灵活的开发环境。本教程强调了在Streamlit应用中通过KedroSession.run()方法的data_catalog参数传递自定义DataCatalog的正确方法,这是处理动态数据的核心。同时,通过深入解析常见的AttributeError,我们明确了Kedro的API设计原则,即避免直接修改只读属性或访问不存在的内部组件。遵循这些指导原则和最佳实践,您将能够构建稳定、高效且易于维护的Kedro-Streamlit集成应用。

以上就是Kedro与Streamlit集成:构建动态数据管道的实践指南的详细内容,更多请关注其它相关文章!


# python  # 如何在  # 上传  # 数据处理  # 您的  # 加载  # 自定义  # 敏感  # 开发环境  # 应用开发  # stream  # ai  # csv  # session  # app  # 大数据  # excel  # 异步任务  # 贵南县公司网站建设  # 推广网站去哪里找平台的  # 绵阳seo优化公司业务  # 钢铁网站推广哪个好  # 山东建设网站有哪些  # 修水网站网络推广的效果  # 萧山区推广网站哪家好  # 宝塔建设个人网站  # 邳州推广网络营销前景  # 广州家具网站seo优化  # 提供一个  # 所示  # 并将  # 文件上传 


相关栏目: 【 Google疑问12 】 【 Facebook疑问10 】 【 优化推广96088 】 【 技术知识133117 】 【 IDC资讯59369 】 【 网络运营7196 】 【 IT资讯61894


相关推荐: yy漫画官方网站登录入口_yy漫画在线阅读页面地址  J*aScript类型数组_TypedArray使用  行者app怎样导出日志  TikTok收藏夹无法删除视频如何解决 TikTok收藏管理优化方法  mysql归档数据怎么导出为csv_mysql归档数据导出为csv文件的方法  mysql导入sql文件能分批导入吗_mysql分批次导入大sql文件的实用技巧  《荔枝fm》导出文件教程  《优志愿》修改手机号方法  如何在CSS中使用伪类:valid实现表单验证提示_结合:valid改变边框颜色  《一起考教师》账号注销方法  服装短视频如何起号推广?服装短视频起号推广有什么要求?  苹果手机如何清理系统缓存数据 iPhone非越狱清理垃圾文件的技巧【系统优化】  三星M34录音变声问题_Samsung M34麦克风调整  修复UI元素交互障碍:从“开始”按钮到信息框的平滑过渡实现  Go语言中方法接收器的选择:值类型还是指针类型?  从HTML表单获取逗号分隔值并转换为NumPy数组进行预测  Golang中的rune与byte类型区别是什么_Golang字符与字节处理详解  智慧团建活动报名入口 智慧团建活动报名入口手机端官网​  高效调试PHP大型嵌套数组:JSON序列化与可视化工具实践  sublime如何撤销关闭的标签页_sublime重新打开已关闭文件技巧  cad怎么隐藏指定的图层_cad隐藏或冻结图层方法  抖音小程序怎么开通?小程序开通条件是什么?  斯宾塞称XGP云游戏“蒸蒸日上”:正在构建一个游戏从未如此唾手可得的未来  Go语言反射机制下访问嵌入结构体中的被遮蔽方法  C++如何实现矩阵乘法_C++二维数组矩阵运算代码示例  获取WooCommerce产品在后台编辑页面的分类ID  电脑视频号|直播|如何分享屏幕  macosmonterey系统外接显示器驱动怎么安装_macosmonterey外接显示器驱动与分辨率调整  mysql中外键约束如何使用_mysql FOREIGN KEY操作  《糖豆》添加舞曲方法  vivo手机视频通话美颜怎么设置_vivo视频通话美颜开启方法  PointNet++语义分割模型中类别变更引发的断言错误及标签处理策略  智慧职教mooc平台登录网址 智慧职教mooc官网直达  PHP中获取HTTP响应状态消息:方法与限制  Win11便笺在哪打开 Win11桌面便笺(Sticky Notes)使用方法【详解】  Golang如何实现HTTP请求重试机制_Golang HTTP请求错误处理策略  小红书网页版首页入口 小红书网页版电脑端官方登录链接  安居客移动经纪人怎么设置自动回复?-安居客移动经纪人设置自动回复的方法  Go Goroutine调度与并发执行深度解析  夸克浏览器资源嗅探怎么用 夸克浏览器网页资源下载技巧【教程】  mysql中如何配置字符集和排序规则_mysql字符集排序配置  在J*a中如何实现类的继承与方法重用_OOP继承方法重用技巧分享  毒蘑菇VOLUMESHADER_BM官网首页登录入口 毒蘑菇VOLUMESHADER_BM官网首页登录入口说明  芒果TV官网登录入口 芒果TV官方网站登录入口  mail.qq.com登录入口 QQ邮箱网页版直达  Golang如何使用crypto/md5生成哈希_Golang MD5哈希生成方法  《密马》发布账号方法  微星主板BIOS怎么调整内存时序_内存参数手动优化BIOS设置教程  快递优选如何查优选物流_快递优选专属物流渠道查询与配送时效  J*a里如何处理ArithmeticException并防止除零_算术异常防护策略解析 

 2025-11-12

了解您产品搜索量及市场趋势,制定营销计划

同行竞争及网站分析保障您的广告效果

点击免费数据支持

提交您的需求,1小时内享受我们的专业解答。

运城市盐湖区信雨科技有限公司


运城市盐湖区信雨科技有限公司

运城市盐湖区信雨科技有限公司是一家深耕海外推广领域十年的专业服务商,作为谷歌推广与Facebook广告全球合作伙伴,聚焦外贸企业出海痛点,以数字化营销为核心,提供一站式海外营销解决方案。公司凭借十年行业沉淀与平台官方资源加持,打破传统外贸获客壁垒,助力企业高效开拓全球市场,成为中小企业出海的可靠合作伙伴。

 8156699

 13765294890

 8156699@qq.com

Notice

We and selected third parties use cookies or similar technologies for technical purposes and, with your consent, for other purposes as specified in the cookie policy.
You can consent to the use of such technologies by closing this notice, by interacting with any link or button outside of this notice or by continuing to browse otherwise.