Close Menu

    Subscribe to Updates

    Get the latest creative news from FooBar about art, design and business.

    What's Hot

    Ukraine drone barrage targets Moscow as Zelenskyy demands accountability for Putin

    June 8, 2025

    Canada’s first astronaut and former Foreign Minister Marc Garneau dies at 76

    June 8, 2025

    LA immigration protests live updates: Trump deploys 2,000 National Guard members

    June 8, 2025
    Facebook X (Twitter) Instagram
    • Demos
    • Buy Now
    Facebook X (Twitter) Instagram YouTube
    14 Trends14 Trends
    Demo
    • Home
    • Features
      • View All On Demos
    • Buy Now
    14 Trends14 Trends
    Home » Stream ingest data from Kafka to Amazon Bedrock Knowledge Bases using custom connectors
    AI AWS

    Stream ingest data from Kafka to Amazon Bedrock Knowledge Bases using custom connectors

    adminBy adminApril 20, 2025No Comments12 Mins Read0 Views
    Facebook Twitter Pinterest LinkedIn Telegram Tumblr Email
    Share
    Facebook Twitter LinkedIn Pinterest Email


    Retrieval Augmented Generation (RAG) enhances AI responses by combining the generative AI model’s capabilities with information from external data sources, rather than relying solely on the model’s built-in knowledge. In this post, we showcase the custom data connector capability in Amazon Bedrock Knowledge Bases that makes it straightforward to build RAG workflows with custom input data. Through this capability, Amazon Bedrock Knowledge Bases supports the ingestion of streaming data, which means developers can add, update, or delete data in their knowledge base through direct API calls.

    Think of the examples of clickstream data, credit card swipes, Internet of Things (IoT) sensor data, log analysis and commodity prices—where both current data and historical trends are important to make a learned decision. Previously, to feed such critical data inputs, you had to first stage it in a supported data source and then either initiate or schedule a data sync job. Based on the quality and quantity of the data, the time to complete this process varied. With custom data connectors, you can quickly ingest specific documents from custom data sources without requiring a full sync and ingest streaming data without the need for intermediary storage. By avoiding time-consuming full syncs and storage steps, you gain faster access to data, reduced latency, and improved application performance.

    However, with streaming ingestion using custom connectors, Amazon Bedrock Knowledge Bases processes such streaming data without using an intermediary data source, making it available almost immediately. This feature chunks and converts input data into embeddings using your chosen Amazon Bedrock model and stores everything in the backend vector database. This automation applies to both newly created and existing databases, streamlining your workflow so you can focus on building AI applications without worrying about orchestrating data chunking, embeddings generation, or vector store provisioning and indexing. Additionally, this feature provides the ability to ingest specific documents from custom data sources, all while reducing latency and alleviating operational costs for intermediary storage.

    Amazon Bedrock

    Amazon Bedrock is a fully managed service that offers a choice of high-performing foundation models (FMs) from leading AI companies such as Anthropic, Cohere, Meta, Stability AI, and Amazon through a single API, along with a broad set of capabilities you need to build generative AI applications with security, privacy, and responsible AI. Using Amazon Bedrock, you can experiment with and evaluate top FMs for your use case, privately customize them with your data using techniques such as fine-tuning and RAG, and build agents that execute tasks using your enterprise systems and data sources.

    Amazon Bedrock Knowledge Bases

    Amazon Bedrock Knowledge Bases allows organizations to build fully managed RAG pipelines by augmenting contextual information from private data sources to deliver more relevant, accurate, and customized responses. With Amazon Bedrock Knowledge Bases, you can build applications that are enriched by the context that is received from querying a knowledge base. It enables a faster time to product release by abstracting from the heavy lifting of building pipelines and providing you an out-of-the-box RAG solution, thus reducing the build time for your application.

    Amazon Bedrock Knowledge Bases custom connector

    Amazon Bedrock Knowledge Bases supports custom connectors and the ingestion of streaming data, which means you can add, update, or delete data in your knowledge base through direct API calls.

    Solution overview: Build a generative AI stock price analyzer with RAG

    For this post, we implement a RAG architecture with Amazon Bedrock Knowledge Bases using a custom connector and topics built with Amazon Managed Streaming for Apache Kafka (Amazon MSK) for a user who may be interested to understand stock price trends. Amazon MSK is a streaming data service that manages Apache Kafka infrastructure and operations, making it straightforward to run Apache Kafka applications on Amazon Web Services (AWS). The solution enables real-time analysis of customer feedback through vector embeddings and large language models (LLMs).

    The following architecture diagram has two components:

    Preprocessing streaming data workflow noted in letters on the top of the diagram:

    1. Mimicking streaming input, upload a .csv file with stock price data into MSK topic
    2. Automatically trigger the consumer AWS Lambda function
    3. Ingest consumed data into a knowledge base
    4. Knowledge base internally using embeddings model transforms into vector index
    5. Knowledge base internally storing vector index into the vector database

    Runtime execution during user queries noted in numerals at the bottom of the diagram:

    1. Users query on stock prices
    2. Foundation model uses the knowledge base to search for an answer
    3. The knowledge base returns with relevant documents
    4. User answered with relevant answer

    solution overview

    Implementation design

    The implementation follows these high-level steps:

    1. Data source setup – Configure an MSK topic that streams input stock prices
    2. Amazon Bedrock Knowledge Bases setup – Create a knowledge base in Amazon Bedrock using the quick create a new vector store option, which automatically provisions and sets up the vector store
    3. Data consumption and ingestion – As and when data lands in the MSK topic, trigger a Lambda function that extracts stock indices, prices, and timestamp information and feeds into the custom connector for Amazon Bedrock Knowledge Bases
    4. Test the knowledge base – Evaluate customer feedback analysis using the knowledge base

    Solution walkthrough

    To build a generative AI stock analysis tool with Amazon Bedrock Knowledge Bases custom connector, use instructions in the following sections.

    Configure the architecture

    To try this architecture, deploy the AWS CloudFormation template from this GitHub repository in your AWS account. This template deploys the following components:

    1. Functional virtual private clouds (VPCs), subnets, security groups and AWS Identity and Access Management (IAM) roles
    2. An MSK cluster hosting Apache Kafka input topic
    3. A Lambda function to consume Apache Kafka topic data
    4. An Amazon SageMaker Studio notebook for granular setup and enablement

    Create an Apache Kafka topic

    In the precreated MSK cluster, the required brokers are deployed ready for use. The next step is to use a SageMaker Studio terminal instance to connect to the MSK cluster and create the test stream topic. In this step, you follow the detailed instructions that are mentioned at Create a topic in the Amazon MSK cluster. The following are the general steps involved:

    1. Download and install the latest Apache Kafka client
    2. Connect to the MSK cluster broker instance
    3. Create the test stream topic on the broker instance

    Create a knowledge base in Amazon Bedrock

    To create a knowledge base in Amazon Bedrock, follow these steps:

    1. On the Amazon Bedrock console, in the left navigation page under Builder tools, choose Knowledge Bases.

    amazon bedrock knowledge bases console

    1. To initiate knowledge base creation, on the Create dropdown menu, choose Knowledge Base with vector store, as shown in the following screenshot.

    amazon bedrock knowledge bases create

    1. In the Provide Knowledge Base details pane, enter BedrockStreamIngestKnowledgeBase as the Knowledge Base name.
    2. Under IAM permissions, choose the default option, Create and use a new service role, and (optional) provide a Service role name, as shown in the following screenshot.

    amazon bedrock knowledge bases create details

    1. On the Choose data source pane, select Custom as the data source where your dataset is stored
    2. Choose Next, as shown in the following screenshot

    amazon bedrock knowledge bases data source details

    1. On the Configure data source pane, enter BedrockStreamIngestKBCustomDS as the Data source name.
    2. Under Parsing strategy, select Amazon Bedrock default parser and for Chunking strategy, choose Default chunking. Choose Next, as shown in the following screenshot.

    amazon bedrock knowledge bases parsing strategy

    1. On the Select embeddings model and configure vector store pane, for Embeddings model, choose Titan Text Embeddings v2. For Embeddings type, choose Floating-point vector embeddings. For Vector dimensions, select 1024, as shown in the following screenshot. Make sure you have requested and received access to the chosen FM in Amazon Bedrock. To learn more, refer to Add or remove access to Amazon Bedrock foundation models.

    amazon bedrock knowledge bases embedding model

    1. On the Vector database pane, select Quick create a new vector store and choose the new Amazon OpenSearch Serverless option as the vector store.

    amazon bedrock knowledge bases vector data store

    1. On the next screen, review your selections. To finalize the setup, choose Create.
    2. Within a few minutes, the console will display your newly created knowledge base.

    Configure AWS Lambda Apache Kafka consumer

    Now, using API calls, you configure the consumer Lambda function so it gets triggered as soon as the input Apache Kafka topic receives data.

    1. Configure the manually created Amazon Bedrock Knowledge Base ID and its custom Data Source ID as environment variables within the Lambda function. When you use the sample notebook, the referred function names and IDs will be filled in automatically.
    response = lambda_client.update_function_configuration(
            FunctionName=,
            Environment={
                'Variables': {
                    'KBID': ,
                    'DSID': 
                }
            }
        )

    1. When it’s completed, you tie the Lambda consumer function to listen for events in the source Apache Kafka topic:
    response = lambda_client.create_event_source_mapping(
            EventSourceArn=,
            FunctionName=,
            StartingPosition='LATEST',
            Enabled=True,
            Topics=['streamtopic']
        )

    Review AWS Lambda Apache Kafka consumer

    The Apache Kafka consumer Lambda function reads data from the Apache Kafka topic, decodes it, extracts stock price information, and ingests it into the Amazon Bedrock knowledge base using the custom connector.

    1. Extract the knowledge base ID and the data source ID:
    kb_id = os.environ['KBID']
    ds_id = os.environ['DSID']

    1. Define a Python function to decode input events:
    def decode_payload(event_data):
        agg_data_bytes = base64.b64decode(event_data)
        decoded_data = agg_data_bytes.decode(encoding="utf-8") 
        event_payload = json.loads(decoded_data)
        return event_payload   

    1. Decode and parse required data on the input event received from the Apache Kafka topic. Using them, create a payload to be ingested into the knowledge base:
    records = event['records']['streamtopic-0']
    for rec in records:
            # Each record has separate eventID, etc.
            event_payload = decode_payload(rec['value'])
            ticker = event_payload['ticker']
            price = event_payload['price']
            timestamp = event_payload['timestamp']
            myuuid = uuid.uuid4()
            payload_ts = datetime.utcfromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
            payload_string = "At " + payload_ts + " the price of " + ticker + " is " + str(price) + "."

    1. Ingest the payload into Amazon Bedrock Knowledge Bases using the custom connector:
    response = bedrock_agent_client.ingest_knowledge_base_documents(
                    knowledgeBaseId = kb_id,
                    dataSourceId = ds_id,
                    documents= [
                        {
                            'content': {
                                'custom' : {
                                    'customDocumentIdentifier': {
                                        'id' : str(myuuid)
                                    },
                                    'inlineContent' : {
                                        'textContent' : {
                                            'data' : payload_string
                                        },
                                        'type' : 'TEXT'
                                    },
                                    'sourceType' : 'IN_LINE'
                                },
                                'dataSourceType' : 'CUSTOM'
                            }
                        }
                    ]
                )

    Testing

    Now that the required setup is done, you trigger the workflow by ingesting test data into your Apache Kafka topic hosted with the MSK cluster. For best results, repeat this section by changing the .csv input file to show stock price increase or decrease.

    1. Prepare the test data. In my case, I had the following data input as a .csv file with a header.
    ticker price
    OOOO $44.50
    ZVZZT $3,413.23
    ZNTRX $22.34
    ZNRXX $208.76
    NTEST $0.45
    ZBZX $36.23
    ZEXIT $942.34
    ZIEXT $870.23
    ZTEST $23.75
    ZVV $2,802.86
    ZXIET $63.00
    ZAZZT $18.86
    ZBZZT $998.26
    ZCZZT $72.34
    ZVZZC $90.32
    ZWZZT $698.24
    ZXZZT $932.32
    1. Define a Python function to put data to the topic. Use pykafka client to ingest data:
    def put_to_topic(kafka_host, topic_name, ticker, amount, timestamp):    
        client = KafkaClient(hosts = kafka_host)
        topic = client.topics[topic_name]
        payload = {
            'ticker': ticker,
            'price': amount,
            'timestamp': timestamp
        }
        ret_status = True
        data = json.dumps(payload)
        encoded_message = data.encode("utf-8")
        print(f'Sending ticker data: {ticker}...')
        with topic.get_sync_producer() as producer:
            result=producer.produce(encoded_message)        
        return ret_status

    1. Read the .csv file and push the records to the topic:
    df = pd.read_csv('TestData.csv')
    start_test_time = time.time() 
    print(datetime.utcfromtimestamp(start_test_time).strftime('%Y-%m-%d %H:%M:%S'))
    df = df.reset_index()
    for index, row in df.iterrows():
        put_to_topic(BootstrapBrokerString, KafkaTopic, row['ticker'], row['price'], time.time())
    end_test_time = time.time()
    print(datetime.utcfromtimestamp(end_test_time).strftime('%Y-%m-%d %H:%M:%S'))

    Verification

    If the data ingestion and subsequent processing is successful, navigate to the Amazon Bedrock Knowledge Bases data source page to check the uploaded information.

    amazon bedrock knowledge bases upload verification

    Querying the knowledge base

    Within the Amazon Bedrock Knowledge Bases console, you have access to query the ingested data immediately, as shown in the following screenshot.

    amazon bedrock knowledge bases test

    To do that, select an Amazon Bedrock FM that you have access to. In my case, I chose Amazon Nova Lite 1.0, as shown in the following screenshot.

    amazon bedrock knowledge bases choose llm

    When it’s completed, the question, “How is ZVZZT trending?”, yields the results based on the ingested data. Note how Amazon Bedrock Knowledge Bases shows how it derived the answer, even pointing to the granular data element from its source.

    bedrock console knowledge bases results

    Cleanup

    To make sure you’re not paying for resources, delete and clean up the resources created.

    1. Delete the Amazon Bedrock knowledge base.
    2. Delete the automatically created Amazon OpenSearch Serverless cluster.
    3. Delete the automatically created Amazon Elastic File System (Amazon EFS) shares backing the SageMaker Studio environment.
    4. Delete the automatically created security groups associated with the Amazon EFS share. You might need to remove the inbound and outbound rules before they can be deleted.
    5. Delete the automatically created elastic network interfaces attached to the Amazon MSK security group for Lambda traffic.
    6. Delete the automatically created Amazon Bedrock Knowledge Bases execution IAM role.
    7. Stop the kernel instances with Amazon SageMaker Studio.
    8. Delete the CloudFormation stack.

    Conclusion

    In this post, we showed you how Amazon Bedrock Knowledge Bases supports custom connectors and the ingestion of streaming data, through which developers can add, update, or delete data in their knowledge base through direct API calls. Amazon Bedrock Knowledge Bases offers fully managed, end-to-end RAG workflows to create highly accurate, low-latency, secure, and custom generative AI applications by incorporating contextual information from your company’s data sources. With this capability, you can quickly ingest specific documents from custom data sources without requiring a full sync, and ingest streaming data without the need for intermediary storage.

    Send feedback to AWS re:Post for Amazon Bedrock or through your usual AWS contacts, and engage with the generative AI builder community at community.aws.


    About the Author

    author-image Prabhakar Chandrasekaran is a Senior Technical Account Manager with AWS Enterprise Support. Prabhakar enjoys helping customers build cutting-edge AI/ML solutions on the cloud. He also works with enterprise customers providing proactive guidance and operational assistance, helping them improve the value of their solutions when using AWS. Prabhakar holds eight AWS and seven other professional certifications. With over 22 years of professional experience, Prabhakar was a data engineer and a program leader in the financial services space prior to joining AWS.



    Source link

    Share. Facebook Twitter Pinterest LinkedIn Tumblr Email
    admin
    • Website

    Related Posts

    Implement semantic video search using open source large vision models on Amazon SageMaker and Amazon OpenSearch Serverless

    June 7, 2025

    Build a serverless audio summarization solution with Amazon Bedrock and Whisper

    June 7, 2025

    Modernize and migrate on-premises fraud detection machine learning workflows to Amazon SageMaker

    June 6, 2025

    How climate tech startups are building foundation models with Amazon SageMaker HyperPod

    June 5, 2025

    Impel enhances automotive dealership customer experience with fine-tuned LLMs on Amazon SageMaker

    June 4, 2025

    Deploy Amazon SageMaker Projects with Terraform Cloud

    June 2, 2025
    Leave A Reply Cancel Reply

    Demo
    Top Posts

    ChatGPT’s viral Studio Ghibli-style images highlight AI copyright concerns

    March 28, 20254 Views

    Best Cyber Forensics Software in 2025: Top Tools for Windows Forensics and Beyond

    February 28, 20253 Views

    An ex-politician faces at least 20 years in prison in killing of Las Vegas reporter

    October 16, 20243 Views

    Laws, norms, and ethics for AI in health

    May 1, 20252 Views
    Don't Miss

    Ukraine drone barrage targets Moscow as Zelenskyy demands accountability for Putin

    June 8, 2025

    LONDON — At least 10 Ukrainian drones were shot down on their approach to Moscow…

    Canada’s first astronaut and former Foreign Minister Marc Garneau dies at 76

    June 8, 2025

    LA immigration protests live updates: Trump deploys 2,000 National Guard members

    June 8, 2025

    Trump attends UFC championship fight in NJ, taking a break from politics, Musk feud

    June 8, 2025
    Stay In Touch
    • Facebook
    • Twitter
    • Pinterest
    • Instagram
    • YouTube
    • Vimeo

    Subscribe to Updates

    Get the latest creative news from SmartMag about art & design.

    Demo
    Top Posts

    ChatGPT’s viral Studio Ghibli-style images highlight AI copyright concerns

    March 28, 20254 Views

    Best Cyber Forensics Software in 2025: Top Tools for Windows Forensics and Beyond

    February 28, 20253 Views

    An ex-politician faces at least 20 years in prison in killing of Las Vegas reporter

    October 16, 20243 Views
    Stay In Touch
    • Facebook
    • YouTube
    • TikTok
    • WhatsApp
    • Twitter
    • Instagram
    Latest Reviews
    Demo
    About Us
    About Us

    Your source for the lifestyle news. This demo is crafted specifically to exhibit the use of the theme as a lifestyle site. Visit our main page for more demos.

    We're accepting new partnerships right now.

    Email Us: info@example.com
    Contact: +1-320-0123-451

    Facebook X (Twitter) Pinterest YouTube WhatsApp
    Our Picks

    Ukraine drone barrage targets Moscow as Zelenskyy demands accountability for Putin

    June 8, 2025

    Canada’s first astronaut and former Foreign Minister Marc Garneau dies at 76

    June 8, 2025

    LA immigration protests live updates: Trump deploys 2,000 National Guard members

    June 8, 2025
    Most Popular

    ChatGPT’s viral Studio Ghibli-style images highlight AI copyright concerns

    March 28, 20254 Views

    Best Cyber Forensics Software in 2025: Top Tools for Windows Forensics and Beyond

    February 28, 20253 Views

    An ex-politician faces at least 20 years in prison in killing of Las Vegas reporter

    October 16, 20243 Views

    Subscribe to Updates

    Get the latest creative news from FooBar about art, design and business.

    14 Trends
    Facebook X (Twitter) Instagram Pinterest YouTube Dribbble
    • Home
    • Buy Now
    © 2025 ThemeSphere. Designed by ThemeSphere.

    Type above and press Enter to search. Press Esc to cancel.