在工作区级别的所有 Databricks 笔记本中进行字符串搜索

String search in all Databricks notebook in workspace level

用例:是否有可能从一个特定的 Databricks 工作区检查所有笔记本并获取笔记本通过 ADLS 位置而不是元存储直接访问数据的位置的清单。

要捕获的信息:

引用“abfss://”的笔记本名称

每个笔记本中引用的具体文件路径

如果可能,请帮助我。

此功能在 Azure Databricks 上不可用 - 您只能按笔记本或文件夹名称进行搜索。但是您仍然可以通过将笔记本导出到本地磁盘并搜索字符串来做您想做的事。对于导出,您可以使用 Databricks CLIworkspace export_dir 命令,如下所示:

databricks workspace export_dir '/Shared/' ~/tmp/databricks-files

最后我们找到了一个正确的solutions.Even通过,workspace export_dir命令是这个用例的解决方案之一,我们必须将所有笔记本从工作区下载到本地,这不是 由我们的安全团队推荐。因此,或者,我们通过 2.0/workspace/export rest api databricks notebooks 本身的选项实现了。

步骤:

  1. 找出所有带有笔记本路径、对象 ID 等笔记本对象的笔记本
  2. 迭代所有笔记本路径以编码格式获取这些笔记本的内容
  3. 对这些笔记本详细信息进行解码,并检查各个解码的笔记本详细信息是否包含搜索字符串。
  4. 根据对象 id 获取笔记本所有者
  5. 生成最终结果。

参考:https://docs.databricks.com/dev-tools/api/latest/workspace.html#export

完整代码供参考:

from pyspark.sql.types import IntegerType
from pyspark.sql.types import *
from pyspark.sql import Row
import base64
import requests
import json

databricks_instance ="https://databricks_instance.azuredatabricks.net"

url_list = f"{databricks_instance}/api/2.0/workspace/list"
url_export = f"{databricks_instance}/api/2.0/workspace/export"


payload = json.dumps({
  "path": "/Users/"
})
headers = {
  'Authorization': 'Bearer user_token',
  'Content-Type': 'application/json'
}

response = requests.request("GET", url_list, headers=headers, data=payload).json()
notebooks = []

# Getting the all notebooks list for given notebooks.

def list_notebooks(mylist):
  for element in mylist['objects']:
    if element['object_type'] == 'NOTEBOOK':
      notebooks.append(element)
    if element['object_type'] == 'DIRECTORY':
      payload_inner = json.dumps({
        "path": element['path']
      })
      response_inner = requests.request("GET", url_list, headers=headers, data=payload_inner).json()
      if len(response_inner) != 0:
        list_notebooks(response_inner)
  return notebooks

result = list_notebooks(response)
print(result[0])

#     print(current_note_book_id)

#     
class BearerAuth(requests.auth.AuthBase):
      def __init__(self, token):
          self.token = token
      def __call__(self, r):
          r.headers["authorization"] = 'Bearer user_token'
          
          return r
# Define the function to get the permission 

def get_permission(var_object_id):
    url_export1 = f"{databricks_instance}/api/2.0/preview/permissions/notebooks/{var_object_id}"
    response = requests.request("GET", url_export1, headers=headers, data=payload).json()
    access_control_list= response['access_control_list']
    return access_control_list
    
# Define the function to get the user and permission of the notebooks 

def get_control_user_list(access_control_list):
    access_control_user_list =[]
    for var_idx in access_control_list:
        if "user_name" in var_idx:
           var_user_name = var_idx['user_name']
        if "group_name" in var_idx:
           var_user_name = var_idx['group_name']
        var_user_permission = var_idx['all_permissions'][0]['permission_level']
        access_control_user_list.append(var_user_name)
        access_control_user_list.append(var_user_permission)
    return access_control_user_list
  
#get the current notebook path

var_current_notebook_path = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()

# print(var_current_notebook_path)

# Processing the every elment of the notebook objects for given location 
var_search_type_oracle ='Oracle'


# Defining the search pattern 

var_oracle_search_pattern = 'jdbc:oracle'

#mentioning the target location 
var_report_target_location = '/writing/into/target/location'

# Define the function to formatting the final results

def get_format_result(var_notebook_path,var_search_type,var_access_user_control_list):
    var_access_user_control_list_str = '-'.join(var_access_user_control_list)
    final_format_record = var_notebook_path + "|" + var_search_type + "|" + var_access_user_control_list_str
    return final_format_record

# Opening the file for report 
f = open(f"{var_report_target_location}/mytarget", 'w')

# Processing the every note book objects 

var_total_notebook_count_processed =0
var_total_notebook_count_oracle    =0
var_total_notebook_count_teradata  =0
var_total_notebook_count_datalakeservice  =0
var_total_notebook_count_oauth  =0


for notebook_items in result : 
    var_notebook_path = notebook_items["path"]
    var_object_id     = notebook_items["object_id"]
    var_note_payload  =  json.dumps({"path": f"{var_notebook_path}"})
    var_response = requests.request("GET", url_export, headers=headers, data=var_note_payload).json()
#     if "content" in var_response:
#         var_response_content=var_response['content']
#     else:
#         print ("this is not notbook cotain content = " % (var_notebook_path))
#         var_response_content = base64.b64encode(b'no data to be encoded')
    try:
        var_response_content=var_response['content']
    except:
        # Continue to next iteration as some time note book has some limitation {'error': 'DatabricksServiceException: BAD_REQUEST: content size (15395930) exceeded the limit 10485760'}
        continue
#     var_response_content_str= base64.b64decode(var_response['content']).decode("utf-8") 
    var_response_content_str= base64.b64decode(var_response_content).decode("utf-8") 
    var_total_notebook_count_processed +=1
    if var_response_content_str.find(f"{var_other_onprem_search_pattern}") != -1 and var_response_content_str.find("password") != -1 and var_notebook_path !=var_current_notebook_path :
       # oracle connection password contain note books 
       if var_response_content_str.find(f"{var_oracle_search_pattern}") != -1 and var_response_content_str.find("password") != -1 and var_notebook_path !=var_current_notebook_path :
          print(var_notebook_path)
          var_access_control_list      = get_permission(var_object_id)
          var_access_user_control_list = get_control_user_list(var_access_control_list)
          var_final_format_record      = get_format_result(var_notebook_path,var_search_type_oracle,var_access_user_control_list)
          print(var_final_format_record)
          f.write(var_final_format_record +'\n')
          var_total_notebook_count_oracle +=1

我构建了一个网络应用程序,它执行弹性搜索以在您的数据块工作区或存储库中查找片段。我在工作中一直使用它。

https://github.com/gardnmi/databricks_snippets